顺序消息及代码实现

顺序消费:分区有序(不同queue之间的消息是同一topic类型,但没有联系,不同queue中的消息之间没有先后顺序,同一queue中的消息必须有先后顺序):当有多个queue参与时,实现queue的分区有序,在定义producer时可以指定消息队列选择器;定义选择器的选择算法时,一般需要使用选择key可以是消息key也可以是其他数据,无论谁做选择key,都是唯一,不能重复的;  一般选择算法是,让选择key(或者hash值)与该topic的queue数量取模,结果为选择出的queue的queueId。

当选择key数量很多时,不同的选择key跟queue取模的结果可能是相同的。一个queue中会出现多个不同选课key的消息,一个consumer也会消费到不同选择key的消息。一般做法是:从消息中获取选择key,对其判断。若是当前consumer需要消费的消息就直接消费,不是就不消费。这种方式要求选择key能随着消息一起被consumer获取,此时使用消息key作为选择key是最好的。

package com.example.test.producer;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.List;public class OrderProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("producer");producer.setSendMsgTimeout(10000);producer.setNamesrvAddr("127.0.0.1:9876");//若为全局有序,则需设置queue数量为1//producer.setDefaultTopicQueueNums(1);producer.start();try {for (int i = 0; i < 10; i++) {//为了演示简单,这里用整数作为orderId,实际上并不是整数。Integer orderId = i;byte[] body = ("Hi" + i).getBytes();Message msg = new Message("TopicTest", "tagA", body);msg.setKeys(orderId.toString());//这是同步发送,加上new CallBack参数为异步发送SendResult sendResult = producer.send(msg, new MessageQueueSelector() {//具体的选择算法在方法中定义//方法中的参数Object o 就是该send方法中的一个参数orderId@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object o) {//这是让for循环中定义的orderId作为选择key。String keys = msg.getKeys();Integer id = Integer.valueOf(keys);//下面是选择 o作为选择key,看的是该send方法中的第三个参数//Integer id=(Integer) o;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.println(sendResult);}} catch (Exception e) {e.printStackTrace();}finally {producer.shutdown();}}
}

如果当前consumer拉取到了不需要消费的选择key的消息,不做处理,但该消息会被视为已被消息,该consumer group的另一个消费该选择key的consumer不能再获取该消息。但每个consumer group之间是互相隔离的且同样可以获取到queue中的全部消息,这样未被做处理的消息可以被其他的消费者组进行处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/781309.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

接口自动化测试问题汇总

本篇文章分享几个接口自动化用例编写过程遇到的问题总结&#xff0c;希望能对初次探索接口自动化测试的小伙伴们解决问题上提供一小部分思路。 sql语句内容出现错误 空格&#xff1a;由于有些字段判断是变量&#xff0c;需要将sql拼接起来&#xff0c;但是在拼接字符串时没有…

npm mongoose包下载冲突解决之道

我在新电脑下载完项目代码后,运行 npm install --registryhttps://registry.npm.taobao.org 1运行就报错&#xff1a; npm ERR! code ERESOLVE npm ERR! ERESOLVE unable to resolve dependency tree npm ERR! npm ERR! While resolving: lowcode-form-backend1.0.0 npm …

Python | 单变量时间序列分析与预测

时间序列数据是机器学习中最具挑战性的任务之一&#xff0c;也是与数据相关的现实问题之一&#xff0c;因为数据实体不仅取决于物理因素&#xff0c;而且主要取决于它们发生的时间顺序。我们可以基于一个单变量特征和两个双变量或多变量特征来预测时间序列中的目标值&#xff0…

合集:JS异步的六个解决方案详解。

Hello&#xff0c;各位老铁&#xff0c;最近发表了js异步的解决方案&#xff0c;是分开发的&#xff0c;这次我把他汇总起来&#xff0c;方便大家收藏、查看&#xff0c;欢迎点赞评论私信交流。 01.详解&#xff1a;JS异步解决方案之回调函数&#xff0c;及其弊端 02.详解&…

全套医院手术麻醉系统源码 人工智能麻醉系统源码 医疗管理系统源码

全套医院手术麻醉系统源码 人工智能麻醉系统源码 医疗管理系统源码 手术麻醉临床信息系统有着完善的临床业务功能&#xff0c;能够涵盖整个围术期的工作&#xff0c;能够采集、汇总、存储、处理、展现所有的临床诊疗资料。通过该系统的实施&#xff0c;能够规范麻醉科的工作流…

【LV16 day2 平台总线驱动开发---名称匹配】

一、总线、设备、驱动 硬编码式的驱动开发带来的问题&#xff1a; 垃圾代码太多结构不清晰一些统一设备功能难以支持开发效率低下 1.1 初期解决思路&#xff1a;设备和驱动分离 ​ struct device来表示一个具体设备&#xff0c;主要提供具体设备相关的资源&#xff08;如寄…

[深度学习]yolov8+pyqt5搭建精美界面GUI设计源码实现二

【简单介绍】 基于目标检测算法YOLOv8和灵活的PyQt5界面开发框架&#xff0c;我们精心打造了一款集直观性、易用性和功能性于一体的目标检测GUI界面。通过深度整合YOLOv8在目标识别上的卓越能力与PyQt5的精致界面设计&#xff0c;我们成功研发出一款既高效又稳定的软件GUI。 …

java全排列(力扣Leetcode46)

