KAFKA SpringBoot2 Nacos 消息异步发送和消费消息(进阶篇)

文章目录

          • 一、基础集成
            • 1. 技术选型
            • 2. 导入依赖
            • 3. kafka配置
            • 4. auto-offset-reset 简述
            • 5. 新增一个订单类
            • 6. 生产者(异步)
            • 7. 消费者
            • 8. kafka配置类
            • 9.单元测试
            • 9. 效果图
            • 10. 源码地址
            • 11.微服务专栏

一、基础集成
1. 技术选型
软件/框架版本
jdk1.8.0_202
springboot2.5.4
kafka serverkafka_2.12-2.8.0
kafka client2.7.1
zookeeper3.7.0
2. 导入依赖
        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
3. kafka配置

properties版本

spring.application.name=springboot-kafka
server.port=8080
# kafka 配置
spring.kafka.bootstrap-servers=node1:9092# producer 配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 生产者每个批次最多方多少条记录
spring.kafka.producer.batch-size=16384
# 生产者一端总的可用缓冲区大小,此处设置为32M * 1024 * 1024
spring.kafka.producer.buffer-memory=33544432# consumer 配置
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id=springboot-consumer-02
# earliest - 如果找不到当前消费者的有效偏移量,则自动重置向到最开始
spring.kafka.consumer.auto-offset-reset=earliest
# 消费者的偏移量是自动提交还是手动提交,此处自动提交偏移量
spring.kafka.consumer.enable-auto-commit=true
# 消费者偏移量自动提交时间间隔
spring.kafka.consumer.auto-commit-interval=1000

yml版本项目内部配置

server:port: 8002
spring:application:# 应用名称name: ly-kafkaprofiles:# 环境配置active: devcloud:nacos:discovery:# 服务注册地址server-addr: nacos.server.com:8848config:# 配置中心地址server-addr: nacos.server.com:8848# 配置文件格式file-extension: yml# 共享配置shared-configs:- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}

nacos-config 服务端配置

在这里插入代码片
4. auto-offset-reset 简述

关于
auto.offset.reset 配置有3个值可以设置,分别如下:

earliest:当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset时,从头开始消费;
latest:当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,消费新产生的该分区下的数据;
none: topic 各分区都存在已提交的 offset 时,从 offset 后开始消费;只要有一个分区不存在已提交的 offset,则抛出异常;
默认建议用 earliest, 设置该参数后 kafka出错后重启,找到未消费的offset可以继续消费。

而 latest 这个设置容易丢失消息,假如 kafka 出现问题,还有数据往topic中写,这个时候重启kafka,这个设置会从最新的offset开始消费, 中间出问题的哪些就不管了。

none 这个设置没有用过,兼容性太差,经常出问题。

5. 新增一个订单类

模拟业务系统中,用户每下一笔订单,就发送一个消息,供其他服务消费

