Kafka 之事务消息

前言:

在分布式消息系统中,事务消息也是一个热门课题,在项目的实际业务场景中,如果用到事务消息的场景也不少见,那 Kafka 作为一个高性能的分布式消息中间件,同样也支持事务消息,本篇我们将对 Kafka 的事务消息展开讨论。

Kafka 系列文章传送门

Kafka 简介及核心概念讲解

Spring Boot 整合 Kafka 详解

Kafka @KafkaListener 注解的详解及使用

Kafka 客户端工具使用分享【offsetexplorer】

Kafka 之消息同步/异步发送

Kafka 之批量消息发送消费

Kafka 之消息广播消费

Kafka 之消息并发消费

Kafka 之顺序消息

事务消息的使用场景

事务消息的使用场景众多,这里我简单列举几个如下:

  • 金融交易处理:在金融领域,每笔交易都必须具备原子性,确保不发生不一致或重复的交易,事务性消息可用于金融交易的场景,来保证交易的完整性。
  • 订单处理:用户订单处理必须是完整的,需要保订单的创建、支付和发货不会出现问题,事务性消息可以用于订单场景,来保证整个流程的完整性。
  • 。。。。等等等场景。

什么是 Kafka 的事务消息?

Kafka 事务性消息是一种机制,用于确保消息的可靠性传递和处理,与非事务性消息相比,它们在数据处理中提供了额外的保证,一旦消息被写入Kafka 集群,它们将被认为是已经处理,无论发生了什么,Kafka 的事务不同于 RocketMQ,RocketMQ 是保障本地事务与 RocketMQ 消息发送的事务一致性,而 Kafka 的事务消息主要是保障一次发送多条消息的事务一致性,发送的消息要么同时成功要么同时失败。

Kafka 事务消息的特性?

  • 原子性:Kafka 事务性消息要么完全成功,要么完全失败,保障了消息不会被部分处理。
  • 可靠性:Kafka 事务性消息一旦写入Kafka,它们将被视为已经处理,即使发生了应用程序或系统故障。
  • 幂等性:Kafka 生产者可以配置为幂等,确保相同的消息不会被重复发送,对于 Kafka 事务消息通常会配置为幂等。
  • Exactly Once 语义:仅一次,Kafka 事务性消息支持仅一次,即消息要么完全到达一次,要么不到达。

Kafka的消息传输保障
Kafka 的消息传输保障分为 3个层级:
at most once:至多一次,消息可能丢失,但绝不会重复。
at least once:最少一次,消息绝不丢失,但可能重复传输。
exactly once:恰好一次,每条消息肯定会被传输一次且仅传输一次。

Kafka 事务消息的使用

Kafka 事务消息的配置

Kafka 的事务消息需要对生产者和消费者进行一些配置来启用 Kafka 对事务消息的支持。

生产者配置

  • acks:这个参数我们前面有见到过,是有关生产者接收到确认之后才认为消息发送成功的设置,对于事务性消息,通常将其设置为acks=all(表示需要等到消息写入到所有 ISR 同步副本中,设置为 -1 也是一样的效果),以确保消息在事务完全提交后才被视为成功发送。
  • transactional.id:事务id,这是用于标识生产者实例的唯一ID,在配置文件中设置 transactional.id 是启用事务性消息的核心步骤。

消费者配置

  • isolation.level:Kafka 消费者隔离级别的设置,对于事务性消息,通常将其设置为isolation.level=read_committed,确保只读取已经提交的事务消息。
  • auto.offset.reset:这是消费者启动时从哪里开始读取消息的设置,通常将其设置为auto.offset.reset=earliest(如果分区下有提交的 offset 从提交的 offset 开始消费,如果没有提交的 offset,从头开始消费),以确保不会错过任何已提交的消息。

具体配置如下:

#消息应答
spring.kafka.producer.acks = -1
#事务id配置
spring.kafka.producer.transaction-id-prefix=my-transaction-
#读取已提交的消息
spring.kafka.consumer.isolation-level=read_committed

Kafka 事务消息案例演示

Producer 代码案例

对于生产者我们两种类型可以选择,使用 KafkaTemplate 进行事务消息发送和使用原生 API 进行事务消息发送。

使用 KafkaTemplate 完成事务消息发送代码如下:

package com.order.service.kafka.producer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;/*** @ClassName: TransactionalProducer* @Author: Author* @Date: 2024/10/22 19:22* @Description: 事务消息生产者*/
@Slf4j
@Component
public class TransactionalProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;//发送事务消息(编程式)public void sendTransactionalMessage(int number) {try {kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<String, String, Object>() {@Overridepublic Object doInOperations(KafkaOperations<String, String> kafkaOperations) {kafkaOperations.send("transactional-topic", "我是第一条事务消息");if (number == 0) {//模拟异常int a = 1 / 0;}kafkaOperations.send("transactional-topic", "我是第二条事务消息");return true;}});} catch (Exception e) {e.printStackTrace();}}//发送事务消息(注解方式)@Transactional(rollbackFor = RuntimeException.class)public void sendTransactionalMessageByAnnotation(int number) {kafkaTemplate.send("transactional-topic", "我是第一条事务消息");if (number == 0) {//模拟异常int a = 1 / 0;}kafkaTemplate.send("transactional-topic", "我是第二条事务消息");}}

上述消息生产者代码中我们使用了两种方式实现,分别是编程式事务和注解式事务消息,大家根据自己的喜好来选择,两种方式都是可以的。

Consumer 代码案例

事务消息的 Consumer 代码没有什么特殊之处,我们使用 Manual ACK 即可。

package com.order.service.kafka.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;/*** @ClassName: TransactionalConsumer* @Author: Author* @Date: 2024/10/22 19:22* @Description: 事务消息消费者*/
@Slf4j
@Component
public class TransactionalConsumer {@KafkaListener(id = "transactional-consumer",groupId = "transactional-consumer-groupId",topics = "transactional-topic",containerFactory = "myContainerFactory")public void listen(String message, Acknowledgment acknowledgment) {log.info("事务消息消费成功,消息内容:{}", message);//手动提交 ACKacknowledgment.acknowledge();}
}

事务消息结果验证

我们触发注解式事务消息发送,numer 传值 1,得到如下结果:

2024-11-05 19:48:32.649  INFO 20652 --- [-consumer-2-C-1] c.o.s.k.consumer.TransactionalConsumer   : 事务消息消费成功,消息内容:我是注解式事务第一条事务消息
2024-11-05 19:48:32.655  INFO 20652 --- [-consumer-2-C-1] c.o.s.k.consumer.TransactionalConsumer   : 事务消息消费成功,消息内容:我是注解式事务第二条事务消息

结果符合预期

2024-11-05 19:52:27.444  INFO 20652 --- [nio-8086-exec-8] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-my-transaction-0, transactionalId=my-transaction-0] Aborting incomplete transaction
2024-11-05 19:52:27.451 ERROR 20652 --- [y-transaction-0] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='我是注解式事务第一条事务消息' to topic transactional-topic:org.apache.kafka.common.KafkaException: Failing batch since transaction was abortedat org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423) [kafka-clients-2.6.0.jar:na]at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) [kafka-clients-2.6.0.jar:na]at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) [kafka-clients-2.6.0.jar:na]at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_121]2024-11-05 19:52:27.456 ERROR 20652 --- [nio-8086-exec-8] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.ArithmeticException: / by zero] with root causejava.lang.ArithmeticException: / by zeroat com.order.service.kafka.producer.TransactionalProducer.sendTransactionalMessageByAnnotation(TransactionalProducer.java:49) ~[classes/:na]at com.order.service.kafka.producer.TransactionalProducer$$FastClassBySpringCGLIB$$dbc0d44d.invoke(<generated>) ~[classes/:na]at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.3.6.jar:5.3.6]at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779) ~[spring-aop-5.3.6.jar:5.3.6]at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.6.jar:5.3.6]at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750) ~[spring-aop-5.3.6.jar:5.3.6]at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123) ~[spring-tx-5.3.6.jar:5.3.6]at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388) ~[spring-tx-5.3.6.jar:5.3.6]at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) ~[spring-tx-5.3.6.jar:5.3.6]at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.6.jar:5.3.6]at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750) ~[spring-aop-5.3.6.jar:5.3.6]at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692) ~[spring-aop-5.3.6.jar:5.3.6]at com.order.service.kafka.producer.TransactionalProducer$$EnhancerBySpringCGLIB$$e4350198.sendTransactionalMessageByAnnotation(<generated>) ~[classes/:na]at com.order.service.controller.KafkaController.sendTransactionalMessageByAnnotation(KafkaController.java:96) ~[classes/:na]at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_121]at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_121]at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_121]at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_121]at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:141) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:894) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1060) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:962) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) ~[spring-webmvc-5.3.6.jar:5.3.6]at javax.servlet.http.HttpServlet.service(HttpServlet.java:626) ~[tomcat-embed-core-9.0.45.jar:4.0.FR]at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.3.6.jar:5.3.6]at javax.servlet.http.HttpServlet.service(HttpServlet.java:733) ~[tomcat-embed-core-9.0.45.jar:4.0.FR]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:93) ~[spring-boot-actuator-2.4.5.jar:2.4.5]at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1707) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.45.jar:9.0.45]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_121]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_121]at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.45.jar:9.0.45]at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]