全排列 力扣原题链接 问题描述 给定一个不含重复数字的数组 nums&#xff0c;返回其所有可能的全排列。你可以按任意顺序返回答案。 示例 示例 1&#xff1a; 输入&#xff1a;nums [1,2,3] 输出&#xff1a;[[1,2,3],[1,3,2],[2,1,3],[2,3,1],[3,1,2],[3,2,1]] 示例 2…

GIT新建远程分支的操作过程

创建一个新的分支&#xff1a;git branch 新分支名 &#xff08;创建并立即切换到新分支&#xff1a;git checkout -b 新分支名&#xff09; &#xff08;创建&#xff09; &#xff08;所创建的新分支的代码和当前分支代码相同&#xff0c;会显示远程Git仓库的地址&#xff09…

OpenHarmony error: signature verification failed due to not trusted app source

问题&#xff1a;error: signature verification failed due to not trusted app source 今天在做OpenHarmony App开发&#xff0c;之前一直用的设备A在测试开效果&#xff0c;今天换成了设备B&#xff0c;通过DevEco Studio安装应用程序的时候&#xff0c;就出现错误&#xf…

gpt 3d三角形 重心坐标填充 沿x轴炫赵师傅

go import pygame from pygame.locals import * import sys import math# 初始化Pygame pygame.init()# 设置窗口大小 width, height 800, 600 screen pygame.display.set_mode((width, height)) pygame.display.set_caption(3D Triangle Fill with Barycentric Coordinates)…

Windows 远程访问 Ubuntu Desktop - 虚拟网络控制台 (Virtual Network Console,VNC)

Windows 远程访问 Ubuntu Desktop - 虚拟网络控制台 [Virtual Network Console&#xff0c;VNC] References 1. Desktop Sharing 2. Desktop Sharing Preferences 勾选 允许其他人查看您的桌面 勾选 要求远程用户输入此密码 取消勾选 必须为对本机器的每次访问进行确定 3. 虚拟…

Java通过反射机制获取类对象下的属性值

目录 以类USER为例&#xff1a; 使用Java的反射机制获取Column的name为“user_name”的类属性值 以类USER为例&#xff1a; import lombok.Data; import javax.persistence.*; import java.io.Serializable;Data Table(name "user_info") public class User imple…

vue3+vite 模板vue3-element-admin框架如何关闭当前页面跳转 tabs

使用模版: 有来开源组织 / vue3-element-admin 需要关闭的.vue 页面增加以下方法 //setup 里import {LocationQuery, useRoute, useRouter} from "vue-router"; const router useRouter(); function close() {console.log(|--router.currentRoute.value, router.cur…

椋鸟数据结构笔记#3:链表

萌新的学习笔记&#xff0c;写错了恳请斧正。 目录 链表的定义 链表的分类 方向&#xff08;单向还是双向&#xff09; 头节点&#xff08;哨兵节点&#xff09;的有无 循环或不循环 8种分类 不带头单向不循环链表的实现 带头单向循环链表的实现 链表与顺序表的差异 链…

FPGA高端项目:解码索尼IMX390 MIPI相机转HDMI输出,提供FPGA开发板+2套工程源码+技术支持

目录 1、前言2、相关方案推荐本博主所有FPGA工程项目-->汇总目录我这里已有的 MIPI 编解码方案 3、本 MIPI CSI-RX IP 介绍4、个人 FPGA高端图像处理开发板简介5、详细设计方案设计原理框图IMX390 及其配置MIPI CSI RX图像 ISP 处理图像缓存HDMI输出工程源码架构 6、工程源码…

每日一题 --- 四数相加 II[力扣][Go]

454. 四数相加 II 题目&#xff1a;454. 四数相加 II 给你四个整数数组 nums1、nums2、nums3 和 nums4 &#xff0c;数组长度都是 n &#xff0c;请你计算有多少个元组 (i, j, k, l) 能满足&#xff1a; 0 < i, j, k, l < nnums1[i] nums2[j] nums3[k] nums4[l] 0…

Mistral 7B v0.2 基础模型开源,大模型微调实践来了

Mistral AI在3月24日突然发布并开源了 Mistral 7B v0.2模型&#xff0c;有如下几个特点&#xff1a; 和上一代Mistral v0.1版本相比&#xff0c;上下文窗口长度从8k提升到32k&#xff0c;上下文窗口&#xff08;context window&#xff09;是指语言模型在进行预测或生成文本时&…

暴雨服务器X7740赋能大模型训练

数字经济浪潮愈演愈烈,大模型训练对服务器的要求也越来越高。在此背景下,暴雨信息发布专门为大规模模型训练而设计的全新旗舰GPU服务器—X7740,以卓越的计算性能、高速网络通信能力以及创新的能效表现,有效赋能大模型训练。 X7740 搭载了暴雨信息最新一代的英特尔至强可扩展处理…

银河麒麟安装回退至GCC5.4.0 添加镜像源地址并设置其优先级。问题已解决

问题&#xff1a; 因为软件依赖低版本的gcc&#xff0c;而新版银河麒麟gcc默认时高版本&#xff0c;考虑到软件适配应该考虑最低版本的麒麟系统&#xff0c;需要将gcc降级。 过程中遇到问题&#xff0c;无法直接找到gcc5.4.0 这是没有软件源&#xff0c;需要添加其他软件源&…