SpringBoot整合Kafka (二)

📑前言

本文主要讲了SpringBoot整合Kafka文章,如果有什么需要改进的地方还请大佬指出⛺️
上文链接:SpringBoot整合Kafka (一)

🎬作者简介:大家好,我是青衿🥇
☁️博客首页:CSDN主页放风讲故事
🌄每日一句:努力一点,优秀一点

在这里插入图片描述

目录

文章目录

  • 📑前言
  • **目录**
    • 一、介绍
    • 二、主要功能
    • 三、Kafka基本概念
    • 四、Spring Boot整合Kafka的demo
      • 1、构建项目
        • 1.1、引入依赖
        • 1.2、YML配置
        • 1.3、生产者简单生产
        • 1.4、消费者简单消费
      • 2、消费者
        • 2.1、Kafka应答机制
          • ACK应答级别
        • 2.2、Kafka消息消费确认机制
          • 自动提交
          • 手动提交
        • 2.3、指定消费
          • 监听一个主题,指定分区消费消息
  • 📑文章末尾


一、介绍

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景,比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目

二、主要功能

1.消息系统: Kafka 和传统的消息系统(也称作消息中间件)都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。
2.存储系统: Kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失的风险。也正是得益于 Kafka 的消息持久化功能和多副本机制,我们可以把 Kafka 作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能即可。
3.日志收集:一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种\nconsumer,例如hadoop、Hbase、Solr等。

三、Kafka基本概念

kafka是一个分布式的,分区的消息(官方称之为commit log)服务。它提供一个消息系统应该具备的功能,但是确有着独特的设计。首先,让我们来看一下基础的消息(Message)相关术语:
Broker
消息中间件处理节点,一个Kafka节点就是一个broker,一 个或者多个Broker可以组成一个Kafka集群
Topic
Kafka根据topic对消息进行归类,发布到Kafka集群的每条 消息都需要指定一个topic
Producer
消息生产者,向Broker发送消息的客户端 Consumer 消息消费者,从Broker读取消息的客户端
ConsumerGroup
每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个ConsumerGroup中只能有一个Consumer能够消费该消息
Partition
物理上的概念,一个topic可以分为多个partition,每个 partition内部消息是有序的在这里插入图片描述

四、Spring Boot整合Kafka的demo

1、构建项目

1.1、引入依赖
    <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
1.2、YML配置
spring:kafka:bootstrap-servers: 192.168.147.200:9092 # 设置 Kafka Broker 地址。如果多个,使用逗号分隔。producer: # 消息提供者key和value序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerretries: 3 # 生产者发送失败时,重试发送的次数consumer: # 消费端反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: demo # 用来唯一标识consumer进程所在组的字符串,如果设置同样的group id,表示这些processes都是属于同一个consumer group,默认:""
1.3、生产者简单生产
@Autowired
private KafkaTemplate kafkaTemplate;@Test
void contextLoads() {ListenableFuture listenableFuture = kafkaTemplate.send("test01-topic", "Hello Wolrd test");System.out.println("发送完成");
}
1.4、消费者简单消费
@Component
public class TopicConsumer {@KafkaListener(topics = "test01-topic")public void readMsg(String msg){System.out.println("msg = " + msg);}
}

2、消费者

2.1、Kafka应答机制

在生产者(producer)往Kafka发送数据的进程中,为了确保数据能够发送到指定的topic中,topic中的每一个partition在收到数据后,都需要向生产者发送 ack(ackacknowledgement)。

假设 producer 在必定的时间内收不到应对,那么producer会再次向Kafka发送此条数据。这就类似于写信,假定我们写一封信给或人,然后我们会在一段时间后收到一封回信,但假设超过了一个月我们还没有收到回信,就会猜想是不是信件丢掉了,会将这封信进行从头发送,直到收到回信中止。

ACK应答级别

一、0
介绍:生产者发送过来的数据,不需要等数据落盘应答
数据可靠性分析:容易丢数据
丢失数据原因:生产者发送完成后,Leader没有接收到数据,但是生产者认为已经发送成功了

