Docker搭建kafka+zookeeper以及Springboot集成kafka快速入门

参考文章

【Docker安装部署Kafka+Zookeeper详细教程】_linux arm docker安装kafka-CSDN博客

Docker搭建kafka+zookeeper

打开我们的docker的镜像源配置

vim /etc/docker/daemon.json

配置

 {
  "registry-mirrors": ["https://widlhm9p.mirror.aliyuncs.com"]
}

 下面的那个insecure是我自己虚拟机的,不用理会

拉取镜像

然后开始拉取我们的zookeeper镜像和我们的kafka镜像

这个是我们的zookeeper镜像,没有指定版本默认就是拉取最新的版本

docker pull zookeeper

kafka镜像 

docker pull wurstmeister/kafka

因为我们的docker不同容器之间的网络是互相隔开的,所以我们要创建一个共同使用的网络

让不同容器都加入这个网络

docker network create创建我们的网络

然后那个zookeeper_network是我们自定义的网络名称

docker network create --driver bridge zookeeper_network

kafka是依赖于zookeeper的所以我们要先安装zookeeper

我们先用run来创建一个zookeeper容器

 docker run -d --name zookeeper1  --network zookeeper_network -p 2181:2181   zookeeper

-d 是后台运行

--name 是我们自定义容器的名字  我定义的名字是zookeeper1

--network 

是指定我们的网络环境,我们刚刚创建的网络环境名字叫zookeeper_network,所以我们要让容器加入这个网络

-p 是指定我们的容器暴露给外部的端口  2181:2181是指虚拟机(或服务器)的2181端口与容器内部的2181端口做映射

最后面的那个zookeeper 是我们的使用的镜像源的名称

一般是zookeeper:xxx来执行使用镜像源的版本,如果不指定版本默认用的就是最新版本

查看我们创建的网络环境的地址

docker inspect zookeeper_network

那个IPv4就是我们的网络环境的地址,这是我的网络环境的地址

我的是12.21.0.2,这个ip地址是要记住方便后面使用的
 

创建一个kafka容器

这段代码有点长,根子自己改吧

 # 启动kafka
docker run -d --name kafka1  --network zookeeper_network -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=<zookeeperIP地址>:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<宿主机IP地址>:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092  wurstmeister/kafka

解释 

KAFKA_ZOOKEEPER_CONNECT 后面写的是我们的之前的网络的地址

KAFKA_ADVERTISED_LISTENERS=PLAINTEXT 我们的虚拟机(服务器)的本机的地址

不知道本机地址可以输入 ip addr来查看本机地址

这样子就搭建完成了


SpringBoot集成kafka

首先就是springboot和kafka的版本兼容了

Spring for Apache Kafka

然后我们引入两个kafka的依赖 

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.2.0</version>
</dependency>

自己对着自己的版本来

自己看b站视频,9分钟就搞定了

63-kafka-集成-Java场景-SpringBoot_哔哩哔哩_bilibili

然后开始写我们的application.yml配置文件

下面是配置文件的全部+解析

其实和普通mq差不多

也就是配置生产者和消费者和一些过期时间超时时间

重点在于那个missing-topics-fatal

主题不存在的话,我们是否还要成功启动

我自己的写的默认的主题是test,但是我还没在kafka里面创建,kafka里面还没有这个叫test的主题

所以我启动的时候,报错然后失败了 

