kafka-顺序消息实现

kafka-顺序消息实现

场景

在购物付款的时候,订单会有不同的订单状态,对应不同的状态事件,比如:待支付,支付成功,支付失败等等,我们会将这些消息推送给消息队列 ,后续的服务会根据订单状态进行不同的业务处理,这就要求订单状态推送就要有状态的保证

解决方案

  • 生产者将相同的key的订单状态事件推送到kafka的同一分区
  • kafka 消费者接收消息
  • 消费者将消息提交给线程池
  • 线程池根据接收到的消息,将订单状态事件使用路由策略选择其中一个线程,将具有相同路由key的事件发送到同一个线程的阻塞队列中
  • 单个线程不停的从阻塞队列获取订单状态消息消费

在这里插入图片描述

代码实现

引入依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.2</version><relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>boot-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>boot-kafka</name>
<description>boot-kafka</description>
<properties><java.version>17</java.version>
</properties>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.39</version></dependency>
</dependencies>
使用到的DTO
@Data
public class InterOrderDto extends OrderDto implements OrderMessage{/*** 属于哪个分区*/private String partition;@Overridepublic String getUniqueNo() {return getOrderNo();}
}@Data
public class InterOrderDto extends OrderDto implements OrderMessage{/*** 属于哪个分区*/private String partition;@Overridepublic String getUniqueNo() {return getOrderNo();}
}public interface OrderMessage {/*** 线程池路由key* @return*/String getUniqueNo();}
定义topic

这里是 3个分区,2个副本

@Configuration
public class KafkaConfiguration {@Beanpublic NewTopic topic(){return new NewTopic(Constants.TOPIC_ORDER,3,(short) 2);}
}public interface Constants {String TOPIC_ORDER = "order";
}
消费者

消费者:OrderListener

@Component
@Slf4j
public class OrderListener {@Autowiredprivate OrderThreadPool<OrderWorker, InterOrderDto> orderThreadPool;@KafkaListener(topics = Constants.TOPIC_ORDER, groupId = "orderGroup", concurrency = "3")public void logListener(ConsumerRecord<String, String> record) {log.debug("> receive log event: {}-{}", record.partition(), record.value());try {OrderDto orderDto = JSON.parseObject(record.value(), OrderDto.class);InterOrderDto interOrderDto = new InterOrderDto();BeanUtils.copyProperties(orderDto, interOrderDto);interOrderDto.setPartition(record.partition() + "");orderThreadPool.dispatch(interOrderDto);} catch (Exception e) {log.error("# kafka log listener error: {}", record.value(), e);}}}

线程池: OrderThreadPool

/*** @Date: 2024/1/24 10:23* 线程池实现** @param W: worker* @param D: message*/
@Slf4j
public class OrderThreadPool<W extends SingleThreadWorker<D>, D extends OrderMessage> {private List<W> workers;private int size;public OrderThreadPool(int size, Supplier<W> provider) {this.size = size;workers = new ArrayList<>(size);for (int i = 0; i < size; i++) {workers.add(provider.get());}if (CollectionUtils.isEmpty(workers)) {throw new RuntimeException("worker size is 0");}start();}/*** route message to single thread** @param data*/public void dispatch(D data) {W w = getUniqueQueue(data.getUniqueNo());w.offer(data);}private W getUniqueQueue(String uniqueNo) {int queueNo = uniqueNo.hashCode() % size;for (W worker : workers) {if (queueNo == worker.getQueueNo()) {return worker;}}throw new RuntimeException("worker 路由失败");}/*** start worker, only start once*/private void start() {for (W worker : workers) {new Thread(worker, "OWorder-" + worker.getQueueNo()).start();}}/*** 关闭所有 workder, 等待所有任务执行完*/public void shutdown() {for (W worker : workers) {worker.shutdown();}}}

工作线程:SingleThreadWorker, 内部使用阻塞队列使其串行化

