SpringBoot 集成 RocketMQ

一、RocketMQ基本概念

消息模型(Message Model)

RocketMQ主要由Producer、Broker、Consumer三部分组成,其中Producer负责生产消息,Consumer负责消费消息,Broker负责存储消息。Broker在实际部署过程中对应一台服务器,每个Broker可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的Broker。MessageQueue用于存储消息的物理地址,每个Topic中的消息地址存储于多个MessageQueue中。ConsumerGroup由多个Consumer实例构成。

 1、在springBoot项目中添加Maven依赖 

        <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.4</version></dependency>

2、添加配置:

application.yml 文件中添加如下配置:

rocketmq:name-server: 192.168.152.165:9876producer:group: my-group

SpringBoot 集成 RocketMQ代码:

生产者: 消息发送的三种方式

package com.rocketmq.springbootrocketmq;import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;import java.util.concurrent.TimeUnit;@RunWith(SpringRunner.class)
@SpringBootTest
public class T {@Autowiredprivate RocketMQTemplate rocketMQTemplate;//同步消息@Testpublic void testRocketMQ() {Message msg = MessageBuilder.withPayload("boot发送同步消息").build();rocketMQTemplate.send("helloTopicBoot", msg);System.out.println("success send");}//异步消息@Testpublic void sendASYCMsg() throws InterruptedException {Message message = MessageBuilder.withPayload("boot发送异步消息").build();rocketMQTemplate.asyncSend("helloTopicBoot", message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("发送状态:"+sendResult.getSendStatus());}@Overridepublic void onException(Throwable throwable) {System.out.println("消息发送失败");}});TimeUnit.SECONDS.sleep(5);}//一次性消息@Testpublic void sendOneWayRocketMQ() {Message msg = MessageBuilder.withPayload("boot发送一次性消息").build();rocketMQTemplate.sendOneWay("helloTopicBoot", msg);}}

消费者:

package com.example.springbooTRocketMQConsumer.listener;import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import java.nio.charset.Charset;@Component
@RocketMQMessageListener(consumerGroup = "htbConsumerGroup",topic = "helloTopicBoot")
public class HelloTopicListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println("success get:"+new String(messageExt.getBody(), Charset.defaultCharset()));}
}

消息消费的两种模式

集群模式:默认模式
广播模式:

消费者:messageModel = MessageModel.BROADCASTING

package com.example.springbooTRocketMQConsumer.listener;import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import java.nio.charset.Charset;@Component
@RocketMQMessageListener(consumerGroup = "htbConsumerGroup",topic = "helloTopicBoot",messageModel = MessageModel.BROADCASTING)
public class HelloTopicListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println("success get:"+new String(messageExt.getBody(), Charset.defaultCharset()));}
}

顺序消息

生产者:

    //顺序消息@Testpublic void sendOrderlyMsg(){//设置队列选择器rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, org.apache.rocketmq.common.message.Message message, Object o) {String orderIdStr = (String) o;long orderId = Long.parseLong(orderIdStr);int index = (int)orderId % list.size();return list.get(index);}});List<OrderStep> orderSteps = OrderUtil.buildOrders();for (OrderStep orderStep : orderSteps) {Message msg = MessageBuilder.withPayload(orderStep.toString()).build();rocketMQTemplate.sendOneWayOrderly("orderlyTopicBoot",msg,String.valueOf(orderStep.getOrderId()));}}

消费者:

package com.example.springbooTRocketMQConsumer.listener;import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import java.nio.charset.Charset;@Component
@RocketMQMessageListener(consumerGroup = "orderlyConsumerBoot",topic = "orderlyTopicBoot",consumeMode = ConsumeMode.ORDERLY)
public class OrderlyTopicListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println("当前线程:" + Thread.currentThread() + "队列ID"+messageExt.getQueueId() + ",消息内容:" + new String(messageExt.getBody(),Charset.defaultCharset()));}
}

延迟消息

生产者:

    //延迟消息@Testpublic void sendDelayRocketMQ() {Message msg = MessageBuilder.withPayload("boot发送延时消息,发送时间:"+new Date()).build();rocketMQTemplate.syncSend("helloTopicBoot", msg,3000,3);}

消费者:

package com.example.springbooTRocketMQConsumer.listener;import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import java.nio.charset.Charset;
import java.util.Date;@Component
@RocketMQMessageListener(consumerGroup = "htbConsumerGroup",topic = "helloTopicBoot")
public class DelayTopicListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println("success get:发送时间"+new Date()+new String(messageExt.getBody(), Charset.defaultCharset()));}
}

消息Tag条件过滤

生成者

    //Tag消息@Testpublic void sendTagFilterRocketMQ() {Message msg1 = MessageBuilder.withPayload("消息A").build();rocketMQTemplate.sendOneWay("tagFilterBoot:TagA", msg1);Message msg2 = MessageBuilder.withPayload("消息B").build();rocketMQTemplate.sendOneWay("tagFilterBoot:TagB", msg2);Message msg3 = MessageBuilder.withPayload("消息C").build();rocketMQTemplate.sendOneWay("tagFilterBoot:TagC", msg3);}

