[每周一更]-(第38期):Go常见的操作消息队列

在这里插入图片描述

在Go语言中,常见的消息队列有以下几种:

  • RabbitMQ:RabbitMQ是一个开源的AMQP(高级消息队列协议)消息代理软件,用于支持多种编程语言,包括Go语言。RabbitMQ提供了可靠的消息传递机制和灵活的路由规则,可以用于处理大量的消息和任务。
  • Apache Kafka:Apache Kafka是一个开源的分布式流处理平台,也可以用作消息队列,用于处理高容量的消息流和实时数据。Kafka提供了高吞吐量、低延迟的消息传递机制,并且具有良好的可伸缩性和可靠性。
  • NSQ:NSQ是一个开源的实时分布式消息平台,用于处理大规模的消息和数据流。NSQ提供了高可靠性和低延迟的消息传递机制,并且具有良好的可扩展性和可伸缩性。
  • NATS:NATS是一个轻量级、高性能的消息系统,用于支持分布式应用程序和微服务。NATS提供了简单易用的API和协议,具有高可靠性、低延迟和高吞吐量的消息传递机制。
  • Redis:Redis是一个开源的内存数据库,也可以用作消息队列。Redis提供了支持发布订阅模式、阻塞队列等特性,可以用于处理实时数据和大量的消息。
  • ActiveMQ:ActiveMQ是一个开源的消息代理软件,用于支持多种消息传递协议和编程语言。ActiveMQ提供了高可靠性、可伸缩性和可扩展性的消息传递机制,可以用于处理大规模的消息和任务。

除了以上常见的消息队列,还有一些其他的开源消息系统和组件,例如RocketMQ、ZeroMQ等,也可以用于处理消息和任务。

在选择消息队列时,需要根据具体的业务需求和性能要求进行选择,并且需要考虑安全性、可靠性和扩展性等因素,确保消息传递的可靠性和性能。

消息队列的使用场景有哪些?

不同的消息队列适用于不同的场景,以下是常见的使用场景:

  • RabbitMQ:RabbitMQ适用于需要可靠的消息传递和灵活的路由规则的场景,例如电商网站的订单处理、银行的支付处理等。
  • Apache Kafka:Kafka适用于处理大规模的消息和数据流,例如社交媒体的实时消息、大型网站的日志处理等。
  • NSQ:NSQ适用于需要高可靠性和低延迟的场景,例如在线游戏的实时消息、金融交易的实时处理等。
  • NATS:NATS适用于需要高可靠性、低延迟和高吞吐量的场景,例如移动互联网应用的实时通信、物联网设备的数据传输等。
  • Redis:Redis适用于需要快速处理大量消息的场景,例如在线聊天、实时数据分析等。
  • ActiveMQ:ActiveMQ适用于需要支持多种消息传递协议和编程语言的场景,例如企业应用集成、分布式系统的消息传递等。

当然,这些场景只是一些常见的示例,具体的使用场景还需要根据业务需求和性能要求来选择。需要根据消息队列的特性、优缺点和性能指标进行评估和比较,选择最适合自己业务需求的消息队列。

使用示例

操作RabbitMQ

我们有一个需求,需要向多个客户端发送消息,可以使用RabbitMQ作为消息队列,Go作为开发语言。

1、安装RabbitMQ并启动服务。

2、安装amqp库:


go get github.com/streadway/amqp

3、生产者向消息队列中发送消息:


package mainimport ("fmt""log""github.com/streadway/amqp"
)func main() {// 连接RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {log.Fatalf("Failed to connect to RabbitMQ: %s", err)}defer conn.Close()// 创建一个channelch, err := conn.Channel()if err != nil {log.Fatalf("Failed to open a channel: %s", err)}defer ch.Close()// 声明一个名为"hello"的queueq, err := ch.QueueDeclare("hello", // 队列名false,   // 是否持久化false,   // 是否自动删除false,   // 是否具有排他性false,   // 是否阻塞nil,     // 额外的参数)if err != nil {log.Fatalf("Failed to declare a queue: %s", err)}// 发送消息到队列body := "Hello, World!"err = ch.Publish("",     // exchange名q.Name, // queue名false,  // 是否强制发送false,  // 是否立即发送amqp.Publishing{ContentType: "text/plain",Body:        []byte(body),},)if err != nil {log.Fatalf("Failed to publish a message: %s", err)}fmt.Println("Message sent successfully")
}