从日志结果我们看到消费者没有消费到任何一条消息(这里我们是在第一条消息发送结束后模拟的异常),结果符合预期。

编程式事务消息这里就不在验证了,有兴趣的朋友可以自己去测试一下。

Producer 使用原生 API 发送事务消息

使用原生 API 发送事务消息,需要有一个 KafkaProducer 对象,我这里使用注入 Bean 的方式,具体如下:

@Bean
public KafkaProducer<String, String> kafkaProducer() {return new KafkaProducer<>(getMyKafkaProps());
}

有了 KafkaProducer 只有我们调用其 API 即可完成事务消息发送,KafkaProducer 有几个重要的 API 如下:

//初始化事务
public void initTransactions();//开启事务
public void beginTransaction() throws ProducerFencedException ;//在事务内提交已经消费的偏移量
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException ;//提交事务
public void commitTransaction() throws ProducerFencedException;//丢弃事务
public void abortTransaction() throws ProducerFencedException ;

使用原生 API 进行事务消息发送代码如下:

package com.order.service.kafka.producer;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @ClassName: NativeTransactionalProducer* @Author: Author* @Date: 2024/10/22 19:22* @Description: 事务消息生产者 原生API*/
@Slf4j
@Component
public class NativeTransactionalProducer {@Autowiredprivate KafkaProducer<String, String> kafkaProducer;//发送事务消息(原生API)public void sendTransactionalMessageByNativeApi(int number) {try {//初始化一个事务kafkaProducer.initTransactions();//开启事务kafkaProducer.beginTransaction();//发送消息kafkaProducer.send(new ProducerRecord<>("transactional-topic", "我是原生API发送出的第一条事务消息"));if (number == 0) {//模拟异常int a = 1 / 0;}kafkaProducer.send(new ProducerRecord<>("transactional-topic", "我是原生API发送出的第二条事务消息"));//提交事务消息kafkaProducer.commitTransaction();} catch (ProducerFencedException e) {//关闭资源kafkaProducer.close();} finally {//关闭资源kafkaProducer.close();}}}

原生 API 发送事务消息结果验证

我们触发原生 API 发送事务消息发送,numer 传值 1,得到如下结果:

2024-11-05 19:48:32.649  INFO 20652 --- [-consumer-2-C-1] c.o.s.k.consumer.TransactionalConsumer   : 事务消息消费成功,消息内容:我是注解式事务第一条事务消息
2024-11-05 19:48:32.655  INFO 20652 --- [-consumer-2-C-1] c.o.s.k.consumer.TransactionalConsumer   : 事务消息消费成功,消息内容:我是注解式事务第二条事务消息

结果符合预期,异常的情况我这里就不演示了,有兴趣的朋友可以自己去验证。


事务消息注意事项

只要开启了事务消息功能,不管是 KafkaProducer 还是 KafkaTemplate 发送的所有消息,都必须处于 Kafka 事务之中,否则会抛出如下异常:

java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a recordat org.springframework.util.Assert.state(Assert.java:76)at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:640)at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:552)at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:363)at com.order.service.kafka.producer.ManualKafkaProducer.sendManualMessage(ManualKafkaProducer.java:34)at com.order.service.controller.KafkaController.sendManualMessage(KafkaController.java:87)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197)at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:141)at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106)at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:894)at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808)at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1060)at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:962)at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898)at javax.servlet.http.HttpServlet.service(HttpServlet.java:626)at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)at javax.servlet.http.HttpServlet.service(HttpServlet.java:733)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:93)at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542)at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143)at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357)at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374)at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893)at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1707)at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)at java.lang.Thread.run(Thread.java:745)

