springboot 使用RocketMQ客户端生产消费消息DEMO

创建springboot项目省略

项目依赖

注意:当前客户端版本是 5.1.3 ,安装的rocketmq服务的版本要与其对应

	<properties><java.version>11</java.version><rocketmq-client-java-version>5.1.3</rocketmq-client-java-version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>${rocketmq-client-java-version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>

创建 JmsConfig


public class JmsConfig {//roketmq 服务地址public static String nameServerAddr = "192.168.2.109:9876";//主题public static String TOPIC = "test_topic";
}

创建生产者 Producer

package com.example.springbootrocketmq.jms;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;@Component
public class PayProducer {//生产组private String producerGroup = "test_group";private DefaultMQProducer producer;public PayProducer() {producer = new DefaultMQProducer(producerGroup);//多个NameServer地址 多个地址 ; 号隔开producer.setNamesrvAddr(JmsConfig.nameServerAddr);start();}/*** 开始*/public void start(){try {this.producer.start();} catch (MQClientException e) {e.printStackTrace();}}public DefaultMQProducer getProducer(){return this.producer;}/*** 一般关闭上下文是关闭*/@PreDestroypublic void shutdown(){System.out.println("关闭....");this.producer.shutdown();}
}

创建消费者 Consumer


import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Component;import java.io.UnsupportedEncodingException;@Component
public class Consumer {private DefaultMQPushConsumer consumer;private String consumerGroup = "test_consumer_group";public PayConsumer() throws Exception{consumer = new DefaultMQPushConsumer(consumerGroup);consumer.setNamesrvAddr(JmsConfig.nameServerAddr);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);consumer.subscribe(JmsConfig.TOPIC, "*");consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {try {Message msg = msgs.get(0);System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));String topic = msg.getTopic();String body = new String(msg.getBody(), "utf-8");String tags = msg.getTags();String keys = msg.getKeys();System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} catch (UnsupportedEncodingException e) {e.printStackTrace();return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});consumer.start();}
}

配置 TestController


import com.example.springbootrocketmq.jms.JmsConfig;
import com.example.springbootrocketmq.jms.Producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@Slf4j
@RestController
public class TestController{@Autowiredprivate Producer producer;@RequestMapping("/api/v1/test_cb")public Object callback(String text) throws Exception {Message message = new Message(JmsConfig.TOPIC,"taga",("hello rocketmq = "+ text).getBytes());SendResult sendResult = producer.getProducer().send(message);log.info(sendResult.toString());return null;}
}

测试结果

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/106077.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Annoy vs Milvus:哪个向量数据库更适合您的AI应用?知其然知其所以然

1. Annoy vs Milvus简介 Annoy 和 Milvus 都是用于向量索引和相似度搜索的开源库&#xff0c;它们可以高效地处理大规模的向量数据。 Annoy&#xff08;Approximate Nearest Neighbors Oh Yeah&#xff09;&#xff1a; Annoy 是一种近似最近邻搜索算法&#xff0c;它通过构…

Flink中KeyBy、分区、分组的正确理解

1.Flink中的KeyBy 在Flink中&#xff0c;KeyBy作为我们常用的一个聚合类型算子&#xff0c;它可以按照相同的Key对数据进行重新分区&#xff0c;分区之后分配到对应的子任务当中去。 源码解析 keyBy 得到的结果将不再是 DataStream&#xff0c;而是会将 DataStream 转换为 Key…

同源策略和跨域问题

1.跨域问题产生的原因 浏览器的同源策略影响&#xff0c;同源策略是一种安全机制&#xff0c;它限制了一个网页中的脚本只能访问同源的资源。 跨源网络访问的三种方式&#xff1a;跨域写操作&#xff0c;跨域资源嵌入&#xff0c;跨域读操作 2.跨域问题案例 ip和域名不一致…

微信小程序底部tabBar不显示图标

现场还原 在设置微信小程序底部tabBar导航图标时&#xff0c;无论如何操作均无法显示在界面上 解决思路 问题1 图标类型 一开始以为不支持png类型&#xff0c;但查看官方API仅提示ICON尺寸大小 打开其他项目可以正常展示&#xff0c;排除图标类型问题 iconPath string 否 …