二、1
介绍:生产者发送过来的数据,Leader收到数据后应答
数据可靠性分析:容易丢数据
丢失数据原因:应答完成后,还没开始同步副本,Leader挂了,新的Leader不会收到同步的消息,因为生产者已经认为发送成功了

三、-1(all)
介绍:生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答
数据可靠性分析:可靠

spring:kafka:bootstrap-servers: 192.168.***.***:9092 # 设置 Kafka Broker 地址。如果多个,使用逗号分隔。producer: # 消息提供者key和value序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerretries: 3 # 生产者发送失败时,重试发送的次数properties:linger.ms: 0 # spring.kafka.producer.properties.linger.ms=0,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafkaacks: 1 # 修改ACK应答级别,默认是1
2.2、Kafka消息消费确认机制

Kafka消费消息确认机制分为两种:自动确认和手动确认。
(1)自动确认:在自动确认模式下,Kafka消费者消费一条消息后,会自动将消息偏移量提交到服务器端,不需要手动进行确认,从而确保消息被有效处理。此种确认机制的优点是操作简单,但是可能会导致消息重复消费,即当消费者处理消息的过程中出现异常,导致偏移量提交失败,下一次启动时就会重新消费之前已经处理过的消息。
(2)手动确认:在手动确认模式下,消费者需要显式地调用commit()方法,将消息的偏移量提交到服务器端,才会被标记为已处理。手动确认模式下,可以避免重复消费的问题,但是需要开发者自己实现确认逻辑,增加了一定的开发复杂度。
总的来说,自动确认适用于对消息的可靠性要求不高、实时性较高的场景;手动确认适用于对消息的可靠性要求较高、不要求实时性的场景

自动提交

这种提交方式有两个很重要的参数:
enable.auto.commit=true(是否开启自动提交,true or false)
auto.commit.interval.ms=5000(提交偏移量的时间间隔,默认5000ms)
每隔5秒,消费者会自动把从poll方法接收到的最大偏移量提交上去。自动提交是在轮询中进行,消费者每次轮询时都会检查是否提交该偏移量。可是这种情况会发生重复消费和丢失消息的情况。

server:port: 18082
spring:kafka:bootstrap-servers: 192.168.***.***:9092,192.168.***.***:9093,192.168.***.***:9094consumer: # consumer消费者group-id: consumergroup # 默认的消费组IDenable-auto-commit: true # 是否自动提交offsetauto-commit-interval: 120000auto-offset-reset: latestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

设置enable-auto-commit: true,开启自动提交,也就是偏移量不需要我们手动提交,程序会自己提交。
设置auto.commit.interval.ms=120000,也就是消费后,不会立即提交,会在2分钟后提交,只要在这期间服务异常终止,偏移量就无法提交到Broker,再次启动,会重复消费。

手动提交

手动提交模式可以有效确保消息不丢失以及不重复消费

MANUAL:poll()拉取一批消息,处理完业务后,手动调用Acknowledgment.acknowledge()先将offset存放到map本地缓存,在下一次poll之前从缓存拿出来批量提交。
我们可以先测试一下MANUAL模式,只需要需改配置application.yml即可:

spring:kafka:bootstrap-servers: 192.168.***.***:9092,192.***.***.130:9093,192.***.***.130:9094consumer: # consumer消费者group-id: consumergroup # 默认的消费组IDenable-auto-commit: false # 是否自动提交offsetauto-offset-reset: latestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:ack-mode: manual # 手动添加偏移量

消费者代码

@KafkaListener(topics = {"itmentu"},groupId = "itmentuGroup")
public void listener(ConsumerRecord<String,String> record, Acknowledgment ack){//获取消息String message = record.value();//消息偏移量long offset = record.offset();System.out.println("读取的消息:"+message+"\n当前偏移量:"+offset);//手动提交偏移量ack.acknowledge();
}
2.3、指定消费

属性解释:
id:消费者ID
groupId:消费组ID
topics:监听的topic,可监听多个
topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听,手动分区。