如果业务中,存在需要事务的情况,也存在不需要事务的情况,那么则需要分别定义两个 KafkaTemplate(Kafka Producer),按需使用。

总结:本篇分享了 Kafka 的事务消息的使用,可以感受到 Kafka 的事务消息和 RocketMQ 是完全不一样的,希望可以帮助到需要用到的伙伴们。

如有不正确的地方欢迎各位指出纠正。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/60133.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

创建者模式之【建造者模式】

建造者模式 概述 将一个复杂对象的构建与表示分离&#xff0c;使得同样的构建过程可以创建不同的表示。 分离了部件的构造(由Builder来负责)和装配(由Director负责)。 从而可以构造出复杂的对象。这个模式适用于&#xff1a;某个对象的构建过程复杂的情况。由于实现了构建和…

pyspark入门基础详细讲解

1.前言介绍 学习目标&#xff1a;了解什么是Speak、PySpark&#xff0c;了解为什么学习PySpark&#xff0c;了解课程是如何和大数据开发方向进行衔接 使用pyspark库所写出来的代码&#xff0c;既可以在电脑上简单运行&#xff0c;进行数据分析处理&#xff0c;又可以把代码无缝…

5. 类加载子系统

一、前言 前面我们了解了字节码文件的大致组成部分&#xff0c;那么 JVM 是如何加载 .class字节码文件的&#xff1f;加载到.class字节码文件后又做了哪些事情呢&#xff1f; 二、类加载子系统初步认识 首先类加载子系统作为虚拟机和外界的一个对接口&#xff0c;主要负责以…

AI 写作(六):核心技术与多元应用(6/10)

一、AI 写作的核心技术概述 AI 写作在当今数字化时代正发挥着越来越重要的作用。它不仅极大地提高了写作效率&#xff0c;还为不同领域带来了创新的可能性。 AI 写作的核心技术主要包括基于模板的文本生成和基于深度学习的文本生成。基于模板的文本生成通常依赖预先设定的模板…

Java-如何实现实现两个异步带有@Async注解的方法按顺序执行

要实现两个带有 @Async 注解的方法按顺序执行,可以使用 CompletableFuture 来管理异步任务 的依赖关系。下面是一个完整的示例,展示了如何确保 method1 执行完成后,再执行 method2。 import org.springframework.scheduling.annotation.Async; import org.springframework…

显卡驱动版本过低怎么办?显卡驱动升级的方法

显卡驱动程序是计算机与显卡之间沟通的桥梁&#xff0c;它负责将操作系统发出的指令翻译成显卡可以理解的语言&#xff0c;从而确保图形显示的流畅与高效。当您遇到显卡驱动版本过低的问题时&#xff0c;升级驱动程序不仅能够提升电脑的图形处理能力&#xff0c;还能解决兼容性…

Qt 环境实现视频和音频播放

在这个示例中&#xff0c;我们将使用 FFmpeg 进行视频和音频的解码&#xff0c;并使用 Qt 的界面进行显示和控制。为了实现音频和视频的解码以及同步显示&#xff0c;我们需要使用 FFmpeg 的解码库进行视频和音频解码&#xff0c;使用 Qt 的 QLabel 显示解码后的视频帧&#xf…

java导出word文件(手绘)

文章目录 代码细节效果图参考资料 代码细节 使用的hutool的WordUtil&#xff0c;WordUtil对poi进行封装&#xff0c;但是这一块的官方封装的很少&#xff0c;很多细节都没有。代码中是常见的绘制段落&#xff0c;标题、表格等常用api Word07Writer writer WordUtil.getWriter(…

UML的另一个主角——用例图

顺序图和类图已经出过单集&#xff0c;本贴要分享的是用例图。 类图https://blog.csdn.net/jsl123x/article/details/143526286?spm1001.2014.3001.5501顺序图https://jslhyh32.blog.csdn.net/article/details/134350587 目录 一.系统 二.参与者 1.主要参与者 2.次要参与…

《TCP/IP网络编程》学习笔记 | Chapter 4:基于TCP的服务器端/客户端(1)

