项目搭建
同样的,需要我们搭建一个maven
工程,整合非常的简单,需要用到:
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
来一起看下完整的pom.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>springboot-kafka-all</artifactId><version>1.0-SNAPSHOT</version><properties><java.version>1.8</java.version></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.3.RELEASE</version></parent><dependencies><!--web--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--test--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><!-- kafka--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!--Hutool依赖--><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.4</version></dependency><!--fast-json--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.58</version></dependency><dependency><groupId> org.slf4j </groupId><artifactId> slf4j-api </artifactId><version> 1.6.4 </version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.25</version><scope>compile</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>2.1.3.RELEASE</version></plugin></plugins></build></project>
配置也很简单 application.yml
server:port: 8081spring:kafka:producer:bootstrap-servers: 127.0.0.1:9092
然后新建一个启动类,看下控制台是否成功链接了Kafka
,在启动之前别忘了开启Kafka集群
基本使用
先从一个简单的例子,来快速体验一下Kafka
,新建HelloController
@Slf4j
@RestController
public class HelloController {private static final String topic = "test";@Autowiredprivate KafkaTemplate<Object, Object> kafkaTemplate;// 接收消息@KafkaListener(id = "helloGroup", topics = topic)public void listen(String msg) {log.info("hello receive value: {}" , msg);// hello receive value: hello kafka}@GetMapping("/hello")public String hello() {// 发送消息kafkaTemplate.send(topic, "hello kafka");return "hello";}
}
我们通过KafkaTemplate
进行消息的发送, 通过@KafkaListener
进行消息的消费,我们可以指定消费者ID
以及监听的topic
,请求localhost:8081/hello
观察控制台的变化。请求后,发现消息发送和接收的非常快,我们也可以观察UI
后台的消息详情,同步对比
topic创建
之前我们的topic
是在UI
后台创建的,那么在SpringBoot
中如何创建呢? 下面我们试着发送一个不存在的topic
// 当topic不存在时 会默认创建一个topic// num.partitions = 1 #默认Topic分区数// num.replica.fetchers = 1 #默认副本数@GetMapping("/hello1")public String hello1() {// 发送消息kafkaTemplate.send("hello1", "hello1");return "hello1";}// 接收消息@KafkaListener(id = "hello1Group", topics = "hello1")public void listen1(String msg) {log.info("hello1 receive value: {}" , msg);// hello1 receive value: hello1}
请求之后,观察控制台以及管理后台,发现并没有报错,并且给我们自动创建了一个topic
,在自动创建下,默认的参数是:
num.partitions = 1 #默认Topic分区数num.replica.fetchers = 1 #默认副本数
如果我想手动创建呢?我们可以通过NewTopic
来手动创建:
@Configuration
public class KafkaConfig {@Beanpublic KafkaAdmin admin(KafkaProperties properties){KafkaAdmin admin = new KafkaAdmin(properties.buildAdminProperties());// 默认False,在Broker不可用时,如果你觉得Broker不可用影响正常业务需要显示的将这个值设置为Trueadmin.setFatalIfBrokerNotAvailable(true);// setAutoCreate(false) : 默认值为True,也就是Kafka实例化后会自动创建已经实例化的NewTopic对象// initialize():当setAutoCreate为false时,需要我们程序显示的调用admin的initialize()方法来初始化NewTopic对象return admin;}/*** 创建指定参数的 topic* @return*/@Beanpublic NewTopic topic() {return new NewTopic("hello2", 0, (short) 0);}
}
如果要更新呢?也非常的简单
/*** 更新 topic* @return*/@Beanpublic NewTopic topicUpdate() {return new NewTopic("hello2", 1, (short) 1);}
注意这里的参数只能+
不能-
这种方式太简单了,如果我想在代码逻辑中来创建呢?我们可以通过AdminClient
来手动创建
/*** AdminClient 创建*/@Autowiredprivate KafkaProperties properties;@GetMapping("/create/{topicName}")public String createTopic(@PathVariable String topicName) {AdminClient client = AdminClient.create(properties.buildAdminProperties());if(client !=null){try {Collection<NewTopic> newTopics = new ArrayList<>(1);newTopics.add(new NewTopic(topicName,1,(short) 1));client.createTopics(newTopics);}catch (Throwable e){e.printStackTrace();}finally {client.close();}}return topicName;}
观察下管理后台,发现topic
都创建成功了
获取消息发送的结果
有时候我们发送消息不知道是不是发成功了,需要有一个结果通知。有两种方式,一种是同步
一种是异步
同步获取结果
/*** 获取通知结果* @return*/@GetMapping("/hello2")public String hello2() {// 同步获取结果ListenableFuture<SendResult<Object,Object>> future = kafkaTemplate.send("hello2","hello2");try {SendResult<Object,Object> result = future.get();log.info("success >>> {}", result.getRecordMetadata().topic()); // success >>> hello2}catch (Throwable e){e.printStackTrace();}return "hello2";}
异步获取
/*** 获取通知结果* @return*/@GetMapping("/hello2")public String hello2() {// 发送消息 - 异步获取通知结果kafkaTemplate.send("hello2", "async hello2").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {@Overridepublic void onFailure(Throwable throwable) {log.error("fail >>>>{}", throwable.getMessage());}@Overridepublic void onSuccess(SendResult<Object, Object> objectObjectSendResult) {log.info("async success >>> {}", objectObjectSendResult.getRecordMetadata().topic()); // async success >>> hello2}});return "hello2";}
Kafka事务
同样的,消息也会存在事务
,如果第一条消息发送成功,再发第二条消息的时候出现异常,那么就会抛出异常并回滚第一条消息,下面通过一个简单的例子体会一下
@GetMapping("/hello3")
public String hello3() {kafkaTemplate.executeInTransaction(t -> {t.send("hello3","msg1");if(true)throw new RuntimeException("failed");t.send("hello3","msg2");return true;});return "hello3";
}// 接收消息
@KafkaListener(id = "hello3Group", topics = "hello3")
public void listen3(String msg) {log.info("hello3 receive value: {}" , msg);
}
默认情况下,Spring-kafka
自动生成的KafkaTemplate
实例,是不具有事务消息发送能力的。我们需要添加transaction-id-prefix
来激活它
spring:kafka:producer:bootstrap-servers: 127.0.0.1:9092transaction-id-prefix: kafka_.
启动之后,观察控制台的变化~ ,除此之外,还可以使用注解的方式@Transactional
来开启事务
// 注解方式@Transactional(rollbackFor = RuntimeException.class)@GetMapping("/hello4")public String hello4() {kafkaTemplate.send("hello3","msg1");if(true)throw new RuntimeException("failed");kafkaTemplate.send("hello3","msg2");return "hello4";}