Docker搭建kafka+zookeeper以及Springboot集成kafka快速入门

参考文章

【Docker安装部署Kafka+Zookeeper详细教程】_linux arm docker安装kafka-CSDN博客

Docker搭建kafka+zookeeper

打开我们的docker的镜像源配置

vim /etc/docker/daemon.json

配置

 {
  "registry-mirrors": ["https://widlhm9p.mirror.aliyuncs.com"]
}

 下面的那个insecure是我自己虚拟机的,不用理会

拉取镜像

然后开始拉取我们的zookeeper镜像和我们的kafka镜像

这个是我们的zookeeper镜像,没有指定版本默认就是拉取最新的版本

docker pull zookeeper

kafka镜像 

docker pull wurstmeister/kafka

因为我们的docker不同容器之间的网络是互相隔开的,所以我们要创建一个共同使用的网络

让不同容器都加入这个网络

docker network create创建我们的网络

然后那个zookeeper_network是我们自定义的网络名称

docker network create --driver bridge zookeeper_network

kafka是依赖于zookeeper的所以我们要先安装zookeeper

我们先用run来创建一个zookeeper容器

 docker run -d --name zookeeper1  --network zookeeper_network -p 2181:2181   zookeeper

-d 是后台运行

--name 是我们自定义容器的名字  我定义的名字是zookeeper1

--network 

是指定我们的网络环境,我们刚刚创建的网络环境名字叫zookeeper_network,所以我们要让容器加入这个网络

-p 是指定我们的容器暴露给外部的端口  2181:2181是指虚拟机(或服务器)的2181端口与容器内部的2181端口做映射

最后面的那个zookeeper 是我们的使用的镜像源的名称

一般是zookeeper:xxx来执行使用镜像源的版本,如果不指定版本默认用的就是最新版本

查看我们创建的网络环境的地址

docker inspect zookeeper_network

那个IPv4就是我们的网络环境的地址,这是我的网络环境的地址

我的是12.21.0.2,这个ip地址是要记住方便后面使用的
 

创建一个kafka容器

这段代码有点长,根子自己改吧

 # 启动kafka
docker run -d --name kafka1  --network zookeeper_network -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=<zookeeperIP地址>:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<宿主机IP地址>:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092  wurstmeister/kafka

解释 

KAFKA_ZOOKEEPER_CONNECT 后面写的是我们的之前的网络的地址

KAFKA_ADVERTISED_LISTENERS=PLAINTEXT 我们的虚拟机(服务器)的本机的地址

不知道本机地址可以输入 ip addr来查看本机地址

这样子就搭建完成了


SpringBoot集成kafka

首先就是springboot和kafka的版本兼容了

Spring for Apache Kafka

然后我们引入两个kafka的依赖 

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.2.0</version>
</dependency>

自己对着自己的版本来

自己看b站视频,9分钟就搞定了

63-kafka-集成-Java场景-SpringBoot_哔哩哔哩_bilibili

然后开始写我们的application.yml配置文件

下面是配置文件的全部+解析

其实和普通mq差不多

也就是配置生产者和消费者和一些过期时间超时时间

重点在于那个missing-topics-fatal

主题不存在的话,我们是否还要成功启动

我自己的写的默认的主题是test,但是我还没在kafka里面创建,kafka里面还没有这个叫test的主题

所以我启动的时候,报错然后失败了 

