RocketMq局部顺序消息

package com.ldj.rocketmq.producer;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;import java.nio.charset.StandardCharsets;/*** User: ldj* Date: 2024/5/26* Time: 15:09* Description: 局部顺序消息*/
public class OrderProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("produce-group-order");producer.setNamesrvAddr("192.168.208.190:9876;192.168.208.191:9876;192.168.208.192:9876");producer.start();/*** 局部顺序消息的要点的分2级,拿最外层的id作为计算队列下标,让相同一级的消息进入同一个队列*/for (int i = 0; i < 5; i++) {for (int j = 0; j < 5; j++) {int orderId = i;Message message = new Message("OrderTopic", "orderMessage", ("order_step[" + orderId + "-" + j + "]").getBytes(StandardCharsets.UTF_8));producer.send(message, (mqs, msg, arg) -> {Integer id = (Integer) arg;if (id != null) {return mqs.get(id.hashCode() % mqs.size());}throw new RuntimeException("缺少决定消息顺序的id!");}, orderId);}}}}

 

将一级分类下的所有信息收集进map,模拟顺序消费信息

package com.ldj.rocketmq.consumer;import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.util.CollectionUtils;import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** User: ldj* Date: 2024/5/26* Time: 16:08* Description: 顺序消息消费者*/
public class OrderConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group-order");consumer.setNamesrvAddr("192.168.208.190:9876;192.168.208.191:9876;192.168.208.192:9876");consumer.subscribe("OrderTopic", "*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.setMessageModel(MessageModel.CLUSTERING);//订单id做为key,消息作为valueMap<String, List<String>> concurrentHashMap = new ConcurrentHashMap<>();consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {msgs.forEach(msg -> {String str = new String(msg.getBody());int left = str.indexOf("[");int right = str.indexOf("]");String substring = str.substring(left + 1, right);String[] k_v = substring.split("-");List<String> messages = concurrentHashMap.get(k_v[0]);if (CollectionUtils.isEmpty(messages)) {List<String> msgFirstList = new ArrayList<>();msgFirstList.add(str);concurrentHashMap.put(k_v[0], msgFirstList);} else {List<String> msgAddList = concurrentHashMap.get(k_v[0]);msgAddList.add(str);concurrentHashMap.put(k_v[0], msgAddList);}});System.out.println(JSON.toJSONString(concurrentHashMap));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();System.out.println("消费者准备就绪...");}
}

 复制最后一行,并格式化

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/16122.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

css卡片翻转 父元素翻转子元素不翻转效果

css卡片翻转 父元素翻转子元素不翻转效果 vue <div class"moduleBox"><div class"headTitle"><span class"headName">大额案例</span></div><div class"moduleItem"><span class"module…

three.js判断物体在人的前面,还是后面

three.js判断物体在人的前面&#xff0c;还是后面 const player new THREE.Vectors(10, 0, 5); const mesh new THREE.Vectors(15, 0, 6);上面&#xff0c;两个变量分别表示&#xff0c;玩家的位置&#xff0c;物体的位置。 从这发现&#xff0c;当玩家和物体的角度关系 小…

spring boot 整合j2cache 项目启动警告 Redis mode [null] not defined. Using ‘single‘

好 之前的文章 spring boot 整合j2cache 基础操作 在spring boot环境中整合了 j2cache 我们 项目启动时 日志会有一个关键信息 Redis的模式 没有定义 默认使用 single Redis 的这个模式有四种 大家可以自己去网上找一下 做个了解 不用很纠结 我们直接在 j2cache.properties …

一文读懂Apollo客户端配置加载流程

本文基于 apollo-client 2.1.0 版本源码进行分析 Apollo 是携程开源的配置中心&#xff0c;能够集中化管理应用不同环境、不同集群的配置&#xff0c;配置修改后能够实时推送到应用端&#xff0c;并且具备规范的权限、流程治理等特性。 Apollo支持4个维度管理Key-Value格式的配…

比勤奋更重要的是系统思考的能力

不要在接近你问题症状的地方寻找解决办法&#xff0c;要追溯过去&#xff0c;查找问题的根源。通常&#xff0c;最有效的活动是最微妙的。有时最好按兵不动&#xff0c;使系统自我修正&#xff0c;或让系统引导行动。有时会发现&#xff0c;最好的解决办法出现在完全出乎预料的…

HTML蓝色爱心

目录 写在前面 HTML入门 完整代码 代码分析 运行结果 系列推荐 写在后面 写在前面 最近好冷吖&#xff0c;小编给大家准备了一个超级炫酷的爱心&#xff0c;一起来看看吧&#xff01; HTML入门 HTML全称为HyperText Markup Language&#xff0c;是一种标记语言&#…

C++-指针

在C中&#xff0c;指针是至关重要的组成部分。它是C语言最强大的功能之一&#xff0c;也是最棘手的功能之一。 指针具有强大的能力&#xff0c;其本质是协助程序员完成内存的直接操纵。 指针&#xff1a;特定类型数据在内存中的存储地址&#xff0c;即内存地址。 指针变量的定…

2024.5组队学习——MetaGPT(0.8.1)智能体理论与实战(下):多智能体开发

