Golang 整合RocketMQ

RocketMQ 相关知识汇总

RocketMQ 是什么

RocketMQ 是阿里巴巴开源的一款 MQ 框架,被广泛的使用于不同的业务场景,同时也有非常好的生态系统支持,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等功能。

RocketMQ核心概念
  • 名称服务(NameServer): 可以理解为注册中心,主要用来保存topic路由消息,管理Broker,在NameServer的集群中,NameServer彼此之间没有任何的通信。

  • 代理服务器(BrokerServer): 消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

  • 生产者(Producer):负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。

  • 生产者组(Producer Group): 同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。

  • 消费者(Consumer): 负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

  • 消费者组(Consumer Group): 同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。

  • 主题(Topic): 表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。

  • 标签(Tag): 为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。

扩展概念

  • 消息模型(Message Model): RocketMQ主要由Producer、Broker、Consumer三部分组成,其中Producer负责生产消息,Consumer负责消费消息,Broker负责存储消息。Broker在实际部署过程中对应一台服务器,每个Broker可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的Broker。MessageQueue用于存储消息的物理地址,每个Topic中的消息地址存储于多个MessageQueue中。ConsumerGroup由多个Consumer实例构成。

  • 消息(message): 消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。

  • 拉取消费(Pull Consumer): Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。

  • 推动式消费(Push Consumer): Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。

  • 集群消费(Clustering): 集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。

  • 广播消费(Broadcasting): 广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。

  • 普通顺序消息(Normal Ordered Message): 普通顺序消费模式下,消费者通过同一个消息队列(Topic分区,称作Message Queue)收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。

  • 严格顺序消息(Strictly Ordered Message): 严格顺序消息模式下,消费者收到的所有消息均是有顺序的。

RocketMQ搭建

现在我们需要在本地搭建一个rokcetMQ的开发环境,我们搭建的方式是基于docker-compose技术来实现, docker-compose.yaml文件的内容如下:

version: '3'
services:rmqnamesrv:image: rocketmqinc/rocketmqcontainer_name: rmqnamesrvcommand: sh mqnamesrvports:- "9876:9876"volumes:- ./namesrv/logs:/root/logsrmqbroker:image: rocketmqinc/rocketmqcontainer_name: rmqbrokercommand: sh mqbroker -c /opt/rocketmq-4.9.1/conf/broker.confdepends_on:- rmqnamesrvenvironment:- NAMESRV_ADDR=rmqnamesrv:9876ports:- "10909:10909"- "10911:10911"volumes:- ./broker/conf:/opt/rocketmq-4.9.1/conf- ./broker/logs:/opt/rocketmq-4.9.1/logs

在创建完成上述内容还需要创建一个rocketmq的配置文件broker.conf文件, 文件的映射路径:./broker/conf,相对于配置文件的路径, 在修改下BrokeIP1的对象地址即可,文件内的内容如下:

brokerName = broker-a  
brokerId = 0  
deleteWhen = 04  
fileReservedTime = 48  
brokerRole = ASYNC_MASTER  
flushDiskType = ASYNC_FLUSH  
brokerIP1=192.168.18.135

搭建完成后,使用docker-compose up -d 命令启动就可以了。

生产者和消费者案例
  • 生产者
package mainimport ("context""fmt""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer""os"
)func main() {p, _ := rocketmq.NewProducer(producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})), // 接入地址producer.WithRetry(2),          // 重试次数producer.WithGroupName("test"), // 分组名称)err := p.Start()if err != nil {fmt.Printf("start producer error: %s", err.Error())os.Exit(1)}tags := []string{"TagA", "TagB", "TagC"}for i := 0; i < 3; i++ {tag := tags[i%3]msg := primitive.NewMessage("test",[]byte("Hello RocketMQ Go Client!"))msg.WithTag(tag)res, err := p.SendSync(context.Background(), msg)if err != nil {fmt.Printf("send message error: %s\n", err)} else {fmt.Printf("send message success: result=%s\n", res.String())}}err = p.Shutdown()if err != nil {fmt.Printf("shutdown producer error: %s", err.Error())}
}
  • 消费者
