Springboot整合kafka基本使用

项目搭建

同样的,需要我们搭建一个maven工程,整合非常的简单,需要用到:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

来一起看下完整的pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>springboot-kafka-all</artifactId><version>1.0-SNAPSHOT</version><properties><java.version>1.8</java.version></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.3.RELEASE</version></parent><dependencies><!--web--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--test--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><!-- kafka--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!--Hutool依赖--><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.4</version></dependency><!--fast-json--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.58</version></dependency><dependency><groupId> org.slf4j </groupId><artifactId> slf4j-api </artifactId><version> 1.6.4 </version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.25</version><scope>compile</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>2.1.3.RELEASE</version></plugin></plugins></build></project>

配置也很简单 application.yml

server:port: 8081spring:kafka:producer:bootstrap-servers: 127.0.0.1:9092

然后新建一个启动类,看下控制台是否成功链接了Kafka,在启动之前别忘了开启Kafka集群

基本使用

先从一个简单的例子,来快速体验一下Kafka,新建HelloController

@Slf4j
@RestController
public class HelloController {private static final String topic = "test";@Autowiredprivate KafkaTemplate<Object, Object> kafkaTemplate;// 接收消息@KafkaListener(id = "helloGroup", topics = topic)public void listen(String msg) {log.info("hello receive value: {}" , msg);// hello receive value: hello kafka}@GetMapping("/hello")public String hello() {// 发送消息kafkaTemplate.send(topic, "hello kafka");return "hello";}
}

我们通过KafkaTemplate进行消息的发送, 通过@KafkaListener进行消息的消费,我们可以指定消费者ID以及监听的topic,请求localhost:8081/hello观察控制台的变化。请求后,发现消息发送和接收的非常快,我们也可以观察UI后台的消息详情,同步对比

topic创建

之前我们的topic是在UI后台创建的,那么在SpringBoot中如何创建呢? 下面我们试着发送一个不存在的topic

 // 当topic不存在时 会默认创建一个topic// num.partitions = 1 #默认Topic分区数// num.replica.fetchers = 1 #默认副本数@GetMapping("/hello1")public String hello1() {// 发送消息kafkaTemplate.send("hello1", "hello1");return "hello1";}// 接收消息@KafkaListener(id = "hello1Group", topics = "hello1")public void listen1(String msg) {log.info("hello1 receive value: {}" , msg);// hello1 receive value: hello1}

请求之后,观察控制台以及管理后台,发现并没有报错,并且给我们自动创建了一个topic,在自动创建下,默认的参数是:

  num.partitions = 1 #默认Topic分区数num.replica.fetchers = 1 #默认副本数

如果我想手动创建呢?我们可以通过NewTopic来手动创建:

@Configuration
public class KafkaConfig {@Beanpublic KafkaAdmin admin(KafkaProperties properties){KafkaAdmin admin = new KafkaAdmin(properties.buildAdminProperties());// 默认False,在Broker不可用时,如果你觉得Broker不可用影响正常业务需要显示的将这个值设置为Trueadmin.setFatalIfBrokerNotAvailable(true);// setAutoCreate(false) : 默认值为True,也就是Kafka实例化后会自动创建已经实例化的NewTopic对象// initialize():当setAutoCreate为false时,需要我们程序显示的调用admin的initialize()方法来初始化NewTopic对象return admin;}/*** 创建指定参数的 topic* @return*/@Beanpublic NewTopic topic() {return new NewTopic("hello2", 0, (short) 0);}
}

如果要更新呢?也非常的简单

 /*** 更新 topic* @return*/@Beanpublic NewTopic topicUpdate() {return new NewTopic("hello2", 1, (short) 1);}

注意这里的参数只能+不能-

这种方式太简单了,如果我想在代码逻辑中来创建呢?我们可以通过AdminClient来手动创建

