使用Linux部署Kafka教程

目录

一、部署Zookeeper

1 拉取Zookeeper镜像

2 运行Zookeeper

二、部署Kafka

1 拉取Kafka镜像

2 运行Kafka

三、验证是否部署成功

1 进入到kafka容器中

2 创建topic 生产者

3 生产者发送消息

4 消费者消费消息

四、搭建kafka管理平台

五、SpringBoot整合Kafka 

1、导入依赖

2、修改配置

3、生产者

 4、消费者

5、测试发送消息

 6、测试收到消息


一、部署Zookeeper

1 拉取Zookeeper镜像

docker pull wurstmeister/zookeeper
  • 1

2 运行Zookeeper

docker run --restart=always --name zookeeper \
--log-driver json-file \
--log-opt max-size=100m \
--log-opt max-file=2  \
-p 2181:2181 \
-v /etc/localtime:/etc/localtime \
-d wurstmeister/zookeeper

二、部署Kafka

1 拉取Kafka镜像

docker pull wurstmeister/kafka

2 运行Kafka

docker run --restart=always --name kafka \
--log-driver json-file \
--log-opt max-size=100m \
--log-opt max-file=2 \-p 9092:9092 \-e KAFKA_BROKER_ID=0 \-e KAFKA_ZOOKEEPER_CONNECT=192.168.8.102:2181 \-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.8.102:9092 \-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \-v /etc/localtime:/etc/localtime \-d wurstmeister/kafka

参数说明:
-e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
-e KAFKA_ZOOKEEPER_CONNECT=172.16.0.13:2181/kafka 配置zookeeper管理kafka的路径172.16.0.13:2181/kafka
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.0.13:9092 把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP,类如Java程序访问出现无法连接。
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口
-v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间

三、验证是否部署成功

1 进入到kafka容器中

docker exec -it kafka /bin/sh

2 创建topic 生产者

cd opt/kafka_2.13-2.8.1bin/kafka-topics.sh --create --zookeeper 192.168.8.102:2181 --replication-factor 1 --partitions 1 --topic partopic

在这里插入图片描述

3 生产者发送消息

bin/kafka-console-producer.sh --broker-list 192.168.8.102:9092 --topic partopic

在这里插入图片描述

4 消费者消费消息

  • 新打开个ssh窗口
  • 跟前面步骤一样进入到容器
bin/kafka-console-consumer.sh --bootstrap-server 192.168.8.102:9092 --topic partopic --from-beginning

在这里插入图片描述

四、搭建kafka管理平台

 docker search kafdrop

docker run -d --rm  -p 9000:9000 \-e JVM_OPTS="-Xms32M -Xmx64M" \-e KAFKA_BROKERCONNECT=<host:port,host:port> \-e SERVER_SERVLET_CONTEXTPATH="/" \obsidiandynamics/kafdrop<host:port,host:port> 为 外网集群地址 多个用逗号分隔 例如xxx.xxx.xxx.xxx:9092,yyy.yyy.yyy.yyy:9092 尖角号不留上面的命令是百度的以下是我自己尝试的
docker run -d --name kafdrop -p 9001:9001 \-e JVM_OPTS="-Xms32M -Xmx64M -Dserver.port=9001" \-e KAFKA_BROKERCONNECT=192.168.58.130:9092 \-e SERVER_SERVLET_CONTEXTPATH="/" \obsidiandynamics/kafdrop因为我docker启动了其他东西占用了9001端口,而这个kafdrop其实就是一个springboot项目,以jar命令的形式启动

访问地址:Kafdrop: Broker List 

五、SpringBoot整合Kafka 

1、导入依赖

        <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

2、修改配置

spring:kafka:bootstrap-servers: 192.168.58.130:9092 #部署linux的kafka的ip地址和端口号producer:# 发生错误后,消息重发的次数。retries: 1#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 设置生产者内存缓冲区的大小。buffer-memory: 33554432# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。acks: 1consumer:# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5Dauto-commit-interval: 1S# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录auto-offset-reset: earliest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: false# 键的反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:# 在侦听器容器中运行的线程数。concurrency: 5#listner负责ack,每调用一次,就立即commitack-mode: manual_immediatemissing-topics-fatal: false

本次测试:linux地址:192.168.58.130

spring.kafka.bootstrap-servers=192.168.58.130:9092

advertised.listeners=192.168.58.130:9092

3、生产者