UE4和C++ 开发-常用的宏(二)UPROPERTY(类似于Unity中C#的特性[SerializeField])

UPROPERTY的作用类似于Unity中C#的特性[SerializeField]或者Godot中的export。目的就是通过反射把属性暴露在蓝图或实例的细节面板。 属性说明符&#xff08;Property Specifiers&#xff09;

unity2022版本 实现手机虚拟操作杆

简介 在许多移动游戏中&#xff0c;虚拟操纵杆是一个重要的用户界面元素&#xff0c;用于控制角色或物体的移动。本文将介绍如何在Unity中实现虚拟操纵杆&#xff0c;提供了一段用于移动控制的代码。我们将讨论不同类型的虚拟操纵杆&#xff0c;如固定和跟随&#xff0c;以及如…

进阶JAVA篇- DateTimeFormatter 类与 Period 类、Duration类的常用API(八)

目录 1.0 DateTimeFormatter 类的说明 1.1 如何创建格式化器的对象呢&#xff1f; 1.2 DateTimeFormatter 类中的 format&#xff08;LocalDateTime ldt&#xff09; 实例方法 2.0 Period 类的说明 2.1 Period 类中的 between(localDate1,localDate2) 静态方法来创建对象。 3.…

京东优惠券怎么找?

京东优惠券怎么找&#xff1f; 1、手机安装「草柴」后&#xff0c;打开京东挑选要购买的商品&#xff1b; 2、挑选好京东商品后&#xff0c;点击右上角的「分享」&#xff0c;并点击「复制链接」&#xff1b; 3、将复制的京东商品链接&#xff0c;粘贴到草柴输入框&#xff0c…

antd pro form 数组套数组 form数组动态赋值 shouldUpdate 使用

antd form中数组套数组 form数组动态变化 动态赋值 需求如上&#xff0c;同时添加多个产品&#xff0c;同时每个产品可以增加多台设备&#xff0c;根据设备增加相应编号&#xff0c;所以存在数组套数组&#xff0c;根据数组值动态变化 使用的知识点 form.list form中的数组…

十六、代码校验(5)

本章概要 基准测试 微基准测试JMH 的引入 基准测试 我们应该忘掉微小的效率提升&#xff0c;说的就是这些 97% 的时间做的事&#xff1a;过早的优化是万恶之源。—— Donald Knuth 如果你发现自己正在过早优化的滑坡上&#xff0c;你可能浪费了几个月的时间(如果你雄心勃勃的…

【AI视野·今日Robot 机器人论文速览 第五十四期】Fri, 13 Oct 2023

AI视野今日CS.Robotics 机器人学论文速览 Fri, 13 Oct 2023 Totally 45 papers &#x1f449;上期速览✈更多精彩请移步主页 Interesting: &#x1f4da;AI与机器人安全, 从攻击界面、伦理法律和人机交互层面进行了论述。(from 密西西比大学) &#x1f4da;机器人与图机器学…

华为云云耀云服务器L实例评测|企业项目最佳实践之建议与总结(十二)

华为云云耀云服务器L实例评测&#xff5c;企业项目最佳实践系列&#xff1a; 华为云云耀云服务器L实例评测&#xff5c;企业项目最佳实践之云服务器介绍(一) 华为云云耀云服务器L实例评测&#xff5c;企业项目最佳实践之华为云介绍(二) 华为云云耀云服务器L实例评测&#xff5…

2021年12月 Python(二级)真题解析#中国电子学会#全国青少年软件编程等级考试

Python编程&#xff08;1~6级&#xff09;全部真题・点这里 C/C编程&#xff08;1~8级&#xff09;全部真题・点这里 一、单选题&#xff08;共25题&#xff0c;每题2分&#xff0c;共50分&#xff09; 第1题 执行以下程序 a[33,55,22,77] a.sort() for i in a:print(i)运行…

Nginx:反向代理(示意图+配置)

示意图&#xff1a; 反向代理 反向代理&#xff08;Reverse Proxy&#xff09;是代理服务器的一种&#xff0c;它代表服务器接收客户端的请求&#xff0c;并将这些请求转发到适当的服务器。当请求在后端服务器完成之后&#xff0c;反向代理搜集请求的响应并将其传输给客户端。…

MFC-对话框

目录 1、模态和非模态对话框&#xff1a; &#xff08;1&#xff09;、对话框的创建 &#xff08;2&#xff09;、更改默认的对话框名称 &#xff08;3&#xff09;、创建模态对话框 1&#xff09;、创建按钮跳转的界面 2&#xff09;、在跳转的窗口添加类 3&#xff0…

docker安装nessus

注册地址:https://zh-tw.tenable.com/products/nessus/nessus-essentials 临时邮箱:http://24mail.chacuo.net/ 帮助文档:https://docs.tenable.com/nessus/Content/DeployNessusDocker.htmdocker pull tenableofficial/nessusdocker run --name "my-nessus" -d -p 8…

vscode用密钥文件连接ssh:如果一直要输密码怎么办

commandshiftP&#xff1a;打开ssh配置文件 加上这么一段&#xff0c;host就是你给主机起的名字 对IdentityFile进行更改&#xff0c;改成相应的密钥文件 然后commandshiftP链接到主机就可以了 但是有时候它会让输入密码 这是由于你给这个IdentityFile的权限太多了&#xf…

数据结构 - 2(顺序表10000字详解)

一&#xff1a;List 1.1 什么是List 在集合框架中&#xff0c;List是一个接口&#xff0c;继承自Collection。 Collection也是一个接口&#xff0c;该接口中规范了后序容器中常用的一些方法&#xff0c;具体如下所示&#xff1a; Iterable也是一个接口&#xff0c;Iterabl…

【Vue面试题二十三】、你了解vue的diff算法吗?说说看

文章底部有个人公众号&#xff1a;热爱技术的小郑。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享&#xff1f; 踩过的坑没必要让别人在再踩&#xff0c;自己复盘也能加深记忆。利己利人、所谓双赢。 面试官&#xff1a;你了解vue的diff算法吗&…

【Vue面试题二十一】、Vue中的过滤器了解吗?过滤器的应用场景有哪些?

文章底部有个人公众号&#xff1a;热爱技术的小郑。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享&#xff1f; 踩过的坑没必要让别人在再踩&#xff0c;自己复盘也能加深记忆。利己利人、所谓双赢。 面试官&#xff1a;Vue中的过滤器了解吗&am…