通常来说,Kafka中的一条消息在同一个消费组(group)中只能被一个消费者消费,这种场景在应用端集群部署的时候非常适用。
还有一种业务场景,需要让每台机器都消费topic中的消息。例如本地缓存的场景,在应用集群部署的环境下,需要把数据库里面的内容缓存到每台机器的本地。数据库内容变更的时候,发送kafka消息,每台机器都需要更新本地的缓存。
一种思路是让每台机器的groupid不一致,例如group name加上本机IP, 这样就能实现所有的机器都能消费到同一个topic了。可以直接在@KafkaListener中的id属性上利用SpEL(Spring Expression Language)来实现这一目标。在SpEL中直接使用InetAddress来获取ip
例如我们的group前缀为 "ppp_" , topic名字为"topic_1",注解如下,即可实现
@KafkaListener(id = "ppp_#{T(java.net.InetAddress).getLocalHost().getHostAddress()}", topics = {"topic_1"})public void onMessage(String message) {System.out.println("received singe message >> " + message);}