消费者:

package com.example.springbooTRocketMQConsumer.listener;import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import java.nio.charset.Charset;
import java.util.Date;@Component
@RocketMQMessageListener(consumerGroup = "tagFilterGroupBoot",topic = "tagFilterBoot",selectorExpression = "TagA || TagC")
public class TagFilterTopicListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println("success get:发送时间"+new Date()+new String(messageExt.getBody(), Charset.defaultCharset()));}
}

SQL92消息过滤

生产者:

    //SQL92消息@Testpublic void sendSQL92FilterRocketMQ() {Message msg1 = MessageBuilder.withPayload("小红,年龄22,体重45").setHeader("age","22").setHeader("weight",45).build();rocketMQTemplate.sendOneWay("SQL92FilterBoot", msg1);Message msg2 = MessageBuilder.withPayload("小明,年龄25,体重60").setHeader("age","25").setHeader("weight",60).build();rocketMQTemplate.sendOneWay("SQL92FilterBoot", msg2);Message msg3 = MessageBuilder.withPayload("小蓝,年龄40,体重70").setHeader("age","40").setHeader("weight",70).build();rocketMQTemplate.sendOneWay("SQL92FilterBoot", msg3);}

消费者:

package com.example.springbooTRocketMQConsumer.listener;import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import java.nio.charset.Charset;
import java.util.Date;@Component
@RocketMQMessageListener(consumerGroup = "SQL92FilterGroupBoot",topic = "SQL92FilterBoot",selectorType = SelectorType.SQL92,selectorExpression = "age > 23 and weight > 60")
public class SQL92FilterTopicListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println("success get:发送时间"+new Date()+new String(messageExt.getBody(), Charset.defaultCharset()));}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/148710.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

nodejs+vue实验室上机管理系统的设计与实现-微信小程序-安卓-python-PHP-计算机毕业设计

用户&#xff1a;管理员、教师、学生 基础功能&#xff1a;管理课表、管理机房情况、预约机房预约&#xff1b;权限不同&#xff0c;预约类型不同&#xff0c;教师可选课堂预约和个人&#xff1b;课堂预约。 在实验室上机前&#xff0c;实验室管理员需要对教务处发来的上机课表…

浅析AcrelEMS-CIA机场智慧能源管平台解决方案-安科瑞 蒋静

1 概述 机场智慧能源管平台解决方案对机场范围内变电站内的高低压配电设备 、 发电机、变压器 、UPS、EPS 、广场照明 、 室内照明 、通风及排水等机电设备进行实时分布式监控和集中管理 , 实现无人值守 , 确保高速公路安全畅通 , 提高 自动化管理水平 , 降低机电设备的运行维…

k8s运维管理

node管理 node隔离与恢复 1.使用patch命令 #实现node隔离调度 kubectl patch node node1 -p {"spec":{"unschedulable":true}} #节点信息多了一项SchedulingDisabled node1 Ready,SchedulingDisabled <none> 7d3h v1.18.20 #解除 kubectl …

C语言二叉树的建立和遍历

