spring-boot2.x整合Kafka步骤

1.pom依赖添加

<properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.18</version><relativePath /> <!-- lookup parent from repository --></parent><dependencies><!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><exclusion><groupId>ch.qos.logback</groupId><artifactId>logback-core</artifactId></exclusion><exclusion><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId></exclusion></exclusions></dependency><!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot</artifactId></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-context-support</artifactId></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></dependency><!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-autoconfigure --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency><!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies>

2. 配置文件添加配置:

server:port: 8080spring:application:name: application-kafkakafka:bootstrap-servers: 192.168.190.100:9092,192.168.190.101:9092 #这个是kafka的地址,对应你server.properties中配置的producer:batch-size: 16384 #批量大小acks: 1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)retries: 10 # 消息发送重试次数#transaction-id-prefix: transactionbuffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:linger:ms: 2000 #提交延迟#partitioner: #指定分区器#class: com.test.config.CustomerPartitionHandlerconsumer:group-id: testGroup,testg2 #默认的消费组IDenable-auto-commit: true #是否自动提交offsetauto-commit-interval: 2000 #提交offset延时# 当kafka中没有初始offset或offset超出范围时将自动重置offset# earliest:重置为分区中最小的offset;# latest:重置为分区中最新的offset(消费分区中新产生的数据);# none:只要有一个分区不存在已提交的offset,就抛出异常;auto-offset-reset: latestmax-poll-records: 500 #单次拉取消息的最大条数key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session:timeout:ms: 120000 # 消费会话超时时间(超过这个时间 consumer 没有发送心跳,就会触发 rebalance 操作)request:timeout:ms: 18000 # 消费请求的超时时间listener:missing-topics-fatal: false # consumer listener topics 不存在时,启动项目就会报错
#      type: batchlogging:config: classpath:log4j2.xml

3. 日志配置

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO"><!--全局参数--><Properties><Property name="pattern">%d{yyyy-MM-dd HH:mm:ss,SSS} %5p %c{1}:%L - %m%n</Property>
<!--         <Property name="logDir">/data/logs/logViewer</Property> --><Property name="logDir">logs</Property></Properties><Appenders><!-- 定义输出到控制台 --><Console name="console" target="SYSTEM_OUT" follow="true"><!--控制台只输出level及以上级别的信息-->
<!--             <ThresholdFilter level="debug" onMatch="ACCEPT" onMismatch="DENY"/> --><PatternLayout><Pattern>${pattern}</Pattern></PatternLayout></Console><!-- 同一来源的Appender可以定义多个RollingFile,定义按天存储日志 --><RollingFile name="rolling_file"fileName="${logDir}/logViewer.log"filePattern="${logDir}/logViewer_%d{yyyy-MM-dd}.log"><ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/><PatternLayout><Pattern>${pattern}</Pattern></PatternLayout><Policies><TimeBasedTriggeringPolicy interval="1"/></Policies><!-- 日志保留策略,配置只保留七天 --><DefaultRolloverStrategy><Delete basePath="${logDir}/" maxDepth="1"><IfFileName glob="logViewer_*.log" /><IfLastModified age="7d" /></Delete></DefaultRolloverStrategy></RollingFile></Appenders><Loggers><Root level="INFO"><AppenderRef ref="console"/><AppenderRef ref="rolling_file"/></Root></Loggers>
</Configuration>

4. controller入口类,其它应用通过该接口直接将数据写入kafka

@RequestMapping(value="/kafka")
@Controller
public class ProducerController {@Autowiredprivate KafkaTemplate kafkaTemplate;//    模拟发送消息@RequestMapping(value = "/send",method = RequestMethod.GET)public String sendMessage(@PathParam(value = "msg") String msg) {System.out.println("收到get请求。。。");kafkaTemplate.send("test",msg);return "成功";}

5. kafka回调方法(需要回调通知时使用该方式):

