手拉手springboot整合kafka

前期准备安装kafka

启动Kafka本地环境需Java 8+以上

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。

Kafka启动方式有Zookeeper和Kraft,两种方式只能选择其中一种启动,不能同时使用。

Kafka下载https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz

解压tar -xzf kafka_2.13-3.7.0.tgz

一、Zookeeper启动Kafka(kafka内置zookeeper)

Kafka依赖Zookeeper

1、启动Zookeeper 2、启动Kafka

使用kafka自带Zookeeper启动

./zookeeper-server-start.sh ../config/zookeeper.properties &

./zookeeper-server-stop.sh ../config/zookeeper.properties

./kafka-server-start.sh ../config/server.properties &

./kafka-server-stop.sh ../config/server.properties

二、Zookeeper服务器启动Kafka

Zookeeper服务器安装

https://zookeeper.apache.org/

https://dlcdn.apache.org/zookeeper/zookeeper-3.9.2/apache-zookeeper-3.9.2-bin.tar.gz

tar zxvf apache-zookeeper-3.9.2-bin.tar.gz

配置Zookeeper服务器

cp zoo_sample.cfg zoo.cfg

启动Zookeeper服务器

./zkServer.sh start

修改Zookeeper端口

Zoo.cfg添加内容

admin.serverPort=8099

apache-zookeeper-3.9.2-bin/bin目录下重启Zookeeper

Zookeeper服务器启动kafka

/opt/kafka_2.13-3.7.0/bin目录下

./kafka-server-start.sh ../config/server.properties &

Kafka配置文件server.properties

三、使用KRaft启动Kafka

UUID通用唯一识别码(Universally Unique Identifier)

1、生成Cluster UUID(集群UUID):./kafka-storage.sh random-uuid

2.格式化kafka日志目录:./kafka-storage.sh format -t 3pMJGNJcT0uLIBsZhbucjQ -c ../config/kraft/server.properties

3.启动kafka:./kafka-server-start.sh ../config/kraft/server.properties &

springboot集成kafka

创建topic时,若不指定topic的分区(partition)数量使,则默认为1个分区(partition)

修改server.properties文件

vim server.properties

listeners=PLAINTEXT://0.0.0.0:9092

advertised.listeners=PLAINTEXT://192.168.68.133:9092

springboot加入依赖kafka

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

加入spring-kafka依赖后,springboot自动装配好kafkaTemplate的Bean

application.yml配置连接kafka

spring:
kafka:
bootstrap-servers: 192.168.68.133:9092

生产者

发送消息

@Resource
private KafkaTemplate<String,String> kafkaTemplate;

@Test
void kafkaSendTest(){
kafkaTemplate.send("kafkamsg01","hello kafka");
}

消费者

接收消息

@Component
public class KafkaConsumer {

@KafkaListener(topics = {"kafkamsg01","test"},groupId = "123")
public void consume(String message){
System.out.println("接收到消息:"+message);
}

}

若没有配置groupid

Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.IllegalStateException: No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.

@Component
public class KafkaConsumer {

@KafkaListener(topics = {"kafkamsg01","test"},groupId = "123")
public void consume(String message){
System.out.println("接收到消息:"+message);
}

}

想从第一条消息开始读取(若同组的消费者已经消费过该主题,并且kafka已经保存了该消费者组的偏移量,则设置auto.offset.reset设置为earliest不生效,需要手动修改偏移量或使用新的消费者组)

application.yml需要将auto.offset.reset设置为earliest

代码语言:java

spring:kafka:bootstrap-servers: 192.168.68.133:9092consumer:auto-offset-reset: earliest

Earliest:将偏移量重置为最早的偏移量

Latest: 将偏移量重置为最新的偏移量

None: 没有为消费者组找到以前的偏移量,向消费者抛出异常

Exception: 向消费者抛出异常

脚本重置消费者组偏移量

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group 123 --topic kafkamsg01 --reset-offsets --to-earliest –execute

重置完成

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/832259.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

算法面试题目

一面 说一下 Transformer 的整体结构 了解有哪些位置编码方式吗&#xff1f; 说一下 LLaMA 中的旋转位置编码 为什么现在的大模型大多是decoder-only的架构&#xff1f; LLM中的因果语言建模与掩码语言建模有什么区别&#xff1f; 如何减轻LLM中的幻觉现象&#xff1f; 如何评估…

PHP定时任务框架taskPHP3.0学习记录7宝塔面板手动可以执行自动无法执行问题排查及解决方案(sh脚本、删除超过特定天数的日志文件、kill -9)

PHP定时任务框架taskPHP3.0学习记录 PHP定时任务框架taskPHP3.0学习记录1&#xff08;TaskPHP、执行任务类的实操代码实例&#xff09;PHP定时任务框架taskPHP3.0学习记录2&#xff08;环境要求、配置Redis、crontab执行时间语法、命令操作以及Screen全屏窗口管理器&#xff0…

深入剖析Tomcat(六) Tomcat各组件的生命周期控制

Catalina中有很多组件&#xff0c;像上一章提到的四种容器&#xff0c;载入器&#xff0c;映射器等都是一种组件。每个组件在对外提供服务之前都需要有个启动过程&#xff1b;组件在销毁之前&#xff0c;也需要有个关闭过程&#xff1b;例如servlet容器关闭时&#xff0c;需要调…

Linux的shell外壳

Shell外壳 在计算机领域&#xff0c;“shell”&#xff08;外壳&#xff09;是指一种用户界面&#xff0c;提供了访问操作系统服务的方式。Shell 是用户与操作系统之间的桥梁&#xff0c;它解释并执行用户输入的命令。 Shell 的主要功能包括&#xff1a; 命令解释&#xff1a…

drain3学习笔记

