go rabbitmq 操作

go rabbitmq 操作

go 依赖包github.com/streadway/amqp

docker快速部署

docker pull rabbitmq:management
docker run -d rabbitmq:management # 先跑一个看看监听了哪些端口
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq #5672 go 程序连接,15672是管理页面

写个最基本生产者消费者demo(headers 模式)

package testimport ("fmt""log""os""os/signal""strconv""syscall""testing""time""github.com/streadway/amqp"
)var (obj *MQOBJ
)type MQOBJ struct {*amqp.Connection*amqp.Channel
}func (mq *MQOBJ) Close() error {mq.Connection.Close()mq.Channel.Close()return nil
}
func init() {var mqurl = "amqp://cho:123@192.168.101.7:5672"con, err := amqp.Dial(mqurl)if err != nil {log.Fatalln(err)}ch, err := con.Channel()if err != nil {log.Fatalln(err)}obj = &MQOBJ{Connection: con, Channel: ch}}
func producer() {_, err := obj.Channel.QueueDeclare("go-test2", true, false, false, false, nil)if err != nil {return}err = obj.ExchangeDeclare("go-test-exchange2", amqp.ExchangeHeaders, true, false, false, false, nil)if err != nil {log.Fatalln(err)}//这个queue绑定,你也可以放消费者那边绑定,更灵活err = obj.Channel.QueueBind("go-test2", "go-test2", "go-test-exchange2", false, amqp.Table{"name": "jesko"})if err != nil {log.Fatalln(err)}ticker := time.NewTicker(time.Millisecond * 300)var i intfor {select {case <-ticker.C:err = obj.Publish("", "go-test2", false, false, amqp.Publishing{Body: []byte("hello " + strconv.Itoa(i)), ContentType: "text/plain", Headers: amqp.Table{"x-match": "any", "name": "jesko", "age": 22}})if err != nil {log.Fatalln(err)}i++}}}
func customer() {_, err := obj.Channel.QueueDeclare("go-test2", true, false, false, false, nil)if err != nil {log.Fatalln(err)}msgch, err := obj.Channel.Consume("go-test2", "", true, false, true, false, nil)if err != nil {log.Fatalln(err)}ch := make(chan os.Signal, 1)signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)for {select {case msg := <-msgch:fmt.Println("accept msg " + string(msg.Body))case <-ch:return}}
}
func TestAmqp(t *testing.T) {defer obj.Close()go func() {producer()}()time.Sleep(2 * time.Second)customer()
}

请添加图片描述
这里可以看到我们创建的queue
请添加图片描述

topic模式

topic模式不用绑定headers去匹配

package testimport ("fmt""log""os""os/signal""strconv""syscall""testing""time""github.com/streadway/amqp"
)var (obj    *MQOBJlogger *log.Logger = log.New(os.Stdout, "", log.Llongfile|log.LUTC)
)func Fataln(a ...any) {logger.Println(a...)os.Exit(0)
}type MQOBJ struct {*amqp.Connection*amqp.Channel
}func (mq *MQOBJ) Close() error {mq.Connection.Close()mq.Channel.Close()return nil
}
func init() {var mqurl = "amqp://cho:123@192.168.101.7:5672"con, err := amqp.Dial(mqurl)if err != nil {Fataln(err)}ch, err := con.Channel()if err != nil {Fataln(err)}obj = &MQOBJ{Connection: con, Channel: ch}fmt.Println("init success")}
func producer() {err := obj.ExchangeDeclare("go-test-exchange", amqp.ExchangeTopic, true, false, false, false, nil)if err != nil {Fataln(err)}ticker := time.NewTicker(time.Millisecond * 300)var i intfor {select {case <-ticker.C:err = obj.Publish("go-test-exchange", "go-test", false, false, amqp.Publishing{Body: []byte("hello " + strconv.Itoa(i)), ContentType: "text/plain"})if err != nil {Fataln(err)}i++}}}type Empty struct{}func customer(name string, stopchan <-chan Empty) {ch, err := obj.Connection.Channel()if err != nil {Fataln(err)}defer ch.Close()_, err = ch.QueueDeclare(name, true, false, false, false, nil)if err != nil {Fataln("queue declare failed", err)}err = ch.QueueBind(name, name, "go-test-exchange", false, nil)if err != nil {fmt.Fprintln(os.Stderr, "queue bind failed", err)return}msgch, err := ch.Consume(name, "", true, false, true, false, nil)if err != nil {Fataln("consume failed", err)}for {select {case msg := <-msgch:fmt.Println("accept msg " + name + " " + string(msg.Body))case <-stopchan:return}}
}
func TestAmqp(t *testing.T) {defer obj.Close()go func() {producer()}()time.Sleep(2 * time.Second)stopchanlist := make([]chan Empty, 2)stopchanlist[0], stopchanlist[1] = make(chan Empty, 1), make(chan Empty, 1)ch := make(chan os.Signal, 1)signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)go func() {<-chfor _, c := range stopchanlist {c <- Empty{}}}()go customer("go-test", stopchanlist[0])customer("go-test2", stopchanlist[1])
}

