在Kafka上异步发送数据

对于一个项目,我试图记录用户的基本交易,例如添加和删除一个项目以及多种类型的项目,并为每笔交易向kafka发送一条消息。 日志机制的准确性不是至关重要的,在kafka服务器停机的情况下,我不希望它阻止我的业务代码。 在这种情况下,将数据发送到kafka的异步方法是一种更好的方法。

我的kafka生产者代码在其引导项目中。 为了使其异步,我只需要添加两个注释:@EnableAsync和@Async。

@EnableAsync将在您的配置类中使用(还要记住,带有@SpringBootApplication的类也是配置类),并将尝试查找TaskExecutor bean。 如果没有,它将创建一个SimpleAsyncTaskExecutor。 SimpleAsyncTaskExecutor适用于玩具项目,但对于任何大于此的项目都存在一定的风险,因为它不限制并发线程,也不会重用线程。 为了安全起见,我们还将添加一个任务执行者bean。

所以,

 @SpringBootApplication  public class KafkaUtilsApplication { public static void main(String[] args) { SpringApplication.run(KafkaUtilsApplication. class , args); }  } 

会变成

 @EnableAsync  @SpringBootApplication  public class KafkaUtilsApplication { public static void main(String[] args) { SpringApplication.run(KafkaUtilsApplication. class , args); } @Bean public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize( 2 ); executor.setMaxPoolSize( 2 ); executor.setQueueCapacity( 500 ); executor.setThreadNamePrefix( "KafkaMsgExecutor-" ); executor.initialize(); return executor; }  } 

如您所见,这里没有太多变化。 我设置的默认值应根据您的应用程序需求进行调整。

我们需要的第二件事是添加@Async。