tree.h typedef struct TreeNode_s {char val;//数据域struct TreeNode_s *pLeft;//左结点struct TreeNode_s *pRight;//右边结点 } TreeNode_t, *pTreeNode_t;//辅助队列建立二叉树 typedef struct QueueNode_s {pTreeNode_t NodeAddr; //数据域存储一个二叉树结点的地址 str…

SpringBoot常见注解

✅作者简介&#xff1a;大家好&#xff0c;我是Leo&#xff0c;热爱Java后端开发者&#xff0c;一个想要与大家共同进步的男人&#x1f609;&#x1f609; &#x1f34e;个人主页&#xff1a;Leo的博客 &#x1f49e;当前专栏&#xff1a;每天一个知识点 ✨特色专栏&#xff1a…

程序员告诉你:人工智能是什么?

随着科技的快速发展&#xff0c;人工智能这个词汇已经逐渐融入了我们的日常生活。然而&#xff0c;对于大多数人来说&#xff0c;人工智能仍然是一个相对模糊的概念。 首先&#xff0c;让我们从人工智能的定义开始。人工智能是一种模拟人类智能的技术&#xff0c;它涵盖了多个领…

C++中将 sizeof() 用于类

C中将 sizeof() 用于类 您知道&#xff0c;通过使用关键字 class 声明自定义类型&#xff0c;可封装数据属性和使用数据的方法。运算符 sizeof( )用于确定指定类型需要多少内存&#xff0c;单位为字节。这个运算符也可用于类&#xff0c;在这种情况下&#xff0c;它将指出类声…

flink入门

1.安装flink&#xff0c;启动flink 文档地址&#xff1a;Apache Flink 1.3-SNAPSHOT 中文文档: Apache Flink 中文文档 代码&#xff1a;GitHub - apache/flink: Apache Flink 2. 打开端口 端口号&#xff0c; 启动jar ### 切换到flink 目录bin下 [rootlocalhost ~]# cd /…

参考文献格式

目录 期刊会议预印本&#xff08;如arxiv&#xff09; 期刊 找不到页码可以在文献中查看bibtex格式&#xff0c;其中有 外文期刊可在web of science中查找卷号、期号和所在页数&#xff1a; [1] ZHANG F, HU Z Q, FU Y K, et al. A New Identification Method for Surface …

python 词云 wordcloud使用paddle模式 庆余年人物分析--不是特别准,可以看着玩一玩

看完之后你也可以生成自己的词云 提供一个过滤人名的英中词性分析对应&#xff0c;更多的可以去我的码云上看看 https://gitee.com/billion_lines_of_code/learn-wordcloud # 只过滤人名 En2Cn_name {nr: 名词-人名,nr1: 名词-汉语姓氏,nr2: 名词-汉语名字,nrf: 名词-音译人…

【0到1学习Unity脚本编程】第一人称视角的角色控制器

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;【0…

图像滤波处理

滤波处理是图像处理中常用的技术之一&#xff0c;用于去除图像中的噪声、平滑图像、边缘检测等。以下是几种常见的滤波处理方法&#xff1a; 1. 均值滤波 (Mean Filtering) 原理&#xff1a; 均值滤波使用一个固定大小的滤波器&#xff0c;在图像上滑动并取周围像素的平均值来…

ClickHouse SQL 查询优化

1 单表查询 1.1 Prewhere替代where Prewhere和where语句的作用相同&#xff0c;用来过滤数据。不同之处在于prewhere只支持 *MergeTree 族系列引擎的表&#xff0c;首先会读取指定的列数据&#xff0c;来判断数据过滤&#xff0c;等待数据过滤之后再读取select 声明的列字段来补…

自动驾驶学习笔记(九)——车辆控制

#Apollo开发者# 学习课程的传送门如下&#xff0c;当您也准备学习自动驾驶时&#xff0c;可以和我一同前往&#xff1a; 《自动驾驶新人之旅》免费课程—> 传送门 《Apollo Beta宣讲和线下沙龙》免费报名—>传送门 文章目录 前言 控制器设计 比例积分微分控制 线性…

在 Linux 环境下的简单调试技巧

在 Linux 环境下的简单调试技巧 GDB&#xff08;GNU调试器&#xff09;是一个强大的命令行调试工具&#xff0c;用于调试C、C等程序。下面是使用GDB的一些基本步骤&#xff1a; 编译程序时包含调试信息 确保在编译程序时使用 -g 选项来包含调试信息。例如&#xff1a; gcc -g …

Kotlin 知识体系

Kotlin 知识体系 1、Kotlin 文档2、Kotlin 基础3、桌面应用程序4、Android 与 iOS 应用程序 1、Kotlin 文档 Kotlin 是一门现代但已成熟的编程语言&#xff0c;旨在让开发人员更幸福快乐。 它简洁、安全、可与 Java 及其他语言互操作&#xff0c;并提供了多种方式在多个平台间复…

『亚马逊云科技产品测评』活动征文|借助AWS EC2搭建服务器群组运维系统Zabbix+spug

授权声明&#xff1a;本篇文章授权活动官方亚马逊云科技文章转发、改写权&#xff0c;包括不限于在 Developer Centre, 知乎&#xff0c;自媒体平台&#xff0c;第三方开发者媒体等亚马逊云科技官方渠道。 本文基于以下软硬件工具&#xff1a; aws ec2 frp-0.52.3 zabbix 6…

LRU最近最少使用算法

LRU(LeastRecentlyUsed)“最近最少使用”算法&#xff1a; 1.当缓存空间已满耗用时&#xff0c;淘汰最近最少使用数据的缓存对象以释放更多的缓存空间(用于历史缓存对象的维护)。 2. 哈希表:快速查找缓存对象&#xff1b;双向链表:维护 历史数据所在的节点顺序。 步骤&#xff…

掌握深度学习利器——TensorFlow 2.x实战应用与进阶

掌握深度学习利器——TensorFlow 2.x实战应用与进阶 摘要&#xff1a;随着人工智能技术的飞速发展&#xff0c;深度学习已成为当下最热门的领域之一。作为深度学习领域的重要工具&#xff0c;TensorFlow 2.x 备受关注。本文将通过介绍TensorFlow 2.x的基本概念和特性&#xff…

在 Linux 上搭建 Java Web 项目环境(最简单的进行搭建)

要在 Linux 上安装的程序有 1.JDK (要想运行 java 程序 JDK 是必不可少的) 2.Tomcat &#xff08;HTTP 服务器&#xff0c;是管理 Web 项目的常用工具&#xff09; 3. mysql &#xff08;数据库&#xff09; 一.安装 JDK 博主使用的 Linux 发行版是 centos &#xff0c;cen…