监听一个主题,指定分区消费消息
    /*** 监听一个主题,且指定消费主题的哪些分区。* 参数详解:消费者组=apple_group;监听主题=iphoneTopic;只消费的分区=1,2;消费者数量=2* @param record*/@KafkaListener(groupId = APPLE_GROUP,topicPartitions = {@TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"})},concurrency = "2")public void consumeByPattern(ConsumerRecord<String, String> record) {System.out.println("consumeByPattern");System.out.printf("主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.topic(),record.partition(),record.offset(),record.key(),record.value(),record.timestamp());

以上是简单的Spring Boot整合kafka的示例,可以根据自己的实际需求进行调整。

📑文章末尾

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/135286.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

改进YOLOv5:结合ICCV2023|动态蛇形卷积,构建不规则目标识别网络

🔥🔥🔥 提升多尺度、不规则目标检测,创新提升 🔥🔥🔥 🔥🔥🔥 捕捉图像特征和处理复杂图像特征 🔥🔥🔥 👉👉👉: 本专栏包含大量的新设计的创新想法,包含详细的代码和说明,具备有效的创新组合,可以有效应用到改进创新当中 👉👉👉: �…

查看apk签名

cmd 命令&#xff1a; keytool -v -list -keystore "E:\xxx\release.jks"

kubernetes集群编排——k8s存储(configmap,secrets)

configmap 字面值创建 kubectl create configmap my-config --from-literalkey1config1 --from-literalkey2config2kubectl get cmkubectl describe cm my-config 通过文件创建 kubectl create configmap my-config-2 --from-file/etc/resolv.confkubectl describe cm my-confi…

Unreal UnLua + Lua Protobuf

Unreal UnLua Lua Protobuf https://protobuf.dev/ protobuf wire format&#xff1a;pb 编译到底层的数据协议 https://github.com/starwing/lua-protobuf/blob/master/README.zh.md buffer 处理 lua string 可以当 buffer 用&#xff0c;# len 不会遇到 0 截断&#xf…

算法leetcode|85. 最大矩形(rust重拳出击)

文章目录 85. 最大矩形&#xff1a;样例 1&#xff1a;样例 2&#xff1a;样例 3&#xff1a;样例 4&#xff1a;样例 5&#xff1a;提示&#xff1a; 分析&#xff1a;题解&#xff1a;rust&#xff1a;go&#xff1a;c&#xff1a;python&#xff1a;java&#xff1a; 85. 最…

Python算法例8 将整数A转换为B

1. 问题描述 给定整数A和B&#xff0c;求出将整数A转换为B&#xff0c;需要改变bit的位数。 2. 问题示例 把31转换为14&#xff0c;需要改变2个bit位&#xff0c;即&#xff1a;&#xff08;31&#xff09;10&#xff08;11111&#xff09;2&#xff0c;&#xff08;14&…

CAN 协议常见面试题总结

0.讲一下CAN通讯的过程 第一段&#xff1a;需要发送的通讯设备&#xff0c;先发送一个显性电平0&#xff0c;告诉其他通讯设备&#xff0c;需要开始通讯。 第二段&#xff1a;就是发送仲裁段&#xff0c;其中包括ID帧和数据帧类型&#xff0c;告诉其他通讯设备&#xff0c;需…

智慧农业:农林牧数据可视化监控平台

数字农业是一种现代农业方式&#xff0c;它将信息作为农业生产的重要元素&#xff0c;并利用现代信息技术进行农业生产过程的实时可视化、数字化设计和信息化管理。能将信息技术与农业生产的各个环节有机融合&#xff0c;对于改造传统农业和改变农业生产方式具有重要意义。 图扑…

Android Studio(项目收获)

取消按钮默认背景色 像按钮默认背景色为深蓝色&#xff0c;即使使用了background属性指定颜色也不能生效。 参考如下的解决方法&#xff1a; 修改/res/values/themes.xml中的指定内容如下&#xff1a; <style name"Theme.TianziBarbecue" parent"Theme.Mater…

OSCP系列靶场-Esay-Dawn

总结 getwebshell → SMB共享无密码 → SMB存在上传功能 → 存在周期执行任务 → SMB上传反弹shell → 被执行获得webshell 提 权 思 路 → suid发现zsh → -p容器提权 准备工作 启动VPN 获取攻击机IP > 192.168.45.163 启动靶机 获取目标机器IP > 192.168.242.11 信…

51单片机-定时计数器

文章目录 前言1 原理2.编程 前言 1 原理 2.编程 定时计算&#xff1a; 50ms501000us 一个机器周期&#xff1a;1.085us 65535 - 501000/1.08546082 故 40082*1.08549998.97 /*定时器1&#xff0c;定时模式 工作模式1 16位计数器&#xff0c; 定时20秒后使能蜂鸣器*/ #include…

5 Tensorflow图像识别(下)模型构建

上一篇&#xff1a;4 Tensorflow图像识别模型——数据预处理-CSDN博客 1、数据集标签 上一篇介绍了图像识别的数据预处理&#xff0c;下面是完整的代码&#xff1a; import os import tensorflow as tf# 获取训练集和验证集目录 train_dir os.path.join(cats_and_dogs_filter…

AI+BI行业数字化转型研讨会 - 总结精华回顾

带您一起观看研讨会精彩内容回顾&#xff01; || 导语 AIBI行业数字化转型研讨会—引领未来&#xff0c;智慧转型 德昂信息技术(北京)有限公司于2023年10月26日成功举办了AIBI行业数字化转型研讨会。此次盛会汇聚了产业精英、企业领袖以及技术专家&#xff0c;共同探讨在快速…

Python的编码规范:PEP 8介绍及基本遵循原则

文章目录 PEP 8简介基本遵循原则1. 缩进2. 行宽3. 空行4. 导入5. 空格6. 命名约定7. 表达式和语句中的空格8. 注释9. 编码声明10. 文档字符串PEP 8简介 PEP 8,或Python Enhancement Proposal 8,是一个官方文档,发布于2001年。它由Guido van Rossum,Python语言的创始人,以…

前端框架Vue学习 ——(二)Vue常用指令

文章目录 常用指令 常用指令 指令: HTML 标签上带有 “v-” 前缀的特殊属性&#xff0c;不同指令具有不同含义。例如: v-if, v-for… 常用指令&#xff1a; v-bind&#xff1a;为 HTML 标签绑定属性值&#xff0c;如设置 href&#xff0c;css 样式等 <a v-bind:href"…

Spark 新特性+核心回顾

Spark 新特性核心 本文来自 B站 黑马程序员 - Spark教程 &#xff1a;原地址 1. 掌握Spark的Shuffle流程 1.1 Spark Shuffle Map和Reduce 在Shuffle过程中&#xff0c;提供数据的称之为Map端&#xff08;Shuffle Write&#xff09;接收数据的称之为Reduce端&#xff08;Sh…

MybatisPlus之新增操作并返回主键ID

在应用mybatisplus持久层框架的项目中&#xff0c;经常遇到执行新增操作后需要获取主键ID的场景&#xff0c;下面将分析及测试过程记录分享出来。 1、MybatisPlus新增方法 持久层新增方法源码如下&#xff1a; public interface BaseMapper<T> extends Mapper<T> …

js处理赎金信

给你两个字符串&#xff1a;ransomNote 和 magazine &#xff0c;判断 ransomNote 能不能由 magazine 里面的字符构成。 如果可以&#xff0c;返回 true &#xff1b;否则返回 false 。 magazine 中的每个字符只能在 ransomNote 中使用一次。 示例 1&#xff1a; 输入&…

自动控制原理--面试问答题

以下文中的&#xff0c;例如 s_1 为 s下角标1。面试加油&#xff01; 控制系统的三要素&#xff1a;稳准快。稳&#xff0c;系统最后不能震荡、发散&#xff0c;一定要收敛于某一个值&#xff1b;快&#xff0c;能够迅速达到系统的预设值&#xff1b;准&#xff0c;最后稳态值…

一台电脑生成两个ssh,绑定两个GitHub账号

背景 一般一台电脑账号生成一个ssh绑定一个GitHub&#xff0c;即一一对应的关系&#xff01;我之前有一个账号也配置了ssh&#xff0c;但是我想经营两个GitHub账号&#xff0c;当我用https url clone新账号的仓库时&#xff0c;直接超时。所以想起了配置ssh。于是有了今天这篇…