go-test 有信息(topic 匹配),go-test2没信息(topic未匹配)。

direct模式

package testimport ("fmt""log""os""os/signal""strconv""syscall""testing""time""github.com/streadway/amqp"
)var (obj    *MQOBJlogger *log.Logger = log.New(os.Stdout, "", log.Llongfile|log.LUTC)
)func Fataln(a ...any) {logger.Println(a...)os.Exit(0)
}type MQOBJ struct {*amqp.Connection*amqp.Channel
}func (mq *MQOBJ) Close() error {mq.Connection.Close()mq.Channel.Close()return nil
}
func init() {var mqurl = "amqp://cho:123@192.168.101.7:5672"con, err := amqp.Dial(mqurl)if err != nil {Fataln(err)}ch, err := con.Channel()if err != nil {Fataln(err)}obj = &MQOBJ{Connection: con, Channel: ch}fmt.Println("init success")}
func producer() {err := obj.ExchangeDeclare("go-test-exchange3", amqp.ExchangeDirect, true, false, false, false, nil)if err != nil {Fataln(err)}ticker := time.NewTicker(time.Millisecond * 300)var i intfor {select {case <-ticker.C:err = obj.Publish("go-test-exchange3", "", false, false, amqp.Publishing{Body: []byte("hello " + strconv.Itoa(i)), ContentType: "text/plain"})if err != nil {Fataln(err)}i++}}}type Empty struct{}func customer(name string, stopchan <-chan Empty) {ch, err := obj.Connection.Channel()if err != nil {Fataln(err)}defer ch.Close()_, err = ch.QueueDeclare(name, true, false, false, false, nil)if err != nil {Fataln("queue declare failed", err)}err = ch.QueueBind(name, "", "go-test-exchange3", false, nil)if err != nil {fmt.Fprintln(os.Stderr, "queue bind failed", err)return}msgch, err := ch.Consume(name, "", true, false, true, false, nil)if err != nil {Fataln("consume failed", err)}for {select {case msg := <-msgch:fmt.Println("accept msg " + name + " " + string(msg.Body))case <-stopchan:return}}
}
func TestAmqp(t *testing.T) {defer obj.Close()go func() {producer()}()time.Sleep(2 * time.Second)stopchanlist := make([]chan Empty, 2)stopchanlist[0], stopchanlist[1] = make(chan Empty, 1), make(chan Empty, 1)ch := make(chan os.Signal, 1)signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)go func() {<-chfor _, c := range stopchanlist {c <- Empty{}}}()go customer("go-test", stopchanlist[0])customer("go-test2", stopchanlist[1])
}

demo测试命令

go test -v amqp_test.go

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/751507.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java-并发编程--ThreadLocal、InheritableThreadLocal