《TCP/IP网络编程》学习笔记 | Chapter 4&#xff1a;基于TCP的服务器端/客户端&#xff08;1&#xff09; 《TCP/IP网络编程》学习笔记 | Chapter 4&#xff1a;基于TCP的服务器端/客户端&#xff08;1&#xff09;理解TCP和UDPTCP/IP协议栈TCP/IP协议的诞生背景链路层网络层T…

【基于PSINS工具箱】以速度为观测量的SINS/GNSS组合导航,UKF滤波

基于【PSINS工具箱】&#xff0c;提供一个MATLAB例程&#xff0c;仅以速度为观测量的SINS/GNSS组合导航&#xff08;滤波方式为UKF&#xff09; 文章目录 工具箱程序简述运行结果 代码程序讲解MATLAB 代码教程&#xff1a;使用UKF进行速度观测1. 引言与基本设置2. 初始设置3. U…

【Vue】Vue3.0(十七)Vue 3.0中Pinia的深度使用指南(基于setup语法糖)

上篇文章&#xff1a; 【Vue】Vue3.0&#xff08;十一&#xff09;Vue 3.0 中 computed 计算属性概念、使用及示例 &#x1f3e1;作者主页&#xff1a;点击&#xff01; &#x1f916;Vue专栏&#xff1a;点击&#xff01; ⏰️创作时间&#xff1a;2024年11月10日15点23分 文章…

通过C++跨平台的预编译宏来区分不同的操作系统:Win32/Win64/Unix/Linux/MacOS

因为 C 具有跨平台的特性&#xff0c;所以有些需求一套代码就多端使用&#xff0c;比如我最近在学习的 OpenGL ES。 但是&#xff0c;不同平台还是具有一定差异性&#xff0c;所以我们首先得判断出是什么平台&#xff1f; 比如 iOS 系统和 Android 系统。 那么如何判断呢&…

工程认证导向的Spring Boot计算机课程管理平台

1系统概述 1.1 研究背景 随着计算机技术的发展以及计算机网络的逐渐普及&#xff0c;互联网成为人们查找信息的重要场所&#xff0c;二十一世纪是信息的时代&#xff0c;所以信息的管理显得特别重要。因此&#xff0c;使用计算机来管理基于工程教育认证的计算机课程管理平台的相…

跨境云专线:构建高速、安全的全球业务网络

在企业出海加速的背景下&#xff0c;越来越多的企业需要在全球范围内部署业务&#xff0c;特别是在多个国家和地区之间进行数据传输。然而&#xff0c;跨境网络连接常常面临带宽不足、延迟高、数据安全性差等问题&#xff0c;这给企业的业务运营带来了巨大挑战。为了解决这些问…

分布式——BASE理论

简单来说&#xff1a; BASE&#xff08;Basically Available、Soft state、Eventual consistency&#xff09;是基于CAP理论逐步演化而来的&#xff0c;核心思想是即便不能达到强一致性&#xff08;Strong consistency&#xff09;&#xff0c;也可以根据应用特点采用适当的方…

【后端速成Vue】模拟实现翻译功能

前言&#xff1a; 本期将会介绍 Vue 中的 watch 侦听器&#xff0c;它语法是怎么样的呢&#xff1f;具有怎样的功能呢&#xff1f;最后用模拟实现百度翻译来更进一步练习 watch 侦听器 篮球哥找工作专属IT岗位内部推荐&#xff1a; 专属内推链接&#xff1a;内推通道 1、认识翻…

UE5.4 PCG 获取地形Layer

使用AttributeFilter&#xff1a;属性过滤器 节点 设置地形Layer名称和权重 效果&#xff1a;

使用wordpress搭建简易的信息查询系统

背景 当前有这样的一个需求&#xff0c;要实现让客户能够自助登录系统查询一些个人的信息&#xff0c;市面上没有特别符合我的需求的产品&#xff0c;经过一段时间的研究&#xff0c;想出了一个用wordpress实现简易信息查询系统&#xff0c;有两种方式。 方式一&#xff1a;使…

EasyUI弹出框行编辑,通过下拉框实现内容联动

EasyUI弹出框行编辑&#xff0c;通过下拉框实现内容联动 需求 实现用户支付方式配置&#xff0c;当弹出框加载出来的时候&#xff0c;显示用户现有的支付方式&#xff0c;datagrid的第一列为conbobox,下来选择之后实现后面的数据直接填充&#xff1b; 点击新增&#xff1a;新…