rocketmq 消息 自定义_跟我学RocketMQ[1-4]之消息消费及支持spring

博客地址:朝·闻·道​www.wuwenliang.net

本文我将继续讲解如何使用DefaultMQPushConsumer对RocketMQ中的消息进行消费,同时在文章的第二部分将继续带领读者朋友对DefaultMQPushConsumer进行薄封装,让我们在Spring中更容易对消息进行消费。DefaultMQPushConsumer不区分普通消息和事务消息,即我们能够利用DefaultMQPushConsumer实现对普通消息和事务消息的消费。

通过DefaultMQProducer消费消息

首先,声明一个DefaultMQPushConsumer客户端,并通过构造器初始化,构造参数为消费者组。官方建议消费者组以“CID_”开头。

DefaultMQPushConsumer consumer =

new DefaultMQPushConsumer("CID_SNOWALKER");

设置NameServer地址

defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");

设置Consumer第一次启动从队列头部开始消费

defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

设置消费模式为集群方式,CLUSTERING模式下每条消息只会被一个Consumer消费一次,如果设置为BROADCASTING则为广播模式,每个消费者都会将消息消费至少一次。一般我们使用的均为CLUSTERING模式。

defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);

注册消息监听器,这里需要实现MessageListenerConcurrently接口,并实现consumeMessage(List msgs, ConsumeConcurrentlyContext context) 方法,我这里的demo是lambda形式,实际上是一样的。如果你不喜欢lambda形式,可以继续使用匿名内部类或者自行定义一个类实现该接口。