1.ThreadLocal 作用 作用&#xff1a;为变量在线程中都创建副本&#xff0c;线程可访问自己内部的副本变量。该类提供了线程局部 (thread-local) 变量&#xff0c;访问这个变量&#xff08;通过其 get 或 set 方法&#xff09;的每个线程都有自己的局部变量&#xff0c;它独立…

CleanMyMac X2024免费绿色版安装包下载

在当今这个数字化时代&#xff0c;我们的生活和工作中离不开电脑&#xff0c;尤其是对于Mac用户而言&#xff0c;更是需要一个轻巧、快捷的解决方案来保持电脑的高效运转。CleanMyMac X正是为此而生&#xff0c;它将帮助您清理Mac中的垃圾文件、优化系统性能&#xff0c;让您的…

前台于后台项目

一&#xff1a;技术栈 前台&#xff1a;vue3element plus 后台&#xff1a;reactant desgin 二&#xff1a;项目中的问题&#xff1a; 多人协同开发导致样式冲突 ui框架中组件的使用 ui框架中组件样式的修改 精度缺失问题 框架的使用 三&#xff1a;解决方案&#xff1a; …

Python电梯楼层数字识别

程序示例精选 Python电梯楼层数字识别 如需安装运行环境或远程调试&#xff0c;见文章底部个人QQ名片&#xff0c;由专业技术人员远程协助&#xff01; 前言 这篇博客针对《Python电梯楼层数字识别》编写代码&#xff0c;代码整洁&#xff0c;规则&#xff0c;易读。 学习与应…

STM32的简单介绍

STM32是一种基于ARM Cortex-M内核的32位微控制器&#xff0c;由意法半导体公司开发和生产。STM32具有丰富的外设和功能&#xff0c;适用于各种应用场合&#xff0c;如工业控制、消费电子、物联网、人机交互等。STM32的优势包括低功耗、高性能、高可靠性、易于开发等。STM32的系…

嵌入式学习39-程序创建数据库及查找

1.sqlite3_open int sqlite3_open( const char *filename, /* Database filename (UTF-8) */ sqlite3 **ppDb /* OUT: SQLite db handle */ ); 功能: 打开 数据库文件(创建一个数据库连接) 参数: filename: …

用 C 语言模拟 Rust 的 Option 枚举类型

Rust 的 Option<i32> 类型是一个枚举类型&#xff0c;它表示可能有一个 i32 类型的值&#xff0c;或者没有值&#xff08;即 None&#xff09;。这在 Rust 中是一个常见的模式&#xff0c;用于处理可能不存在的值&#xff0c;避免了使用裸指针或引入 NULL 的概念。 在 C…

深度解析Transformer编码器:理论与实践(附代码示例)

在自然语言处理领域&#xff0c;Transformer 模型以其革命性的架构和卓越的性能&#xff0c;成为了研究和应用的热门选择。其中&#xff0c;Transformer 编码器和解码器作为其重要组成部分&#xff0c;编码器在文本表示和特征提取方面发挥着重要作用&#xff0c;而解码器将编码…

excel封装和ddt D17

1&#xff09;excel封装 openpyxl的操作 2&#xff09;ddt 数据驱动测试 ## openpyxl的操作 1.安装&#xff1a;pip install openpyxl 2.导入 openpyxl&#xff1a; import openpyxl 3.workbook对象&#xff1a;工作簿&#xff0c;openpyxl.load_workbook() 4.sheet对象&a…

【Linux】基础 IO(文件描述符)-- 详解

一、前言 1、文件的宏观理解 文件在哪呢&#xff1f; 从广义上理解&#xff0c;键盘、显示器、网卡、声卡、显卡、磁盘等几乎所有的外设都可以称之为文件&#xff0c;因为 “Linux 下&#xff0c;一切皆文件”。 从狭义上的理解&#xff0c;文件在磁盘&#xff08;硬件&#…

【博士每天一篇文献-综述】Brain network communication_ concepts, models and applications

