支持自动重连,随时 Ctrl+C 退出程序

package main

import (
	"bufio"
	"flag"
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/streadway/amqp"
)

var (
	conn    *amqp.Connection
	channel *amqp.Channel
	stop    = make(chan struct{})
)

func main() {
	user := flag.String("user", "user", "用户名")
	pass := flag.String("pass", "password", "密码")
	host := flag.String("host", "rabbitmq.ha.svc.cluster.local", "地址")
	port := flag.String("port", "5672", "端口")
	mode := flag.String("mode", "send", "模式: send/recv")
	queue := flag.String("queue", "test_cluster_queue", "队列名")
	flag.Parse()

	rabbitURL := fmt.Sprintf("amqp://%s:%s@%s:%s/", *user, *pass, *host, *port)
	setupCloseHandler()

	for {
		select {
		case <-stop:
			return
		default:
		}

		connect(rabbitURL, *queue)

		if *mode == "send" {
			runSender(*queue)
		}

		if *mode == "recv" {
			runReceiver(*queue)
		}

		closeAll()
		fmt.Println("连接断开,将自动重连...")
		time.Sleep(1 * time.Second)
	}
}

func runSender(queue string) {
	fmt.Println("✅ 进入发送模式,输入消息发送,按 Ctrl+C 退出")

	scanner := bufio.NewScanner(os.Stdin)

	for {
		select {
		case <-stop:
			return
		default:
		}

		fmt.Print("> ")

		if !scanner.Scan() {
			return
		}

		msg := scanner.Text()
		if msg == "" {
			continue
		}

		ok := sendMsg(queue, msg)
		if !ok {
			fmt.Println("连接异常,退出发送循环,准备重连...")
			return
		}
	}
}

func runReceiver(queue string) {
	fmt.Println("✅ 进入接收模式,等待消息... Ctrl+C 退出")

	msgs, err := channel.Consume(
		queue,
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		return
	}

	for {
		select {
		case <-stop:
			return
		case d, ok := <-msgs:
			if !ok {
				return
			}
			fmt.Println("收到消息:", string(d.Body))
		}
	}
}

func sendMsg(queue, body string) bool {
	err := channel.Publish(
		"",
		queue,
		false,
		false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		},
	)

	if err != nil {
		fmt.Println("❌ 发送失败:", err)
		return false
	}

	fmt.Println("✅ 发送成功:", body)
	return true
}

func connect(url string, queue string) {
	for {
		select {
		case <-stop:
			return
		default:
		}

		var err error

		conn, err = amqp.Dial(url)
		if err != nil {
			fmt.Printf("⚠️ 连接失败,5秒后重试... (Ctrl+C退出)\n")
			waitWithStop(5 * time.Second)
			continue
		}

		channel, err = conn.Channel()
		if err != nil {
			closeAll()
			waitWithStop(5 * time.Second)
			continue
		}

		_, err = channel.QueueDeclare(
			queue,
			false,
			true,
			false,
			false,
			nil,
		)

		if err != nil {
			closeAll()
			waitWithStop(5 * time.Second)
			continue
		}

		fmt.Println("✅ 连接成功!")
		return
	}
}

func setupCloseHandler() {
	c := make(chan os.Signal, 1)

	signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)

	go func() {
		<-c
		fmt.Println("\n退出程序...")
		close(stop)
		closeAll()
		os.Exit(0)
	}()
}

func waitWithStop(d time.Duration) {
	timer := time.NewTimer(d)
	defer timer.Stop()

	select {
	case <-timer.C:
	case <-stop:
	}
}

func closeAll() {
	if channel != nil {
		_ = channel.Close()
		channel = nil
	}

	if conn != nil {
		_ = conn.Close()
		conn = nil
	}
}

测试如图

Image

Image