SpringBoot整合Kafka (一)

📑前言

本文主要讲了SpringBoot整合Kafka文章⛺️

🎬作者简介:大家好,我是青衿🥇
☁️博客首页:CSDN主页放风讲故事
🌄每日一句:努力一点,优秀一点

在这里插入图片描述

目录

文章目录

  • 📑前言
  • **目录**
    • 一、介绍
    • 二、主要功能
    • 三、Kafka基本概念
    • 四、Spring Boot整合Kafka的demo
      • 1、构建项目
        • 1.1、引入依赖
        • 1.2、YML配置
        • 1.3、生产者简单生产
        • 1.4、消费者简单消费
      • 2、生产者\n\n
        • 2.1、Kafka生产者消息监听
        • 2.2、生产者写入分区策略
          • 指定写入分区
          • 根据key写入分区
          • 随机选择分区
        • 2.3、带回调的生产者
  • 📑文章末尾![在这里插入图片描述](https://img-blog.csdnimg.cn/e3b1caa28e7a4ce08cb3f1c3b27c4eb6.png)


一、介绍

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景,比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目

二、主要功能

1.消息系统: Kafka 和传统的消息系统(也称作消息中间件)都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。
2.存储系统: Kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失的风险。也正是得益于 Kafka 的消息持久化功能和多副本机制,我们可以把 Kafka 作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能即可。
3.日志收集:一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种\nconsumer,例如hadoop、Hbase、Solr等。

三、Kafka基本概念

kafka是一个分布式的,分区的消息(官方称之为commit log)服务。它提供一个消息系统应该具备的功能,但是确有着独特的设计。首先,让我们来看一下基础的消息(Message)相关术语:
Broker
消息中间件处理节点,一个Kafka节点就是一个broker,一 个或者多个Broker可以组成一个Kafka集群
Topic
Kafka根据topic对消息进行归类,发布到Kafka集群的每条 消息都需要指定一个topic
Producer
消息生产者,向Broker发送消息的客户端 Consumer 消息消费者,从Broker读取消息的客户端
ConsumerGroup
每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个ConsumerGroup中只能有一个Consumer能够消费该消息
Partition
物理上的概念,一个topic可以分为多个partition,每个 partition内部消息是有序的在这里插入图片描述

四、Spring Boot整合Kafka的demo

1、构建项目

1.1、引入依赖
    <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
1.2、YML配置
spring:kafka:bootstrap-servers: 192.168.147.200:9092 # 设置 Kafka Broker 地址。如果多个,使用逗号分隔。producer: # 消息提供者key和value序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerretries: 3 # 生产者发送失败时,重试发送的次数consumer: # 消费端反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: demo # 用来唯一标识consumer进程所在组的字符串,如果设置同样的group id,表示这些processes都是属于同一个consumer group,默认:""
1.3、生产者简单生产
@Autowired
private KafkaTemplate kafkaTemplate;@Test
void contextLoads() {ListenableFuture listenableFuture = kafkaTemplate.send("test01-topic", "Hello Wolrd test");System.out.println("发送完成");
}
1.4、消费者简单消费
@Component
public class TopicConsumer {@KafkaListener(topics = "test01-topic")public void readMsg(String msg){System.out.println("msg = " + msg);}
}

2、生产者\n\n

2.1、Kafka生产者消息监听
@Component
public class KafkaSendResultHandler implements ProducerListener {private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);@Overridepublic void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {log.info("Message send success : " + producerRecord.toString());}@Overridepublic void onError(ProducerRecord producerRecord, Exception exception) {log.info("Message send error : " + producerRecord.toString());}
}
2.2、生产者写入分区策略

我们知道,kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。

指定写入分区

100条数据全部写入到2号分区

for (int i = 0; i < 100; i++) {ListenableFuture listenableFuture = kafkaTemplate.send("demo03-topic",2,null, "toString");listenableFuture.addCallback(new ListenableFutureCallback() {@Overridepublic void onFailure(Throwable ex) {System.out.println("发送失败");}@Overridepublic void onSuccess(Object result) {SendResult sendResult = (SendResult) result;int partition = sendResult.getRecordMetadata().partition();String topic = sendResult.getRecordMetadata().topic();System.out.println("topic:"+topic+",分区:" + partition);}});Thread.sleep(1000);
}
根据key写入分区
String key = "kafka-key";
System.out.println("根据key计算分区:" + (key.hashCode() % 3));
for (int i = 0; i < 10; i++) {ListenableFuture listenableFuture = kafkaTemplate.send("demo03-topic", key, "toString");listenableFuture.addCallback(new ListenableFutureCallback() {@Overridepublic void onFailure(Throwable ex) {System.out.println("发送失败");}@Overridepublic void onSuccess(Object result) {SendResult sendResult = (SendResult) result;int partition = sendResult.getRecordMetadata().partition();String topic = sendResult.getRecordMetadata().topic();System.out.println("topic:" + topic + ",分区:" + partition);}});Thread.sleep(1000);
}
随机选择分区
 for (int i = 0; i < 10; i++) {ListenableFuture listenableFuture = kafkaTemplate.send("demo03-topic", "toString");listenableFuture.addCallback(new ListenableFutureCallback() {@Overridepublic void onFailure(Throwable ex) {System.out.println("发送失败");}@Overridepublic void onSuccess(Object result) {SendResult sendResult = (SendResult) result;int partition = sendResult.getRecordMetadata().partition();String topic = sendResult.getRecordMetadata().topic();System.out.println("topic:" + topic + ",分区:" + partition);}});Thread.sleep(1000);}
