KAFKA 集成 SpringBoot2 消息发送和消费消息(基础篇)

文章目录

            • 1. 技术选型
            • 2. 导入依赖
            • 3. kafka配置
            • 4. 生产者(同步)
            • 5. 生产者(异步)
            • 6. 消费者

1. 技术选型
软件/框架版本
jdk1.8.0_202
springboot2.5.4
kafka serverkafka_2.12-2.8.0
kafka client2.7.1
zookeeper3.7.0
2. 导入依赖
        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
3. kafka配置

properties版本

spring.application.name=springboot-kafka
server.port=8080
# kafka 配置
spring.kafka.bootstrap-servers=node1:9092# producer 配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 生产者每个批次最多方多少条记录
spring.kafka.producer.batch-size=16384
# 生产者一端总的可用缓冲区大小,此处设置为32M * 1024 * 1024
spring.kafka.producer.buffer-memory=33544432# consumer 配置
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id=springboot-consumer-02
# earliest - 如果找不到当前消费者的有效偏移量,则自动重置向到最开始
spring.kafka.consumer.auto-offset-reset=earliest
# 消费者的偏移量是自动提交还是手动提交,此处自动提交偏移量
spring.kafka.consumer.enable-auto-commit=true
# 消费者偏移量自动提交时间间隔
spring.kafka.consumer.auto-commit-interval=1000

yml版本

