背景
作为一个合格的码农,当然要学会CV大法
了,可是CV
也是有风险的,别以为前任写的已经上线那么久了没有问题…
我们需要将埋点信息上报到一个三方平台(S2S)接口,三方平台对时间有要求,同一个用户上报同一个埋点事件的时间只能按照顺序上报,如果后面上报的事件的时间早于之前已经上报的埋点事件时间,那么会被拒绝。
系统:
- 定时任务系统(负责查询数据、发送kafka消息)
- Kafka消费系统(负责消费kafka消息,对消息进行业务处理)
我在查询数据时就已经对时间进行排序了,保证查询出来的列表是根据时间进行排序的,然后进行for循环一条一条的发送kafka消息。然后再kafka消费系统进行消费消息进行上报给平台,通过日志看到有很多报错日志提示:上报事件的时间早于之前上报的事件时间,奇了怪了。我甚至怀疑到kafka消费是不是顺序消费,但是通过分析之后,发现:是发送kafka消息出现了问题…
发送消息 – 错误示例
要是,不了解springboot集成的kafka 默认的方法 send(String topic, @Nullable V data)
发送消息底层逻辑,他也是和我一样的懵逼的。
这个问题就出现在发送kafka的身上…
发送消息-- 正确示例
总结:
为什么kafka发送消息无法保证顺序问题?
kafka使用send
方法,不调用get()
的话,默认是进行异步批量发送消息的(这是kafka高吞吐量
的一个手段之一),这样是无法保证你先调用send
方法发送的消息先发送的情况,需要进行get方法进行获取到响应(也就是同步
的意思),达到发送的消息有序性的效果。当然:同步消耗的性能和时间肯定是没有异步更快的。
为什么kafka消费是可以保证消费消息的有序性?
这是个不准确的问题
因为只有在同一个topic下的同一个partition(分区)下的消息才能保证有序性,原因:每个主题(Topic)可以被分割成多个partition 分区,而消息在发布时会被分区器(Partitioner)根据某个属性key进行路由追加到某个partition分区上。而每个partition分区会保证消息的顺序来进行存储,因此保证了分区内的消息顺序性。
然后kafka消费会根据partition(分区)下存储的消息进行按照顺序从前往后进行消费,消费后进行提交offset(偏移量),用于确认消费到哪条消息。
实现消息的顺序消费办法:
1、在一个topic中,只创建一个partition,这样这个topic下的消息都会按照顺序保存在同一个
partition中,这就保证了消息的顺序消费。
2、发送消息的时候指定partition,如果一个topic下有多个partition分区,我们可以把需要保证顺序
的消息都发送到同一个partition中,也能做到顺序消费,所以上述代码中,我需要对同一个用户的事件进行生成一个key,保证发送到同一个分区上,确保负载均衡和消息的顺序性。