Springboot整合kafka简单使用

kafka

一,介绍

Kafka 是一个开源的分布式流处理平台,最初由 LinkedIn 开发并贡献给 Apache 软件基金会。它设计用于构建高性能、持久性、可伸缩和容错的实时数据管道和流处理应用程序。

以下是 Kafka 的一些关键特点和概念:

  1. 发布-订阅模型:Kafka 使用发布-订阅模型,其中消息生产者将消息发布到主题(topic),而消息消费者从主题订阅消息。这种模型可以支持多个消费者同时消费消息,并且具有良好的扩展性。
  2. 主题(Topic):主题是消息的逻辑分类,相当于一个消息队列。消息被发布到主题,并且消费者从主题订阅消息。
  3. 分区(Partition):每个主题可以划分为一个或多个分区,每个分区是一个有序的日志。分区可以分布在集群的不同节点上,以提供水平扩展和负载均衡。
  4. 副本(Replication):Kafka 支持将每个分区的数据复制到多个副本,以提供容错性和数据可靠性。副本位于集群的不同节点上,确保即使某个节点故障,数据仍然可用。
  5. 生产者(Producer):生产者负责将消息发布到主题。它们可以选择将消息发送到特定的分区,也可以根据负载均衡策略将消息均匀分布到不同的分区中。
  6. 消费者(Consumer):消费者从主题订阅消息,并按照其偏移量(offset)顺序消费消息。消费者组(Consumer Group)是一组共享相同主题的消费者,它们协调以确保每个分区的消息只有一个消费者消费。
  7. ZooKeeper:Kafka 使用 ZooKeeper 来进行分布式协调和管理,如协调生产者和消费者、维护集群元数据等。

Kafka 提供了高性能、可靠、持久的消息传递系统,适用于大规模的实时数据处理和流式处理应用程序。它已经成为许多企业构建实时数据管道和流处理应用程序的首选工具之一。

二,运行原理

Kafka 的运行原理涉及多个组件和过程,主要包括生产者发送消息、消息存储在代理 (Broker) 中的分区中、消费者从分区中读取消息等。以下是 Kafka 的基本运行原理:

  1. 生产者发送消息:
    • 生产者将消息发送到 Kafka 的特定主题 (Topic) 中。
    • 生产者可以选择性地将消息发送到特定的分区 (Partition),或者使用 Kafka 的默认分区分配策略,由 Kafka 在发送时决定将消息发送到哪个分区。
  2. 消息存储在分区中:
    • 主题可以被分成多个分区,每个分区是一个有序的消息序列。
    • 每条消息在分区内有一个唯一的偏移量 (Offset),用于标识消息在分区中的位置。
    • 消息被持久化在 Kafka 的分区中,直到满足一定的保留策略(如时间或者大小限制)。
  3. 消费者从分区中读取消息:
    • 消费者从 Kafka 的特定主题中读取消息。
    • 消费者可以以不同的方式订阅主题,例如:
      • 指定订阅的主题和分区。
      • 加入一个消费者组 (Consumer Group),使得消费者可以以并行的方式消费主题中的消息。
    • 每个消费者在消费主题时,会维护自己的消费偏移量 (Offset),用于记录已经消费的消息位置。
  4. 分区和副本管理:
    • Kafka 使用分区来实现并行处理和水平扩展。
    • 每个分区可以有多个副本,其中一个是领导者副本 (Leader),其余的是追随者副本 (Follower)。
    • 领导者副本负责处理读写请求,追随者副本用于备份数据。
    • 如果领导者副本失效,Kafka 会从追随者副本中选举新的领导者。
  5. ZooKeeper 协调:
    • Kafka 集群依赖 ZooKeeper 来进行集群管理、元数据存储和领导者选举等任务。
    • ZooKeeper 存储了 Kafka 集群的元数据,包括主题、分区、消费者组等信息,同时也用于监控和管理集群的健康状态。

通过这些组件和过程,Kafka 实现了高吞吐量、持久性、分布式和水平扩展等特性,使得它成为处理大规模实时数据流的理想选择。

三,Spring Boot 项目中整合 Kafka (简单使用)