server:port: 8080
spring:application:name: springboot-kafkakafka:bootstrap-servers: 192.168.92.104:9092consumer:auto-commit-interval: 1000auto-offset-reset: earliestenable-auto-commit: truegroup-id: springboot-consumer-02key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:batch-size: 16384buffer-memory: 33544432key-serializer: org.apache.kafka.common.serialization.IntegerSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
4. 生产者(同步)
package com.gblfy.demo.controller;import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.ExecutionException;@RestController
public class KafkaSyncController {private final static Logger log = LoggerFactory.getLogger(KafkaSyncController.class);@Autowiredprivate KafkaTemplate<Integer, String> kafkaTemplate;@RequestMapping("/send/sync/{message}")public String send(@PathVariable String message) {//同步发送消息ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send("topic-springboot-01", 0, 0, message);try {SendResult<Integer, String> sendResult = future.get();RecordMetadata metadata = sendResult.getRecordMetadata();log.info("发送的主题:{} ,发送的分区:{} ,发送的偏移量:{} ", metadata.topic(), metadata.partition(), metadata.offset());// System.out.println(metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}return "success";}}
5. 生产者(异步)
package com.gblfy.demo.controller;import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaAsyncController {private final static Logger log = LoggerFactory.getLogger(KafkaAsyncController.class);@Autowiredprivate KafkaTemplate<Integer, String> kafkaTemplate;//设置回调函数,异步等待broker端的返回结束@RequestMapping("/send/async/{message}")public String sendAsync(@PathVariable String message) {//同步发送消息ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send("topic-springboot-01", 0, 1, message);future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {@Overridepublic void onFailure(Throwable e) {log.info("发送消息失败: {}", e.getMessage());}@Overridepublic void onSuccess(SendResult<Integer, String> result) {RecordMetadata metadata = result.getRecordMetadata();log.info("发送的主题:{} ,发送的分区:{} ,发送的偏移量:{} ", metadata.topic(), metadata.partition(), metadata.offset());}});return "success";}
}
6. 消费者
package com.gblfy.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class KafkaConsumer {private final static Logger log = LoggerFactory.getLogger(KafkaConsumer.class);@KafkaListener(topics = {"topic-springboot-01"})public void onMessage(ConsumerRecord<Integer, String> record) {log.info("消费者接收到消息主题:{} ,消息的分区:{} ,消息偏移量:{}  ,消息key: {}  ,消息values:{} ",record.topic(), record.partition(), record.offset(), record.key(), record.value());}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/517263.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

jquery标题左右移动动画

标题会在红框范围内来回移动 html和css代码 <div class"menu-notice" click"check_cart"><div class"menu-notice-logo"></div><div class"menu-notice-title" ref"noticeTitle">{{storeinfo[0] ?…

解密 云HBase 冷热分离技术原理

前言 HBase是当下流行的一款海量数据存储的分布式数据库。往往海量数据存储会涉及到一个成本问题&#xff0c;如何降低成本。常见的方案就是通过冷热分离来治理数据。冷数据可以用更高的压缩比算法&#xff08;ZSTD&#xff09;&#xff0c;更低副本数算法&#xff08;Erasure…

再见,工资!2020年6月程序员工资统计,平均14404元,网友:又跌了!

见了鬼&#xff01;工资竟然又跌了2020 年 6 月全国招收程序员 313739 人。2020 年 6 月全国程序员平均工资 14404 元&#xff0c;工资中位数 12500 元&#xff0c;其中 95% 的人的工资介于 5250 元到 35000 元。怪不得小陈发现最近猎头的“骚扰”电话越来越少了&#xff0c;这…

mysql创建function 报错误1418 - This function has none of DETERMINISTIC, NO SQL, or READS SQL DATA in

解决方法&#xff1a; 执行这条sql就可以了&#xff1a; set global log_bin_trust_function_creators1;运行结果&#xff1a; 函数创建成功了

一个实时精准触达系统的自我修养

问题定义 在互联网行业&#xff0c;唯一不变的就是一直在变化。作为技术同学&#xff0c;我们经常会碰到以下几种需求&#xff1a; 当用户收藏的商品降价后及时通知用户&#xff0c;促进双方交易达成&#xff1b;新用户或90天内未成交的用户浏览多个商品后引导用户主动和卖家聊…

vue-datepicker的使用

写这个文章主要是记录下用法&#xff0c;官网已经说的很详细了 npm install vue-datepicker --savehtml代码 <myDatepicker :date"startTime" :option"multiOption" :limit"limit"></myDatepicker> <myDatepicker :date"e…

数据库怎么选择?终于有人讲明白了

作者 | Alex Petrov所有数据库管理系统的主要工作都是可靠地存储数据并使其对用户可用。我们使用数据库作为数据的主要来源&#xff0c;帮助我们在应用程序的不同部分之间共享数据。我们使用数据库&#xff0c;而不是在每次创建新应用程序时寻找存储和检索信息的方法&#xff0…

医疗数据典型特征及架构发展方向研究

前言 医疗健康产业目前呈高速发展状态&#xff0c;处在互联网对医疗行业赋能的关键阶段&#xff0c;由于医疗行业数据的隐私性较强&#xff0c;通过传统方式很难获取公开的医疗健康数据进行研究&#xff0c;根据阿里云天池比赛赛题设置研究及提供的脱敏数据集着手进行分析是比…

分布式事务 GTS 的价值和原理浅析

GTS 今年双 11 的成绩 今年 2684 亿的背后&#xff0c;有一个默默支撑&#xff0c;低调到几乎被遗忘的中间件云产品——GTS&#xff08;全局事务服务&#xff0c;Global Transaction Service&#xff09;&#xff0c;稳稳地通过了自 2014 年诞生以来的第 5 次“大考”。 2019 …

vue的transition组件的使用

主要实现的是页面跳转的时候一个页面从左边出去&#xff0c;一个页面从右边进来&#xff0c;通过css3实现的&#xff0c;可以自己自行修改 html代码 <template><div id"app"><div class"router-parent"><transition :name"tra…

kafka java.net.UnknownHostException: node4 Error connecting to node node4:9092

解决&#xff1a;修改kafka的server.properties文件 vim /kafka安装路径/config/server.properties 去除下面这行配置的注释&#xff0c;并设置对应的ip地址 #advertised.listenersPLAINTEXT://your.host.name:9092 advertised.listenersPLAINTEXT://192.168.92.104:9092 重启…

看全新升级的KubeSphere 3.0 如何助力企业在容器混合云时代乘风破浪?

数据时代&#xff0c;层出不穷的创新型业务对企业IT提出了更高的要求&#xff0c;业务、技术和管理方面的挑战也逐渐显现。对此&#xff0c;越来越多的企业希望能够快速、简单地创建企业应用&#xff0c;敏捷地满足业务创新的需求&#xff0c;同时还能维持极高的企业级服务水平…

5G的7大用途,你知道几个?

阿里妹导读&#xff1a;5G时代悄悄来临&#xff0c;甚至成为街头巷尾都在讨论的话题。相信你一定有过一些疑问&#xff1a;什么是5G&#xff1f;仅仅只是网速更快吗&#xff1f;5G如何做到毫秒级的延迟&#xff1f;网络切片是什么&#xff1f;5G的标准之争是怎么回事&#xff0…

ALive:淘宝双11直播,技术同学却可以“偷懒”?

“疯狂的”淘宝直播间 今年直播又火了&#xff01; 2019年双11淘宝直播带来近 200亿 成交&#xff0c;以天猫双11交易总额2684亿计算&#xff0c;直播已经占总成交额的近 7.45%&#xff01; 今年的变化 除了以往的手淘和猫客&#xff0c;现在 UC 浏览器、新浪微博、支付宝、…

虚拟机安装centos

到官网下载centos系统&#xff1a; https://www.centos.org/download/ 有三种选择&#xff08;DVD IOS&#xff0c;Everything IOS, Minimal IOS(精简版的)&#xff09;&#xff0c;建议使用DVD IOS 安装虚拟机&#xff1a; #选择典型安装&#xff1a; #选择稍后安装&#…

KAFKA SpringBoot2 Nacos 消息异步发送和消费消息(进阶篇)

文章目录一、基础集成1. 技术选型2. 导入依赖3. kafka配置4. auto-offset-reset 简述5. 新增一个订单类6. 生产者&#xff08;异步&#xff09;7. 消费者8. kafka配置类9.单元测试9. 效果图10. 源码地址11.微服务专栏一、基础集成 1. 技术选型 软件/框架版本jdk1.8.0_202spri…

看懂别人的代码,只是成为高效程序员的第一步!

作者 | SeattleDataGuy译者 | 弯月&#xff0c;责编 | 屠敏出品 | CSDN&#xff08;ID&#xff1a;CSDNnews&#xff09;在为面试做准备的时候&#xff0c;很多软件工程师都花费了大量时间做编程题和完善简历。最终在找到一份工作后&#xff0c;无论是在创业公司、Google、亚马…

开发者解读:为什么蚂蚁要用融合计算这种新计算模式?

导读&#xff1a;如今大部分人工智能应用是基于监督学习范式开发的&#xff0c;即模型在线下进行训练&#xff0c;然后部署到服务器上进行线上预测&#xff0c;这样的开发方式在实时响应上存在较大的局限。随着计算和 AI 体系逐步成熟&#xff0c;我们希望机器学习应用能更多地…

Vue-touch的使用

#####有时候我们不止需要有返回键&#xff0c;也要有手势滑动切换页面的功能时&#xff0c;这个时候vue-touch就派上用场了 #####API地址&#xff1a; https://github.com/vuejs/vue-touch/tree/next#####安装 npm install vue-touchnext --save //main.js中引入&#xff1a;…