2.3、带回调的生产者

kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理

for (int i = 0; i < 100; i++) {ListenableFuture listenableFuture = kafkaTemplate.send("demo03-topic",2,null, "toString");listenableFuture.addCallback(new ListenableFutureCallback() {@Overridepublic void onFailure(Throwable ex) {System.out.println("发送失败");}@Overridepublic void onSuccess(Object result) {SendResult sendResult = (SendResult) result;int partition = sendResult.getRecordMetadata().partition();String topic = sendResult.getRecordMetadata().topic();System.out.println("topic:"+topic+",分区:" + partition);}});Thread.sleep(1000);
}

以上是简单的Spring Boot整合kafka的示例,可以根据自己的实际需求进行调整。

📑文章末尾在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/219306.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Python】计算最少排班人数(2)

接前一文章 部分关键代码及方法&#xff1a; 程序界面代码&#xff1a; winTk() win.config(bg#F2F2D7) win.geometry(550x440) win.title(最优排班计算器)frameFrame(win,width500,height60,bg#F2F2D7) frame.grid(row0,column0,columnspan5) l1Label(frame,text选择文件&a…

JetBrains2023年度报告,编程领域的风向标

前言 JetBrains是一家位于捷克的软件开发公司&#xff0c;有很多知名的开发IDE都是他们家的&#xff0c;比如IntelliJ IDEA、CLion、PyCharm、WebStorm等等&#xff0c;还有Kotlin编程语言也是JetBrains开发的&#xff0c;后来成为Android官方开发语言。 自2017年JetBrains发…

JVM之jmap java内存映射工具

jmap java内存映射工具 1、jmap jdk安装后会自带一些小工具&#xff0c;jmap命令(Memory Map for Java)是其中之一。主要用于打印指定Java进程(或核 心文件、远程调试服务器)的共享对象内存映射或堆内存细节。 jmap命令可以获得运行中的jvm的堆的快照&#xff0c;从而可以离…

使用 Timm 库替换 YOLOv8 主干网络 | 1000+ 主干融合YOLOv8

文章目录 前言版本差异说明替换方法parse_moedl( ) 方法_predict_once( ) 方法修改 yaml ,加载主干论文引用timm 是一个包含最先进计算机视觉模型、层、工具、优化器、调度器、数据加载器、数据增强和训练/评估脚本的库。 该库内置了 700 多个预训练模型,并且设计灵活易用。…

经典策略筛选-20231213

策略1&#xff1a; 龙头战法只做最强&#xff1a;国企改革 ----四川金顶 1、十日交易内出现 涨停或 &#xff08;涨幅大于7个点且量比大于3&#xff09; 2、JDK MACD RSI OBV LWR MTM 六指标共振 3、均线多头 4、 筹码峰 &#xff08;锁仓&#xff09; 5、现价> 五日均…

OkHttp: 拦截器和事件监听器

文章目录 1. 拦截器1. 拦截器链2. 实际案例1. 注册为应用拦截器2. 注册为网络拦截器 3. 如何选择用哪种拦截器1. 应用拦截器2. 网络层拦截器3. 重写请求4. 重写响应 4. 可用性 2. 事件监听器1. 请求的生命周期2. EventListener使用案例3. EventListener.Factory4. 调用失败的请…

【LeetCode】28. 找出字符串中第一个匹配项的下标 【字符串单模匹配:KMP算法】

题目链接 Python3 直觉解法 class Solution:def strStr(self, haystack: str, needle: str) -> int:pn, ph 0, 0n len(needle) h len(haystack)while ph < h:if haystack[ph] needle[pn]:if pn n-1: # 1234 123return ph - len(needle) 1else: pn 1ph 1else:…

【LeetCode刷题】-- 163.缺失的区间