    @GetMapping("/kafka/callbackTwo/{message}")public void sendCallbackTwoMessage(@PathVariable("message") String message) {kafkaTemplate.send("test", message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable throwable) {System.out.println("发送消息失败2:"+throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> result) {System.out.println("发送消息成功2:" + result.getRecordMetadata().topic() + "-"+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());}});}

6.kafka消费者注册

@Component
public class KafkaMessageListener {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);/*** containerFactory* 消息过滤器消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。配置消息过滤只需要为 监听器工厂 配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。* @param record*/@KafkaListener(topics = {"test","test2"},groupId = "testGroup")public void listenTestStatus(ConsumerRecord<?, ?> record) {LOGGER.info("接收到消息:开始业务处理。。。。。");if (null == record || null == record.value()) {LOGGER.info("接收到空数据,跳过");}else {LOGGER.info("test-topics -->kafka监听到的值为: {}", record.value().toString());}}@KafkaListener(topics = {"test"},groupId = "testg2")public void listenTest2(ConsumerRecord<?, ?> record) {LOGGER.info("###listenTest2接收到消息:开始业务处理。。。。。");if (null == record || null == record.value()) {LOGGER.info("接收到空数据,跳过");}else {LOGGER.info("test-topics -->kafka监听到的值为: {}", record.value().toString());}}/*** id:消费者IDgroupId:消费组IDtopics:监听的topic,可监听多个topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听,手动分区* @param records*///批量消费@KafkaListener(id = "consumer2", topics = {"test"}, groupId = "testGroup",errorHandler = "consumerAwareErrorHandler")public void onBatchMessage(List<ConsumerRecord<String, Object>> records) {System.out.println(">>> 批量消费一次,recoreds.size()=" + records.size());for (ConsumerRecord<String, Object> record : records) {System.out.println(record.value());}}

7.非spring-boot环境下使用java原生API手写kafka生产消息:

 public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
//        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);for(int i = 0; i < 5; i++) {//Part2:构建消息ProducerRecord<String, String> record = new ProducerRecord<>("test", Integer.toString(i), "MyProducer" + i);//Part3:发送消息//单向发送:不关心服务端的应答。producer.send(record);System.out.println("message "+i+" sended");}//消息处理完才停止发送者。producer.close();}

8.非spring-boot环境下使用java原生API手写java手写kafka消费者:

 public static void main(String[] args) {//PART1:设置发送者相关属性Properties props = new Properties();//kafka地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, MyProducerTest.BOOTSTRAP_SERVERS);//每个消费者要指定一个groupprops.put(ConsumerConfig.GROUP_ID_CONFIG, "test");//key序列化类props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//value序列化类props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("test"));while (true) {//PART2:拉取消息// 100毫秒超时时间ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));//PART3:处理消息for (ConsumerRecord<String, String> record : records) {System.out.println("partition = "+record.partition()+"offset = " + record.offset() + ";key = " + record.key() + "; value= " + record.value());}//提交offset,消息就不会重复推送。consumer.commitSync(); //同步提交,表示必须等到offset提交完毕,再去消费下一批数据。
//            consumer.commitAsync(); //异步提交,表示发送完提交offset请求后,就开始消费下一批数据了。不用等到Broker的确认。}

9.非spring-boot环境下使用java原生API手写异步发送kafka:

 public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, MyProducerTest.BOOTSTRAP_SERVERS);// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);CountDownLatch latch = new CountDownLatch(5);for(int i = 0; i < 5; i++) {//Part2:构建消息ProducerRecord<String, String> record = new ProducerRecord<>("test", Integer.toString(i), "MyProducer" + i);//Part3:发送消息//异步发送:消息发送后不阻塞,服务端有应答后会触发回调函数producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(null != e){System.out.println("消息发送失败,"+e.getMessage());e.printStackTrace();}else{String topic = recordMetadata.topic();long offset = recordMetadata.offset();String message = recordMetadata.toString();System.out.println("message:["+ message+"] sended with topic:"+topic+";offset:"+offset);}latch.countDown();}});}//消息处理完才停止发送者。latch.await();//消息处理完才停止发送者。producer.close();}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/45909.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