import com.alibaba.fastjson.JSON;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;/*** 事件的生产者*/
@Slf4j
@Component
public class KafkaProducer {@Autowiredpublic KafkaTemplate kafkaTemplate;/** 主题 */public static final String TOPIC_TEST = "Test";/** 消费者组 */public static final String TOPIC_GROUP = "test-consumer-group";public void send(Object obj){String obj2String = JSON.toJSONString(obj);log.info("准备发送消息为:{}",obj2String);//发送消息ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, obj);//回调future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable ex) {//发送失败的处理log.info(TOPIC_TEST + " - 生产者 发送消息失败:" + ex.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> result) {//成功的处理log.info(TOPIC_TEST + " - 生产者 发送消息成功:" + result.toString());}});}}

 4、消费者

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.util.Optional;/*** 事件消费者*/
@Component
public class KafkaConsumer {private Logger logger = LoggerFactory.getLogger(org.apache.kafka.clients.consumer.KafkaConsumer.class);@KafkaListener(topics = KafkaProducer.TOPIC_TEST,groupId = KafkaProducer.TOPIC_GROUP)public void topicTest(ConsumerRecord<?,?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){Optional<?> message = Optional.ofNullable(record.value());if (message.isPresent()) {Object msg = message.get();logger.info("topic_test 消费了: Topic:" + topic + ",Message:" + msg);ack.acknowledge();}}
}

5、测试发送消息

@Testvoid kafkaTest(){kafkaProducer.send("Hello Kafka");}