package mainimport ("context""fmt""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/consumer""github.com/apache/rocketmq-client-go/v2/primitive"
)func main() {c, err := rocketmq.NewPushConsumer(consumer.WithGroupName("test"),consumer.WithNameServer([]string{"127.0.0.1:9876"}),)if err != nil {panic(err)}err = c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {for _, msg := range msgs {fmt.Printf("subscribe callback: %+v \n", msg)}return consumer.ConsumeSuccess, nil})if err != nil {panic(err)}err = c.Start()if err != nil {panic(err)}defer func() {err = c.Shutdown()if err != nil {fmt.Printf("shutdown Consumer error: %s", err.Error())}}()<-(chan interface{})(nil)}
参考资料
  1. https://zhuanlan.zhihu.com/p/528956421

  2. https://mp.weixin.qq.com/s/iRCP6hEiKOLEp8QRm_OsWQ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/140411.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【已验证】php配置连接sql server中文乱码(解决方法)更改utf-8格式

解决数据库中的中文数据在页面显示乱码的问题 在连接的$connectionInfo中设置"CharacterSet" > "UTF-8"&#xff0c;指定编码方式即可 $connectionInfo array("UID">$uid, "PWD">$pwd, "Database">$database…

【springboot】Failed to start bean ‘webServerStartStop‘;

新同事新建了一个项目springboot项目&#xff0c;启动时候报错。 具体错误如下&#xff1a; Failed to start bean webServerStartStop; nested exception is org.springframework.boot.web.server.WebServerException: Unable to start embedded Tomcat server 未能启动bea…

(论文阅读26/100)Weakly-supervised learning with convolutional neural networks

26.文献阅读笔记 简介 题目 Weakly-supervised learning with convolutional neural networks 作者 Maxime Oquab&#xff0c;Leon Bottou&#xff0c;Ivan Laptev&#xff0c;Josef Sivic&#xff0c;CVPR&#xff0c;2015 原文链接 http://www.cv-foundation.org/open…

深度探究深度学习常见数据类型INT8 FP32 FP16的区别即优缺点

定点和浮点都是数值的表示&#xff08;representation&#xff09;&#xff0c;它们区别在于&#xff0c;将整数&#xff08;integer&#xff09;部分和小数&#xff08;fractional&#xff09;部分分开的点&#xff0c;点在哪里。定点保留特定位数整数和小数&#xff0c;而浮点…

C++——const成员

这里先用队列举例&#xff1a; #define _CRT_SECURE_NO_WARNINGS 1 #include <iostream> #include <assert.h> using namespace std; class SeqList { public:void pushBack(int data){if (_size _capacity){int* tmp (int*)realloc(a, sizeof(int) * 4);if (tm…

excel记录wFm数值(推理过程)

1 导入计算wfm库2 实例化具体的指标 3 列表循环之前&#xff0c;设置空list 4 单图评测-将图号、图片名、数值记录 列表里面存储dict 5 将excel列表结果逐个存入excel.xlsx文件 完整代码 test_CPD.py ### test_CPD.py ### import torch import torch.nn.functional as Fimpor…

算法leetcode|88. 合并两个有序数组(rust重拳出击)

文章目录 88. 合并两个有序数组&#xff1a;样例 1&#xff1a;样例 2&#xff1a;样例 3&#xff1a;提示&#xff1a; 分析&#xff1a;题解&#xff1a;rust&#xff1a;go&#xff1a;c&#xff1a;python&#xff1a;java&#xff1a; 88. 合并两个有序数组&#xff1a; …

flv.js在vue中的使用

Flv.js 是 HTML5 Flash 视频&#xff08;FLV&#xff09;播放器&#xff0c;纯原生 JavaScript 开发&#xff0c;没有用到 Flash。由 bilibili 网站开源。它的工作原理是将 FLV 文件流转码复用成 ISO BMFF&#xff08;MP4 碎片&#xff09;片段&#xff0c;然后通过 Media Sour…

​《水经注全国三维离线GIS系统》硬件安装教程

有些工作&#xff0c;是需要一些外在动力才能完成的。 为什么这么讲呢&#xff1f; 因为正是在客户的要求下&#xff0c;我们才撰写了《水经注全国三维离线GIS系统》的硬件安装教程&#xff0c;而且还录制了视频教程。 当用户收到货物以后&#xff0c;就可以通过本教程清点货…

信驰达科技加入车联网联盟(CCC),推进数字钥匙发展与应用

