Go实现RabbitMQ消息模式

【目标】

  1. go实现RabbitMQ简单模式和work工作模式

  2. go实现RabbitMQ 消息持久化和手动应答

  3. go实现RabbitMQ 发布订阅模式

  4. go使用MQ实现评论后排行榜更新

1. go实现简单模式

编写路由实现生产消息

实现生产消息

MQ消息执行为命令行执行,所以创建命令行执行函数main,用来消费消息

创建mq/demo/main.go

浏览器中访问路由,执行生产者生产消息

打开http://localhost:15672/#/queues, 查看RabbitMQ客户端查看是否消息

执行消费者,实现消息消费

进入 mq/demo/中,执行bee run

2. go实现work工作模式

在启动另一个窗口,实现第二个消费者

生产消息

打开RabbitMQ客户端,查看消费者

查看work消费

两个work时,轮询执行消费

2.1 go实现RabbitMQ消息持久化和手动应答

消息持久化

消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢——消息持久化。
为了保证RabbitMQ在退出或者crash等异常情况下数据没有丢失,需要将queue,exchange和Message都持久化

生产者实现消息持久化

第二个参数设置为true,即durable=true.

消费者实现消息持久化

在RabbitMQ服务重启或者服务宕机的情况下,也不会丢失消息。

可以将Queue与Message都设置为可持久化(durable),这样可以保证绝大部分情况下RabbitMQ消息不会丢失。

手动应答

RabbitMQ 消息应答机制

消费者处理一个任务是需要一段时间的,如果有一个消费者正在处理一个比较耗时的任务并且只处理了一部分,突然这个时候消费者宕机了,那么会出现什么情况呢?

如果是自动应答模式,消费者在处理任务的过程中宕机了,那么消息将会丢失,而手动应答则能够保证消息不会被丢失,所以在实际的应用当中绝大多数都采用手动应答

为了保证消息从队列可靠地达到消费者并且被消费者消费处理,RabbitMQ 提供了消息应答机制,RabbitMQ 有两种应答机制,自动应答和手动应答

1、自动应答

RabbitMQ 只要将消息分发给消费者就被认为消息传递成功,就会将内存中的消息删除,而不管消费者有没有处理完消息

2、手动应答

RabbitMQ 将消息分发给了消费者,并且只有当消费者处理完成了整个消息之后才会被认为消息传递成功了,然后才会将内存中的消息删除

消息应答:

消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了

手动应答优点:

可以批量应答并且减少网络拥堵

消费方法中设置手动应答

效果:

关闭自动应答

RabbitMQ中查看

开启手动应答后,才返回消息执行成功,保证了消息不会被丢失

3. go实现RabbitMQ 发布订阅模式

RabbitMq消息模式的核心思想是:

一个生产者并不会直接往一个队列中发送消息,事实上,生产者根本不知道它发送的消息将被转发到哪些队列。

  实际上,生产者只能把消息发送给一个exchange(交换机),exchange只做一件简单的事情:一方面它们接收从生产者发送过来的消息,另一方面,它们把接收到的消息推送给队列。一个exchage必须清楚地知道如何处理一条消息。

有四种类型的交换器,分别是:direct、topic、headers、fanous(广播模式)

广播模式交换器很简单,从字面意思也能理解,它其实就是把接收到的消息推送给所有它知道的队列。在我们的日志系统中正好需要这种模式。

go实现RabbitMQ 发布订阅模式 RabbitMQ tutorial - Publish/Subscribe | RabbitMQ

实现广播模式(发布订阅模式)demo

生产者向交换机中发送消息

和简单模式、work模式相比,多了创建交换机

消费者拉取交换机中消息实现消费

和简单模式、work模式相比,多了创建交换机、创建了临时队列、绑定临时队列