package com.gblfy.kafka.entity;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.time.LocalDateTime;@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Order {/*** 订单id*/private long orderId;/*** 订单号*/private String orderNum;/*** 订单创建时间*/private LocalDateTime createTime;
}
6. 生产者(异步)
package com.gblfy.lykafka.provider;import com.alibaba.fastjson.JSONObject;
import com.gblfy.common.constant.KafkaTopicConstants;
import com.gblfy.common.entity.Order;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;import java.time.LocalDateTime;/*** Kafka生产者** @author gblfy* @date 2021-09-28*/
@Service
public class KafkaProvider {private final static Logger log = LoggerFactory.getLogger(KafkaProvider.class);@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(long orderId, String orderNum, LocalDateTime createTime) {// 构建一个订单类Order order = Order.builder().orderId(orderId).orderNum(orderNum).createTime(createTime).build();// 发送消息,订单类的 json 作为消息体ListenableFuture<SendResult<String, String>> future =kafkaTemplate.send(KafkaTopicConstants.KAFKA_MSG_TOPIC, JSONObject.toJSONString(order));// 监听回调future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable e) {log.info("发送消息失败: {}", e.getMessage());}@Overridepublic void onSuccess(SendResult<String, String> result) {RecordMetadata metadata = result.getRecordMetadata();log.info("发送的主题:{} ,发送的分区:{} ,发送的偏移量:{}  ",metadata.topic(), metadata.partition(), metadata.offset());}});}
}
7. 消费者
package com.gblfy.lykafka.controller;import com.gblfy.lykafka.provider.KafkaProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;@RestController
@RequestMapping("/kafka")
public class KafkaProviderController {@Autowiredprivate KafkaProvider kafkaProvider;@GetMapping("/sendMQ")public String sendMQContent() {kafkaProvider.sendMessage(0001, "10", LocalDateTime.now());return "OK";}
}

通过 @KafkaListener注解,我们可以指定需要监听的 topic 以及 groupId, 注意,这里的 topics 是个数组,意味着我们可以指定多个 topic,如:@KafkaListener(topics = {“topic-springboot-01”, “topic-springboot-02”}, groupId = “group_id”)。

注意:消息发布者的 TOPIC 需要保持与消费者监听的 TOPIC 一致,否者消费不到消息。

8. kafka配置类
package com.gblfy.common.constant;public class KafkaTopicConstants {//kafka发送消息主题public static final String KAFKA_MSG_TOPIC = "topic-springboot-01";// kafka消费者组需要和yml文件中的 kafka.consumer.group-id的值保持一致public static final String KAFKA_MSG_TOPIC_GROUP = "springboot-consumer-02";
}
9.单元测试

新建单元测试,功能测试消息发布,以及消费。

package com.gblfy.kafka;import com.gblfy.kafka.controller.KafkaProvider;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.time.LocalDateTime;
import java.util.UUID;
import java.util.concurrent.TimeUnit;@SpringBootTest
class KafkaSpringbootApplicationTests {@Autowiredprivate KafkaProvider kafkaProvider;@Testpublic void sendMessage() throws InterruptedException {// 发送 1000 个消息for (int i = 0; i < 1000; i++) {long orderId = i+1;String orderNum = UUID.randomUUID().toString();kafkaProvider.sendMessage(orderId, orderNum, LocalDateTime.now());}TimeUnit.MINUTES.sleep(1);}
}
9. 效果图

在这里插入图片描述
在这里插入图片描述

10. 源码地址

https://gitee.com/gb_90/kafka-parent

11.微服务专栏

https://gitee.com/gb_90/micro-service-parent

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/517246.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

看懂别人的代码,只是成为高效程序员的第一步!

作者 | SeattleDataGuy译者 | 弯月&#xff0c;责编 | 屠敏出品 | CSDN&#xff08;ID&#xff1a;CSDNnews&#xff09;在为面试做准备的时候&#xff0c;很多软件工程师都花费了大量时间做编程题和完善简历。最终在找到一份工作后&#xff0c;无论是在创业公司、Google、亚马…

响应速度不给力?解锁正确缓存姿势

阿里妹导读&#xff1a;响应时间长&#xff0c;遇到性能瓶颈时&#xff0c;开发者第一个想到的总是性能优化。《什么技能产品经理不会提&#xff0c;但技术人必须懂&#xff1f;》讲到了什么时候需要使用缓存。但缓存的用法是什么&#xff1f;一旦缓存使用不当&#xff0c;或稍…

Spring Boot2 集成 jasypt 3.0.4 配置文件敏感信息加密

文章目录1. 导入依赖2. yml中添加配置文件3. 加解密工具类4. 敏感信息替换5. 编译打包6. 启动项目1. 导入依赖 <!--敏感信息加密--><dependency><groupId>com.github.ulisesbocchio</groupId><artifactId>jasypt-spring-boot-starter</artifa…

技术直播:1小时突击Java工程师面试核心(限免报名)

后疫情时代&#xff0c;连程序员这个多金的职业也遭受到了一定程度的打击。从各大招聘网站和多次面试经历中&#xff0c;相信大家已经意识到&#xff0c;面试官对程序员技能体系和项目经验考核似乎更严苛了。你在面试中常常为什么苦恼呢&#xff1f;简历撰写&#xff1f;数据算…

重塑云上的 Java 语言

音乐无国界&#xff0c;但是音乐人有国界。 云原生亦如此。虽没有限定的编程语言&#xff0c;但应用所使用的编程语言已经决定了应用部署运行的行为。 Java 诞生于20年前&#xff0c;拥有大量优秀的企业级框架&#xff0c;践行 OOP 理念&#xff0c;更多体现的是严谨以及在长…

5分钟带你看懂 GCanvas渲染引擎的演进

本文内容大纲&#xff1a; 1、轻量级图形渲染引擎与应用 2、渲染引擎演进与优化之路 3、渲染引擎未来的发展方向 GCanvas 的定位是遵循 w3c 标准的跨平台的轻量级图形渲染引擎。有清晰的定位和目标&#xff0c;并且紧贴现有的业务&#xff0c;为业务提供丰富表现形式。 GCa…

免费技术直播:唐宇迪带你一节课了解机器学习经典算法

常常有小伙伴在后台反馈&#xff1a;机器学习经典算法有哪些&#xff1f;自学难度大又没有效果&#xff0c;该怎么办&#xff1f;CSDN为了解决这个难题&#xff0c;联合唐宇迪老师为大家带来了一场精彩的直播【一节课掌握机器学习经典算法-线性回归模型】。本次直播将帮大家了解…

Centos7 安装Go环境

文章目录1. 下载2. 解压 和目录创建3. 配置环境变量4. 刷新环境变量5. 验证1. 下载 https://golang.google.cn/dl/ wget https://golang.google.cn/dl/go1.17.1.linux-amd64.tar.gz2. 解压 和目录创建 tar -zxvf go1.17.1.linux-amd64.tar.gz -C /usr/local/ mkdir gocode3…

深度学习在商户挂牌语义理解的实践

​导读&#xff1a;高德地图拥有几千万的POI兴趣点&#xff0c;例如大厦、底商、学校等数据&#xff0c;而且每天不断有新的POI出现。为了维持POI数据的鲜度&#xff0c;高德会通过大量的数据采集来覆盖和更新。现实中POI名称复杂&#xff0c;多变&#xff0c;同时&#xff0c;…

云计算与星辰大海的结合——不要回答,来自百亿光年外的未知信号

作者 | 硬核云顶宫责编 | Carol出品 | CSDN云计算&#xff08;ID&#xff1a;CSDNcloud&#xff09;今年在疫情的影响下&#xff0c;各国的经济发展都遇到了一些困难&#xff0c;甚至除中国以外的主要经济体都会进入了负增长的情况&#xff0c;不过越是这样的时候&#xff0c;越…

让大数据分析更简单,4步教你玩转MongoDB BI Connector

MongoDB使用BI Connector支持BI组件直接使用SQL或ODBC数据源方式直接访问MongoDB,在早期MongoDB直接使用Postgresql FDW实现 SQL到MQL的转换,后来实现更加轻量级的mongosqld支持BI工具的连接。 安装 BI Connector 参考 Install BI Connectorhttps://docs.mongodb.com/bi-conne…

谷歌排名第一的编程语言,收下这份资料,小白也能学的会!

学习 Python 的过程中你是否有过这样的问题&#xff1a; 应用方向太多了&#xff0c;不知道该选择哪个&#xff0c;也不知道学习路径是什么。 入门简单&#xff0c;但是精通很难&#xff0c;每次学完做练习项目时都头疼&#xff0c;没思路&#xff0c;甚至怀疑自己不适合编程。…

SpringBoot2 集成xJar插件 动态解密jar包,避免源码泄露或反编译

文章目录一、集成1. 官方介绍地址2. 添加仓库和插件3. 编译打包二、安装go环境和编译2.1. 安装go2.2. 编译三、运行3.1. 正常运行3.2. 二次加密运行3.3. 测试结果四、IntelliJ IDE 反编译测试4.1. 将加密的jar进行解压4.2. 打开解压后的文件夹4.3. class文件查看4.4. 配置文件反…

每秒7亿次请求,阿里新一代数据库如何支撑?

阿里妹导读&#xff1a;Lindorm&#xff0c;就是云操作系统飞天中面向大数据存储处理的重要组成部分。Lindorm是基于HBase研发的、面向大数据领域的分布式NoSQL数据库&#xff0c;集大规模、高吞吐、快速灵活、实时混合能力于一身&#xff0c;面向海量数据场景提供世界领先的高…

拼不过 GO?阿里如何重塑云上的 Java

阿里妹导读&#xff1a;Java 诞生于20年前&#xff0c;拥有大量优秀的企业级框架&#xff0c;践行 OOP 理念&#xff0c;更多体现的是严谨以及在长时间运行条件下的稳定性和高性能。反观如今&#xff0c;在要求快速迭代交付的云场景下&#xff0c;语言的简单性似乎成了首要的要…

android studio打包纯H5项目(集成5+SDK)

下载地址 http://ask.dcloud.net.cn/docs/#//ask.dcloud.net.cn/article/103 我们下载5SDK直接复制demo出来 可以自行修改名字&#xff0c;上面两个dome都可以使用 使用Android Studio打开后 默认目录结构如下 替换HTML文件 找到目录下app/src/main/assets/apps/HelloH5/www…

Apache Flink CEP 实战

本文根据Apache Flink 实战&进阶篇系列直播课程整理而成&#xff0c;由哈啰出行大数据实时平台资深开发刘博分享。通过一些简单的实际例子&#xff0c;从概念原理&#xff0c;到如何使用&#xff0c;再到功能的扩展&#xff0c;希望能够给打算使用或者已经使用的同学一些帮…

图神经网络(AliGraph)在阿里巴巴的发展与应用

背景 为什么做GNN? 在大数据的背景下&#xff0c;利用高速计算机去发现数据中的规律似乎是最有效的手段。为了让机器计算的有目的性&#xff0c;需要将人的知识作为输入。我们先后经历了专家系统、经典机器学习、深度学习三个阶段&#xff0c;输入的知识由具体到抽象&#xf…

实用!五款新型 Linux 命令行工具

作者 | Ricardo Gerardi译者 | 弯月&#xff0c;责编 | 屠敏出品 | CSDN&#xff08;ID&#xff1a;CSDNnews&#xff09;在Linux/Unix系统的日常使用中&#xff0c;我们需要使用很多命令行工具来完成工作&#xff0c;以及理解和管理我们的系统&#xff0c;例如使用du来监视磁盘…

从零开始入门 K8s | 手把手带你理解 etcd

导读&#xff1a;etcd 是用于共享配置和服务发现的分布式、一致性的 KV 存储系统。本文从 etcd 项目发展所经历的几个重要时刻开始&#xff0c;为大家介绍了 etcd 的总体架构及其设计中的基本原理。希望能够帮助大家更好的理解和使用 etcd。 一、etcd 项目的发展历程 etcd 诞生…