163.缺失的区间 class Solution {public List<List<Integer>> findMissingRanges(int[] nums, int lower, int upper) {List<List<Integer>> res new ArrayList<>();for(int num : nums){if(lower < num){res.add(Arrays.asList(lower,num -…

c语言函数与指针

//本文有待补充。 一、函数 1.函数的定义 如果程序的逻辑比较复杂、代码量比较大&#xff0c;或者重复性功能比较多&#xff0c;那么全部写在主函数里就会显得十分冗长和杂乱。为了使代码更加简洁、思路更加清晰&#xff0c;C语言提供了”函数“。函数是一个实现一定功能的语…

dockerfile创建镜像 lNMP+wordpress

dockerfile创建镜像 lNMPwordpress nginx dockernginx mysql dockermysql php dockerphp nginx vim nginx.conf vim Dockerfile docker network create --subnet172.17.0.0/16 --opt "com.docker.network.bridge.name""docker1" mynetwork docker buil…

最新Applestore建立其他地区账号简单快捷一看就会

1、首先打开创建appleid网站 2、点击创建你的Apple ID开始创建&#xff08;这里以美国为例&#xff09; 电话号码可以填大陆手机号即可 这两个选项建议不要勾选 3、更改付款方式 3.1点击付款与配送 3.2添加付款方式&#xff0c;这里是最重要的一步&#xff0c;传统方法已经无法…

Mac电脑投屏AirServer 2024怎么下载安装激活许可期限

对于那些想要将 iPhone、iPad 或其他 iOS 设备上的小屏幕镜像到计算机上的大屏幕的人来说&#xff0c;AirPlay 是一个很好的工具。 基于此&#xff0c;AirServer 非常需要将您的 Mac 或 PC 变成 AirPlay 设备。 但是如何使用计算机上的设置对 iPhone 等 iOS 设备进行屏幕镜像&a…

Ubuntu 22安装PHP环境

参考博客为《练习 0&#xff08;2/2&#xff09;&#xff1a;Ubuntu 环境下安装PHP&#xff08;PHP-FPM&#xff09;》和《原生态Ubuntu部署LAMP环境 PHP8.1MySQLApache》 sudo apt-get install -y php7.4想要安装php7.4&#xff0c;发现安装的是php8.1。 完成如下图&#xf…

超声波测距HC-SR04模块的简单应用

文章目录 一、HC-SR04HC-SR04是什么&#xff1f;HC-SR04测距的原理 二、使用步骤1.硬件最远探测距离调节硬件连接 2.软件1.初始化配置代码如下&#xff08;示例&#xff09;&#xff1a;引脚初始化定时器初始化 2.引脚输入输出配置代码如下&#xff08;示例&#xff09;&#x…

【linux系统】用户功能与权限详细总结

前言 菜某的笔记总结&#xff0c;有错误还请指正。 linux用户的概念与root用户 这么理解&#xff1a;一台电脑有多个操作者&#xff0c;每个操作者只能无限制操作自己文件夹中的东西&#xff0c;其他地方的操作需要给与相应权限才能操作。 root用户&#xff1a;就是最高级的…

【Java系列】详解多线程(二)——Thread类及常见方法(上篇)

个人主页&#xff1a;兜里有颗棉花糖 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 兜里有颗棉花糖 原创 收录于专栏【Java系列专栏】【JaveEE学习专栏】 本专栏旨在分享学习Java的一点学习心得&#xff0c;欢迎大家在评论区交流讨论&#x1f48c; 目录 一…

Proxmox创建CentOS虚拟机

文章目录 下载ISO安装文件上传创建虚拟机启动虚拟机设置DNS CentOS配置国内安装源备份原有安装源下载更新国内源清理yum缓存制作新配置文件缓存 下载ISO安装文件 下载地址&#xff1a;https://www.xitongzhijia.net/ 也可去官网进行下载 上传 下面介绍直接通过页面上传&…

【论文阅读笔记】M3Care: Learning with Missing Modalities in Multimodal Healthcare Data

本文介绍了一种名为“MCare”的模型&#xff0c;旨在处理多模态医疗保健数据中的缺失模态问题。这个模型是端到端的&#xff0c;能够补偿病人缺失模态的信息&#xff0c;以执行临床分析。MCare不是生成原始缺失数据&#xff0c;而是在潜在空间中估计缺失模态的任务相关信息&…

idea一些报错

java: 非法字符: \ufeff 使用IDEA修改文件编码 在IDEA右下角&#xff0c;将编码改为GBK&#xff0c;再转为UTF-8&#xff0c;重新启动项目。具体步骤如下&#xff1a; 在IDEA右下角找到UTF-8字样的编码格式设计项&#xff0c;点击选择第一项GBK&#xff0c;然后Convert&#xf…

JVM虚拟机系统性学习-对象的创建流程及对象的访问定位

对象的创建流程与内存分配 对象创建流程如下&#xff1a; Java 中新创建的对象如何分配空间呢&#xff1f; new 的对象先放 Eden 区&#xff08;如果是大对象&#xff0c;直接放入老年代&#xff09;当 Eden 区满了之后&#xff0c;程序还需要创建对象&#xff0c;则垃圾回收…