kafka-顺序消息实现

kafka-顺序消息实现

场景

在购物付款的时候,订单会有不同的订单状态,对应不同的状态事件,比如:待支付,支付成功,支付失败等等,我们会将这些消息推送给消息队列 ,后续的服务会根据订单状态进行不同的业务处理,这就要求订单状态推送就要有状态的保证

解决方案

  • 生产者将相同的key的订单状态事件推送到kafka的同一分区
  • kafka 消费者接收消息
  • 消费者将消息提交给线程池
  • 线程池根据接收到的消息,将订单状态事件使用路由策略选择其中一个线程,将具有相同路由key的事件发送到同一个线程的阻塞队列中
  • 单个线程不停的从阻塞队列获取订单状态消息消费

在这里插入图片描述

代码实现

引入依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.2</version><relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>boot-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>boot-kafka</name>
<description>boot-kafka</description>
<properties><java.version>17</java.version>
</properties>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.39</version></dependency>
</dependencies>
使用到的DTO
@Data
public class InterOrderDto extends OrderDto implements OrderMessage{/*** 属于哪个分区*/private String partition;@Overridepublic String getUniqueNo() {return getOrderNo();}
}@Data
public class InterOrderDto extends OrderDto implements OrderMessage{/*** 属于哪个分区*/private String partition;@Overridepublic String getUniqueNo() {return getOrderNo();}
}public interface OrderMessage {/*** 线程池路由key* @return*/String getUniqueNo();}
定义topic

这里是 3个分区,2个副本

@Configuration
public class KafkaConfiguration {@Beanpublic NewTopic topic(){return new NewTopic(Constants.TOPIC_ORDER,3,(short) 2);}
}public interface Constants {String TOPIC_ORDER = "order";
}
消费者

消费者:OrderListener

@Component
@Slf4j
public class OrderListener {@Autowiredprivate OrderThreadPool<OrderWorker, InterOrderDto> orderThreadPool;@KafkaListener(topics = Constants.TOPIC_ORDER, groupId = "orderGroup", concurrency = "3")public void logListener(ConsumerRecord<String, String> record) {log.debug("> receive log event: {}-{}", record.partition(), record.value());try {OrderDto orderDto = JSON.parseObject(record.value(), OrderDto.class);InterOrderDto interOrderDto = new InterOrderDto();BeanUtils.copyProperties(orderDto, interOrderDto);interOrderDto.setPartition(record.partition() + "");orderThreadPool.dispatch(interOrderDto);} catch (Exception e) {log.error("# kafka log listener error: {}", record.value(), e);}}}

线程池: OrderThreadPool

/*** @Date: 2024/1/24 10:23* 线程池实现** @param W: worker* @param D: message*/
@Slf4j
public class OrderThreadPool<W extends SingleThreadWorker<D>, D extends OrderMessage> {private List<W> workers;private int size;public OrderThreadPool(int size, Supplier<W> provider) {this.size = size;workers = new ArrayList<>(size);for (int i = 0; i < size; i++) {workers.add(provider.get());}if (CollectionUtils.isEmpty(workers)) {throw new RuntimeException("worker size is 0");}start();}/*** route message to single thread** @param data*/public void dispatch(D data) {W w = getUniqueQueue(data.getUniqueNo());w.offer(data);}private W getUniqueQueue(String uniqueNo) {int queueNo = uniqueNo.hashCode() % size;for (W worker : workers) {if (queueNo == worker.getQueueNo()) {return worker;}}throw new RuntimeException("worker 路由失败");}/*** start worker, only start once*/private void start() {for (W worker : workers) {new Thread(worker, "OWorder-" + worker.getQueueNo()).start();}}/*** 关闭所有 workder, 等待所有任务执行完*/public void shutdown() {for (W worker : workers) {worker.shutdown();}}}

工作线程:SingleThreadWorker, 内部使用阻塞队列使其串行化