自学鸿蒙HarmonyOS的ArkTS语言<十二>wrapBuilder:组件工厂类封装

// FactoryComponent.ets Builder function Radio1() {Column() {Text(单选组件&#xff1a;)Row() {Radio({ value: 1, group: radioGroup })Text(选项1)}Row() {Radio({ value: 2, group: radioGroup })Text(选项2)}}.margin(10) }Builder function Checkbox1() {Column() {T…

DP(5) | 完全背包 | Java | 卡码52, LeetCode 518, 377, 70 做题总结

完全背包 感觉越写越糊涂了&#xff0c;初始化怎么做的&#xff1f;递推公式怎么来的&#xff1f; 卡码52. 携带研究材料 https://kamacoder.com/problempage.php?pid1052 import java.util.*;public class Main {public static void main(String[] args) {Scanner sc new …

Java面试八股之Redis集群是怎么选择数据库的

在Redis集群中&#xff0c;数据被水平分割&#xff08;sharding&#xff09;到各个节点上&#xff0c;这意味着所有的键空间被分成16384个哈希槽&#xff08;hash slots&#xff09;&#xff0c;这些槽均匀地分布在集群中的各个节点上。Redis集群并不支持传统的数据库切换&…

xiuno兔兔超级SEO插件(精简版)

xiuno论坛是一个一款轻论坛产品的论坛&#xff0c;但是对于这个论坛基本上都是用插件实现&#xff0c;一个论坛怎么能离开网站seo&#xff0c;本篇分享一个超级seo插件&#xff0c;自动sitemap、主动提交、自动Ping提交。 插件下载:tt_seo.zip

实验11 数据库日志及数据库恢复

一、 实验目的 了解Mysql数据库系统中数据恢复机制和主要方法。 二、 实验环境 操作系统&#xff1a;Microsoft Windows 7旗舰版&#xff08;32&64位&#xff09;/Linux。 硬件&#xff1a;容量足以满足MySQL 5.7&#xff08;8.0&#xff09;安装及后续实验的使用。 软件…

Python | Leetcode Python题解之第232题用栈实现队列

题目&#xff1a; 题解&#xff1a; class MyQueue:def __init__(self):self.A, self.B [], []def push(self, x: int) -> None:self.A.append(x)def pop(self) -> int:peek self.peek()self.B.pop()return peekdef peek(self) -> int:if self.B: return self.B[-1…

什么叫图像的中值滤波,并附利用OpenCV和MATLB实现均值滤波的代码

图像的中值滤波&#xff08;Median Filtering&#xff09;是一种非线性数字滤波技术&#xff0c;常用于图像处理以减少噪声&#xff0c;同时保留图像边缘细节。其基本思想是用图像中某个窗口内像素的中值替代该窗口中心像素的值。具体步骤如下&#xff1a; 选择窗口&#xff1a…

C++树(二)【直径,中心】

目录&#xff1a; 树的直径&#xff1a; 树的直径的性质&#xff1a; 性质1&#xff1a;直径的端点一定是叶子节点 性质2&#xff1a;任意点的最长链端点一定是直径端点。 性质3&#xff1a;如果一棵树有多条直径,那么它们必然相交&#xff0c;且有极长连…

STM32中PC13引脚可以当做普通引脚使用吗?如何配置STM32的TAMPER?

1.STM32中PC13引脚可以当做普通引脚使用吗&#xff1f; 在STM32单片机中&#xff0c;PC13引脚可以作为普通IO使用&#xff0c;但需要进行一定的配置。PC13通常与RTC侵入检测功能&#xff08;TAMPER&#xff09;复用&#xff0c;因此需要关闭TAMPER功能才能将其作为普通IO使用。…

服务端渲染框架:Nuxt.js 与 Next.js 的区别和对比

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…

2024国家护网面试小结

