1.引入依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
2.配置参数
server:port: 8080
spring:kafka:bootstrap-servers: 101.34.251.168:9092producer: # ⽣产者retries: 3 # 设置⼤于0的值,则客户端会将发送失败的记录重新发送batch-size: 16384buffer-memory: 33554432acks: 1
# 指定消息key和消息体的编解码⽅式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: default-groupenable-auto-commit: falseauto-offset-reset: earliestkey-deserializer:org.apache.kafka.common.serialization.StringDeserializervalue-deserializer:org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 500listener:# 当每⼀条记录被消费者监听器(ListenerConsumer)处理之后提交# RECORD# 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交# BATCH# 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间⼤于TIME时提交# TIME# 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量⼤于等于COUNT时提交# COUNT# TIME | COUNT 有⼀个条件满⾜时提交# COUNT_TIME# 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后, ⼿动调⽤Acknowledgment.acknowledge()后提交# MANUAL# ⼿动调⽤Acknowledgment.acknowledge()后⽴即提交,⼀般使⽤这种# MANUAL_IMMEDIATEack-mode: MANUAL_IMMEDIATE
# redis:
# host: 172.16.253.21
3.生产者搭建
package com.wen.kafka.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("msg")
public class ProducerController {@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;@RequestMapping("/send")public String sendMessage(){kafkaTemplate.send("test", "key", "msg2");return "Send Success";}}
4.消费者搭建
package com.wen.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class Consumer {@KafkaListener(topics = "test", groupId = "GroupOne")public void listenGroup(ConsumerRecord<String, String> record){System.out.println(record);}
}