kafka-顺序消息实现
场景
在购物付款的时候,订单会有不同的订单状态,对应不同的状态事件,比如:待支付,支付成功,支付失败等等,我们会将这些消息推送给消息队列 ,后续的服务会根据订单状态进行不同的业务处理,这就要求订单状态推送就要有状态的保证
解决方案
- 生产者将相同的key的订单状态事件推送到kafka的同一分区
- kafka 消费者接收消息
- 消费者将消息提交给线程池
- 线程池根据接收到的消息,将订单状态事件使用路由策略选择其中一个线程,将具有相同路由key的事件发送到同一个线程的阻塞队列中
- 单个线程不停的从阻塞队列获取订单状态消息消费
代码实现
引入依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.2</version><relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>boot-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>boot-kafka</name>
<description>boot-kafka</description>
<properties><java.version>17</java.version>
</properties>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.39</version></dependency>
</dependencies>
使用到的DTO
@Data
public class InterOrderDto extends OrderDto implements OrderMessage{/*** 属于哪个分区*/private String partition;@Overridepublic String getUniqueNo() {return getOrderNo();}
}@Data
public class InterOrderDto extends OrderDto implements OrderMessage{/*** 属于哪个分区*/private String partition;@Overridepublic String getUniqueNo() {return getOrderNo();}
}public interface OrderMessage {/*** 线程池路由key* @return*/String getUniqueNo();}
定义topic
这里是 3个分区,2个副本
@Configuration
public class KafkaConfiguration {@Beanpublic NewTopic topic(){return new NewTopic(Constants.TOPIC_ORDER,3,(short) 2);}
}public interface Constants {String TOPIC_ORDER = "order";
}
消费者
消费者:OrderListener
@Component
@Slf4j
public class OrderListener {@Autowiredprivate OrderThreadPool<OrderWorker, InterOrderDto> orderThreadPool;@KafkaListener(topics = Constants.TOPIC_ORDER, groupId = "orderGroup", concurrency = "3")public void logListener(ConsumerRecord<String, String> record) {log.debug("> receive log event: {}-{}", record.partition(), record.value());try {OrderDto orderDto = JSON.parseObject(record.value(), OrderDto.class);InterOrderDto interOrderDto = new InterOrderDto();BeanUtils.copyProperties(orderDto, interOrderDto);interOrderDto.setPartition(record.partition() + "");orderThreadPool.dispatch(interOrderDto);} catch (Exception e) {log.error("# kafka log listener error: {}", record.value(), e);}}}
线程池: OrderThreadPool
/*** @Date: 2024/1/24 10:23* 线程池实现** @param W: worker* @param D: message*/
@Slf4j
public class OrderThreadPool<W extends SingleThreadWorker<D>, D extends OrderMessage> {private List<W> workers;private int size;public OrderThreadPool(int size, Supplier<W> provider) {this.size = size;workers = new ArrayList<>(size);for (int i = 0; i < size; i++) {workers.add(provider.get());}if (CollectionUtils.isEmpty(workers)) {throw new RuntimeException("worker size is 0");}start();}/*** route message to single thread** @param data*/public void dispatch(D data) {W w = getUniqueQueue(data.getUniqueNo());w.offer(data);}private W getUniqueQueue(String uniqueNo) {int queueNo = uniqueNo.hashCode() % size;for (W worker : workers) {if (queueNo == worker.getQueueNo()) {return worker;}}throw new RuntimeException("worker 路由失败");}/*** start worker, only start once*/private void start() {for (W worker : workers) {new Thread(worker, "OWorder-" + worker.getQueueNo()).start();}}/*** 关闭所有 workder, 等待所有任务执行完*/public void shutdown() {for (W worker : workers) {worker.shutdown();}}}
工作线程:SingleThreadWorker
, 内部使用阻塞队列使其串行化
/*** @Date: 2024/1/24 10:58* single thread with a blocking-queue*/
@Slf4j
public abstract class SingleThreadWorker<T> implements Runnable {private static AtomicInteger cnt = new AtomicInteger(0);private BlockingQueue<T> queue;private boolean started = true;/*** worker 唯一id*/@Getterprivate int queueNo;public SingleThreadWorker(int size) {this.queue = new LinkedBlockingQueue<>(size);this.queueNo = cnt.getAndIncrement();log.info("init worker {}", this.queueNo);}/*** 提交消息** @param data*/public void offer(T data) {try {queue.put(data);} catch (InterruptedException e) {log.info("{} offer error: {}", Thread.currentThread().getName(), JSON.toJSONString(data), e);}}@Overridepublic void run() {log.info("{} worker start take ", Thread.currentThread().getName());while (started) {try {T data = queue.take();doConsumer(data);} catch (InterruptedException e) {log.error("queue take error", e);}}}/*** do real consume message** @param data*/protected abstract void doConsumer(T data);/*** consume rest of message in the queue when thread-pool shutdown*/public void shutdown() {this.started = false;ArrayList<T> rest = new ArrayList<>();int i = queue.drainTo(rest);if (i > 0) {log.info("{} has rest in queue {}", Thread.currentThread().getName(), i);for (T t : rest) {doConsumer(t);}}}}
工作线程实现:OrderWorker
, 这里就单独处理订单事件
/*** @Date: 2024/1/24 13:42* 具体消费者*/
@Slf4j
public class OrderWorker extends SingleThreadWorker<InterOrderDto>{public OrderWorker(int size) {super(size);}@Overrideprotected void doConsumer(InterOrderDto data) {log.info("{} consume msg: {}", Thread.currentThread().getName(), JSON.toJSONString(data));}
}
生产者
生产者:OrderController
, 模拟发送不同的事件类型的订单
@RestController
public class OrderController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/send")public String send() throws InterruptedException {int size = 1000;for (int i = 0; i < size; i++) {OrderDto orderDto = new InterOrderDto();orderDto.setOrderNo(i + "");orderDto.setPayStatus(getStatus(0));orderDto.setTimestamp(System.currentTimeMillis());//相同的key发送到相同的分区kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(1));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(2));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));}return "success";}private String getStatus(int status){return status == 0 ? "待支付" : status == 1 ? "已支付" : "支付失败";}
}
application.properties 配置
# kafka地址
spring.kafka.bootstrap-servers=192.168.x.x:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
启动类
@Slf4j
@SpringBootApplication
public class BootKafkaApplication {public static void main(String[] args) {SpringApplication.run(BootKafkaApplication.class, args);}/*** 配置线程池* @return*/@Beanpublic OrderThreadPool<OrderWorker, InterOrderDto> orderThreadPool(){OrderThreadPool<OrderWorker, InterOrderDto> threadPool =new OrderThreadPool<>(3, () -> new OrderWorker(100));Runtime.getRuntime().addShutdownHook(new Thread(() -> {log.info("shutdown orderThreadPool");//容器关闭时让工作线程中的任务都被消费完threadPool.shutdown();}));return threadPool;}}
测试
访问: http://localhost:8080/send
, 结果:
OWorder-0 worker start take
OWorder-0 consume msg: {"orderNo":"0","partition":"2","payStatus":"待支付","timestamp":1706084482134,"uniqueNo":"0"}
OWorder-0 consume msg: {"orderNo":"0","partition":"2","payStatus":"已支付","timestamp":1706084482271,"uniqueNo":"0"}
OWorder-0 consume msg: {"orderNo":"0","partition":"2","payStatus":"支付失败","timestamp":1706084482282,"uniqueNo":"0"}
OWorder-0 consume msg: {"orderNo":"3","partition":"2","payStatus":"待支付","timestamp":1706084482326,"uniqueNo":"3"}
OWorder-0 consume msg: {"orderNo":"3","partition":"2","payStatus":"已支付","timestamp":1706084482336,"uniqueNo":"3"}
OWorder-0 consume msg: {"orderNo":"3","partition":"2","payStatus":"支付失败","timestamp":1706084482347,"uniqueNo":"3"}
OWorder-0 consume msg: {"orderNo":"6","partition":"1","payStatus":"待支付","timestamp":1706084482391,"uniqueNo":"6"}
OWorder-0 consume msg: {"orderNo":"6","partition":"1","payStatus":"已支付","timestamp":1706084482401,"uniqueNo":"6"}
OWorder-0 consume msg: {"orderNo":"6","partition":"1","payStatus":"支付失败","timestamp":1706084482412,"uniqueNo":"6"}
可以发现,在我们工作线程中,事件消费是有序的
good luck!