24年国护马上就要开始&#xff0c;基本上大部分蓝队红队都已经准备入场了 今年护网第一年变成常态化护网&#xff0c;由十五天突然变成了两个月常态化&#xff0c;导致今年护网有很多项目整的七零八落 博主今年参加了三家厂商蓝队护网面试&#xff0c;在这边分享一下护网面试…

掌握这些技巧,让你成为画册制作高手

在数字化的时代背景下&#xff0c;电子画册以其便捷的传播方式、丰富的视觉表现形式&#xff0c;赢得了大众的喜爱。它不仅能够在个人电脑上展现&#xff0c;还能通过智能手机、平板电脑等多种移动设备随时随地被访问和浏览。这种跨平台的支持&#xff0c;使得无论你身处何地&a…

Html_Css问答集(12)

99、将上例的0%改为30%&#xff0c;会如何变化&#xff1f; none:延迟2秒间无色&#xff0c;3.8秒&#xff08;0%-30%占1.8秒&#xff09;前无色&#xff0c;之后变红到5秒绿最后蓝&#xff0c;动画结束时恢复初始&#xff08;无色&#xff09;。 forward:延迟2秒间无色&am…

leetcode刷题总结——字符串匹配

KMP&#xff08;字符串匹配算法&#xff09; 主串或目标串&#xff1a;比较长的&#xff0c;我们就是在它里面寻找子串是否存在&#xff1b; 子串或模式串&#xff1a;比较短的。 前缀&#xff1a;字符串A和B&#xff0c;A BS&#xff0c;S非空&#xff0c;则B为A的前缀。 …

婚礼成本与筹备策略:一场梦幻婚礼的理性规划

婚礼成本与筹备策略&#xff1a;一场梦幻婚礼的理性规划 摘要 婚礼&#xff0c;作为人生中的重要仪式&#xff0c;承载着新人的爱情与梦想&#xff0c;同时也伴随着不菲的经济投入。本文旨在探讨婚礼所需的大致成本、影响成本的主要因素以及婚礼筹备过程中的关键注意事项&…

【Java--数据结构】二叉树

欢迎关注个人主页&#xff1a;逸狼 创造不易&#xff0c;可以点点赞吗~ 如有错误&#xff0c;欢迎指出~ 树结构 树是一种非线性的数据结构&#xff0c;它是由n&#xff08;n>0&#xff09;个有限结点组成一个具有层次关系的集合 注意&#xff1a;树形结构中&#xff0c;子…

Transformer模型在多任务学习中的革新应用

在深度学习领域&#xff0c;多任务学习&#xff08;Multi-task Learning, MTL&#xff09;是一种训练模型以同时执行多个任务的方法。这种方法可以提高模型的泛化能力&#xff0c;因为它允许模型在不同任务之间共享知识。近年来&#xff0c;Transformer模型因其在自然语言处理&…

【linux高级IO(三)】初识epoll

&#x1f493;博主CSDN主页:杭电码农-NEO&#x1f493;   ⏩专栏分类:Linux从入门到精通⏪   &#x1f69a;代码仓库:NEO的学习日记&#x1f69a;   &#x1f339;关注我&#x1faf5;带你学更多操作系统知识   &#x1f51d;&#x1f51d; Linux高级IO 1. 前言2. 初识e…

STM32 HRTIM生成PWM时遇到无法输出PWM脉冲波形问题

在使用HRTIM生成PWM时&#xff0c;当把周期寄存器更新的设置放到while循环中时&#xff0c;无法输出PWM脉冲波形&#xff0c;即使增加计数延时也无法输出&#xff0c;最终只能放到中断函数中执行后期寄存器值更新才能够生成PWM脉冲波形。

主流大数据调度工具DolphinScheduler之数据ETL流程

今天给大家分享主流大数据调度工具DolphinScheduler&#xff0c;以及数据的ETL流程。 一&#xff1a;调度工具DS 主流大数据调度工具DolphinScheduler&#xff0c; 其定位&#xff1a;解决数据处理流程中错综复杂的依赖关系 任务支持类型&#xff1a;支持传统的shell任务&a…