临时队列

  我们使用的队列都是有具体的队列名,创建命名队列是很必要的,因为我们需要将消费者指向同一名字的队列。因此,要想在生产者和消费者中间共享队列就必须要使用命名队列。

 demo中的日志系统也可以使用非命名队列(可以不手动命名),我们希望收到所有日志消息,而不是部分。并且我们希望总是接收到新的日志消息而不是旧的日志消息。为了解决这个问题,需要分两步走。

  首先,无论何时我们的消费者连接到RabbitMq,我们都需要一个新的、空的队列来接收日志消息,因此,消费者在连接上RabbitMq之后需要创建一个任意名字的队列,或者让RabbitMq生成任意的队列名字。

  其次,一旦该消费者断开了与RabbitMq的连接,队列也被自动删除。

  通过queueDeclare()来创建一个非持久化、专有的、自动删除的、名字随机生成的队列。

实现发布订阅模式:

创建消息路由

控制器中实现生产者消息推送到交换机

创建mq/fanout/main.go,实现消费者从交换机中获取消息实现消费

效果:

执行生产者,实现消息生产

打开RabbitMQ客户端,查看消息状态

执行消费者,实现消费

注:因为是发布订阅模式。所以我们启动两个消费者实现多个用户消费同一消息

消费者1

消费者2

当生产者生产消息时,所订阅的消费者会执行消费

消费者1

消费者2

4. go实现RabbitMQ 路由模式


一个通过路由把One的消息取出来,另一个通过路由把two的消息取出来,一个队列打印奇数,一个队列打印偶数

生产者代码

消费者代码奇数代码


消费者代码偶数代码

运行效果

5. go实现RabbitMQ 主题模式


生产者代码

// topic主题push
// @router /mq/topic/push [*]
func (this *MqDemoController) GetTopic() {//创建线程执行(发送自增的数字到队列中)go func() {count := 0for {if count%2 == 0 {//strconv.Itoa 把count转化为字符串mq.PublishEx("wsyb.demo.topic", "topic", "wsyb.video", "wsyb.video"+strconv.Itoa(count))} else {mq.PublishEx("wsyb.demo.topic", "topic", "user.wsyb", "user.wsyb"+strconv.Itoa(count))}count++time.Sleep(1 * time.Second)}}()this.Ctx.WriteString("topic")
}// topic主题push
// @router /mq/topictwo/push [*]
func (this *MqDemoController) GetTopicTwo() {//创建线程执行(发送自增的数字到队列中)go func() {count := 0for {if count%2 == 0 {//strconv.Itoa 把count转化为字符串mq.PublishEx("wsyb.demo.topic", "topic", "a.frog.name", "a.frog.name"+strconv.Itoa(count))} else {mq.PublishEx("wsyb.demo.topic", "topic", "b.frog.uid", "b.frog.uid"+strconv.Itoa(count))}count++time.Sleep(1 * time.Second)}}()this.Ctx.WriteString("topic")
}