spring:kafka:bootstrap-servers: 192.168.88.130:9092  #Kafka 集群的地址和端口号producer:acks: all #生产者发送消息时, Kafka 集群需要确认的确认级别。all 表示需要所有 broker 确认消息已经写入batch-size: 16384  #生产者在发送消息时, 会先缓存一些消息, 达到 batch-size 后再批量发送。这个参数设置了批量发送的大小。buffer-memory: 33554432  #生产者用于缓存消息的内存大小key-serializer: org.apache.kafka.common.serialization.StringSerializer  #定义了消息 key 和 value 的序列化方式。value-serializer: org.apache.kafka.common.serialization.StringSerializer #定义了消息 key 和 value 的序列化方式。retries: 0consumer:group-id: test #消费者组ID#消费方式: 在有提交记录的时候,earliest与latest是一样的,从提交记录的下一条开始消费# earliest:无提交记录,表示从最早的消息开始消费#latest:无提交记录,从最新的消息的下一条开始消费auto-offset-reset: earliest  #当消费者没有提交过 offset 时, 从何处开始消费消息enable-auto-commit: true #是否自动提交偏移量offsetauto-commit-interval: 1s #前提是 enable-auto-commit=true。自动提交 offset 的间隔时间key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  #定义了消息 key 和 value 的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #定义了消息 key 和 value 的反序列化方式max-poll-records: 2  #一次 poll 操作最多返回的消息数量properties:#如果在这个时间内没有收到心跳,该消费者会被踢出组并触发{组再平衡 rebalance}#消费者与 Kafka 服务端的会话超时时间session.timeout.ms: 120000#最大消费时间。此决定了获取消息后提交偏移量的最大时间,超过设定的时间(默认5分钟),服务端也会认为该消费者失效。踢出并再平衡#消费者调用 poll 方法的最大间隔时间max.poll.interval.ms: 300000#消费者发送请求到 Kafka 服务端的超时时间#配置控制客户端等待请求响应的最长时间。#如果在超时之前没有收到响应,客户端将在必要时重新发送请求,#或者如果重试次数用尽,则请求失败。request.timeout.ms: 60000#订阅或分配主题时,允许自动创建主题。0.11之前,必须设置falseallow.auto.create.topics: true#消费者向协调器发送心跳的间隔时间。#poll方法向协调器发送心跳的频率,为session.timeout.ms的三分之一heartbeat.interval.ms: 40000#每个分区里返回的记录最多不超max.partitions.fetch.bytes 指定的字节#0.10.1版本后 如果 fetch 的第一个非空分区中的第一条消息大于这个限制#仍然会返回该消息,以确保消费者可以进行#每个分区最多拉取的消息字节数。#max.partition.fetch.bytes=1048576  #1Mlistener:#当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效#manual_immediate:需要手动调用Acknowledgment.acknowledge()后立即提交#ack-mode: manual_immediate   手动 ACK 的方式。#如果监听的主题不存在, 是否启动失败。missing-topics-fatal: false #如果至少有一个topic不存在,true启动失败。false忽略#消费方式, single 表示单条消费, batch 表示批量消费#type: single #单条消费?批量消费? #批量消费需要配合 consumer.max-poll-recordstype: batch#并发消费的线程数concurrency: 2 #配置多少,就为为每个消费者实例创建多少个线程。多出分区的线程空闲#默认的主题名称template:default-topic: "test"#springboot启动的端口号
server:port: 9999 #这个是java项目启动的端口

基本案例

这是常量类

指定了一个topic和group

主题和分组id

groupid是消费者组的唯一标识

这个视频9分钟看懂kafka

小朋友也可以懂的Kafka入门教程,还不快来学_哔哩哔哩_bilibili

生产者

我们这个Autowired自动注入,会根据我们的配置文件的配置来自动注入

@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;

produces里面指定我们前端传的是json格式

 我们往这个标题发送我们的消息,其实这个就是我们的常量类里面写的"test"