spring:kafka:bootstrap-servers: 192.168.88.130:9092  #Kafka 集群的地址和端口号producer:acks: all #生产者发送消息时, Kafka 集群需要确认的确认级别。all 表示需要所有 broker 确认消息已经写入batch-size: 16384  #生产者在发送消息时, 会先缓存一些消息, 达到 batch-size 后再批量发送。这个参数设置了批量发送的大小。buffer-memory: 33554432  #生产者用于缓存消息的内存大小key-serializer: org.apache.kafka.common.serialization.StringSerializer  #定义了消息 key 和 value 的序列化方式。value-serializer: org.apache.kafka.common.serialization.StringSerializer #定义了消息 key 和 value 的序列化方式。retries: 0consumer:group-id: test #消费者组ID#消费方式: 在有提交记录的时候,earliest与latest是一样的,从提交记录的下一条开始消费# earliest:无提交记录,表示从最早的消息开始消费#latest:无提交记录,从最新的消息的下一条开始消费auto-offset-reset: earliest  #当消费者没有提交过 offset 时, 从何处开始消费消息enable-auto-commit: true #是否自动提交偏移量offsetauto-commit-interval: 1s #前提是 enable-auto-commit=true。自动提交 offset 的间隔时间key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  #定义了消息 key 和 value 的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #定义了消息 key 和 value 的反序列化方式max-poll-records: 2  #一次 poll 操作最多返回的消息数量properties:#如果在这个时间内没有收到心跳,该消费者会被踢出组并触发{组再平衡 rebalance}#消费者与 Kafka 服务端的会话超时时间session.timeout.ms: 120000#最大消费时间。此决定了获取消息后提交偏移量的最大时间,超过设定的时间(默认5分钟),服务端也会认为该消费者失效。踢出并再平衡#消费者调用 poll 方法的最大间隔时间max.poll.interval.ms: 300000#消费者发送请求到 Kafka 服务端的超时时间#配置控制客户端等待请求响应的最长时间。#如果在超时之前没有收到响应,客户端将在必要时重新发送请求,#或者如果重试次数用尽,则请求失败。request.timeout.ms: 60000#订阅或分配主题时,允许自动创建主题。0.11之前,必须设置falseallow.auto.create.topics: true#消费者向协调器发送心跳的间隔时间。#poll方法向协调器发送心跳的频率,为session.timeout.ms的三分之一heartbeat.interval.ms: 40000#每个分区里返回的记录最多不超max.partitions.fetch.bytes 指定的字节#0.10.1版本后 如果 fetch 的第一个非空分区中的第一条消息大于这个限制#仍然会返回该消息,以确保消费者可以进行#每个分区最多拉取的消息字节数。#max.partition.fetch.bytes=1048576  #1Mlistener:#当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效#manual_immediate:需要手动调用Acknowledgment.acknowledge()后立即提交#ack-mode: manual_immediate   手动 ACK 的方式。#如果监听的主题不存在, 是否启动失败。missing-topics-fatal: false #如果至少有一个topic不存在,true启动失败。false忽略#消费方式, single 表示单条消费, batch 表示批量消费#type: single #单条消费?批量消费? #批量消费需要配合 consumer.max-poll-recordstype: batch#并发消费的线程数concurrency: 2 #配置多少,就为为每个消费者实例创建多少个线程。多出分区的线程空闲#默认的主题名称template:default-topic: "test"#springboot启动的端口号
server:port: 9999 #这个是java项目启动的端口

基本案例

这是常量类

指定了一个topic和group

主题和分组id

groupid是消费者组的唯一标识

这个视频9分钟看懂kafka

小朋友也可以懂的Kafka入门教程,还不快来学_哔哩哔哩_bilibili

生产者

我们这个Autowired自动注入,会根据我们的配置文件的配置来自动注入

@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;

produces里面指定我们前端传的是json格式

 我们往这个标题发送我们的消息,其实这个就是我们的常量类里面写的"test"