介绍 由于众所周知的原因&#xff0c;Github访问不稳定。官网虽然介绍&#xff0c;但是案例连接无效&#xff0c;因此保存于此&#xff0c;方便参考学习。 配置 官网样例&#xff08;GitHub&#xff09; drain3.ini [SNAPSHOT] snapshot_interval_minutes 10 compress_st…

设计模式(软件设计师第5版)

创建型&#xff08;5种&#xff09; 1.他们都将关于该系统使用哪些具体的类的信息封转起来。 2.他们隐藏这些类的实例是如何被创建和放在一起的。整个系统关于这些对象所知道的是由抽象类所定义的接口。 创建型模式在什么被创建&#xff0c;谁创建它&#xff0c;它是怎样被创…

从同一文件中导出和导入多个组件

从同一文件中导出和导入多个组件 如果你只想展示一个 Profile 组&#xff0c;而不展示整个图集。你也可以导出 Profile 组件。但 Gallery.js 中已包含 默认 导出&#xff0c;此时&#xff0c;你不能定义 两个 默认导出。但你可以将其在新文件中进行默认导出&#xff0c;或者将…

字符串循环左移

#include <iostream> #include <string> using namespace std;int main() {string s1, s2;getline(cin, s1);int n;cin >> n;if(n>s1.size()){nn-s1.size();s2 s1.substr(0, n);s1.erase(0, n);cout << s1s2;}else{// 提取s1的前n个字符到s2中s2 …

MyBatis 多表映射及动态语句

三、MyBatis多表映射 3.1 多表映射概念 多表查询结果映射思路 前面说明中&#xff0c;我全面梳理了单表的mybatis操作&#xff01;但是开发中更多的是多表查询需求&#xff0c;这种情况我们如何让进行处理&#xff1f;MyBatis 思想是&#xff1a;数据库不可能永远是你所想或…

金融行业专题|信托超融合架构转型与场景探索合集

文章包含 15 信托用户基于超融合实现私有云建设、平台云下迁、信创云转型、容器云探索等场景实践分享。下载《【核心业务篇】金融核心生产业务场景探索文章合集》、《【信创转型与架构升级篇】金融核心生产业务场景探索文章合集》、《【数据库与数据仓库篇】金融核心生产业务场…

编程入门(六)【Linux系统基础操作一】

读者大大们好呀&#xff01;&#xff01;!☀️☀️☀️ &#x1f525; 欢迎来到我的博客 &#x1f440;期待大大的关注哦❗️❗️❗️ &#x1f680;欢迎收看我的主页文章➡️寻至善的主页 文章目录 &#x1f525;前言&#x1f680;Linux操作系统介绍与环境准备Linux操作系统介…

【PyTorch】5-进阶训练技巧(损失函数、学习率、模型微调、半精度训练、数据增强、超参数设置)

PyTorch&#xff1a;5-进阶训练技巧 注&#xff1a;所有资料来源且归属于thorough-pytorch(https://datawhalechina.github.io/thorough-pytorch/)&#xff0c;下文仅为学习记录 5.1&#xff1a;自定义损失函数 PyTorch在torch.nn模块提供了许多常用的损失函数&#xff0c;比…

Windows远程桌面实现之十四:实现AirPlay接收端,让苹果设备(iOS,iPad等)屏幕镜像到PC端

by fanxiushu 2024-05-04 转载或引用请注明原始作者。 这个课题已经持续了好几年&#xff0c;已经可以说是很长时间了。 实现的程序是 xdisp_virt&#xff0c; 可以去github下载使用:GitHub - fanxiushu/xdisp_virt: xfsredir file system 一开始是基于测试镜像驱动的目的随便开…

Vue前端环境准备

vue-cli Vue-cli是Vue官方提供的脚手架&#xff0c;用于快速生成一个Vue项目模板 提供功能&#xff1a; 统一的目录结构 本地调试 热部署 单元测试 集成打包上线 依赖环境&#xff1a;NodeJs 安装NodeJs与Vue-Cli 1、安装nodejs&#xff08;已经安装就不用了&#xff09; node-…

探索系统限流的艺术:滑动与滚动时间窗口的奥秘

在互联网的汪洋大海中&#xff0c;系统如同航行的巨轮&#xff0c;面对着波涛汹涌的流量浪涌。为了保障这艘巨轮稳定前行&#xff0c;"限流"便成了必备的导航仪器&#xff0c;而滑动时间窗口与滚动时间窗口则是其中最为精湛的两大技术。本文将为你揭示它们的奥秘&…

linux文本三剑客之grep

目录 1、三剑客特点和应用场景 2、三件客之grep 1) -v 参数使用示例&#xff1a; 1、三剑客特点和应用场景 命令特点场景grep过滤grep命令过滤速度最快sed替换&#xff0c;修改文件内容&#xff0c;取行 如果要进替换/修改文件内容 取出某个范围的内容&#xff08;从中午12.到…

【stomp 实战】spring websocket用户消息发送源码分析

这一节&#xff0c;我们学习用户消息是如何发送的。 消息的分类 spring websocket将消息分为两种&#xff0c;一种是给指定的用户发送&#xff08;用户消息&#xff09;&#xff0c;一种是广播消息&#xff0c;即给所有用户发送消息。那怎么区分这两种消息呢?那就是用前缀了…

我们说的数据分析,到底要分析些什么?

作者 Gam 本文为CDA志愿者投稿作品 “我们说数据分析&#xff0c;到底要分析些什么&#xff1f;” 数据分析这个话题自从进入人们的视线以来&#xff0c;这个话题就成为人们茶余饭后的谈资&#xff0c;但是一千个人眼中就有一千个哈姆雷特&#xff0c;就意味着每个人对数据分…