  /*** AdminClient 创建*/@Autowiredprivate KafkaProperties properties;@GetMapping("/create/{topicName}")public String createTopic(@PathVariable String topicName) {AdminClient client = AdminClient.create(properties.buildAdminProperties());if(client !=null){try {Collection<NewTopic> newTopics = new ArrayList<>(1);newTopics.add(new NewTopic(topicName,1,(short) 1));client.createTopics(newTopics);}catch (Throwable e){e.printStackTrace();}finally {client.close();}}return topicName;}

观察下管理后台,发现topic都创建成功了

获取消息发送的结果

有时候我们发送消息不知道是不是发成功了,需要有一个结果通知。有两种方式,一种是同步一种是异步

同步获取结果

/*** 获取通知结果* @return*/@GetMapping("/hello2")public String hello2() {// 同步获取结果ListenableFuture<SendResult<Object,Object>> future = kafkaTemplate.send("hello2","hello2");try {SendResult<Object,Object> result = future.get();log.info("success >>> {}", result.getRecordMetadata().topic()); // success >>> hello2}catch (Throwable e){e.printStackTrace();}return "hello2";}

异步获取

/*** 获取通知结果* @return*/@GetMapping("/hello2")public String hello2() {// 发送消息 - 异步获取通知结果kafkaTemplate.send("hello2", "async hello2").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {@Overridepublic void onFailure(Throwable throwable) {log.error("fail >>>>{}", throwable.getMessage());}@Overridepublic void onSuccess(SendResult<Object, Object> objectObjectSendResult) {log.info("async success >>> {}", objectObjectSendResult.getRecordMetadata().topic()); // async success >>> hello2}});return "hello2";}

Kafka事务

同样的,消息也会存在事务,如果第一条消息发送成功,再发第二条消息的时候出现异常,那么就会抛出异常并回滚第一条消息,下面通过一个简单的例子体会一下

@GetMapping("/hello3")
public String hello3() {kafkaTemplate.executeInTransaction(t -> {t.send("hello3","msg1");if(true)throw new RuntimeException("failed");t.send("hello3","msg2");return true;});return "hello3";
}// 接收消息
@KafkaListener(id = "hello3Group", topics = "hello3")
public void listen3(String msg) {log.info("hello3 receive value: {}" , msg);
}

默认情况下,Spring-kafka自动生成的KafkaTemplate实例,是不具有事务消息发送能力的。我们需要添加transaction-id-prefix来激活它

spring:kafka:producer:bootstrap-servers: 127.0.0.1:9092transaction-id-prefix: kafka_.

启动之后,观察控制台的变化~ ,除此之外,还可以使用注解的方式@Transactional来开启事务

// 注解方式@Transactional(rollbackFor = RuntimeException.class)@GetMapping("/hello4")public String hello4() {kafkaTemplate.send("hello3","msg1");if(true)throw new RuntimeException("failed");kafkaTemplate.send("hello3","msg2");return "hello4";}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/235241.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

macOS系统下载百度网盘的操作流程

第一步 进入百度网盘的官网&#xff0c;链接&#xff1a;百度网盘-免费云盘丨文件共享软件丨超大容量丨存储安全​​​​​​​&#xff0c;选择“客户端下载” 第二步 根据自己的电脑配置选择版本进行下载。芯片的查看路径是系统设置-通用-关于本机 第三步 点击右上角的图标…

自监督学习综述

1.简介 其实自监督学习的核心思想很简单&#xff0c;利用大量的无标签数据训练模型&#xff0c;然后将其作为预训练模型在下游任务上进行微调&#xff08;有标签&#xff09;。在用无标签图像训练模型时主要通过设计辅助任务&#xff0c;用图像自身的信息作为标签训练。常见的…

linux存储管理

固态硬盘SSD SSD的优势 SSD采用电子存储介质进行数据存储和读取的一种技术&#xff0c;拥有极高的存储性能&#xff0c;被认为是存储技术发展的未来新星。 内存直接当成硬盘使用 与传统硬盘相比&#xff0c;SSD固态电子盘具有以下优点 第一&#xff0c;SSD完全的半导体化&…

什么是深度学习的无监督学习与有监督学习

无监督学习&#xff1a; 深度学习中的无监督学习方法是一种训练算法&#xff0c;它在没有标注输出的情况下从输入数据中学习模式和特征。这种方法的核心是探索和理解数据的内在结构和分布&#xff0c;而不是通过已知的输出来指导学习过程。无监督学习在深度学习领域有许多不同…

go语言函数二、init函数定义与作用

go语言init函数定义与作用 在go语言中&#xff0c;每一个源文件都可以包含一个init函数&#xff0c;这个函数会在main函数执行前&#xff0c;被go运行框架调用&#xff0c;注意是在main函数执行前。 package main import ("fmt" )func init() {fmt.Println("i…

实战案例:缓存不一致问题的解决(redis+本地缓存caffine)

一.问题引入 目前在写项目的时候&#xff0c;在B端查看文章&#xff0c;A端修改文章。为了增加效率&#xff0c;以及防止堆内存溢出&#xff0c;在B端选择本地缓存文章的方案。但是目前出现了A端对文章修改之后&#xff0c;B端读的还是旧数据&#xff0c;出现了缓存不一致的问…

阿赵UE学习笔记——2、新建项目和项目设置

阿赵UE学习笔记目录 大家好&#xff0c;我是阿赵。继续来学习虚幻引擎的使用。这次介绍一下新建项目和项目设置。 一、新建项目 通过桌面快捷方式&#xff0c;或者EPIC Games Loader&#xff0c;启动虚幻引擎。 启动完成后&#xff0c;会打开项目管理的界面&#xff0c;可以看…

C++共享和保护——(5)编译预处理命令

归纳编程学习的感悟&#xff0c; 记录奋斗路上的点滴&#xff0c; 希望能帮到一样刻苦的你&#xff01; 如有不足欢迎指正&#xff01; 共同学习交流&#xff01; &#x1f30e;欢迎各位→点赞 &#x1f44d; 收藏⭐ 留言​&#x1f4dd; 耕耘者的汗水是哺育种子成长的乳汁&am…

Android 基础篇

Android 应用框架 Android 应用组件 对Android应用程序而言&#xff0c;应用组件主要由Activity、Service、Broadcast Receivers、Intent、Content Providers、AndroidManifest等构成。 Activity是与用户直接交互UI组件&#xff1b; Service是运行在后台、用户不可见的服务组…

超结MOS/低压MOS在5G基站电源上的应用-REASUNOS瑞森半导体

一、前言 5G基站是5G网络的核心设备&#xff0c;实现有线通信网络与无线终端之间的无线信号传输&#xff0c;5G基站主要分为宏基站和小基站。5G基站由于通信设备功耗大&#xff0c;采用由电源插座、交直流配电、防雷器、整流模块和监控模块组成的电气柜。所以顾名思义&#xf…

vue中的侦听器和组件之间的通信

目录 一、侦听器 监听基本数据类型&#xff1a; 监听引用数据类型&#xff1a; 计算属性和watch区别&#xff1f; 二、组件通信/传值方式 1.父子组件传值 父组件给子组件传值&#xff1a; &#xff08;1&#xff09;props &#xff08;2&#xff09;provide inject &…

玩转大数据20:大数据应用容器化与部署实践

一、容器化技术介绍和优势 随着云计算的快速发展&#xff0c;容器化技术作为一种轻量级虚拟化技术&#xff0c;已经成为应用部署的主要方式。容器化技术通过共享操作系统&#xff0c;将应用程序及其依赖项打包成一个独立的、可移植的容器&#xff0c;从而实现应用的快速部署和…

Android Studio: 解决Gradle sync failed 错误

文章目录 1. 前言2. 错误情况3. 解决办法3.1 获取gradle下载地址3.2 获取gradle存放目录3.3 替换并删除临时文件3.4 触发Try Again 4. 执行成功 1. 前言 今天调试项目&#xff0c;发现新装的AS&#xff0c;在下载gradle的过程中&#xff0c;一直显示连接失败&#xff0c;Gradl…

构建陪诊预约系统:技术实战指南

在医疗科技的飞速发展中&#xff0c;陪诊预约系统的应用为患者和陪诊人员提供了更为便捷和贴心的服务。本文将带领您通过技术实现&#xff0c;构建一个简单而实用的陪诊预约系统&#xff0c;以提升医疗服务的效率和用户体验。 技术栈选择 在开始之前&#xff0c;我们需要选择…

AUTOSAR CanTSyn模块配置与代码实现(二)

AUTOSAR CanTSyn模块配置与代码实现 1、FUP message处理 CanTSyn_process_FUP_message 先比较和SYNC报文的Sequence是否相等&#xff0c;如果不相等则不接受该报文。 然后调用CanTSyn_unpack_store_fup处理fup报文。 获取接收到FUP时的本地时间&#xff0c;并与接收到的SYNC…

Linux静态ip

Linux静态ip Ⅰ、修改静态ip Ⅰ、修改静态ip 修改静态ip必须是root用户 su root //切换root用户 ip a //查看修改前的动态ipvi /etc/sysconfig/network-scripts/ifcfg-ens33 //打开网卡配置文件&#xff0c;修改一处&#xff0c;新增四处 BOOTPROTO&quo…

3 使用postman批量创建测试数据

上一篇:2 使用postman进行接口测试-CSDN博客 在软件测试实际工作中,因测试需要,我们要批量创建测试数据。如果某些接口不允许输入重复数据,我们在做批量请求时就要做参数处理了。 比如在上一篇介绍的用户注册接口,一般注册的时候用户名是不允许重复的,如果要批量创…

如何入门 GPT 并快速跟上当前的大语言模型 LLM 进展?

入门GPT 首先说第一个问题&#xff1a;如何入门GPT模型&#xff1f; 最直接的方式当然是去阅读官方的论文。GPT模型从2018年的GPT-1到现在的GPT-4已经迭代了好几个版本&#xff0c;通过官方团队发表的论文是最能准确理清其发展脉络的途径&#xff0c;其中包括GPT模型本身和一…

算法基础之约数个数

约数个数 核心思想&#xff1a; 用哈希表存每个质因数的指数 然后套公式 #include <iostream>#include <algorithm>#include <unordered_map>#include <vector>using namespace std;const int N 110 , mod 1e9 7;typedef long long LL; //long l…

orangepi5plus刷自编译armbian系统

准备好一个编译主机&#xff0c;配置尽量高一点。尽可能有上google的环境配置。 主要步骤 1. 克隆源码 armbian源码仓库 2. 配置apt源 更改/etc/apt/sources.list为国内源&#xff0c;比如我这里ubuntu主机配置清华源。 然后执行apt-get -y update && apt-get -y…