文章目录
- 1. 技术选型
- 2. 导入依赖
- 3. kafka配置
- 4. 生产者(同步)
- 5. 生产者(异步)
- 6. 消费者
1. 技术选型
软件/框架 | 版本 |
---|---|
jdk | 1.8.0_202 |
springboot | 2.5.4 |
kafka server | kafka_2.12-2.8.0 |
kafka client | 2.7.1 |
zookeeper | 3.7.0 |
2. 导入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
3. kafka配置
properties版本
spring.application.name=springboot-kafka
server.port=8080
# kafka 配置
spring.kafka.bootstrap-servers=node1:9092# producer 配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 生产者每个批次最多方多少条记录
spring.kafka.producer.batch-size=16384
# 生产者一端总的可用缓冲区大小,此处设置为32M * 1024 * 1024
spring.kafka.producer.buffer-memory=33544432# consumer 配置
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id=springboot-consumer-02
# earliest - 如果找不到当前消费者的有效偏移量,则自动重置向到最开始
spring.kafka.consumer.auto-offset-reset=earliest
# 消费者的偏移量是自动提交还是手动提交,此处自动提交偏移量
spring.kafka.consumer.enable-auto-commit=true
# 消费者偏移量自动提交时间间隔
spring.kafka.consumer.auto-commit-interval=1000
yml版本
server:port: 8080
spring:application:name: springboot-kafkakafka:bootstrap-servers: 192.168.92.104:9092consumer:auto-commit-interval: 1000auto-offset-reset: earliestenable-auto-commit: truegroup-id: springboot-consumer-02key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:batch-size: 16384buffer-memory: 33544432key-serializer: org.apache.kafka.common.serialization.IntegerSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
4. 生产者(同步)
package com.gblfy.demo.controller;import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.ExecutionException;@RestController
public class KafkaSyncController {private final static Logger log = LoggerFactory.getLogger(KafkaSyncController.class);@Autowiredprivate KafkaTemplate<Integer, String> kafkaTemplate;@RequestMapping("/send/sync/{message}")public String send(@PathVariable String message) {//同步发送消息ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send("topic-springboot-01", 0, 0, message);try {SendResult<Integer, String> sendResult = future.get();RecordMetadata metadata = sendResult.getRecordMetadata();log.info("发送的主题:{} ,发送的分区:{} ,发送的偏移量:{} ", metadata.topic(), metadata.partition(), metadata.offset());// System.out.println(metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}return "success";}}
5. 生产者(异步)
package com.gblfy.demo.controller;import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaAsyncController {private final static Logger log = LoggerFactory.getLogger(KafkaAsyncController.class);@Autowiredprivate KafkaTemplate<Integer, String> kafkaTemplate;//设置回调函数,异步等待broker端的返回结束@RequestMapping("/send/async/{message}")public String sendAsync(@PathVariable String message) {//同步发送消息ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send("topic-springboot-01", 0, 1, message);future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {@Overridepublic void onFailure(Throwable e) {log.info("发送消息失败: {}", e.getMessage());}@Overridepublic void onSuccess(SendResult<Integer, String> result) {RecordMetadata metadata = result.getRecordMetadata();log.info("发送的主题:{} ,发送的分区:{} ,发送的偏移量:{} ", metadata.topic(), metadata.partition(), metadata.offset());}});return "success";}
}
6. 消费者
package com.gblfy.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class KafkaConsumer {private final static Logger log = LoggerFactory.getLogger(KafkaConsumer.class);@KafkaListener(topics = {"topic-springboot-01"})public void onMessage(ConsumerRecord<Integer, String> record) {log.info("消费者接收到消息主题:{} ,消息的分区:{} ,消息偏移量:{} ,消息key: {} ,消息values:{} ",record.topic(), record.partition(), record.offset(), record.key(), record.value());}
}