我的旧代码是:

 @Service  public class KafkaProducerServiceImpl implements KafkaProducerService { private static final String TOPIC = "logs" ; @Autowired private KafkaTemplate<String, KafkaInfo> kafkaTemplate; @Override public void sendMessage(String id, KafkaType kafkaType, KafkaStatus kafkaStatus) { kafkaTemplate.send(TOPIC, new KafkaInfo(id, kafkaType, kafkaStatus); }  } 

如您所见,同步代码非常简单。 它只需要kafkaTemplate并将消息对象发送到“ logs”主题。 我的新代码比这更长。

 @Service  public class KafkaProducerServiceImpl implements KafkaProducerService { private static final String TOPIC = "logs" ; @Autowired private KafkaTemplate kafkaTemplate; @Async @Override public void sendMessage(String id, KafkaType kafkaType, KafkaStatus kafkaStatus) { ListenableFuture<SendResult<String, KafkaInfo>> future = kafkaTemplate.send(TOPIC, new KafkaInfo(id, kafkaType, kafkaStatus)); future.addCallback( new ListenableFutureCallback<>() { @Override public void onSuccess( final SendResult<String, KafkaInfo> message) { // left empty intentionally } @Override public void onFailure( final Throwable throwable) { // left empty intentionally } }); }  } 

在这里,onSuccess()对我而言并不真正有意义。 但是onFailure()可以记录异常,因此可以通知我我的kafka服务器是否存在问题。

我还要与您分享另一件事。 为了通过kafkatemplate发送对象,我必须为其配备序列化文件。

 public class KafkaInfoSerializer implements Serializer<kafkainfo> { @Override public void configure(Map map, boolean b) { } @Override public byte [] serialize(String arg0, KafkaInfo info) { byte [] retVal = null ; ObjectMapper objectMapper = new ObjectMapper(); try { retVal = objectMapper.writeValueAsString(info).getBytes(); } catch (Exception e) { // log the exception } return retVal; } @Override public void close() { }  } 

另外,不要忘记为其添加配置。 有几种定义kafka的序列化器的方法。 最简单的方法之一是将其添加到application.properties。

spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer = com.sezinkarli.kafkautils.serializer.KafkaInfoSerializer

现在,您有了一个启动项目,该项目可以将异步对象发送到所需的主题。

翻译自: https://www.javacodegeeks.com/2020/01/send-your-data-async-on-kafka.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/341029.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

devc++工程提示“源文件未编译”的可能问题

博主使用devc5.11&#xff0c;win7&#xff0c;创建了一个c工程以后编译一直提示“源文件未编译”&#xff0c;查了两小时博客期间反复重装&#xff0c;最后还是靠自己误打误撞解决了&#xff0c;解决步骤如下&#xff1a; 1、安装时语言选择English。 网上各种教程教新人安装…

vmware网络桥接模式无法上网的解决办法

1.vmware->编辑->虚拟网络编辑器->桥接模式->选择有线网卡 2.VMware-虚拟机设置-网络适配器-桥接模式-复制物理网络连接状态、启动时连接 3.重启虚拟机&#xff0c;完成 如果出现连接到网络但是无法上网的情况&#xff0c;还需做如下处理 1.sudo gedit /etc/res…

oracle idm_深入了解Oracle IDM审核

oracle idm在处理敏感信息的任何产品中&#xff0c; 报告都是至关重要的功能。 同样适用于身份和访问管理工具。 Oracle IDM的审核模块是其OOTB报告功能的基础。 让我们快速了解一下审核引擎以及它如何促进OIM中的报告功能。 这里介绍的用例很简单– 在OIM中更改为用户记录。 …

C语言编写贪吃蛇游戏

自己用C语言编写一个贪吃蛇游戏&#xff0c;效果如图&#xff1a; 源代码可免费下载&#xff0c;传送门如下&#xff1a; 点击下载贪吃蛇游戏和源代码

JDK 13中的JEP 355文本块

JDK 13已于2019年9月17日上线GA&#xff0c; 此处列出了重要的新功能。 新功能之一是“文本块”。 这样可以轻松地编写多行字符串&#xff0c;而不必在拆分成不同的行时进行串联。 让我们快速了解创建多行字符串的不同方法&#xff1a; String aBlock """ SE…

java8 默认方法_默认方法:Java 8的无名英雄

java8 默认方法几周前&#xff0c;我写了一个博客&#xff0c;说开发人员学习新语言是因为它们很酷。 我仍然坚持这个主张&#xff0c;因为关于Java 8的事情真的很酷。 尽管毫无疑问&#xff0c;该节目的明星是添加了Lambdas和将函数提升为一等变量&#xff0c;但我目前最喜欢的…

两个常见的并发错误

作为Baeldung的编辑&#xff0c;我很高兴与一位作者一起撰写有关Java通用并发陷阱的文章。 这是一本不错的书&#xff0c;但是假设开发人员具有一定的能力。 我已经看到了几件即时并发失败的事情。 它们很容易添加到代码中&#xff0c;并保证为您提供奇怪的结果。 开发人员仍会…

kotlin自定义View出现 java.lang.ClassNotFoundException

问题1&#xff1a;找不到所引用的自定义View Didn’t find class “dxf.example.dxf.customviewdemo.MyTextView” on path: DexPathList 原因&#xff1a;build.gradle中 应用插件 解决&#xff1a;添加 apply plugin: ‘kotlin-android’ 问题2&#xff1a; java.lang.Cl…

javafx swing_JavaFX技巧9:请勿混用Swing / JavaFX

javafx swingJavaFX团队非常努力地说服我们&#xff0c;因为可以将Swing内容嵌入JavaFX UI中&#xff0c;反之亦然&#xff0c;因此从Swing迁移到JavaFX很容易。 我必须承认&#xff0c;我从来没有尝试过&#xff0c;但是根据我从客户那里得到的反馈&#xff0c;我只能建议不要…

kotlin-unresolved reference daclaredFunctions

问题&#xff1a;如题 原因&#xff1a; 默认编译时不导入kotlin-reflect.jar包导致&#xff0c;所以在该包中的默认不能使用 解决 需额外在dependencies中添加 kotlin-reflect的编译&#xff0c; compile “org.jetbrains.kotlin:kotlin-reflect:$kotlin-version”

AWS Loft的数据库周

这是我的笔记&#xff1a; https://databaseweekoctober2019sf.splashthat.com AWS上的数据库&#xff1a;正确工作的正确工具 在许多此类谈话中&#xff0c;我并没有做过深刻的记录。 我正在关注重点。 PostgreSQL排在MySQL之后。 AWS上8种类型的数据库&#xff1a; 关系…

MockWebServer[45678] connection from null failed: java.net.SocketException

MockWebServer使用中的异常 MockWebServer: MockWebServer[45678] connection from null failed: java.net.SocketException: sendto failed: EBADF (Bad file number) java.io.IOException: unexpected end of stream on Connection Caused by: java.io.EOFException: \n n…

ExternalDocumentationLinkImpl(url=https://developer.android.com/reference/, packageListUrl=https://d

dokka 问题 java.lang.RuntimeException: Exception while loading package-list from ExternalDocumentationLinkImpl(urlhttps://developer.android.com/reference/, packageListUrlhttps://developer.android.com/reference/package-list) 原因 使用了下面这个任务定义…

IntellijIDEA插件编写-删除/插入/替换文档内容

错误 ERROR - plication.impl.ApplicationImpl - Assertion failed: Write2018-03-24 01:57:49,835 [ 138880] ERROR - plication.impl.ApplicationImpl - Assertion failed: Write access is allowed inside write-action only (see com.intellij.openapi.application.Appli…

jaxb_JAXB –新手的观点,第1部分

jaxb我知道你们中的很多人已经在想什么&#xff0c;所以让我们摆脱这个问题&#xff1a;“ JAXB&#xff1f; 如XML&#xff1f; 来吧&#xff0c;所有很棒的孩子都在使用JSON。” 关于XML与JSON的辩论以及许多促成它的论据都得到了很好的记录。 我不会花很多时间在这里重新整…

Log4j Bug –减慢您的应用程序

最近&#xff0c;我们正在对流行的SaaS应用程序进行故障排除。 该应用程序间歇性地减慢了速度。 要从问题中恢复&#xff0c;必须重新启动应用程序。 在高流量期间&#xff0c;此应用有时会变慢&#xff1b; 有时在交通繁忙时也是如此。 没有凝聚力模式。 这种应用程序变慢并重…

androidstudio --debug 出现 source code not match bytecode

问题 如标题 原因 编译api版本与运行app的模拟器api版本不一致 解决 换成API版本一致的模拟器运行app即可

android monitor突然不能显示

问题 如题 原因 不清楚。。。。 解决 方式一-有副作用&#xff08;原因不明&#xff09;-debug可能会一直 wait attach&#xff0c;不能进入调试模式 先将Android NDK support plugin禁用&#xff08;重启androidstudio&#xff09;右击android monitor,选择remove from …

jvm虚拟机 基于栈_守护基于JVM的应用程序

jvm虚拟机 基于栈部署体系结构设计是任何定制服务器端应用程序开发项目的重要组成部分。 由于其重要性&#xff0c;部署架构设计应尽早开始&#xff0c;并与其他开发活动一起进行。 部署体系结构设计的复杂性取决于许多方面&#xff0c;包括所提供服务的可伸缩性和可用性目标&a…

ListView中让TextView中的文字进行单独滚动

TextView不能在ListeView中单独的滚动原因 默认ListView将会拦截MOVE事件向下传递 参见源码 case MotionEvent.ACTION_MOVE: {switch (mTouchMode) {case TOUCH_MODE_DOWN:int pointerIndex ev.findPointerIndex(mActivePointerId);if (pointerIndex -1) {pointerIndex 0;…