传送门&#xff1a; 《2024.5组队学习——MetaGPT&#xff08;0.8.1&#xff09;智能体理论与实战&#xff08;上&#xff09;&#xff1a;MetaGPT安装、单智能体开发》《2024.5组队学习——MetaGPT&#xff08;0.8.1&#xff09;智能体理论与实战&#xff08;中&#xff09;&…

ModelBuilder之GDP空间化——批量值提取

一、前言 前面明确说到对于空间化过程中其实只有两个过程可以进行批量操作,一个是我们灯光指数提取过程和批量进行值提取,这里补充一点,对于灯光指数计算可以实现批量计算总灯光指数和平均灯光指数,综合灯光指数需要用平均灯光指数乘以面积占比求得,面积比就是(DN大于0的…

VS2022通过C++网络库Boost.asio搭建一个简单TCP异步服务器和客户端

基本介绍 上一篇博客我们介绍了通过Boost.asio搭建一个TCP同步服务器和客户端&#xff0c;这次我们再通过asio搭建一个异步通信的服务器和客户端系统&#xff0c;由于这是一个简单异步服务器&#xff0c;所以我们的异步特指异步服务器而不是异步客户端&#xff0c;同步服务器在…

BGP选路规则

配置地址&#xff0c;AS123使用ospf保证通讯&#xff0c;修改接口类型保证ospf学习环回20.0,30.0,100.0 地址时&#xff0c;是以24位掩码学习&#xff0c;R1&#xff0c;R2&#xff0c;R3都处于BGP边界&#xff0c;各自都需要宣告三者的私网环回 1&#xff0c; [R4]ip ip-prefi…

【Qnx 】Qnx IPC通信PPS

Qnx IPC通信PPS Qnx自带PPS服务&#xff0c;PPS全称Persistent Publish/Subscribe Service&#xff0c;就是常见的P/S通信模式。 Qnx PPS的通信模式是异步的&#xff0c;Publisher和Subscriber也无需关心对方是否存在。 利用Qnx提供的PPS服务&#xff0c;Publisher可以通知多…

嵌入式进阶——LED呼吸灯(PWM)

&#x1f3ac; 秋野酱&#xff1a;《个人主页》 &#x1f525; 个人专栏:《Java专栏》《Python专栏》 ⛺️心若有所向往,何惧道阻且长 文章目录 PWM基础概念STC8H芯片PWMA应用PWM配置详解占空比 PWM基础概念 PWM全称是脉宽调制&#xff08;Pulse Width Modulation&#xff09…

Arduino下载与安装(Windows 10)

Arduino下载与安装(Windows 10) 官网 下载安装 打开官网&#xff0c;点击SOFTWARE&#xff0c;进入到软件下载界面&#xff0c;选择Windows 选择JUST DOWNLOAD 在弹出的界面中&#xff0c;填入电子邮件地址&#xff0c;勾选Privacy Policy&#xff0c;点击JUST DOWNLOAD即可 …

【脚本篇】---spyglass lint脚本

目录结构 sg_lint.tcl &#xff08;顶层&#xff09; #1.source env #date set WORK_HOME . set REPORT_PATH ${WORK_HOME}/reports puts [clock format [clock second] -format "%Y-%m-%d %H:%M:%S"] #2.generate source filelist #3.set top module puts "##…

qt-C++笔记之QThread使用

qt-C笔记之QThread使用 ——2024-05-26 下午 code review! 参考博文&#xff1a; qt-C笔记之使用QtConcurrent异步地执行槽函数中的内容&#xff0c;使其不阻塞主界面 qt-C笔记之QThread使用 文章目录 qt-C笔记之QThread使用一:Qt中几种多线程方法1.1. 使用 QThread 和 Lambda…

ubuntu server 24.04 网络 SSH等基础配置

1 安装参考上一篇: VMware Workstation 虚拟机安装 ubuntu 24.04 server 详细教程 服务器安装图形化界面-CSDN博客 2 网络配置 #安装 sudo apt install net-tools#查看 ifconfig #修改网络配置 sudo vim /etc/netplan/50-cloud-init.yaml network:version: 2ethernets:en…

飞鸡:从小训练飞行的鸡能飞行吗?为什么野鸡能飞吗?是同一品种吗?今天自由思考

鸡的飞行能力在很大程度上受到其生理结构的限制。尽管鸡有翅膀&#xff0c;但与能够长时间飞行的鸟类相比&#xff0c;鸡的翅膀相对较小&#xff0c;且胸部肌肉较弱。再加上鸡的身体较重&#xff0c;这些因素共同限制了鸡的飞行能力。通常&#xff0c;鸡只能进行短暂的、低空的…

【wiki知识库】01.wiki知识库前后端项目搭建(SpringBoot+Vue3)

&#x1f4dd;个人主页&#xff1a;哈__ 期待您的关注 &#x1f33c;环境准备 想要搭建自己的wiki知识库&#xff0c;要提前搭建好自己的开发环境&#xff0c;后端我使用的是SpringBoot&#xff0c;前端使用的是Vue3&#xff0c;采用前后端分离的技术实现。同时使用了Mysql数…