阅读时间&#xff1a;2023-12-1 1 介绍 年份&#xff1a;2023 作者&#xff1a;Caio Seguin&#xff0c;Olaf Sporns印第安纳大学心理与脑科学系 期刊&#xff1a; nature reviews neuroscience 引用量&#xff1a;33 中文翻译参考&#xff1a;https://swarma.org/?p44524 …

深度学习pytorch——Tensor维度变换(持续更新)

view()打平函数 需要注意的是打平之后的tensor是需要有物理意义的&#xff0c;根据需要进行打平&#xff0c;并且打平后总体的大小是不发生改变的。 并且一定要谨记打平会导致维度的丢失&#xff0c;造成数据污染&#xff0c;如果想要恢复到原来的数据形式&#xff0c;是需要…

强大的开源网络爬虫框架Scrapy的基本介绍(入门级)

Scrapy 是一个强大的开源网络爬虫框架&#xff0c;用于从网站上抓取数据。它基于 Twisted 异步网络框架&#xff0c;可以高效地处理并发请求和数据处理。 以下是 Scrapy 框架的一些重要特点和功能&#xff1a; 1. **灵活的架构**&#xff1a; - Scrapy 提供了灵活的架构&a…

力扣细节题:字符串中的最大奇数

奇数只要找到第一位是奇数的即可&#xff0c;不是找单个数字 //即从最低位开始&#xff0c;找到第一位为奇数的位 //然后之前的就是需要的数字char * largestOddNumber(char * num){int i strlen(num) - 1;while(i > 0){if((num[i] - 0) % 2 1)break;i--;}//先找到低位开…

Spring Boot中application配置文件的生效顺序

Spring Boot的一个重要特性就是它的自动配置&#xff0c;这一特性在很大程度上依赖于名称为application的配置文件。本文将详细介绍在Spring Boot中&#xff0c;这些配置文件的加载顺序以及每份文件的应用范围。 文章目录 配置文件的种类配置文件的加载顺序配置文件的环境切换 …

Win10系统使用IIS服务搭建WebDAV网站结合内网穿透公网访问本地文件

文章目录 推荐1. 安装IIS必要WebDav组件2. 客户端测试3. cpolar内网穿透3.1 打开Web-UI管理界面3.2 创建隧道3.3 查看在线隧道列表3.4 浏览器访问测试 4. 安装Raidrive客户端4.1 连接WebDav服务器4.2 连接成功4.2 连接成功总结&#xff1a; 推荐 前些天发现了一个巨牛的人工智能…

Python爬虫与数据可视化源码免费领取

引言 作为一名在软件技术领域深耕多年的专业人士&#xff0c;我不仅在软件开发和项目部署方面积累了丰富的实践经验&#xff0c;更以卓越的技术实力获得了&#x1f3c5;30项软件著作权证书的殊荣。这些成就不仅是对我的技术专长的肯定&#xff0c;也是对我的创新精神和专业承诺…

Java常见问题:编辑tomcat运行环境、部署若伊系统

文章目录 引言I Eclipse1.1 编辑tomcat运行环境II JDK2.1 驱动程序无法通过使用安全套接字层(SSL)加密与 SQL Server 建立安全连接2.2 restriction on required library2.3 The type javax.servlet.http.HttpServletRequest cannot be resolved.的解决方法III npm3.1 npm报错:…

Docker 哲学 - 容器操作 -cp

1、拷贝 容器绑定的 volume的 数据&#xff0c;到指定目录 2、匿名挂载 volume 只定义一个数据咋在容器内的path&#xff0c;docker自动生成一个 sha256 的key作为 volume 名字。这个 sha256 跟 commitID 一致都是唯一的所以 &#xff0c;docker利用这个机制&#xff0c;可以…

AI大浪潮,怎能少了国产HBM内存?

据有关报道显示&#xff0c;武汉新芯半导体制造有限公司&#xff08;XMC&#xff09;正在启动一项专注于开发和生产高带宽内存&#xff08;HBM&#xff09;的项目。 HBM作为一种关键的DRAM类型&#xff0c;对于人工智能&#xff08;AI&#xff09;和高性能计算&#xff08;HPC&…