go rabbitmq 操作
go 依赖包github.com/streadway/amqp
docker快速部署
docker pull rabbitmq:management
docker run -d rabbitmq:management # 先跑一个看看监听了哪些端口
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq #5672 go 程序连接,15672是管理页面
写个最基本生产者消费者demo(headers 模式)
package testimport ("fmt""log""os""os/signal""strconv""syscall""testing""time""github.com/streadway/amqp"
)var (obj *MQOBJ
)type MQOBJ struct {*amqp.Connection*amqp.Channel
}func (mq *MQOBJ) Close() error {mq.Connection.Close()mq.Channel.Close()return nil
}
func init() {var mqurl = "amqp://cho:123@192.168.101.7:5672"con, err := amqp.Dial(mqurl)if err != nil {log.Fatalln(err)}ch, err := con.Channel()if err != nil {log.Fatalln(err)}obj = &MQOBJ{Connection: con, Channel: ch}}
func producer() {_, err := obj.Channel.QueueDeclare("go-test2", true, false, false, false, nil)if err != nil {return}err = obj.ExchangeDeclare("go-test-exchange2", amqp.ExchangeHeaders, true, false, false, false, nil)if err != nil {log.Fatalln(err)}//这个queue绑定,你也可以放消费者那边绑定,更灵活err = obj.Channel.QueueBind("go-test2", "go-test2", "go-test-exchange2", false, amqp.Table{"name": "jesko"})if err != nil {log.Fatalln(err)}ticker := time.NewTicker(time.Millisecond * 300)var i intfor {select {case <-ticker.C:err = obj.Publish("", "go-test2", false, false, amqp.Publishing{Body: []byte("hello " + strconv.Itoa(i)), ContentType: "text/plain", Headers: amqp.Table{"x-match": "any", "name": "jesko", "age": 22}})if err != nil {log.Fatalln(err)}i++}}}
func customer() {_, err := obj.Channel.QueueDeclare("go-test2", true, false, false, false, nil)if err != nil {log.Fatalln(err)}msgch, err := obj.Channel.Consume("go-test2", "", true, false, true, false, nil)if err != nil {log.Fatalln(err)}ch := make(chan os.Signal, 1)signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)for {select {case msg := <-msgch:fmt.Println("accept msg " + string(msg.Body))case <-ch:return}}
}
func TestAmqp(t *testing.T) {defer obj.Close()go func() {producer()}()time.Sleep(2 * time.Second)customer()
}
这里可以看到我们创建的queue
topic模式
topic模式不用绑定headers去匹配
package testimport ("fmt""log""os""os/signal""strconv""syscall""testing""time""github.com/streadway/amqp"
)var (obj *MQOBJlogger *log.Logger = log.New(os.Stdout, "", log.Llongfile|log.LUTC)
)func Fataln(a ...any) {logger.Println(a...)os.Exit(0)
}type MQOBJ struct {*amqp.Connection*amqp.Channel
}func (mq *MQOBJ) Close() error {mq.Connection.Close()mq.Channel.Close()return nil
}
func init() {var mqurl = "amqp://cho:123@192.168.101.7:5672"con, err := amqp.Dial(mqurl)if err != nil {Fataln(err)}ch, err := con.Channel()if err != nil {Fataln(err)}obj = &MQOBJ{Connection: con, Channel: ch}fmt.Println("init success")}
func producer() {err := obj.ExchangeDeclare("go-test-exchange", amqp.ExchangeTopic, true, false, false, false, nil)if err != nil {Fataln(err)}ticker := time.NewTicker(time.Millisecond * 300)var i intfor {select {case <-ticker.C:err = obj.Publish("go-test-exchange", "go-test", false, false, amqp.Publishing{Body: []byte("hello " + strconv.Itoa(i)), ContentType: "text/plain"})if err != nil {Fataln(err)}i++}}}type Empty struct{}func customer(name string, stopchan <-chan Empty) {ch, err := obj.Connection.Channel()if err != nil {Fataln(err)}defer ch.Close()_, err = ch.QueueDeclare(name, true, false, false, false, nil)if err != nil {Fataln("queue declare failed", err)}err = ch.QueueBind(name, name, "go-test-exchange", false, nil)if err != nil {fmt.Fprintln(os.Stderr, "queue bind failed", err)return}msgch, err := ch.Consume(name, "", true, false, true, false, nil)if err != nil {Fataln("consume failed", err)}for {select {case msg := <-msgch:fmt.Println("accept msg " + name + " " + string(msg.Body))case <-stopchan:return}}
}
func TestAmqp(t *testing.T) {defer obj.Close()go func() {producer()}()time.Sleep(2 * time.Second)stopchanlist := make([]chan Empty, 2)stopchanlist[0], stopchanlist[1] = make(chan Empty, 1), make(chan Empty, 1)ch := make(chan os.Signal, 1)signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)go func() {<-chfor _, c := range stopchanlist {c <- Empty{}}}()go customer("go-test", stopchanlist[0])customer("go-test2", stopchanlist[1])
}
go-test 有信息(topic 匹配),go-test2没信息(topic未匹配)。
direct模式
package testimport ("fmt""log""os""os/signal""strconv""syscall""testing""time""github.com/streadway/amqp"
)var (obj *MQOBJlogger *log.Logger = log.New(os.Stdout, "", log.Llongfile|log.LUTC)
)func Fataln(a ...any) {logger.Println(a...)os.Exit(0)
}type MQOBJ struct {*amqp.Connection*amqp.Channel
}func (mq *MQOBJ) Close() error {mq.Connection.Close()mq.Channel.Close()return nil
}
func init() {var mqurl = "amqp://cho:123@192.168.101.7:5672"con, err := amqp.Dial(mqurl)if err != nil {Fataln(err)}ch, err := con.Channel()if err != nil {Fataln(err)}obj = &MQOBJ{Connection: con, Channel: ch}fmt.Println("init success")}
func producer() {err := obj.ExchangeDeclare("go-test-exchange3", amqp.ExchangeDirect, true, false, false, false, nil)if err != nil {Fataln(err)}ticker := time.NewTicker(time.Millisecond * 300)var i intfor {select {case <-ticker.C:err = obj.Publish("go-test-exchange3", "", false, false, amqp.Publishing{Body: []byte("hello " + strconv.Itoa(i)), ContentType: "text/plain"})if err != nil {Fataln(err)}i++}}}type Empty struct{}func customer(name string, stopchan <-chan Empty) {ch, err := obj.Connection.Channel()if err != nil {Fataln(err)}defer ch.Close()_, err = ch.QueueDeclare(name, true, false, false, false, nil)if err != nil {Fataln("queue declare failed", err)}err = ch.QueueBind(name, "", "go-test-exchange3", false, nil)if err != nil {fmt.Fprintln(os.Stderr, "queue bind failed", err)return}msgch, err := ch.Consume(name, "", true, false, true, false, nil)if err != nil {Fataln("consume failed", err)}for {select {case msg := <-msgch:fmt.Println("accept msg " + name + " " + string(msg.Body))case <-stopchan:return}}
}
func TestAmqp(t *testing.T) {defer obj.Close()go func() {producer()}()time.Sleep(2 * time.Second)stopchanlist := make([]chan Empty, 2)stopchanlist[0], stopchanlist[1] = make(chan Empty, 1), make(chan Empty, 1)ch := make(chan os.Signal, 1)signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)go func() {<-chfor _, c := range stopchanlist {c <- Empty{}}}()go customer("go-test", stopchanlist[0])customer("go-test2", stopchanlist[1])
}
demo测试命令
go test -v amqp_test.go