消费所有主题代码(#)

// 包名必须是main否则消费不成功
package mainimport ("fmt""wsybapi/services/mq"
)func main() {//执行消费  # 代表获取所有的数据mq.ConsumerEx("wsyb.demo.topic", "topic", "#", callback)
}// 回调函数
func callback(s string) {//打印消费结果fmt.Printf("topic all msg is :%s\n", s)
}

匹配多个规则进行消费

// 包名必须是main否则消费不成功
package mainimport ("fmt""wsybapi/services/mq"
)func main() {//执行消费 * 匹配一个或者多个符合规则的数据mq.ConsumerEx("wsyb.demo.topic", "topic", "*.frog.*", callback)
}// 回调函数
func callback(s string) {//打印消费结果fmt.Printf("topic frog msg is :%s\n", s)
}

匹配一个规则进行消费

// 包名必须是main否则消费不成功
package mainimport ("fmt""wsybapi/services/mq"
)func main() {//执行消费mq.ConsumerEx("wsyb.demo.topic", "topic", "wsyb.*", callback)
}// 回调函数
func callback(s string) {//打印消费结果fmt.Printf("tpic wsyb msg is :%s\n", s)
}

运行结果

6. rabbitmq死信队列

6.1应用场景:
  1. 发送消息规定10分钟以后发送给用户
  2. 规定消息每天固定的时间发送
    3.下了订单没有支付,30分钟以后就会取消订单
    4.订单相关的,下单以后会定时收到会系统的提示消息
6.2什么是死信队列呢:

死信队列产生的条件,不仅是ttl时间过期了,还有消息被拒绝,队列达到最大长度,都会产生死信,相信大家已经明白了

7. go使用MQ实现评论后排行榜更新

修改逻辑,新增评论时更新redis排行榜的数据

发布评论

打开MQ客户端,查看队列状态

创建mq/top/main.go,连接数据库

在消费回调函数中,编写消费者逻辑实现排行榜更新

执行消费者

效果:

先评论内容

打开redis可视化界面,查看排行榜评论数

再次评论

打开redis可视化界面,查看排行榜评论数是否实现更新

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/880765.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【React】react项目中的redux使用

1. store目录结构设计 2. react组件中使用store中的数据——useSelector 3. react组件中修改store中的数据——useDispatch 4. 示例 react-basic\src\store\moduels\counterStore.js import { createSlice } from reduxjs/toolkitconst counterStore createSlice({name: cou…

Flutter屏幕适配

我们可以根据下面有适配属性的Widget来进行屏幕适配 1.MediaQuery 通过它可以直接获得屏幕的大小(宽度 / 高度)和方向(纵向 / 横向) Size screenSize MediaQuery.of(context).size; double width screenSize.width; double h…

【Linux:线程概念】

目录 概念: 创建线程的函数:​编辑 ​编辑 有多进程为什么还需要有多线程? 线程调度的成本为什么低? 进程与线程的区别: 概念: 线程是CPU的基本调度单位,在进程内部运行。在内核中&#xff…

CSS 效果:实现动态展示双箭头

最近写了一段 CSS 样式,虽然不难,但实现过程比较繁琐。这个效果结合了两个箭头,一个突出,一个内缩,非常适合用于步骤导航或选项卡切换等场景。样式不仅仅是静态的,还可以通过点击 click 或者 hover 事件&am…

Java的栈帧和动态链接是什么?

在 Java 的面试过程中,不可避免的一个面试题那就是 JVM,而 JVM 的面试题中,有各种,比如在堆中会被问到的关于垃圾回收机制的相关问题,在栈中会被问到入栈以及出栈的过程,来聊一下关于栈的相关问题&#xff…

C0008.Clion利用C++开发Qt界面,使用OpenCV时,配置OpenCV方法

安装OpenCV 配置环境 配置Clion中的CMakeLists.txt文件 # 设置OpenCV的安装路径 set(OpenCV_DIR "D:/OpenCv_Win/opencv/build/x64/vc16/lib"

分糖果C++

题目&#xff1a; 样例解释&#xff1a; 样例1解释 拿 k20 块糖放入篮子里。 篮子里现在糖果数 20≥n7&#xff0c;因此所有小朋友获得一块糖&#xff1b; 篮子里现在糖果数变成 13≥n7&#xff0c;因此所有小朋友获得一块糖&#xff1b; 篮子里现在糖果数变成 6<n7&#xf…

【算法竞赛】堆

堆是一种树形结构,树的根是堆顶,堆顶始终保持为所有元素的最优值。 有最大堆和最小堆,最大堆的根节点是最大值,最小堆的根节点是最小值。 本节都以最小堆为例进行讲解。 堆一般用二叉树实现,称为二叉堆。 二叉堆的典型应用有堆排序和优先队列。 二叉堆的概念 二叉堆是一棵…

定时器定时中断定时器外部中断

基础背景&#xff1a;TIM定时中断-CSDN博客 TIM的函数 // 恢复缺省设置 void TIM_DeInit(TIM_TypeDef* TIMx); // 时基单元初始化&#xff0c;第一个参数TIMx选择某个定时器&#xff0c;第二个参数是结构体&#xff0c;包含了配置时基单元的一些参数。 void TIM_TimeBaseInit…

c++ string 以 空格 拆分

在C中&#xff0c;你可以使用std::istringstream和std::getline来以空格为分隔符拆分字符串。以下是一个简单的函数&#xff0c;它将字符串拆分为单词的std::vector<std::string>。 #include <iostream> #include <sstream> #include <vector> #inclu…

blender解决缩放到某个距离就不能继续缩放

threejs中也存在同样的问题&#xff0c;原因相同&#xff0c;都是因为相机位置和相机观察点距离太近导致的。 threejs解决缩放到某个距离就不能继续缩放-CSDN博客 blender中的解决方案 1、视图中心->视图锁定->选择你想看的物体

【无标题】observer: error while loading shared libraries: libmariadb.so.3处理办法

文章目录 1.记录新装的oceanbase,使用observer帮助时&#xff0c;出现lib文件无法找到的处理过程 ./observer --help ./observer: error while loading shared libraries: libmariadb.so.3: cannot open shared object file: No such file or directory2.做一个strace跟踪&…

day01——登录功能

逻辑&#xff1a; 前端将登录信息通过报文的形式&#xff0c;发送给后端。后端进行登陆验证 2.1 根据接受的用户名&#xff0c;查询数据表。 若不存在该用户的记录&#xff0c;返回用户不存在。 若用户存在&#xff0c;判断数据库中的密码和接收的是否一致&#xff0c;不一致则…

OpenStack 部署实践与原理解析 - Ubuntu 22.04 部署 (DevStack)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言OpenStack 原理详解1. OpenStack 的架构2. OpenStack 的工作原理3. OpenStack 的 API4. 扩展性和模块化 OpenStack 安装方式比较1. DevStack2. Kolla3. OpenSta…

Java高效编程(7):消除过时的对象引用

解锁Python编程的无限可能&#xff1a;《奇妙的Python》带你漫游代码世界 在从手动管理内存的语言&#xff08;如C或C&#xff09;转向垃圾回收语言&#xff08;如Java&#xff09;时&#xff0c;程序员的工作变得容易得多&#xff0c;因为对象在不再使用时会被自动回收。然而…

图解C#高级教程(三):泛型

本讲用许多代码示例介绍了 C# 语言当中的泛型&#xff0c;主要包括泛型类、接口、结构、委托和方法。 文章目录 1. 为什么需要泛型&#xff1f;2. 泛型类的定义2.1 泛型类的定义2.2 使用泛型类创建变量和实例 3. 使用泛型类实现一个简单的栈3.1 类型参数的约束3.2 Where 子句3…

CI/CD详细流程

CI/CD&#xff08;持续集成/持续交付或持续部署&#xff09;是一种软件开发实践&#xff0c;旨在通过自动化软件构建、测试和部署的过程&#xff0c;提高开发效率和软件质量。以下是CI/CD流程的详细说明&#xff1a; 1. 持续集成&#xff08;CI&#xff09; 持续集成的核心思想…

安装图片标识工具anylabeling

目录 下载压缩包 创建环境 安装opencv 安装第三方库 运行setup.py文件 安装过程可能会出现的错误&#xff1a; 错误1 错误2 安装完成 图标更换 之前提到的嵌入式开发】可编程4k蓝牙摄像头点击器还可以训练模型&#xff0c;使图像识别精度提高 现在讲解&#xff0c;如…

uniapp微信小程序,获取上一页面路由

在进入当前页面的时候&#xff0c;判断是不是从某个页面跳转过来的&#xff08;一般是当前页面为公共页面是出现的&#xff09;&#xff0c;比如 A-->B C-->B ,那么 要在 C跳转到B页面的时候多个提示语什么的 而在A跳转到B时不需要&#xff0c;那么就要判断 上一页面的…

先进制造aps专题二十六 基于强化学习的人工智能ai生产排程aps模型简介

基于强化学习的人工智能ai生产排程模型简介 人工智能ai能不能做生产排程&#xff1f; 答案是肯定的。 ai的算法分两类&#xff0c;一类是学习&#xff0c;一类是搜索。 而生产排程问题&#xff0c;它是一个搜索问题&#xff0c;本质上&#xff0c;它和下围棋是一样的 我们…