/*** @Date: 2024/1/24 10:58* single thread with a blocking-queue*/
@Slf4j
public abstract class SingleThreadWorker<T> implements Runnable {private static AtomicInteger cnt = new AtomicInteger(0);private BlockingQueue<T> queue;private boolean started = true;/*** worker 唯一id*/@Getterprivate int queueNo;public SingleThreadWorker(int size) {this.queue = new LinkedBlockingQueue<>(size);this.queueNo = cnt.getAndIncrement();log.info("init worker {}", this.queueNo);}/*** 提交消息** @param data*/public void offer(T data) {try {queue.put(data);} catch (InterruptedException e) {log.info("{} offer error: {}", Thread.currentThread().getName(), JSON.toJSONString(data), e);}}@Overridepublic void run() {log.info("{} worker start take ", Thread.currentThread().getName());while (started) {try {T data = queue.take();doConsumer(data);} catch (InterruptedException e) {log.error("queue take error", e);}}}/*** do real consume message** @param data*/protected abstract void doConsumer(T data);/*** consume rest of message in the queue when thread-pool shutdown*/public void shutdown() {this.started = false;ArrayList<T> rest = new ArrayList<>();int i = queue.drainTo(rest);if (i > 0) {log.info("{} has rest in queue {}", Thread.currentThread().getName(), i);for (T t : rest) {doConsumer(t);}}}}

工作线程实现:OrderWorker, 这里就单独处理订单事件

/*** @Date: 2024/1/24 13:42* 具体消费者*/
@Slf4j
public class OrderWorker extends SingleThreadWorker<InterOrderDto>{public OrderWorker(int size) {super(size);}@Overrideprotected void doConsumer(InterOrderDto data) {log.info("{} consume msg: {}", Thread.currentThread().getName(), JSON.toJSONString(data));}
}

生产者

生产者:OrderController, 模拟发送不同的事件类型的订单

@RestController
public class OrderController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/send")public String send() throws InterruptedException {int size = 1000;for (int i = 0; i < size; i++) {OrderDto orderDto = new InterOrderDto();orderDto.setOrderNo(i + "");orderDto.setPayStatus(getStatus(0));orderDto.setTimestamp(System.currentTimeMillis());//相同的key发送到相同的分区kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(1));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(2));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));}return "success";}private String getStatus(int status){return status == 0 ? "待支付" : status == 1 ? "已支付" : "支付失败";}
}

application.properties 配置

