前言
Kafka的基本工作原理
我们将消息的发布(publish)称作 producer(生产者),将消息的订阅(subscribe)表述为 consumer(消费者),将中间的存储阵列称作 broker(代理),这样就可以大致描绘出这样一个场面:
生产者将数据生产出来,交给 broker 进行存储,消费者需要消费数据了,就从broker中去拿出数据来,然后完成一系列对数据的处理操作。
1.引入spring-kafka的jar包
在pom.xml里面导入spring-kafka包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.4</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>SpringBootKafka</artifactId><version>0.0.1-SNAPSHOT</version><name>SpringBootKafka</name><description>SpringBootKafka</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- pom.xml --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId></dependency></dependencies><repositories><repository><id>central</id><name>aliyun maven</name><url>https://maven.aliyun.com/repository/public/</url><layout>default</layout><!-- 是否开启发布版构件下载 --><releases><enabled>true</enabled></releases><!-- 是否开启快照版构件下载 --><snapshots><enabled>false</enabled></snapshots></repository></repositories><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>
2.编写配置文件
在src/main/resources/application.yml里面编写kafka的配置,包括生成者和消费者
spring:kafka:bootstrap-servers: 192.168.110.105:9092#streams:#application-id: my-streams-appconsumer:group-id: myGroupIdauto-offset-reset: latestenable-auto-commit: truekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerretries: 5
3.编写生产者
使用org.springframework.kafka.core.KafkaTemplate来发送消息,这里采用了异步方式,获取了消息的处理结果
package com.example.springbootkafka.service;import com.example.springbootkafka.entity.User;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;@Slf4j
@Service
public class KafkaProducer {private final KafkaTemplate<String, String> kafkaTemplate;private final ObjectMapper objectMapper;@Autowiredpublic KafkaProducer(KafkaTemplate<String, String> kafkaTemplate, ObjectMapper objectMapper) {this.kafkaTemplate = kafkaTemplate;this.objectMapper = objectMapper;}public void sendMessage(String message) {log.info("KafkaProducer message:{}", message);//kafkaTemplate.send("test", message).addCallback();Future<SendResult<String, String>> future = kafkaTemplate.send("test", message);CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {try {future.get(); // 等待原始future完成} catch (Exception e) {throw new RuntimeException(e);}});// 使用whenComplete方法completableFuture.whenComplete((result, ex) -> {if (ex != null) {System.out.println("Error occurred: " + ex.getMessage());// 成功发送} else {System.out.println("Completed successfully");}});/*future.whenComplete((result, ex) -> {if (ex == null) {// 成功发送RecordMetadata metadata = result.getRecordMetadata();System.out.println("Message sent successfully with offset: " + metadata.offset());} else {// 发送失败System.err.println("Failed to send message due to: " + ex.getMessage());}});*/}public void sendUser(User user) throws JsonProcessingException {//final ProducerRecord<String, String> record = createRecord(data);//ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", message);//ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", user);String userJson = objectMapper.writeValueAsString(user);ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", userJson);/*future.addCallback(success -> System.out.println("Message sent successfully: " + userJson),failure -> System.err.println("Failed to send message: " + failure.getMessage()));*/CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {try {future.get(); // 等待原始future完成} catch (Exception e) {throw new RuntimeException(e);}});completableFuture.whenComplete((result, ex) -> {if (ex != null) {System.out.println("Error occurred: " + ex.getMessage());// 成功发送} else {System.out.println("Completed successfully");}});}
}
4.编写消费者
通过org.springframework.kafka.annotation.KafkaListener来监听消息
package com.example.springbootkafka.service;import lombok.extern.slf4j.Slf4j;
import org.apache.log4j.Logger;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Slf4j
@Service
public class KafkaConsumer {@KafkaListener(topics = "test", groupId = "myGroupId")public void consume(String message) {System.out.println("Received message: " + message);log.info("KafkaConsumer message:{}", message);}
}
5.测试消息的生成与发送
package com.example.springbootkafka.controller;import com.example.springbootkafka.entity.User;
import com.example.springbootkafka.service.KafkaProducer;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@Slf4j
@RestController
public class MessageController {private final KafkaProducer producer;@Autowiredpublic MessageController(KafkaProducer producer) {this.producer = producer;}@GetMapping("/send-message")public String sendMessage() {log.info("MessageController sendMessage start!");producer.sendMessage("hello, Kafka!");log.info("MessageController sendMessage end!");return "Message sent successfully.";}@GetMapping("/send")public String sendMessage1() {log.info("MessageController sendMessage1 start!");User user = User.builder().name("xuxin").dept("IT/DevlopMent").build();try {producer.sendUser(user);} catch (JsonProcessingException e) {throw new RuntimeException(e);}log.info("MessageController sendMessage1 end!");return "Message sendMessage1 successfully.";}
}
6.查看结果:
详细代码见https://gitee.com/dylan_2017/springboot-kafka.git