消费者

 @KafkaListener(topics = SpringBootKafkaConfig.TOPIC_TEST, groupId = SpringBootKafkaConfig.GROUP_ID)public void topic_test(List<String> messages, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {for (String message : messages) {//因为这个String是Json,所以我们可以转回Object对象,其实是转成JsonObject对象final JSONObject entries = JSONUtil.parseObj(message);System.out.println(SpringBootKafkaConfig.GROUP_ID + " 消费了: Topic:" + topic + ",Message:" + entries.getStr("name"));//ack.acknowledge();}}

我们用List<String>来接收,因为可能一个消费者接收多条消息

指定消费者监听的主题topic

以及指定消费者的唯一标识GROUP_ID

这些其实都是自己在常量类里面自己写好的



@Header(KafkaHeaders.RECEIVED_TOPIC) String topic

 这个是得到我们的主题topic的名字

我用apifox调试之后,成功执行了


kafka的图形化工具

这里介绍一个免费的开源项目KafkaKing

Releases · Bronya0/Kafka-King (github.com)

里面还能指定中文

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/42783.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue父子组件通信实现模糊搜索功能

我遇到的问题&#xff1a; 我的搜索框在父页面&#xff0c;静态数据都在子页面。怎么实现模糊查询数据&#xff1f; 昨天的尝试&#xff1a;先把搜索的内容数据存到session里&#xff0c;然后从session里拿&#xff0c; 结果&#xff1a;存是存进去了&#xff0c;却拿不到。应…

Django学习收尾

启动项目命令 python manage.py runserver 文件上传功能实现 title "Form上传"if request.method "GET":form UpForm()return render(request, upload_form.html, {"form": form, "title": title})form UpForm(datarequest.POS…

Java对象创建究竟是在栈上还是堆上??

在 Java 中&#xff0c;对象的创建通常情况下是在堆上。 基本数据类型&#xff08;如 byte、short、int、long、float、double、char&#xff09;在方法内声明时&#xff0c;其值会存储在栈上。除了基本数据类型之外的所有对象&#xff0c;都是由 Java 虚拟机&#xff08;JVM&…

python入门基础知识·二

""" # Python介绍 # Python注释 # 单行注释&#xff1a; # # 多行注释&#xff1a; r """""" # Python输出和输入 # print: 输出 # input: 输入 ①会让程序暂停&#xff0c;②得到的是字符串内容 int(&…

Linux Mac 安装Higress 平替 Spring Cloud Gateway

Linux Mac 安装Higress 平替 Spring Cloud Gateway Higress是什么?传统网关分类Higress定位下载安装包执行安装命令执行脚本 安装成功打开管理界面使用方法configure.shreset.shstartup.shshutdown.shstatus.shlogs.sh Higress官网 Higress是什么? Higress是基于阿里内部的…

Vue指令详解与实操运用 - 编程魔法

在Vue.js的世界里&#xff0c;指令就像是一位魔法师&#xff0c;它们能够赋予HTML元素以生命&#xff0c;让网页与用户互动起来。今天&#xff0c;我们就来揭开这些指令的神秘面纱&#xff0c;看看它们是如何在我们的日常开发中发挥作用的。 1. v-text 和 v-html - 文字与内容的…

思考:Java内存模型和硬件内存模型

前言 前一阵在看volatile的原理&#xff0c;看到内存屏障和缓存一致性&#xff0c;发现再往底层挖就挖到了硬件和Java内存模型。这一块是自己似懂非懂的知识区&#xff0c;我一般称之为知识混沌区。因此整理这一篇文章。 什么是内存模型&#xff08;Memory Model&#xff09;…

CentOS6用文件配置IP模板

CentOS6用文件配置IP模板 到 CentOS6.9 , 默认还不能用 systemctl , 能用 service chkconfig sshd on 对应 systemctl enable sshd 启用,开机启动该服务 ### chkconfig sshd on 对应 systemctl enable sshd 启用,开机启动该服务 sudo chkconfig sshd onservice sshd start …

未羽研发测试管理平台

突然有一些觉悟&#xff0c;程序猿不能只会吭哧吭哧的低头做事&#xff0c;应该学会怎么去展示自己&#xff0c;怎么去宣传自己&#xff0c;怎么把自己想做的事表述清楚。 于是&#xff0c;这两天一直在整理自己的作品&#xff0c;也为接下来的找工作多做点准备。接下来…

LT7911UX 国产原装 一拖三 edp 转LVDS 可旋转 可缩放

2.一般说明 该LT7911UX是一种高性能Type-C/DP1.4a到MIPI或LVDS芯片的VR/显示应用。HDCP RX作为HDCP转发器的上游&#xff0c;可以与其他芯片的HDCP TX配合实现转发器功能。 对于DP1.4a输入&#xff0c;LT7911UX可配置为1/2/4通道。自适应均衡使其适用于长电缆应用&#xff0c;最…

Junior.Crypt.2024 CTF Web方向 题解WirteUp 全

Buy a cat 题目描述&#xff1a;Buy a cat 开题 第一思路是抓包改包 Very Secure App 题目描述&#xff1a;All secrets become clear 开题 乱输一个密码就登陆成功了&#xff08;不是弱口令&#xff09; 但是回显Your role is: user 但是有jwt&#xff01;&#xff01;&a…

深入理解基本数据结构:链表详解

引言 在计算机科学中&#xff0c;数据结构是存储、组织和管理数据的方式。链表是一种重要的线性数据结构&#xff0c;广泛应用于各种编程场景。在这篇博客中&#xff0c;我们将详细探讨链表的定义、特点、操作及其在不同编程语言中的实现。 什么是链表&#xff1f; 链表是一种…

Mobile ALOHA前传之VINN, Diffusion Policy和ACT对比

VINNDiffusion PolicyACT核心思想1.从离线数据中自监督学习获得一个视觉编码器&#xff1b;2.基于视觉编码器&#xff0c;从采集的示例操作数据中检索与当前观测图像最相似的N张图像以及对应的动作&#xff1b;3.基于图像编码器的距离对各个动作进行加权平均&#xff0c;获得最…

Open3D loss函数优化的ICP配准算法(精配准)

目录 一、概述 1.1ICP的基本步骤 1.2损失函数的设计 二、代码实现 2.1关键函数 2.2完整代码 三、实现效果 3.1原始点云 3.2配准后点云 3.3计算数据 一、概述 ICP(Iterative Closest Point)配准算法是一种用于对齐两个点云的经典算法。其目标是通过迭代优化…

Istio实战教程:Service Mesh部署与流量管理

引言 Istio是一个开源的服务网格&#xff0c;它提供了一种统一的方法来连接、保护、控制和观察服务。本教程将指导你从零开始部署Istio&#xff0c;并展示如何使用Istio进行基本的流量管理。 环境准备 Kubernetes集群&#xff1a;Istio运行在Kubernetes之上&#xff0c;确保…

W25Q64 Flash存储器与STM32:硬件与软件的完美结合案例

摘要 在嵌入式系统中&#xff0c;数据存储是关键组成部分之一。W25Q64 Flash存储器因其高容量、低功耗和高可靠性&#xff0c;成为STM32微控制器项目中优选的存储解决方案。本文将展示W25Q64与STM32微控制器集成的案例&#xff0c;包括硬件设计、SPI通信协议实现和软件编程策略…

记录在Windows上安装Docker

在Windows上安装Docker时&#xff0c;可以选择使用不同的后端。 其中两个常见的选择是&#xff1a;WSL 2&#xff08;Windows Subsystem for Linux 2&#xff09;和 Hyper-V 后端。此外&#xff0c;还可以选择使用Windows容器。 三者的区别了解即可&#xff0c;推荐用WSL 2&…

我们公司落地大模型的路径、方法和坑

我们公司落地大模型的路径、方法和坑 李木子 AI大模型实验室 2024年07月02日 18:35 北京 最近一年&#xff0c;LLM&#xff08;大型语言模型&#xff09;已经成熟到可以投入实际应用中了。预计到 2025 年&#xff0c;AI 领域的投资会飙升到 2000 亿美元。现在&#xff0c;不只…

Thinking--在应用中添加动态水印,且不可删除

Thinking系列&#xff0c;旨在利用10分钟的时间传达一种可落地的编程思想。 水印是一种用于保护版权和识别内容的技术&#xff0c;通常用于图像、视频或文档中。它可以是文本、图像或两者的组合&#xff0c;通常半透明或以某种方式嵌入到内容中&#xff0c;使其不易被移除或篡改…

【Linux】多线程_2

文章目录 九、多线程2. 线程的控制 未完待续 九、多线程 2. 线程的控制 主线程退出 等同于 进程退出 等同于 所有线程都退出。为了避免主线程退出&#xff0c;但是新线程并没有执行完自己的任务的问题&#xff0c;主线程同样要跟进程一样等待新线程返回。 pthread_join 函数…