# kafka地址
spring.kafka.bootstrap-servers=192.168.x.x:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
启动类
@Slf4j
@SpringBootApplication
public class BootKafkaApplication {public static void main(String[] args) {SpringApplication.run(BootKafkaApplication.class, args);}/*** 配置线程池* @return*/@Beanpublic OrderThreadPool<OrderWorker, InterOrderDto> orderThreadPool(){OrderThreadPool<OrderWorker, InterOrderDto> threadPool =new OrderThreadPool<>(3, () -> new OrderWorker(100));Runtime.getRuntime().addShutdownHook(new Thread(() -> {log.info("shutdown orderThreadPool");//容器关闭时让工作线程中的任务都被消费完threadPool.shutdown();}));return threadPool;}}

测试

访问: http://localhost:8080/send, 结果:

OWorder-0 worker start take 
OWorder-0 consume msg: {"orderNo":"0","partition":"2","payStatus":"待支付","timestamp":1706084482134,"uniqueNo":"0"}
OWorder-0 consume msg: {"orderNo":"0","partition":"2","payStatus":"已支付","timestamp":1706084482271,"uniqueNo":"0"}
OWorder-0 consume msg: {"orderNo":"0","partition":"2","payStatus":"支付失败","timestamp":1706084482282,"uniqueNo":"0"}
OWorder-0 consume msg: {"orderNo":"3","partition":"2","payStatus":"待支付","timestamp":1706084482326,"uniqueNo":"3"}
OWorder-0 consume msg: {"orderNo":"3","partition":"2","payStatus":"已支付","timestamp":1706084482336,"uniqueNo":"3"}
OWorder-0 consume msg: {"orderNo":"3","partition":"2","payStatus":"支付失败","timestamp":1706084482347,"uniqueNo":"3"}
OWorder-0 consume msg: {"orderNo":"6","partition":"1","payStatus":"待支付","timestamp":1706084482391,"uniqueNo":"6"}
OWorder-0 consume msg: {"orderNo":"6","partition":"1","payStatus":"已支付","timestamp":1706084482401,"uniqueNo":"6"}
OWorder-0 consume msg: {"orderNo":"6","partition":"1","payStatus":"支付失败","timestamp":1706084482412,"uniqueNo":"6"}

可以发现,在我们工作线程中,事件消费是有序的

good luck!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/652716.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CSS基础细节学习

目录 一.CSS--网页的美容师 二.语法规范及选择器的介绍 一.CSS--网页的美容师 CSS是层叠样式表( Cascading Style Sheets )的简称&#xff0c;有时我们也会称之为CSS样式表或级联样式表。 CSS是也是一种标记语言&#xff0c;CSS主要用于设置HTML页面中的文本内容(字体、大小…

log4j2 java api 入门介绍

概述 Log4j 2 API 提供了应用程序应该编码的接口&#xff0c;并提供了实现者创建日志实现所需的适配器组件。 虽然 Log4j 2 在 API 和实现之间被分解&#xff0c;但这样做的主要目的不是允许多个实现&#xff0c;尽管这当然是可能的&#xff0c;而是明确定义在“正常”应用程…

应急响应-内存分析

在应急响应过程中&#xff0c;除了上述几个通用的排查项&#xff0c;有时也需要对应响应服务器进行内存的提权&#xff0c;从而分析其中的隐藏进程。 内存的获取 内存的获取方法有如下几种&#xff1a; 基于用户模式程序的内存获取&#xff1b;基于内核模式程序的内存获取&a…

常用MQ产品的对比

常用MQ产品的对比 本文整理了常用MQ之间的对比&#xff0c;旨在帮助大家在实际项目中选择MQ产品。 消息队列对比参照表 注&#xff1a; 对照表来自&#xff1a;消息队列对比参照表 &#xff0c;对比维度比较全面&#xff0c;结果个人比较认同&#xff0c;强烈建议参考。 Rock…

Docker安装RcoketMQ

1、Docker安装RcoketMQ-4.9.4 在同级文件夹创建目录config&#xff0c;并在里面创建文件broker.conf&#xff0c;文件内容如下&#xff1a; brokerClusterNameDefaultCluster brokerNamebroker-a brokerId0 deleteWhen04 fileReservedTime48 brokerRoleASYNC_MASTER flushDis…

Python网络爬虫实战——实验5:Python爬虫之selenium动态数据采集实战

【实验内容】 本实验主要介绍和使用selenium库在js动态加载网页中数据采集的作用。 【实验目的】 1、理解动态加载网页的概念 2、学习Selenium库基本使用 3、掌握动态加载数据采集流程 【实验步骤】 步骤1理解动态加载网页 步骤2学习使用Selenium库 步骤3 采集河北政府采购…

Python初学者学习记录——python基础综合案例:数据可视化——地图可视化

一、基础地图使用 1、基础地图演示 2、基础地图演示——视觉映射器 from pyecharts.charts import Map from pyecharts.options import VisualMapOpts# 准备地图对象 map Map() # 准备数据 data [("北京市", 99),("上海市", 199),("湖南省", 2…

1 月 28日算法练习-前缀和

小郑的蓝桥平衡串 思路&#xff1a;把 L 看成 1&#xff0c;Q 看成 -1&#xff0c;利用前缀和来得到输入串的前缀子串中LQ 的和&#xff0c;利用前缀和差的性质得到子串&#xff0c;通过枚举看它是否平衡。 将L看做1&#xff0c;Q看做&#xff0d;1&#xff0c;只有当某个区间…

如何快速上手一个vue框架

安装nvm 下载nvm-setup.zip&#xff1a; https://github.com/coreybutler/nvm-windows/releases 解压安装nvm&#xff1a; 创建两个文件夹&#xff0c;一个是nvm的安装位置&#xff0c;另一个是node.js的下载位置。不需要配置环境变量和修改setting文件了 检查nvm是否安装成功…

不常见知识点汇总

目录 1.关于输入流&#xff08;cin&#xff09;1.1 cin.fail()1.2 cin.clear()1.3 cin.ignore() 随时补充&#xff01;&#xff01;&#xff01; 1.关于输入流&#xff08;cin&#xff09; 问题描述&#xff1a; int input 0; cin >> input; while (cin.fail()) {cin.…

Linux 增加 SWAP 空间

一、需求 通过阿里云启动项目时&#xff0c;使用Vuepress build编译静态页面时内存需要800MB&#xff0c;导致内存不够&#xff0c;因此考虑使用swap方式&#xff0c;置换一些内存资源存放swap磁盘。 [rootxxx myblog]# npm run docs:dev> myblog1.0.0 docs:dev > vuep…

全角色服务、全场景支撑、全业务应用的新一代智慧教室

新一代智慧教室以“数智化助力高质量人才培养”为核心目标&#xff0c;以AI赋能的智能硬件为基础构建多形态智慧教学环境&#xff0c;以中台为支撑实现数据、设备、系统、业务的互联互通、开放共享&#xff0c;以平台全面覆盖教学应用&#xff0c;采集、汇聚、挖掘、分析课前课…

【解决】IntelliJ IDEA 重命名 Shift + F6 失效

IntelliJ IDEA 重命名 Shift F6 失效 问题解决 问题 Idea 重命名 Shift F6 &#xff0c;一直没反应 解决 调查发现原因是微软新版的输入法冲突了。需要设置【使用以前版本的微软拼音输入法】解决兼容性。 设置 -> 时间和语言 -> 区域 -> 语言选项 -> 键盘选项…

公众号迁移公证书模板在哪里下载?

公众号迁移有什么作用&#xff1f;只能变更主体吗&#xff1f;公众号迁移的作用可不止变更主体这一个哦&#xff0c;它还可以把个人公众号变成企业公众号&#xff0c;或者把服务号变成订阅号&#xff0c;甚至还能开通留言功能。不过要注意&#xff0c;现在订阅号已经不能变成服…

软考计算题注意事项总结

1、沟通渠道中&#xff0c;N涉及多少人&#xff1f;是有所增加还是增至多少人&#xff1f; 2、在EMV中&#xff0c;关注的是成本还是收益&#xff1f;若是考虑成本和时间&#xff0c;则选择较小的方案&#xff1b;若是关注收益&#xff0c;则选择较大的方案。 3、在PERT中&am…

什么是数据库设计?基本步骤有哪些?

数据库设计结构图 实线代表输入&#xff0c;虚线代表输出&#xff0c;每个节点的输出作为下一个节点的输入。 基本步骤 1.需求分析阶段 数据需求分析是在项目确定之后&#xff0c;用户和设计人员对数据库应用系统所要涉及的内容&#xff08;数据&#xff09;和功能&#xff0…

JSP在线阅读系统myeclipse定制开发SQLServer数据库网页模式java编程jdbc

一、源码特点 JSP 小说在线阅读系统是一套完善的web设计系统&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库 &#xff0c;系统主要采用B/S模式开发。开发环境为 TOMCAT7.0,Myeclipse8.5开发&#xff0c;数据库为SQLServer2008&#…

安装 java 的 JDK

前几天重装系统以后想起来要重装 java 的JDK 安装地址 Windows 下的 JDK 数据包有三种可选的下载&#xff1a;Windows x64 Compressed Archive、Windows x64 Installer 和 Windows x64 MSI Installer。其中&#xff0c;前者为 JDK 的免安装版本&#xff0c;后两者均为 JDK 的离…

数仓治理-计算资源治理

注&#xff1a;文章参考: 数据治理实践 | 网易某业务线的计算资源治理从计算资源治理实践出发&#xff0c;带大家清楚认识计算资源治理到底该如何进行&#xff0c;并如何应用到其他项目中https://mp.weixin.qq.com/s/w6d5zhDaaavNhW_DMEkPsQ 目录 一、计算资源治理的背景 二…

༺༽༾ཊ—Unity之-04-工厂方法模式—ཏ༿༼༻

首先创建一个项目&#xff0c; 在这个初始界面我们需要做一些准备工作&#xff0c; 建基础通用文件夹&#xff0c; 创建一个Plane 重置后 缩放100倍 加一个颜色&#xff0c; 任务&#xff1a;使用工厂方法模式 创建 飞船模型&#xff0c; 首先资源商店下载飞船模型&#xff0c…