1.添加 Maven 依赖

首先,在你的 Spring Boot 项目的 pom.xml 文件中添加 Spring Kafka 的依赖:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
2.配置 Kafka 连接信息

application.propertiesapplication.yml 中添加 Kafka 服务器的连接信息:

  kafka:bootstrap-servers: 192.168.193.131:9092producer:  #生产者序列化器retries: 10 #如果发生故障,生产者将尝试重新发送消息的次数。key-serializer: org.apache.kafka.common.serialization.StringSerializer #序列化生产者消息键的类。value-serializer: org.apache.kafka.common.serialization.StringSerializer #序列化生产者消息值的类。ack-mode: manualconsumer: #消费者序列化器group-id: ${spring.application.name}-test # 消费者组的唯一标识符。在消费者组中的所有消费者将共享消费者组的工作负载。key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #用于反序列化消费者消息键的类。value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #用于反序列化消费者消息值的类。listener: #配置了监听器相关的设置。ack-mode: manual #开启手动确认 设置为手动,表示消费者将等待手动确认来确定是否已成功处理消息。
3.创建kafka配置类:

配置 Kafka 主题(topic)的创建

@Configuration
public class KafkaConfig {/*** @return org.apache.kafka.clients.admin.NewTopic* @date 2024/5/31 14:42* @Description: TODO @Bean 注解* 作用:将方法返回的对象注册为 Spring 容器中的一个 Bean。* 返回值类型:NewTopic,表示 Kafka 主题的配置信息。*/@Beanpublic NewTopic viewUserTopic(){/*第一个参数:主题名称,这里是 "viewUserTopic"。第二个参数:分区数量,这里设置为 1。第三个参数:副本数量,这里设置为 1。*/return new NewTopic("viewUserTopic",1,(short) 1);}}
4.注入kafka模板类
@Autowired
private KafkaTemplate kafkaTemplate;
5.发送消息
//定义消息的唯一ID 防止消息重复消费
String msgId = "msg-" + UUID.randomUUID().toString();
//定义消息内容
String msgBody = JSON.toJSONString(tbUser);//将消息唯一表示存入redis缓存  防止消息重复消费
stringRedisTemplate.opsForValue().set(msgId, msgBody);/*组装消息体 发送消息队列*/
MessageVO messageVO = new MessageVO();
messageVO.setMsgID(msgId);
messageVO.setMsgBody(msgBody);//向名为 "viewUserTopic" 的 Kafka 主题发送消息。//参数一: 表示目标 Kafka 主题的名称。   参数二:消息内容kafkaTemplate.send("viewUserTopic", JSON.toJSONString(messageVo))//通过该方法设置回调函数,用于处理消息发送的成功和失败情况。.addCallback(//成功回调函数,处理消息发送成功的情况。new SuccessCallback() {@Overridepublic void onSuccess(Object o) {// 消息发送成功System.out.println("kafka 消息发送成功了~~~~~~~~~~~~");}},//失败回调函数,处理消息发送失败的情况。new FailureCallback() {@Overridepublic void onFailure(Throwable throwable) {// 消息发送失败了,再次发送System.out.println("kafka 消息发送失败了,再次发送");kafkaTemplate.send("viewUserTopic", JSON.toJSONString(messageVo));}});
6.消息的接收(监听)
/*** @param message 表示接收到的消息内容,这里是 JSON 格式的字符串。* @param acknowledgment 用于手动提交消费者偏移量的对象。* @date 2024/5/31 15:13* @Description: TODO* @KafkaListener 通过该注解指定了监听的 Kafka 主题为 "viewUserTopic"。*/@KafkaListener(topics = "viewUserTopic")public void recvViewUserMessage(String message, Acknowledgment acknowledgment) {//--1 接收消息MessageVO messageVo = JSON.parseObject(message, MessageVO.class);//--2 根据消息的唯一ID,判断消息是否重复String msgId = messageVo.getMsgID();if (!stringRedisTemplate.hasKey(msgId)) {// 消息重复了System.out.println("kafka 消息重复了");// 使用 acknowledgment.acknowledge() 方法手动确认消费完成,通知 Kafka 服务器该消息已经被处理。acknowledgment.acknowledge();return;}//--3 消费消息(处理消息)String msgBody = messageVo.getMsgBody();TbLog tbLog = JSON.parseObject(msgBody, TbLog.class);tbLog.setCreateTime(new Date());tbLogMapper.insert(tbLog);//--4 手动确认消息//手动确认消费完成,通知 Kafka 服务器该消息已经被处理。acknowledgment.acknowledge();//--5 删除消息的唯一ID,防止消息重复消费stringRedisTemplate.delete(msgId);}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/21157.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SPWM载波调制方式-三电平杂记1

方法一&#xff1a; P2 O1 N0 方法二&#xff1a;双载波直接发波 方法三&#xff1a;负轴载波和调制波往上抬升1&#xff0c;得到使用同一个载波 在正半周在P和O切换&#xff0c;在下半轴式O和N切换

自动评论自动私信引流系统,自动化时代的挑战与机遇

随着科技的飞速发展&#xff0c;自动化技术已经渗透到我们生活的方方面面。从工业生产线上的机械臂到家庭中的智能助手&#xff0c;自动化不仅改变了我们的工作方式&#xff0c;也在重塑着社会的面貌。然而&#xff0c;在享受自动化带来的便利和效率的同时&#xff0c;我们也必…

961题库 北航计算机 MIPS基础选择题 附答案 选择题形式

有题目和答案&#xff0c;没有解析&#xff0c;不懂的题问大模型即可&#xff0c;无偿分享。 第1组 习题 MIPS处理器五级流水线中&#xff0c;涉及DRAM的是 A. 取指阶段 B. 译码阶段 C. 执行阶段 D. 访存阶段 MIPS处理器五级流水线中&#xff0c;R型指令保存结果的阶段是 A.…

关于高版本 Plant Simulation 每次保存是 提示提交comm对话框的处理方法

关于高版本 Plant Simulation 每次保存是 提示提交comm对话框的处理方法 如下图 将model saving history 修改为None即可 关于AutoCAD 2022 丢失模板库的问题 从新从以下地址打开即可&#xff1a; D:\Program Files\Autodesk\AutoCAD 2022\UserDataCache\zh-cn\Template

Visual Studio Installer 点击闪退

Visual Studio Installer 点击闪退问题 1. 问题描述2. 错误类型3. 解决方法4. 结果5. 说明6. 参考 1. 问题描述 重装了系统后&#xff08;系统版本&#xff1a;如下图所示&#xff09;&#xff0c;我从官方网站&#xff08;https://visualstudio.microsoft.com/ ) 下载了安装程…

Leetcode:正则表达式匹配

目录 普通版本&#xff08;动态规划&#xff09; 状态表示 状态转移方程 优化③①情况 数学化简分析 结合实际情况画图化简分析 总结 最终代码 题目链接&#xff1a;10. 正则表达式匹配 - 力扣&#xff08;LeetCode&#xff09; 好像是leetcode前100道里面最难的一道&a…

方法引用与构造方法引用

目录 方法引用 什么是方法引用 构造方法引用 构造方法引用&#xff08;也可以称作构造器引用&#xff09; 数组构造方法引用 方法引用 什么是方法引用 当要传递给 Lambda 体的操作&#xff0c;已经有实现的方法了&#xff0c;可以使用方法引用。 方法引用可以看做是 La…

PHAR反序列化

PHAR PHAR&#xff08;PHP Archive&#xff09;文件是一种归档文件格式&#xff0c;phar文件本质上是一种压缩文件&#xff0c;会以序列化的形式存储用户自定义的meta-data。当受影响的文件操作函数调用phar文件时&#xff0c;会自动反序列化meta-data内的内容,这里就是我们反序…

头歌页面置换算法第3关:计算LRU算法缺页率

2 任务:LRU算法 2.1 任务描述 设计LRU页面置换算法模拟程序:从键盘输入访问串。计算LRU算法在不同内存页框数时的缺页数和缺页率。要求程序模拟驻留集变化过程,即能模拟页框装入与释放过程。 2.2任务要求 输入串长度作为总页框数目,补充程序完成LRU算法。 2.3算法思路 LRU算…

jmeter常用的断言

包括&#xff08;Contains&#xff09;&#xff1a;响应内容包括需要匹配的内容即代表响应成功&#xff0c;支持正则表达式 匹配&#xff08;Matches&#xff09;&#xff1a;响应内容要完全匹配需要匹配的内容即代表响应成功&#xff0c;大小写不敏感&#xff0c;支持正则表达…

vue html2canvas生成base64图片和图片高度

vue html2canvas生成图片 exportAsBase64() {const ele document.getElementById(content);html2canvas(ele, {dpi: 96, // 分辨率 scale: 2, // 设置缩放 useCORS: true, // 允许canvas画布内跨域请求外部链接图片 bgcolor: #ffffff, // 背景颜色 logging: false, // 不…

rust之cargo install cargo-binstall 是什么

cargo-binstall 是什么 官方&#xff1a;https://lib.rs/crates/cargo-binstall Binstall 提供了一种低复杂性的机制来安装 Rust 二进制文件&#xff0c;作为从源代码&#xff08;通过 cargo install &#xff09;构建或手动下载软件包的替代方案。这旨在与现有的 CI 工件和基…

Windows安装ElasticSearch版本7.17.0

在Windows系统上本地安装Elasticsearch的详细步骤如下&#xff1a; 1. 下载Elasticsearch 访问 Elasticsearch下载页面。选择适用于Windows的版本7.17.0&#xff0c;并下载ZIP文件。 2. 解压文件 下载完成后&#xff0c;找到ZIP文件&#xff08;例如 elasticsearch-7.17.0.…

【算法篇】冒泡排序算法JavaScript版

冒泡排序算法&#xff1a;原理与实现 冒泡排序&#xff08;Bubble Sort&#xff09;是一种简单的排序算法&#xff0c;它重复地遍历要排序的数列&#xff0c;一次比较两个元素&#xff0c;如果它们的顺序错误就把它们交换过来。遍历数列的工作是重复地进行直到没有再需要交换&…

spoon基础使用-第一个转换文件

新建一个转换&#xff0c;文件->新建->转换&#xff0c;也可以直接ctralN新建。 从右边主对象树拖拽一个输入->表输入&#xff1b;输出->文本文档输出&#xff1b;也可以直接在搜索框搜素表输入、文本文档输出。 双击表输入新建一个数据库连接 确定后就可以在S…

【人工智能】第二部分:ChatGPT的架构设计和训练过程

人不走空 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌赋&#xff1a;斯是陋室&#xff0c;惟吾德馨 目录 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌…

Java | Leetcode Java题解之第126题单词接龙II

题目&#xff1a; 题解&#xff1a; class Solution {public List<List<String>> findLadders(String beginWord, String endWord, List<String> wordList) {List<List<String>> res new ArrayList<>();// 因为需要快速判断扩展出的单词…

传输中的串扰(八)

串扰指的是有害信号从一个线网传递到相邻线网上。通常把噪声源所在的线网称为动态线或攻击线网&#xff0c;而把有噪声形成的线网称为静态线或受害线网。 静态线上的噪声电压的表现与信号电压完全一样。一旦在静态线上产生噪声电压&#xff0c;它们就会传播并在阻抗突变处出现反…

html解决浏览器记住密码输入框的问题

当浏览器记住密码并自动填充到表单的密码输入框时&#xff0c;这通常是浏览器为了提供便利而采取的功能。然而&#xff0c;有时这可能不是用户所期望的&#xff0c;或者你可能希望在某些情况下禁用此功能。 虽然HTML本身并没有直接提供禁用浏览器自动填充密码输入框的标准方法…

常见算法(基本查找、二分查找、分块查找冒泡、选择、插入、快速排序和递归算法)

一、常见算法-01-基本、二分、插值和斐波那契查找 1、基本查找/顺序查找 需求1&#xff1a;定义一个方法利用基本查找&#xff0c;查询某个元素是否存在 数据如下&#xff1a;{131&#xff0c;127&#xff0c;147&#xff0c;81&#xff0c;103&#xff0c;23&#xff0c;7&am…