4、消费者从消息队列中获取消息并处理:


package mainimport ("log""github.com/streadway/amqp"
)func main() {// 连接RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {log.Fatalf("Failed to connect to RabbitMQ: %s", err)}defer conn.Close()// 创建一个channelch, err := conn.Channel()if err != nil {log.Fatalf("Failed to open a channel: %s", err)}defer ch.Close()// 声明一个名为"hello"的queueq, err := ch.QueueDeclare("hello", // 队列名false,   // 是否持久化false,   // 是否自动删除false,   // 是否具有排他性false,   // 是否阻塞nil,     // 额外的参数)if err != nil {log.Fatalf("Failed to declare a queue: %s", err)}// 消费队列中的消息msgs, err := ch.Consume(q.Name, // queue名"",     // 消费者名true,   // 是否自动应答

操作Kafka

我们使用 github.com/segmentio/kafka-go 库作为 Kafka Go 客户端。在生产者示例中,我们通过 kafka.DialLeader 方法连接到 Kafka 集群,
然后使用 conn.WriteMessages 方法向 Kafka 集群发送消息。在消费者示例中,我们通过 kafka.NewReader 方法创建一个 Kafka 消费者,
然后通过 r.ReadMessage 方法从 Kafka 集群读取消息。

在示例中,我们还使用了一个 sigchan 信号通道来监听操作系统的信号并退出程序。

安装 Kafka Go 客户端:

go get -u github.com/segmentio/kafka-go

生产者示例:


package mainimport ("context""fmt""log""time""github.com/segmentio/kafka-go"
)func main() {topic := "my-topic"partition := 0conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)if err != nil {log.Fatal("failed to dial leader:", err)}defer conn.Close()// 发送消息msg := kafka.Message{Value: []byte("Hello, Kafka!"),}_, err = conn.WriteMessages(msg)if err != nil {log.Fatal("failed to write message:", err)}fmt.Println("message sent")
}

消费者示例:

package mainimport ("context""fmt""log""os""os/signal""syscall""github.com/segmentio/kafka-go"
)func main() {topic := "my-topic"partition := 0offset := kafka.LastOffsetr := kafka.NewReader(kafka.ReaderConfig{Brokers: []string{"localhost:9092"},Topic:   topic,Partition: partition,MinBytes: 10e3, // 10KBMaxBytes: 10e6, // 10MBMaxWait:  10 * time.Second,})// 接收消息sigchan := make(chan os.Signal, 1)signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)for {select {case <-sigchan:log.Println("received signal, exiting...")returndefault:msg, err := r.ReadMessage(context.Background())if err != nil {log.Fatal("failed to read message:", err)}fmt.Println(string(msg.Value))}}
}

操作NSQ

使用 github.com/nsqio/go-nsq 库作为 NSQ Go 客户端。在生产者示例中,我们通过 nsq.NewProducer 方法创建一个 NSQ 生产者,

然后使用 producer.Publish 方法向 NSQ 集群发送消息。在消费者示例中,我们通过 nsq.NewConsumer 方法创建一个 NSQ 消费者,

然后使用 consumer.AddHandler 方法设置消息处理函数。最后,我们通过 consumer.ConnectToNSQLookupd 方法连接到 NSQ 集群,

并使用 select {} 语句保持消费者程序不退出。

需要注意的是,上述示例中我们使用了一个空的 select {} 语句来保持消费者程序不退出。在实际生产环境中,我们需要在程序中添加正确的退出逻辑。

例如,使用一个 sigchan 信号通道来监听操作系统的信号并退出程序。

安装 NSQ Go 客户端:


安装 NSQ Go 客户端:

生产者示例:

package mainimport ("fmt""github.com/nsqio/go-nsq"
)func main() {producer, err := nsq.NewProducer("localhost:4150", nsq.NewConfig())if err != nil {panic(err)}// 发送消息err = producer.Publish("my-topic", []byte("Hello, NSQ!"))if err != nil {panic(err)}producer.Stop()fmt.Println("message sent")
}

消费者示例:

package mainimport ("fmt""log""github.com/nsqio/go-nsq"
)type MyHandler struct{}func (*MyHandler) HandleMessage(msg *nsq.Message) error {fmt.Println(string(msg.Body))return nil
}func main() {consumer, err := nsq.NewConsumer("my-topic", "my-channel", nsq.NewConfig())if err != nil {panic(err)}consumer.AddHandler(&MyHandler{})err = consumer.ConnectToNSQLookupd("localhost:4161")if err != nil {panic(err)}fmt.Println("consumer started")select {}
}

操作NATS

使用 github.com/nats-io/nats.go 库作为 NATS Go 客户端。在生产者示例中,我们通过 nats.Connect 方法创建一个 NATS 连接,

然后使用 nc.Publish 方法向 NATS 集群发送消息。在消费者示例中,我们通过 nc.Subscribe 方法订阅 my-topic 主题,并使用一个回调函数处理接收到的消息。

最后,我们使用一个空的 select {} 语句保持消费者程序不退出。

需要注意的是,上述示例中我们使用了一个空的 select {} 语句来保持消费者程序不退出。在实际生产环境中,

我们需要在程序中添加正确的退出逻辑。例如,使用一个 sigchan 信号通道来监听操作系统的信号并退出程序。

安装 NATS Go 客户端:

go get github.com/nats-io/nats.go

生产者示例:

package mainimport ("fmt""time""github.com/nats-io/nats.go"
)func main() {nc, err := nats.Connect("nats://localhost:4222")if err != nil {panic(err)}defer nc.Close()// 发送消息err = nc.Publish("my-topic", []byte("Hello, NATS!"))if err != nil {panic(err)}fmt.Println("message sent")
}

消费者示例:

package mainimport ("fmt""log""time""github.com/nats-io/nats.go"
)func main() {nc, err := nats.Connect("nats://localhost:4222")if err != nil {panic(err)}defer nc.Close()// 订阅消息_, err = nc.Subscribe("my-topic", func(msg *nats.Msg) {fmt.Println(string(msg.Data))})if err != nil {panic(err)}fmt.Println("consumer started")select {}
}

操作Redis

安装 Redis Go 客户端:

go get github.com/go-redis/redis

连接 Redis:

package mainimport ("fmt""github.com/go-redis/redis"
)func main() {// 创建 Redis 客户端client := redis.NewClient(&redis.Options{Addr:     "localhost:6379",Password: "", // 没有设置密码DB:       0,  // 使用默认数据库})// 检查 Redis 是否正常连接pong, err := client.Ping().Result()fmt.Println(pong, err)
}

Redis String 类型操作:

package mainimport ("fmt""github.com/go-redis/redis"
)func main() {// 创建 Redis 客户端client := redis.NewClient(&redis.Options{Addr:     "localhost:6379",Password: "", // 没有设置密码DB:       0,  // 使用默认数据库})// 设置字符串err := client.Set("key", "value", 0).Err()if err != nil {panic(err)}// 获取字符串val, err := client.Get("key").Result()if err != nil {panic(err)}fmt.Println("key", val)// 删除字符串err = client.Del("key").Err()if err != nil {panic(err)}
}

Redis List 类型操作:

package mainimport ("fmt""github.com/go-redis/redis"
)func main() {// 创建 Redis 客户端client := redis.NewClient(&redis.Options{Addr:     "localhost:6379",Password: "", // 没有设置密码DB:       0,  // 使用默认数据库})// 将元素添加到列表err := client.RPush("list", "a", "b", "c").Err()if err != nil {panic(err)}// 获取列表长度length, err := client.LLen("list").Result()if err != nil {panic(err)}fmt.Println("list length:", length)// 获取列表中的元素val, err := client.LRange("list", 0, -1).Result()if err != nil {panic(err)}fmt.Println("list elements:", val)// 弹出列表左侧元素elem, err := client.LPop("list").Result()if err != nil {panic(err)}fmt.Println("popped element:", elem)
}

操作ActiveMQ

在Go中操作ActiveMQ,可以使用go-stomp库。以下是一个简单的示例代码,用于连接到ActiveMQ,并向队列发送消息:

package mainimport ("fmt""github.com/go-stomp/stomp"
)func main() {conn, err := stomp.Dial("tcp", "localhost:61613")if err != nil {fmt.Println(err)return}defer conn.Disconnect()msg := "hello, activemq"err = conn.Send("/queue/test", "text/plain", []byte(msg), nil)if err != nil {fmt.Println(err)return}fmt.Printf("Message sent: %s\n", msg)
}

在该示例中,我们通过stomp.Dial()方法连接到ActiveMQ的默认端口61613。然后,我们使用conn.Send()方法向队列“/queue/test”发送消息。

要从队列中接收消息,请使用conn.Subscribe()方法。以下是一个示例代码:

package mainimport ("fmt""github.com/go-stomp/stomp"
)func main() {conn, err := stomp.Dial("tcp", "localhost:61613")if err != nil {fmt.Println(err)return}defer conn.Disconnect()sub, err := conn.Subscribe("/queue/test", stomp.AckAuto)if err != nil {fmt.Println(err)return}defer sub.Unsubscribe()for {msg := <-sub.Cfmt.Printf("Received message: %s\n", string(msg.Body))}
}

在该示例中,我们使用conn.Subscribe()方法订阅队列“/queue/test”。然后,我们通过使用sub.C通道来接收来自队列的消息。收到消息后,我们使用string(msg.Body)将其转换为字符串,并打印到控制台上。

需要注意的是,这只是一个简单的示例代码。在实际应用中,您需要考虑诸如异常处理、连接丢失、重连等问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/240325.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于SSM的在线学习系统的设计与实现论文

基于SSM的在线学习系统的设计与实现 摘要 随着信息互联网购物的飞速发展&#xff0c;一般企业都去创建属于自己的管理系统。本文介绍了在线学习系统的开发全过程。通过分析企业对于在线学习系统的需求&#xff0c;创建了一个计算机管理在线学习系统的方案。文章介绍了在线学习…

【习题】运行Hello World工程

判断题 1. DevEco Studio是开发HarmonyOS应用的一站式集成开发环境。 正确(True)错误(False) 正确(True) 2. main_pages.json存放页面page路径配置信息。 正确(True)错误(False) 正确(True) 单选题 1. 在stage模型中&#xff0c;下列配置文件属于AppScope文件夹的是&am…

信号与线性系统翻转课堂笔记7——信号正交与傅里叶级数

信号与线性系统翻转课堂笔记7——信号正交与傅里叶级数 The Flipped Classroom7 of Signals and Linear Systems 对应教材&#xff1a;《信号与线性系统分析&#xff08;第五版&#xff09;》高等教育出版社&#xff0c;吴大正著 一、要点 &#xff08;1&#xff0c;重点&a…

安全、效率、成本:混合云数据库管理的三重挑战!

随着业务需求的不断演变&#xff0c;数据在多云平台之间流动&#xff0c;给数据库管控带来了新的层次和复杂性。这给数据库管控带来了前所未有的挑战。企业可能面临着一系列问题&#xff0c;包括安全性挑战、管理复杂性、性能与效率问题、成本控制难题、缺乏统一的管理视图以及…

php反序列化漏洞原理、利用方法、危害

文章目录 PHP反序列化漏洞1. 什么是PHP反序列化漏洞&#xff1f;2. PHP反序列化如何工作&#xff1f;3. PHP反序列化漏洞是如何利用的&#xff1f;4. PHP反序列化漏洞的危害是什么&#xff1f;5. 如何防止PHP反序列化漏洞&#xff1f;6. PHP反序列化漏洞示例常见例子利用方法PH…

elementUI CDN引入本地文件报错,刷新页面报错

报错原因&#xff1a;vue.config.js的externals 配置中有外部cdn引入配置&#xff0c;而当前场景我的element是直接下载放在本地的&#xff0c;这时就需要将配置注释或者删除 webpack 中的 externals 配置项用于指定在打包时需要排除掉的模块&#xff0c;这些模块会被视为外部依…

Rust中peekable的使用

在 Rust 中&#xff0c;从迭代器中获取&#xff08;也就是“消费”&#xff09;一个元素时&#xff0c;每次调用 next 方法都会“消费”迭代器的一个元素&#xff0c;这意味着此元素被从迭代器中移除并返回给调用者&#xff0c; 一旦一个元素被消费&#xff0c;它就不能再次从同…

maven下载jar包失败

配置国内镜像 设置国内的仓库,比如: <!--阿里仓库--><mirror><id>alimaven</id><name>aliyun maven</name><url>https://maven.aliyun.com/repository/public/</url><mirrorOf>central</mirrorOf></mirror>…

医学实验室检验科LIS信息系统源码

实验室信息管理是专为医院检验科设计的一套实验室信息管理系统&#xff0c;能将实验仪器与计算机组成网络&#xff0c;使病人样品登录、实验数据存取、报告审核、打印分发&#xff0c;实验数据统计分析等繁杂的操作过程实现了智能化、自动化和规范化管理。 实验室管理系统功能介…

nuxt打包占用磁盘IO

目录 前言排除过程 前言 jenkins运行打包&#xff0c;总是要卡一段时间&#xff0c;磁盘IO很高。我手动执行后的确发现了这个问题&#xff0c;如下图所示。 排除过程 我的方案很原始&#xff0c;利用git恢复到以前的版本&#xff0c;抽检&#xff0c;搞了差不多两个小时&am…

vue中添加change的js事件并根据下拉框内容动态改变另一个组件中的数据(亲测有效)

vue中添加change的js事件并根据下拉框内容动态改变另一个组件中的数据 话不多说看我怎么完成的&#xff0c;以我当前实现的例子演示 我想根据班级下拉框来动态改变报名费内容 具体步骤如下&#xff1a; 1.首先给下拉框的组件加一个change change“changeFeiByclass” 2.在met…

关于Triple DES(3DES)对称加密算法

一、引言 在网络安全领域&#xff0c;对称加密算法作为一种常见的加密手段&#xff0c;被广泛应用于保障数据传输的保密性和完整性。其中&#xff0c;DES&#xff08;Data Encryption Standard&#xff09;算法作为一种经典的对称加密算法&#xff0c;由IBM于1970年代开发&…

mac上使用 Downie 下载网页视频

在今天的数字时代&#xff0c;视频内容在互联网上的传播变得更加普遍和便捷。然而&#xff0c;有时我们可能希望将网页上的视频保存在本地&#xff0c;以便离线观看或与他人分享。Downie 是一款强大而简便的工具&#xff0c;专门设计用于下载网页上的视频内容。本文将介绍 Down…

多维时序 | MATLAB实现SSA-CNN-SVM麻雀算法优化卷积神经网络-支持向量机多变量时间序列预测

多维时序 | MATLAB实现SSA-CNN-SVM麻雀算法优化卷积神经网络-支持向量机多变量时间序列预测 目录 多维时序 | MATLAB实现SSA-CNN-SVM麻雀算法优化卷积神经网络-支持向量机多变量时间序列预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 多维时序 | MATLAB实现…

【智慧办公】如何让智能会议室的电子标签实现远程、批量更新信息?东胜物联网硬件网关让解决方案更具竞争力

近年来&#xff0c;为了减少办公耗能、节能环保、降本增效&#xff0c;越来越多的企业开始从传统的办公模式转向智慧办公。 以智能会议室为例&#xff0c;会议是企业业务中不可或缺的一部分&#xff0c;但在传统办公模式下&#xff0c;一来会议前行政人员需要提前准备会议材料…

多相机系统通用视觉 SLAM 框架的设计与评估

Design and Evaluation of a Generic Visual SLAM Framework for Multi-Camera Systems PDF https://arxiv.org/abs/2210.07315 Code https://github.com/neufieldrobotics/MultiCamSLAM Data https://tinyurl.com/mwfkrj8k 程序设置 主要目标是开发一个与摄像头系统配置无关…

极智项目 | 实战Pytorch戴口罩检测

欢迎关注我的公众号 [极智视界]&#xff0c;获取我的更多项目分享 大家好&#xff0c;我是极智视界&#xff0c;本文来介绍 实战 Pytorch 戴口罩检测项目。 本文介绍的 实战 Pytorch 戴口罩检测项目&#xff0c;提供完整的可以一键执行的项目工程源码&#xff0c;获取方式有两…

Diary26-Vue综合案例1-书籍购物车

Vue综合案例1-书籍购物车 案例要求: 代码&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewpor…

网络爬虫之多任务数据采集(多线程、多进程、协程)

进程&#xff1a;是操作系统中资源分配的基本单位 线程&#xff1a;使用进程分配的资源处理具体任务 一个进程中可以有多个线程&#xff1a;进程相当于一个公司&#xff0c;线程就是公司里面的员工。 一 多线程 多线程都是关于功能的并发执行。而异步编程是关于函数之间的非…

C语言——最古老的树

归纳编程学习的感悟&#xff0c; 记录奋斗路上的点滴&#xff0c; 希望能帮到一样刻苦的你&#xff01; 如有不足欢迎指正&#xff01; 共同学习交流&#xff01; &#x1f30e;欢迎各位→点赞 &#x1f44d; 收藏⭐ 留言​&#x1f4dd; 缺乏明确的目标&#xff0c;一生将庸庸…