/*** @Date: 2024/1/24 10:58* single thread with a blocking-queue*/
@Slf4j
public abstract class SingleThreadWorker<T> implements Runnable {private static AtomicInteger cnt = new AtomicInteger(0);private BlockingQueue<T> queue;private boolean started = true;/*** worker 唯一id*/@Getterprivate int queueNo;public SingleThreadWorker(int size) {this.queue = new LinkedBlockingQueue<>(size);this.queueNo = cnt.getAndIncrement();log.info("init worker {}", this.queueNo);}/*** 提交消息** @param data*/public void offer(T data) {try {queue.put(data);} catch (InterruptedException e) {log.info("{} offer error: {}", Thread.currentThread().getName(), JSON.toJSONString(data), e);}}@Overridepublic void run() {log.info("{} worker start take ", Thread.currentThread().getName());while (started) {try {T data = queue.take();doConsumer(data);} catch (InterruptedException e) {log.error("queue take error", e);}}}/*** do real consume message** @param data*/protected abstract void doConsumer(T data);/*** consume rest of message in the queue when thread-pool shutdown*/public void shutdown() {this.started = false;ArrayList<T> rest = new ArrayList<>();int i = queue.drainTo(rest);if (i > 0) {log.info("{} has rest in queue {}", Thread.currentThread().getName(), i);for (T t : rest) {doConsumer(t);}}}}

工作线程实现:OrderWorker, 这里就单独处理订单事件

/*** @Date: 2024/1/24 13:42* 具体消费者*/
@Slf4j
public class OrderWorker extends SingleThreadWorker<InterOrderDto>{public OrderWorker(int size) {super(size);}@Overrideprotected void doConsumer(InterOrderDto data) {log.info("{} consume msg: {}", Thread.currentThread().getName(), JSON.toJSONString(data));}
}

生产者

生产者:OrderController, 模拟发送不同的事件类型的订单

@RestController
public class OrderController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/send")public String send() throws InterruptedException {int size = 1000;for (int i = 0; i < size; i++) {OrderDto orderDto = new InterOrderDto();orderDto.setOrderNo(i + "");orderDto.setPayStatus(getStatus(0));orderDto.setTimestamp(System.currentTimeMillis());//相同的key发送到相同的分区kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(1));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(2));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));}return "success";}private String getStatus(int status){return status == 0 ? "待支付" : status == 1 ? "已支付" : "支付失败";}
}

application.properties 配置