defaultMQPushConsumer.registerMessageListener(

(msgs, context) -> {

for (MessageExt msg : msgs) {

String realMessage = new String(msg.getBody());

LOGGER.info("当前消费线程名={}, 消息id={}, 收到消息为={}",

Thread.currentThread().getName(),

msg.getMsgId(),

realMessage);

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

)

这里注意,当消费逻辑执行成功,则返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,后续将不再对该消息进行消费。如果消费逻辑失败,则需要设置为ConsumeConcurrentlyStatus.RECONSUME_LATER, RocketMQ会对消息进行重新推送,默认推送16次,目的是尽量保证消息消费成功,如果达到最大重试次数,还是失败则进入死信队列,等待人工干预。

调用start()方法,启动对队列的监听,开始进行消息的消费。

defaultMQPushConsumer.start();

我们尝试运行一下,这里我已经有了对应的消费者,可以看下运行的日志:

2019-01-23 09:55:25.022 INFO 18784 --- [ublicExecutor_8] c.s.shield.job.publisher.DemoPublisher :

消息id=AC1E5356496018B4AAC2736D06CF0002, 发送结果=SEND_OK

2019-01-23 09:55:27.519 INFO 18784 --- [MessageThread_8] c.s.shield.job.consumer.DemoConsumer :

当前消费线程名=ConsumeMessageThread_8, 消息id=AC1E5356496018B4AAC2736D06CF0002, 收到消息为={"msgName":"rocketmq-simple-msg-test","topicName":"SNOWALKER_TEST","tagName":"SNOWALKER_TEST-TAG","clusterName":"localhost.localdomain","taskName":"测试消息简单发送------第0次","threadSize":"10","threadName":"simple-msg-test-0"}

可以看到broker推送消息至消费端,并且被成功消费。

Spring框架整合DefaultMQPushConsumer

我们仍然基于Spring Boot v1.5.3.RELEASE, Spring v4.3.8.RELEASE 对DefaultMQPushConsumer进行整合,相关代码已经上传至github

这里对核心代码进行讲解。

首先定义RocketMQPushConsumerAgent.java并将其声明为spring的bean,作用域为prototype,即多例形式。

@Scope("prototype")

@Component

public class RocketMQPushConsumerAgent {

声明消息监听器及消息消费者

private MessageListenerConcurrently messageListener;

private DefaultMQPushConsumer defaultMQPushConsumer;

init()方法为核心的初始化逻辑,在该方法中,初始化了DefaultMQPushConsumer,并设置NameServer地址、消费模式以及将外部实现的监听器设置给内部的messageListener引用。

接着对消息主题进行订阅,对该主题下所有的消息进行监听,这里有待优化,后续将把消息的过滤表达式也暴露给调用者。

所有的配置参数均通过RocketMQConsumerConfig进行设置,保证接口的整洁性,RocketMQConsumerConfig将在附录中进行简单讲解。

public RocketMQPushConsumerAgent init(RocketMQConsumerConfig consumerConfig, MessageListenerConcurrently messageListener) throws MQClientException {

defaultMQPushConsumer = new DefaultMQPushConsumer(consumerConfig.getConsumerGroup());

defaultMQPushConsumer.setNamesrvAddr(consumerConfig.getNameSrvAddr());

defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

// 消费模式

if (consumerConfig.getMessageModel() != null) {

defaultMQPushConsumer.setMessageModel(consumerConfig.getMessageModel());

}

// 注册监听器

this.messageListener = messageListener;

defaultMQPushConsumer.registerMessageListener(this.messageListener);

defaultMQPushConsumer.subscribe(consumerConfig.getTopic(), "*");

LOGGER.debug("com.shield.job.message.rocketmq.RocketMQConsumerAgent消费者客户端组装完成");

return this;

}

独立的启动方法

public void start() throws MQClientException {

this.defaultMQPushConsumer.start();

}

独立的关闭方法

public void destroy() {

defaultMQPushConsumer.shutdown();

LOGGER.debug("com.shield.job.message.rocketmq.RocketMQConsumerAgent消费者客户端[已关闭]");

}

为方便外部对消费者进行进一步的自定义设置,提供外部获取defaultMQPushConsumer的接口。

public DefaultMQPushConsumer getConsumer() {

return defaultMQPushConsumer;

}

RocketMQPushConsumerAgent使用案例

仍然依据开头的示例进行改造。

@Component

public class DemoConsumer {

private static final Logger LOGGER = LoggerFactory.getLogger(DemoConsumer.class);

使用@Resource(name = “rocketMQPushConsumerAgent”)或者直接@Autowired将自定义的消息消费者注入。

@Resource(name = "rocketMQPushConsumerAgent")

RocketMQPushConsumerAgent rocketMQConsumerAgent;

调用方需要实现一个返回值为void的方法,并标记为@PostConstruct,在该方法中进行rocketMQConsumerAgent的初始化。当spring在加载过程中,DemoConsumer初始化之前会调用该init()方法初始化rocketMQConsumerAgent。通过start()链式调用,启动消息消费者,内部是调用的defaultMQPushConsumer.start()方法。

@PostConstruct

void init() {

try {

rocketMQConsumerAgent.init(

new RocketMQConsumerConfig(

"snowalker-consumer-group",

"172.30.83.100:9876",

"SNOWALKER_TEST",

MessageModel.CLUSTERING),

(msgs, context) -> {

for (MessageExt msg : msgs) {

String realMessage = new String(msg.getBody());

LOGGER.info("当前消费线程名={}, 消息id={}, 收到消息为={}",

Thread.currentThread().getName(),

msg.getMsgId(),

realMessage);

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

).start();

LOGGER.info("DemoConsumer 初始化RocketMQ简单消息消费者完成");

} catch (MQClientException e) {

e.printStackTrace();

LOGGER.info("DemoConsumer 初始化RocketMQ简单消息消费者失败");

}

}

}

在init()方法中同时将消息监听器的实现逻辑注入,消费者会加载该接口的实现。

附录:RocketMQConsumerConfig配置类

public class RocketMQConsumerConfig {

/**消费者组*/

private String consumerGroup;

/**nameServer地址*/

private String nameSrvAddr;

/**消息消费主题*/

private String topic;

private MessageModel messageModel;

public RocketMQConsumerConfig(String consumerGroup, String nameSrvAddr, String topic) {

Preconditions.checkNotNull(consumerGroup);

Preconditions.checkNotNull(nameSrvAddr);

Preconditions.checkNotNull(topic);

this.consumerGroup = consumerGroup;

this.nameSrvAddr = nameSrvAddr;

this.topic = topic;

}

public RocketMQConsumerConfig(String consumerGroup, String nameSrvAddr, String topic, MessageModel messageModel) {

Preconditions.checkNotNull(consumerGroup);

Preconditions.checkNotNull(nameSrvAddr);

Preconditions.checkNotNull(topic);

Preconditions.checkNotNull(messageModel);

this.consumerGroup = consumerGroup;

this.nameSrvAddr = nameSrvAddr;

this.topic = topic;

this.messageModel = messageModel;

}

public String getConsumerGroup() {

return consumerGroup;

}

public String getNameSrvAddr() {

return nameSrvAddr;

}

public String getTopic() {

return topic;

}

public MessageModel getMessageModel() {

return messageModel;

}

}

该配置类封装了消费者客户端初始化的必填参数,目的是收拢初始化参数,从而使初始化接口更加简洁,符合开闭原则。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/496646.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

sklearn 逻辑回归Demo

逻辑回归案例 假设表示 基于上述情况,要使分类器的输出在[0,1]之间,可以采用假设表示的方法。 设 h θ ( x ) g ( θ T x ) h_θ (x)g(θ^T x) hθ​(x)g(θTx), 其中 g ( z ) 1 ( 1 e − z ) g(z)\frac{1}{(1e^{−z} )} g(z)(1e−z)1​…

URL原理、URL编码、URL特殊字符、输入URL到页面显示

​From:http://blog.csdn.net/zmx729618/article/details/51381655 From:http://www.cnblogs.com/coco1s/p/5038412.html HTML URL 编码参考手册:https://www.w3cschool.cn/htmltags/html-urlencode.html http://www.w3school.com.cn/t…

记忆模糊、记忆泛化的关键分子开关被发现

来源:brainnews2018年3月12日,Nature Medicine杂志在线刊登了麻省总医院Amar Sahay研究组的最新重要工作,他们发现了一种细胞骨架蛋白Actin-binding LIM protein 3 (ABLIM3),降低该蛋白的表达水平可以增强海马齿状回细胞&#xff…

240多个jQuery插件 (转)

概述 jQuery 是继 prototype 之后又一个优秀的 Javascript 框架。其宗旨是—写更少的代码,做更多的事情。它是轻量级的 js 库(压缩后只有21k) ,这是其它的 js 库所不及的,它兼容 CSS3,还兼容各种浏览器(IE 6.0, FF 1.5, Safari 2.…

Exchanger及其用法

01 Exchanger 作用 使两个线程之间进行数据传递。(对是两个之间而不是三个或者更多个线程之间) 02 常用方法 exchange() 阻塞当前线程并等待其他线程来取得数据,若没有其他线程来取数据则一直等待。 exchange&…

SM3算法

文章目录前言一、SM3是什么?二、go语言实现前言 提示:以下是本篇文章正文内容,下面案例可供参考 一、SM3是什么? SM3是中华人民共和国政府采用的一种密码散列函数标准,由国家密码管理局于2010年12月17日发布。相关标…

2 如何设置窗口title_如何设置华为4G路由2的WiFi黑白名单【设置方法】

不想让自家的Wi-Fi被蹭网,除了将Wi-Fi隐藏起来,您还可以设置Wi-Fi黑白名单。如果您发现有人蹭网了,可以将蹭网设备直接加入黑名单,这样就可以禁止这个设备再连接到您的Wi-Fi。如果您将家人、朋友的设备加入了白名单,那…

复选框怎么点td选中_jQuery点击tr实现checkbox选中的方法

标题描述的有点不贴切,但希望大家能够明白,为了更形像的表达,我特意录制了一张GIF动画图片。我不知道实际开发中有没有用到这种效果,但我个人认为,这种方式更人性化,因为只要点到一行,就可以使C…

谷歌大脑AutoML最新进展:用进化算法发现神经网络架构

来源:AI中国大脑的进化进程持续已久,从5亿年前的蠕虫大脑到现如今各种现代结构。例如,人类的大脑可以完成各种各样的活动,其中许多活动都是毫不费力的。例如,分辨一个视觉场景中是否包含动物或建筑物对我们来说是微不足…

股票名词解释

开盘价:   是指当日开盘后该股票的第一笔交易成交的价格。如果开市后30分钟内无成交价,则以前日的收盘价作为开盘价。 收盘价:   指每天成交中最后一笔股票的价格,也就是收盘价格。 最高价:   是指当日所成交的…

linux 的 ip 命令 和 ifconfig 命令

From(试试Linux下的ip命令,ifconfig已经过时了): https://linux.cn/article-3144-1.html From(linux网络配置命令之ifconfig、ip和route): http://chrinux.blog.51cto.com/6466723/1188108 From…

SM4算法

文章目录前言一、SM4是什么?二、go语言实现前言 提示:以下是本篇文章正文内容,下面案例可供参考 一、SM4是什么? SM4.0(原名SMS4.0)是中华人民共和国政府采用的一种分组密码标准,由国家密码管…

Java并发编程实战~软件事务内存

很多同学反馈说,工作了挺长时间但是没有机会接触并发编程,实际上我们天天都在写并发程序,只不过并发相关的问题都被类似 Tomcat 这样的 Web 服务器以及 MySQL 这样的数据库解决了。尤其是数据库,在解决并发问题方面,可…

python list去重时间复杂度_List集合去重的一种方法 z

需要对一个List集合去重,情况是该集合中会出现多个Name属性值相同的,但是其他属性值不同的数据。在这种情况下,需求要只保留其中一个就好。我觉得遍历和HashSet都不是我想要的,便采用了一下方式定义Compare类,继承IEqu…

对于Office Live平台的思考

刚接触计算机编程的时候,脑子里想法比肚子里的墨水多得多,那时候想通过网络成立一个游戏开发团队,将不少人都很喜欢的一款FC游戏“重装机兵”(Metal Max)移植到电脑上来。当时的想法很激进也很宏大,我想的不…

中国学者用人工光感受器助失明小鼠复明

来源:《自然—通讯》中国研究人员在英国《自然通讯》杂志上发表报告说,他们通过在失明小鼠眼底植入一种新研发的人工光感受器,让它们的视觉得以恢复。如果这种技术发展成熟,未来或许能帮助因黄斑变性等疾病而视力下降或失明的患者…

PoW算法

文章目录前言一、PoW——工作量证明二、go语言简单案例前言 提示:以下是本篇文章正文内容,下面案例可供参考 一、PoW——工作量证明 ⚫ Proof-of-Work 简称 PoW,即为工作量证明 ⚫ 通过计算一个数值,使得拼揍上交易数据后内容的…

systemctl 命令完全指南

From:https://linux.cn/article-5926-1.html systemctl命令是系统服务管理器指令,它实际上将 service 和 chkconfig 这两个命令组合到一起。 任务 旧指令 新指令 使某服务自动启动 chkconfig --level 3 httpd on systemctl enable httpd.service 使某服务不自…

boundcolumn 根据值进行判断_Excel使用函数进行条件判断的方法步骤

Excel中的函数具体该如何进行判断数据的条件是否达到要求呢?下面是学习关于excel使用函数进行条件判断的教程,希望阅读过后对你有所启发! excel使用函数进行条件判断的教程 函数条件判断步骤1:如何计算成绩是否合格 函数条件判断步骤2:选中要…

城市大脑不仅是AI系统,更是结合人类智慧的混合智能巨系统

作者:刘锋 《互联网进化论》作者从2015年开始,智慧城市的类脑化进程不断加速,包括城市大脑,城市云脑,城市神经系统,智慧城市脑,交通大脑等概念不断涌现,人工智能成为当前科技热点的今…