CCC)的会员。 图 1 深圳信驰达正式成为车联网联盟(CCC)会员 车联网联盟(CCC)是一个跨行业组织&#xff0c;致力于推动智能手机与汽车连接解决方案的技术发展。CCC涵盖了全球汽车和智能手机行业的大部分企业&#xff0c;拥有150多家成员公司。CCC成员公司包括智能手机和汽车制造…

chose_xml

import os import shutil # 定义函数&#xff0c;用于遍历文件夹并复制文件 def copy_files(src_folder, dst_folder, file_type): # 遍历文件夹 for root, dirs, files in os.walk(src_folder): # 遍历文件 for file in files: # 判断文…

Springboot+vue的人力资源管理系统(有报告)。Javaee项目,springboot vue前后端分离项目

演示视频&#xff1a; Springbootvue的人力资源管理系统&#xff08;有报告&#xff09;。Javaee项目&#xff0c;springboot vue前后端分离项目 项目介绍&#xff1a; 本文设计了一个基于Springbootvue的前后端分离的企业资产管理系统&#xff0c;采用M&#xff08;model&…

react+星火大模型,构建上下文ai问答页面(可扩展)

前言 最近写的开源项目核心功能跑通了&#xff0c;前两天突发奇想。关于项目可否介入大模型来辅助用户使用平台&#xff0c;就跑去研究了最近比较活火的国内大模型–讯飞星火大模型。 大模型api获取 控制台登录 地址&#xff1a;https://console.xfyun.cn/app/myapp 新建应…

迅为龙芯2K1000开发板虚拟机ubuntu启动root用户

作为嵌入式开发人员&#xff0c;系统的所有权限都要为我们打开&#xff0c;所以我们不必像运维那样&#xff0c;对 root 用户非常敏感&#xff0c;所以安装完 ubuntu 系统以后&#xff0c;我们要启用 root 用户。 首先我们打开 ubuntu 控制终端&#xff0c;然后在终端里面输入…

[SOC] MBIST (Memory Built-In Self Test) and Memory Built-in Self Repair (BISR)

存储器构成了 VLSI 电路的很大一部分。存储系统设计的目的 是存储大量数据。[1] 存储器不包括逻辑门和触发器。因此&#xff0c;需要不同的故障模型和测试算法来测试存储器。 MBIST 是一种自测试和修复机制&#xff0c;它通过一组有效的算法来测试存储器&#xff0c;以检测典型…

【阿里云】任务2-OSS对象存储教程(找我参加活动可获得京东卡奖励)

目录 前言说明第一步第二步第三步&#xff1a;开通并使用OSS传输加速三、清理第四步-提交作品第五步-提交记录到小程序 前言 本次任务是阿里云官方发出的&#xff0c;每个任务30软妹币&#xff0c;欢迎大家加入我的活动群&#xff0c;门槛很低&#xff0c;所有人都可以参加&…

代码随想录算法训练营第五十天丨 动态规划part13

300.最长递增子序列 思路 首先通过本题大家要明确什么是子序列&#xff0c;“子序列是由数组派生而来的序列&#xff0c;删除&#xff08;或不删除&#xff09;数组中的元素而不改变其余元素的顺序”。 本题也是代码随想录中子序列问题的第一题&#xff0c;如果没接触过这种…

能跟“猫主子”聊天了!生成式AI最快5年内破译第一种动物语言

image.png ChatGPT用它自己的方式来理解世界&#xff0c;类似的技术是否也能用来学习动物的语言&#xff1f; 所罗门能够与动物交流并不是因为他拥有魔法物品&#xff0c;而是因为他有观察的天赋。 ——康拉德・劳伦兹《所罗门王的指环》 在《狮子王》、《疯狂动物城》等以动…

Java通过JNI技术调用C++动态链接库的helloword测试

JNI调用原理 原理就不细说了&#xff0c;其实就是写个库给Java调&#xff0c;可以百度一下Java JNI&#xff0c;下面是HelloWorld代码测试 编写一个本地测试类 package com.my.study.cpp_jni;/*** 测试Java调用C库* <p>使用命令javac -h . NativeTest.java自动生成C头…

红海云签约湘湖实验室,助力科研机构人力资源数字化全面升级

湘湖实验室&#xff08;农业浙江省实验室&#xff09;是由浙江省农业科学院和萧山区人民政府共同举办的新型研发机构&#xff0c;定位为农业核心种质资源生物制造与生物互作科学问题和核心技术研究&#xff0c;瞄准世界农业科技发展前沿&#xff0c;面向国家重大战略&#xff0…