# kafka地址
spring.kafka.bootstrap-servers=192.168.x.x:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
启动类
@Slf4j
@SpringBootApplication
public class BootKafkaApplication {public static void main(String[] args) {SpringApplication.run(BootKafkaApplication.class, args);}/*** 配置线程池* @return*/@Beanpublic OrderThreadPool<OrderWorker, InterOrderDto> orderThreadPool(){OrderThreadPool<OrderWorker, InterOrderDto> threadPool =new OrderThreadPool<>(3, () -> new OrderWorker(100));Runtime.getRuntime().addShutdownHook(new Thread(() -> {log.info("shutdown orderThreadPool");//容器关闭时让工作线程中的任务都被消费完threadPool.shutdown();}));return threadPool;}}

测试

访问: http://localhost:8080/send, 结果:

OWorder-0 worker start take 
OWorder-0 consume msg: {"orderNo":"0","partition":"2","payStatus":"待支付","timestamp":1706084482134,"uniqueNo":"0"}
OWorder-0 consume msg: {"orderNo":"0","partition":"2","payStatus":"已支付","timestamp":1706084482271,"uniqueNo":"0"}
OWorder-0 consume msg: {"orderNo":"0","partition":"2","payStatus":"支付失败","timestamp":1706084482282,"uniqueNo":"0"}
OWorder-0 consume msg: {"orderNo":"3","partition":"2","payStatus":"待支付","timestamp":1706084482326,"uniqueNo":"3"}
OWorder-0 consume msg: {"orderNo":"3","partition":"2","payStatus":"已支付","timestamp":1706084482336,"uniqueNo":"3"}
OWorder-0 consume msg: {"orderNo":"3","partition":"2","payStatus":"支付失败","timestamp":1706084482347,"uniqueNo":"3"}
OWorder-0 consume msg: {"orderNo":"6","partition":"1","payStatus":"待支付","timestamp":1706084482391,"uniqueNo":"6"}
OWorder-0 consume msg: {"orderNo":"6","partition":"1","payStatus":"已支付","timestamp":1706084482401,"uniqueNo":"6"}
OWorder-0 consume msg: {"orderNo":"6","partition":"1","payStatus":"支付失败","timestamp":1706084482412,"uniqueNo":"6"}

可以发现,在我们工作线程中,事件消费是有序的

good luck!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/652716.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CSS基础细节学习

目录 一.CSS--网页的美容师 二.语法规范及选择器的介绍 一.CSS--网页的美容师 CSS是层叠样式表( Cascading Style Sheets )的简称&#xff0c;有时我们也会称之为CSS样式表或级联样式表。 CSS是也是一种标记语言&#xff0c;CSS主要用于设置HTML页面中的文本内容(字体、大小…

log4j2 java api 入门介绍

概述 Log4j 2 API 提供了应用程序应该编码的接口&#xff0c;并提供了实现者创建日志实现所需的适配器组件。 虽然 Log4j 2 在 API 和实现之间被分解&#xff0c;但这样做的主要目的不是允许多个实现&#xff0c;尽管这当然是可能的&#xff0c;而是明确定义在“正常”应用程…

如何高效地利用淘宝API接口获取商品数据

在电商领域&#xff0c;能够快速且准确地获取商品数据是至关重要的。淘宝作为中国领先的电商平台&#xff0c;通过其开放的API接口为商家们提供了强大的数据服务功能。本文将验证如何高效地利用淘宝API接口获取商品数据&#xff0c;并提供一套行之有效的策略和步骤。 预备工作…

应急响应-内存分析

在应急响应过程中&#xff0c;除了上述几个通用的排查项&#xff0c;有时也需要对应响应服务器进行内存的提权&#xff0c;从而分析其中的隐藏进程。 内存的获取 内存的获取方法有如下几种&#xff1a; 基于用户模式程序的内存获取&#xff1b;基于内核模式程序的内存获取&a…

常用MQ产品的对比

常用MQ产品的对比 本文整理了常用MQ之间的对比&#xff0c;旨在帮助大家在实际项目中选择MQ产品。 消息队列对比参照表 注&#xff1a; 对照表来自&#xff1a;消息队列对比参照表 &#xff0c;对比维度比较全面&#xff0c;结果个人比较认同&#xff0c;强烈建议参考。 Rock…

备份数据提示Allowed memory size of 134217728 bytes exhausted的修复方法

今日给一老数据库备份&#xff0c;发现无法备份&#xff08;有近60万条数据&#xff09;&#xff0c;查看日志&#xff0c;提示报错&#xff1a;PHP Fatal error: Allowed memory size of 134217728 bytes exhausted (tried to allocate 189263328 bytes) in 解析&#xff1a;…

go语言基础之time时间处理

1.时间类型 Go 语言中使用time.Time类型表示时间。我们可以通过time.Now函数获取当前的时间对象&#xff0c;然后从时间对象中可以获取到年、月、日、时、分、秒等信息。 // timeDemo 时间对象的年月日时分秒 func timeDemo() {now : time.Now() // 获取当前时间fmt.Printf(&…

VitisHLS中读写任意深度的图像文件

一、8bits灰度图像的读写 这里可以使用opencv的库函数&#xff0c;也可以使用赛灵思提供的库函数。实际上&#xff0c;赛灵思的 vision库也是调用opencv的imread和imwrite库函数的&#xff0c;只不过封装了一下而已。 #include <iostream> #include <stdio.h> #in…

python3-cookbook-字典的运算

第一章:数据结构和算法 Python 提供了大量的内置数据结构,包括列表,集合以及字典。大多数情况下使用这些数据结构是很简单的。但是,我们也会经常碰到到诸如查询,排序和过滤等等这些普遍存在的问题。 因此,这一章的目的就是讨论这些比较常见的问题和算法。 另外,我们也会…

R语言【taxlist】——clean_strings():清理字符串

Package taxlist version 0.2.4 Description 多个前导的和后随的空格以及错误的编码可能会在处理分类学名称的信息中导致严重的问题。clean_strings() 方法可以清除这些错误。 Usage clean_strings(x, ...)## S4 method for signature character clean_strings(x, from &quo…

Vue的状态管理Vuex

文章目录 一、介绍二、install三、store1、介绍2、创建并全局引入3、单一状态树4、多模块状态树(无命名空间)5、多模块状态树(有命名空间)一、介绍 Vuex 是一个专为 Vue.js 应用程序开发的状态管理模式 + 库当我们的应用遇到多个组件共享状态(共享状态:多个组件维护1个变…

Docker安装RcoketMQ

1、Docker安装RcoketMQ-4.9.4 在同级文件夹创建目录config&#xff0c;并在里面创建文件broker.conf&#xff0c;文件内容如下&#xff1a; brokerClusterNameDefaultCluster brokerNamebroker-a brokerId0 deleteWhen04 fileReservedTime48 brokerRoleASYNC_MASTER flushDis…

linux系统ansible主机清单和命令

ansible主机清单和命令 主机清单配置主机清单文件配置主机清单方式常用变量 ansible命令ansible-doc命令ansible命令格式ansible配置公私钥ansible 命令集 主机清单 配置主机清单文件 /etc/ansible/hosts //配置主机清单文件配置主机清单方式 ip地址 ansible_ssh_user…

Python网络爬虫实战——实验5:Python爬虫之selenium动态数据采集实战

【实验内容】 本实验主要介绍和使用selenium库在js动态加载网页中数据采集的作用。 【实验目的】 1、理解动态加载网页的概念 2、学习Selenium库基本使用 3、掌握动态加载数据采集流程 【实验步骤】 步骤1理解动态加载网页 步骤2学习使用Selenium库 步骤3 采集河北政府采购…

Python初学者学习记录——python基础综合案例:数据可视化——地图可视化

一、基础地图使用 1、基础地图演示 2、基础地图演示——视觉映射器 from pyecharts.charts import Map from pyecharts.options import VisualMapOpts# 准备地图对象 map Map() # 准备数据 data [("北京市", 99),("上海市", 199),("湖南省", 2…

1 月 28日算法练习-前缀和

小郑的蓝桥平衡串 思路&#xff1a;把 L 看成 1&#xff0c;Q 看成 -1&#xff0c;利用前缀和来得到输入串的前缀子串中LQ 的和&#xff0c;利用前缀和差的性质得到子串&#xff0c;通过枚举看它是否平衡。 将L看做1&#xff0c;Q看做&#xff0d;1&#xff0c;只有当某个区间…

如何快速上手一个vue框架

安装nvm 下载nvm-setup.zip&#xff1a; https://github.com/coreybutler/nvm-windows/releases 解压安装nvm&#xff1a; 创建两个文件夹&#xff0c;一个是nvm的安装位置&#xff0c;另一个是node.js的下载位置。不需要配置环境变量和修改setting文件了 检查nvm是否安装成功…

不常见知识点汇总

目录 1.关于输入流&#xff08;cin&#xff09;1.1 cin.fail()1.2 cin.clear()1.3 cin.ignore() 随时补充&#xff01;&#xff01;&#xff01; 1.关于输入流&#xff08;cin&#xff09; 问题描述&#xff1a; int input 0; cin >> input; while (cin.fail()) {cin.…

Linux 增加 SWAP 空间

一、需求 通过阿里云启动项目时&#xff0c;使用Vuepress build编译静态页面时内存需要800MB&#xff0c;导致内存不够&#xff0c;因此考虑使用swap方式&#xff0c;置换一些内存资源存放swap磁盘。 [rootxxx myblog]# npm run docs:dev> myblog1.0.0 docs:dev > vuep…

全角色服务、全场景支撑、全业务应用的新一代智慧教室

新一代智慧教室以“数智化助力高质量人才培养”为核心目标&#xff0c;以AI赋能的智能硬件为基础构建多形态智慧教学环境&#xff0c;以中台为支撑实现数据、设备、系统、业务的互联互通、开放共享&#xff0c;以平台全面覆盖教学应用&#xff0c;采集、汇聚、挖掘、分析课前课…