 6、测试收到消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/54327.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

大彩串口屏使用记录

写在最前面 屏幕型号 DC10600M070 IDE VisualTFT&#xff08;官方&#xff09; VSCode&#xff08;lua编程&#xff09; 用之前看一下官方那个1小时的视频教程就大概懂控件怎么用了&#xff0c;用官方的软件VisualTFT很简单 本文只是简单记录遇到的一些坑 lua编辑器 VisualTF…

内嵌功能强大、低功耗STM32WB55CEU7、STM32WB55CGU7 射频微控制器 - MCU, 48-UFQFN

一、概述&#xff1a; STM32WB55xx多协议无线和超低功耗器件内嵌功能强大的超低功耗无线电模块&#xff08;符合蓝牙 低功耗SIG规范5.0和IEEE 802.15.4-2011标准&#xff09;。该器件内含专用的Arm Cortex -M0&#xff0c;用于执行所有的底层实时操作。这些器件基于高性能Arm …

TensorFlow中slim包的具体用法

TensorFlow中slim包的具体用法 1、训练脚本文件&#xff08;该文件包含数据下载打包、模型训练&#xff0c;模型评估流程&#xff09;3、模型训练1、数据集相关模块&#xff1a;2、设置网络模型模块3、数据预处理模块4、定义损失loss5、定义优化器模块 本次使用的TensorFlow版本…

Redis五大数据类型

Redis五大数据类型 Redis-Key 官网&#xff1a;https://www.redis.net.cn/order/ 序号命令语法描述1DEL key该命令用于在 key 存在时删除 key2DUMP key序列化给定 key &#xff0c;并返回被序列化的值3EXISTS key检查给定 key 是否存在&#xff0c;存在返回1&#xff0c;否则返…

yolov8热力图可视化

安装pytorch_grad_cam pip install grad-cam自动化生成不同层的bash脚本 # 循环10次&#xff0c;将i的值从0到9 for i in $(seq 0 13) doecho "Running iteration $i";python yolov8_heatmap.py $i; done热力图生成python代码 import warnings warnings.filterwarn…

vscode流程图插件使用

vscode流程图插件使用 1.在vscode中点击左下角设置然后选择扩展。 2.在扩展中搜索Draw.io Integration&#xff0c;安装上面第一个插件。 3.安装插件后在工程中创建一个后缀为drawio的文件并且双击打开即可绘制流程图

2023-08-26 LeetCode每日一题(汇总区间)

2023-08-26每日一题 一、题目编号 228. 汇总区间二、题目链接 点击跳转到题目位置 三、题目描述 给定一个 无重复元素 的 有序 整数数组 nums 。 返回 恰好覆盖数组中所有数字 的 最小有序 区间范围列表 。也就是说&#xff0c;nums 的每个元素都恰好被某个区间范围所覆盖…

如何在地图上寻找最密集点的位置?

最近我在工作中遇到了一个小的需求点&#xff0c;大概是需要在地图上展示出一堆点中的点密度最密集的位置。最开始没想到好的方法&#xff0c;就使用了一个非常简单的策略——所有点的坐标求平均值&#xff0c;这个方法大部分的时候好用&#xff0c;因为大部分城市所有点位基本…

深度学习4. 循环神经网络 – Recurrent Neural Network | RNN

目录 循环神经网络 – Recurrent Neural Network | RNN 为什么需要 RNN &#xff1f;独特价值是什么&#xff1f; RNN 的基本原理 RNN 的优化算法 RNN 到 LSTM – 长短期记忆网络 从 LSTM 到 GRU RNN 的应用和使用场景 总结 百度百科维基百科 循环神经网络 – Recurre…

【手写promise——基本功能、链式调用、promise.all、promise.race】

文章目录 前言一、前置知识二、实现基本功能二、实现链式调用三、实现Promise.all四、实现Promise.race总结 前言 关于动机&#xff0c;无论是在工作还是面试中&#xff0c;都会遇到Promise的相关使用和原理&#xff0c;手写Promise也有助于学习设计模式以及代码设计。 本文主…

WPF基础入门-Class5-WPF命令

WPF基础入门 Class5-WPF命令 1、xaml编写一个button&#xff0c;Command绑定一个命令 <Grid><ButtonWidth"100"Height"40" Command"{Binding ShowCommand}"></Button> </Grid>2、编写一个model.cs namespace WPF_Le…

【LeetCode-面试经典150题-day15】

目录 104.二叉树的最大深度 100.相同的树 226.翻转二叉树 101.对称二叉树 105.从前序与中序遍历序列构造二叉树 106.从中序与后序遍历序列构造二叉树 117.填充每个节点的下一个右侧节点指针Ⅱ 104.二叉树的最大深度 题意&#xff1a; 给定一个二叉树 root &#xff0c;返回其…

STM32F103 4G Cat.1模块EC200S使用

一、简介 EC200S-CN 是移远通信最近推出的 LTE Cat 1 无线通信模块&#xff0c;支持最大下行速率 10Mbps 和最大上行速率 5Mbps&#xff0c;具有超高的性价比&#xff1b;同时在封装上兼容移远通信多网络制式 LTE Standard EC2x&#xff08;EC25、EC21、EC20 R2.0、EC20 R2.1&a…

用大白话来讲讲多线程的知识架构

感觉多线程的知识又多又杂&#xff0c;自从接触java&#xff0c;就在一遍一遍捋脉络和深入学习。现在将这次的学习成果展示如下。 什么是多线程&#xff1f; 操作系统运行一个程序&#xff0c;就是一个线程。同时运行多个程序&#xff0c;就是多线程。即在同一时间&#xff0…

基于FPGA的Lorenz混沌系统verilog开发,含testbench和matlab辅助测试程序

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 将vivado的仿真结果导入到matlab显示三维混沌效果&#xff1a; 2.算法运行软件版本 vivado2019.2 matlab2022a 3.部分核心程序 testbench如下所…

npm常用命令 + 前端常用的包管理工具 以及 npm淘宝镜像配置等

npm常用命令 前端常用的包管理工具 以及 npm淘宝镜像配置等 1. 前言1.1 NodeJs的下载安装1.2 windows上1.3 常用包管理工具 2. npm2.1 npm 的安装2.2 npm初始化包2.3 npm 安装、卸载包2.3.1 非全局安装2.3.1.1 单个包的安装2.3.1.1.1 默认版本安装2.3.1.1.2 指定版本安装 2.3.…

解除用户账户控制提醒

解决用户账户控制提醒 1. 前言2. 解决用户账户控制提醒2.1 控制面板2.2 注册表2.3 UAC服务 结束语 1. 前言 当我们使用电脑时&#xff0c;有时进行安装应用或者打开应用时&#xff0c;总会弹出一个提示框&#xff0c;要选择点击是否允许程序运行&#xff1b; 系统经常弹出用户…

【Git】测试持续集成——Git+Gitee+PyCharm

文章目录 概述一、使用Gitee1. 注册账号2. 绑定邮箱3. 新建仓库4. 查看项目地址 二、安装配置Git1. 下载安装包2. 校验是否安装成功。3. 配置Git4. Git命令5. Git实操 三、PyCharmGit1. 配置Git2. Clone项目3. 提交文件到服务器4. 从服务器拉取文件 概述 持续集成&#xff08;…

【javaweb】学习日记Day4 - Maven 依赖管理 Web入门

目录 一、Maven入门 - 管理和构建java项目的工具 1、IDEA如何构建Maven项目 2、Maven 坐标 &#xff08;1&#xff09;定义 &#xff08;2&#xff09;主要组成 3、IDEA如何导入和删除项目 二、Maven - 依赖管理 1、依赖配置 2、依赖传递 &#xff08;1&#xff09;查…

Docker容器学习:Dockerfile制作Web应用系统nginx镜像

目录 编写Dockerfile 1.文件内容需求&#xff1a; 2.编写Dockerfile&#xff1a; 3.开始构建镜像 4.现在我们运行一个容器&#xff0c;查看我们的网页是否可访问 推送镜像到私有仓库 1.把要上传的镜像打上合适的标签 2.登录harbor仓库 3.上传镜像 编写Dockerfile 1.文…