消费者

 @KafkaListener(topics = SpringBootKafkaConfig.TOPIC_TEST, groupId = SpringBootKafkaConfig.GROUP_ID)public void topic_test(List<String> messages, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {for (String message : messages) {//因为这个String是Json,所以我们可以转回Object对象,其实是转成JsonObject对象final JSONObject entries = JSONUtil.parseObj(message);System.out.println(SpringBootKafkaConfig.GROUP_ID + " 消费了: Topic:" + topic + ",Message:" + entries.getStr("name"));//ack.acknowledge();}}

我们用List<String>来接收,因为可能一个消费者接收多条消息

指定消费者监听的主题topic

以及指定消费者的唯一标识GROUP_ID

这些其实都是自己在常量类里面自己写好的



@Header(KafkaHeaders.RECEIVED_TOPIC) String topic

 这个是得到我们的主题topic的名字

我用apifox调试之后,成功执行了


kafka的图形化工具

这里介绍一个免费的开源项目KafkaKing

Releases · Bronya0/Kafka-King (github.com)

里面还能指定中文

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/42783.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux Mac 安装Higress 平替 Spring Cloud Gateway

Linux Mac 安装Higress 平替 Spring Cloud Gateway Higress是什么?传统网关分类Higress定位下载安装包执行安装命令执行脚本 安装成功打开管理界面使用方法configure.shreset.shstartup.shshutdown.shstatus.shlogs.sh Higress官网 Higress是什么? Higress是基于阿里内部的…

思考:Java内存模型和硬件内存模型

前言 前一阵在看volatile的原理&#xff0c;看到内存屏障和缓存一致性&#xff0c;发现再往底层挖就挖到了硬件和Java内存模型。这一块是自己似懂非懂的知识区&#xff0c;我一般称之为知识混沌区。因此整理这一篇文章。 什么是内存模型&#xff08;Memory Model&#xff09;…

CentOS6用文件配置IP模板

CentOS6用文件配置IP模板 到 CentOS6.9 , 默认还不能用 systemctl , 能用 service chkconfig sshd on 对应 systemctl enable sshd 启用,开机启动该服务 ### chkconfig sshd on 对应 systemctl enable sshd 启用,开机启动该服务 sudo chkconfig sshd onservice sshd start …

未羽研发测试管理平台

突然有一些觉悟&#xff0c;程序猿不能只会吭哧吭哧的低头做事&#xff0c;应该学会怎么去展示自己&#xff0c;怎么去宣传自己&#xff0c;怎么把自己想做的事表述清楚。 于是&#xff0c;这两天一直在整理自己的作品&#xff0c;也为接下来的找工作多做点准备。接下来…

LT7911UX 国产原装 一拖三 edp 转LVDS 可旋转 可缩放

2.一般说明 该LT7911UX是一种高性能Type-C/DP1.4a到MIPI或LVDS芯片的VR/显示应用。HDCP RX作为HDCP转发器的上游&#xff0c;可以与其他芯片的HDCP TX配合实现转发器功能。 对于DP1.4a输入&#xff0c;LT7911UX可配置为1/2/4通道。自适应均衡使其适用于长电缆应用&#xff0c;最…

Junior.Crypt.2024 CTF Web方向 题解WirteUp 全

Buy a cat 题目描述&#xff1a;Buy a cat 开题 第一思路是抓包改包 Very Secure App 题目描述&#xff1a;All secrets become clear 开题 乱输一个密码就登陆成功了&#xff08;不是弱口令&#xff09; 但是回显Your role is: user 但是有jwt&#xff01;&#xff01;&a…

记录在Windows上安装Docker

在Windows上安装Docker时&#xff0c;可以选择使用不同的后端。 其中两个常见的选择是&#xff1a;WSL 2&#xff08;Windows Subsystem for Linux 2&#xff09;和 Hyper-V 后端。此外&#xff0c;还可以选择使用Windows容器。 三者的区别了解即可&#xff0c;推荐用WSL 2&…

我们公司落地大模型的路径、方法和坑

我们公司落地大模型的路径、方法和坑 李木子 AI大模型实验室 2024年07月02日 18:35 北京 最近一年&#xff0c;LLM&#xff08;大型语言模型&#xff09;已经成熟到可以投入实际应用中了。预计到 2025 年&#xff0c;AI 领域的投资会飙升到 2000 亿美元。现在&#xff0c;不只…

Thinking--在应用中添加动态水印,且不可删除

Thinking系列&#xff0c;旨在利用10分钟的时间传达一种可落地的编程思想。 水印是一种用于保护版权和识别内容的技术&#xff0c;通常用于图像、视频或文档中。它可以是文本、图像或两者的组合&#xff0c;通常半透明或以某种方式嵌入到内容中&#xff0c;使其不易被移除或篡改…

【Linux】多线程_2

文章目录 九、多线程2. 线程的控制 未完待续 九、多线程 2. 线程的控制 主线程退出 等同于 进程退出 等同于 所有线程都退出。为了避免主线程退出&#xff0c;但是新线程并没有执行完自己的任务的问题&#xff0c;主线程同样要跟进程一样等待新线程返回。 pthread_join 函数…

算法学习笔记(8.2)-动态规划入门进阶

目录 问题判断: 问题求解步骤&#xff1a; 图例&#xff1a; 解析&#xff1a; 方法一&#xff1a;暴力搜索 实现代码如下所示&#xff1a; 解析&#xff1a; 方法二&#xff1a;记忆化搜索 代码示例&#xff1a; 解析&#xff1a; 方法三&#xff1a;动态规划 空间…

Qt入门(二):Qt的基本组件

目录 Designer程序面板 1、布局Layout 打破布局 贴合窗口 2、QWidget的属性 3、Qlabel标签 显示图片 4、QAbstractButton 按钮类 按钮组 5、QLineEdit 单行文本输入框 6、ComboBox 组合框 7、若干与数字相关的组件 Designer程序面板 Qt包含了一个Designer程序 &…

Django 更新数据 save()方法

1&#xff0c;添加模型 Test/app11/models.py from django.db import modelsclass Post(models.Model):title models.CharField(max_length200)content models.TextField()pub_date models.DateTimeField(date published)class Book(models.Model):title models.CharFie…

Spring Boot集成grpc快速入门demo

1.什么是GRPC&#xff1f; gRPC 是一个高性能、开源、通用的RPC框架&#xff0c;由Google推出&#xff0c;基于HTTP2协议标准设计开发&#xff0c;默认采用Protocol Buffers数据序列化协议&#xff0c;支持多种开发语言。gRPC提供了一种简单的方法来精确的定义服务&#xff0c…

UE5.3-基础蓝图类整理一

常用蓝图类整理&#xff1a; 1、获取当前关卡名&#xff1a;Get Current LevelName 2、通过关卡名打开关卡&#xff1a;Open Level(by name) 3、碰撞检测事件&#xff1a;Event ActorBeginOverlap 4、获取当前player&#xff1a;Get Player Pawn 5、判断是否相等&#xff1…

WEB安全基础:网络安全常用术语

一、攻击类别 漏洞&#xff1a;硬件、软件、协议&#xff0c;代码层次的缺陷。 后⻔&#xff1a;方便后续进行系统留下的隐蔽后⻔程序。 病毒&#xff1a;一种可以自我复制并传播&#xff0c;感染计算机和网络系统的恶意软件(Malware)&#xff0c;它能损害数据、系统功能或拦…

C++语言学习精简笔记(包含C++20特性)

目录 1 C新语法C与CC编译运行String编程范式C基础类型**自动类型推导**统一对象初始化&#xff1a;Uniform Initialization 控制结构if语句for语句switch语句namespace 2 函数函数声明形式参数函数参数传递的选择函数返回值的选择 函数重载 Lambda表达式函数的定义和申明生存期…

【一】m2芯片的mac中安装ubuntu24虚拟机集群

文章目录 1. 虚拟机配置2. 复制虚拟机2.1 修改主机名2.2 修改网络 1. 虚拟机配置 在官方网站下载好ubuntu24-arm版镜像开始安装&#xff0c;安装使用VMWare Fusion的社区免费授权版,使用一台m2芯片的mac电脑作为物理机平台。 为什么选择ubuntu24&#xff1f;因为centOS7目前已…

Proteus + Keil单片机仿真教程(五)多位LED数码管的静态显示

Proteus + Keil单片机仿真教程(五)多位LED数码管 上一章节讲解了单个数码管的静态和动态显示,这一章节将对多个数码管的静态显示进行学习,本章节主要难点: 1.锁存器的理解和使用; 2.多个数码管的接线封装方式; 3.Proteus 快速接头的使用。 第一个多位数码管示例 元件…

『C + ⒈』‘\‘

&#x1f942;在反斜杠(\)有⒉种最常用的功能如下所示&#x1f44b; #define _CRT_SECURE_NO_WARNINGS 1 #include<stdio.h> int main(void) {int a 10;int b 20;int c 30;if (a 10 &&\b 20 &&\c 30){printf("Your print\n");}else{prin…