SpringBoot2.1.9 多Kafka消费者配置

一、配置文件

pom.xml

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

application.yml

spring:application:name: double-kafka-consumerprofiles:active: devjackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8server:port: 8008sys:kafka:one:bootstrap-servers: 192.168.1.2:5021consumer:group-id: one-groupauto-offset-reset: latestenable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 5topic: one-kafka-testtwo:bootstrap-servers: 192.168.1.3:5021consumer:group-id: two-groupauto-offset-reset: latestenable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 5topic: two-kafka-test

 

二、配置Configuration Bean

(1)第一个kafka配置


public class OneKafkaConfig {@Beanpublic KafkaListenerContainerFactory oneKafkaFactory(@Autowired @Qualifier("oneConsumerFactory") ConsumerFactory oneConsumerFactory) {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(oneConsumerFactory);factory.setConcurrency(10);factory.setBatchListener(true);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//手动提交return factory;}@Beanpublic ConsumerFactory oneConsumerFactory(@Autowired @Qualifier("oneKafkaProperties") KafkaProperties oneKafkaProperties){return new DefaultKafkaConsumerFactory(oneKafkaProperties.buildConsumerProperties());}@ConfigurationProperties(prefix = "sys.kafka.one")@Beanpublic KafkaProperties oneKafkaProperties(){return new KafkaProperties();}}

(2)第二个kafka配置(主)


public class TwoKafkaConfig {@Beanpublic KafkaListenerContainerFactory twoKafkaFactory(@Autowired @Qualifier("twoConsumerFactory") ConsumerFactory twoConsumerFactory) {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(twoConsumerFactory);factory.setConcurrency(10);factory.setBatchListener(true);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}@Primary//必须指定一个默认的消费者工厂@Beanpublic ConsumerFactory twoConsumerFactory(@Autowired @Qualifier("twoKafkaProperties") KafkaProperties twoKafkaProperties){return new DefaultKafkaConsumerFactory(twoKafkaProperties.buildConsumerProperties());}@Primary//必须指定一个默认的kafka配置@ConfigurationProperties(prefix = "sys.kafka.two")@Beanpublic KafkaProperties twoKafkaProperties(){return new KafkaProperties();}}

(3)kakfka配置导入

@Configuration
@Import({OneKafkaConfig.class, TwoKafkaConfig.class})
public class KafkaConfig {@Beanpublic KafkaConsumer kafkaConsumer(){KafkaConsumer consumer = new KafkaConsumer();return consumer;}}

(4) 消费者监听

public class KafkaConsumer {@KafkaListener(containerFactory = "oneKafkaFactory", topics = "${sys.kafka.one.topic}")public void oneKafkaHandle(List<ConsumerRecord<String,String>> consumerRecords, Acknowledgment ack){//do somthing }@KafkaListener(containerFactory = "twoKafkaFactory", topics = "${sys.kafka.two.topic}")public void twoKafkaHandle(List<ConsumerRecord<String,String>> consumerRecords, Acknowledgment ack){//do somthing}}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/322847.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如果不懂Service mesh,就不要谈微服务了

提到微服务&#xff0c;spring cloud等经典框架被使用的最为广泛&#xff0c;但是在2016年才被提起的Service Mesh&#xff0c;已经被Paypal、Lyft、Ticketmaster和Credit Karma等等一些大流量平台所使用&#xff0c;在生产应用中添加了Service mesh。今年随着Linkerd传入国内&…

欢乐纪中某B组赛【2019.1.29】

前言 Rank1Rank1Rank1耶 成绩 RankRankRank是有算别人的 RankRankRankPersonPersonPersonScoreScoreScoreAAABBBCCC1112017myself2017myself2017myself2802802801001001008080801001001003332017xjq2017xjq2017xjq2002002001001001000001001001001212122017hjq2017hjq2017hjq15…

mysq和mysqli关系

1、在php5版本之前&#xff0c;一般是用php的mysql函数去驱动mysql数据库的&#xff0c;比如mysql_query()的函数&#xff0c;属于面向过程. 2、在php5版本以后&#xff0c;增加了mysqli的函数功能&#xff0c;某种意义上讲&#xff0c;它是mysql系统函数的增强版&#xff0c;更…

SpringBoot2.1.9 分布式锁ShedLock

一、分布式锁配置 &#xff08;1&#xff09;redis锁 pom.xml <dependency><groupId>net.javacrumbs.shedlock</groupId><artifactId>shedlock-spring</artifactId><version>2.5.0</version> </dependency><dependency&…

使用AspectCore动态代理

前言 最近越来越多的同学关注到AspectCore&#xff0c;并且提出不少中肯的建议&#xff0c;其中最多的提议是希望能够看到更多的关于AspectCore使用方式的文章和Demo。那么在这篇文章里&#xff0c;我们就来聊聊AspectCore核心之一的动态代理。 动态代理 在.NET平台中&#xff…

P1131-[ZJOI2007]时态同步【树形dp】

正题 题目大意 一棵树&#xff0c;可以增长边权长度&#xff0c;要求根节点要每个叶子节点路径长度相等&#xff0c;求最少增加次数。 解题思路 肯定优先修改上面的边&#xff0c;因为这样可以影响最多的点&#xff0c;那么对于每个节点我们都要使它到每个它子树中叶子节点的…

已经安装完成mysql后wamp怎么配置

如果之前安装过mysql&#xff0c;然后想要安装wamp&#xff0c;那么怎么配置呢 先安装好wamp&#xff0c;然后在以下目录中修改my.ini 将密码改为自己的mysql密码即可 这时你发现启动wamp还是黄的 不要慌&#xff0c;因为你已经安装过了wamp&#xff0c;所以wamp自己的mys…

SpringBoot2.1.9 分布式锁ShedLock不执行坑

一、起由 Configuration EnableScheduling EnableSchedulerLock(defaultLockAtMostFor "PT30S") public class TimerTaskConfig implements SchedulingConfigurer {Beanpublic LockProvider scheduledLockConfiguration(RedisConnectionFactory redisConn) {return…

P3891-[GDOI2014]采集资源【背包,dp】

正题 题目大意 nnn个苦力&#xff0c;aia_iai​资源招募&#xff0c;每sss生产bib_ibi​资源。开始有mmm点资源&#xff0c;求最短时间内生产ttt点资源。 解题思路 先计算fif_ifi​表示花费iii点资源最多可以获得多少生产力。 然后gi,jg_{i,j}gi,j​表示前iii个单位时间资源为…

通过 Visual Studio 的“代码度量值”来改进代码质量

1 软件度量值指标 1.1 可维护性指数 表示源代码的可维护性&#xff0c;数值越高可维护性越好。该值介于0到100之间。绿色评级在20到100之间&#xff0c;表明该代码具有高度的可维护性&#xff1b;黄色评级在10到19之间&#xff0c;表示该代码适度可维护&#xff1b;红色评级在0…

php如何接收前端返回的各种类型的数据

之前学习node后端的时候&#xff0c;因为始终无法在网上找到接收json数据的函数&#xff0c;所以后来就放弃了。最近又心血来潮&#xff0c;想学习php. 这次已经有了之前学习php的基础&#xff0c;所以直接入手thinkphp5.0 这次php的学习&#xff0c;主要是为了解决之前遗留的问…

面试官让我讲讲Java中的锁,我笑了

转载自 面试官让我讲讲Java中的锁&#xff0c;我笑了 在读很多并发文章中&#xff0c;会提及各种各样锁如公平锁&#xff0c;乐观锁等等&#xff0c;这篇文章介绍各种锁的分类。介绍的内容如下&#xff1a; 公平锁/非公平锁 可重入锁 独享锁/共享锁 互斥锁/读写锁 乐观锁…

ASP.NET Core 认证与授权[5]:初识授权

经过前面几章的姗姗学步&#xff0c;我们了解了在 ASP.NET Core 中是如何认证的&#xff0c;终于来到了授权阶段。在认证阶段我们通过用户令牌获取到用户的Claims&#xff0c;而授权便是对这些的Claims的验证&#xff0c;如&#xff1a;是否拥有Admin的角色&#xff0c;姓名是否…

P1850-换教室【数学期望,dp,Floyd】

正题 题目大意 一张图&#xff0c;nnn次&#xff0c;每次在cic_ici​上课&#xff0c;可以申请换课室到did_idi​&#xff0c;成功概率kik_iki​。求最短需要走的路径的期望长度 解题思路 先FlodyFlodyFlody预处理多源最短路&#xff0c;然后考虑dpdpdp 设fi,j,0/1f_{i,j,0/…

uni-app打包h5

如果我们想打包成直接浏览的h5&#xff0c;我们需要配置manifest.json这个文件&#xff0c;在其中的h5配置中加入publicPath配置&#xff0c;配置如下&#xff1a; 代码为&#xff1a; "h5" : {"publicPath": "./"},配置好这个后&#xff0c;以后…

SpringBoot2.1.9 多MongoDB配置template

一、配置文件 pom.xml <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb</artifactId> </dependency> application.yml spring:application:name: double-mongoprofiles:active…

[52ABP实战系列] .NET CORE实战入门视频课程出来啦

“ .NET CORE实战入门视频&#xff0c;要是有讲的不好的地方&#xff0c;还请留言。” 早安&#xff01; 各位道友好&#xff0c;.NET CORE入门视频的第一章已经录制完毕了。视频会放在传课网、网易云课堂及segment fault。 本来想的是第一章合计6个小节就可以播放完毕的&#…

P2473-[SCOI2008]奖励关【数学期望,状压dp】

正题 题目大意 kkk轮&#xff0c;每轮抛出随机一个宝物&#xff0c;若一个宝物的前提集合SiS_iSi​之前都拿到过&#xff0c;那么就可以拿这个并获得pip_ipi​分。 求最大期望分数 解题思路 考虑状态压缩dpdpdp&#xff0c;状态表示宝物是否拿过。 fi,jf_{i,j}fi,j​表示第iii…

vue项目总结

axios axios.defaults.baseURL "http://localhost/public/index/video/" axios.defaults.headers.post[Content-Type] application/json Vue.prototype.$http axios this.$http.post(dunmylist,this.form).then(res>{console.log(res.data);})表单验证 1、r…

SpringBoot2.1.9 多Redis Lettuce配置

一、配置文件 pom.xml <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId> </dependency> application.yml spring:application:name: double-redis-lettuceprofiles:…