kafka集群搭建-使用zookeeper

1.环境准备:

使用如下3台主机搭建zookeeper集群,由于默认的9092客户端连接端口不在本次使用的云服务器开放端口范围内,故端口改为了8093。
172.2.1.69:8093
172.2.1.70:8093
172.2.1.71:8093

2.下载地址

去官网下载,或者使用如下仓库地址下载,本次使用的时kafka_2.13-3.6.1.tgz ,即3.6.1版本,前面的2.13是scala版本,该版本是较新的版本,可以使用zookeeper,也可以不使用zookeeper搭建集群,本次记录使用了zk,zk集群的部署可以参考上一篇记录。

# 软件包下载地址,可以切到/kafka/路径,选择自己需要的版本
https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgz

3.软件包下载解压

在上面3台服务器上分别执行wget下载,或者本地下载后上传,本次使用的环境为堡垒机接入,如果使用的是宿主机账密登陆,可以下载配置一台,其余使用SCP命令拷贝过去即可。

cd  /usr/local/wget https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgztar -zxvf kafka_2.13-3.6.1.tgzmv kafka_2.13-3.6.1 kafka

4.修改配置

需要修改logs路径的话,可以在/kafka路径下新建logs路径,并配置到server.properties中,这里使用默认的/tmp/kafka-logs路径。

cd kafka
vim conf/server.properties

修改内容如下,

# 每个节点唯一的id,这里.69、.70、.71服务器分别设置为了1、2、3
broker.id=1# 默认为9092,云服务器开放端口问题,改为了8093
port=8093# 上一篇记录博客搭建的zk集群地址
zookeeper.connect=172.2.1.69:8092,172.2.1.70:8092,172.2.1.71:8092# 配置监听访问、绑定地址,这里都是PLAINTEXT协议,不需要认证(相当于内网访问)
listeners=PLAINTEXT://0.0.0.0:8093                                                                                                 
advertised.listeners=PLAINTEXT://172.2.1.71:8093# 日志路径
log.dirs=/tmp/kafka-logs

5.启动kafka集群

分别在每个节点的bin路径下执行启动脚本

# 在3个节点分别执行如下命令,-daemon表示后台启动,不带该参数前台启动
./bin/kafka-server-start.sh -daemon config/server.properties

使用jps命令,或者去kafka启动日志,查看kafka是否启动成功。

6.创建topic`

# 创建topic,在任一节点执行都可以。./bin/kafka-topics.sh --bootstrap-server=172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --create --topic topic-demo --partitions=3 --replication-factor=3
# 查看topic是否创建成功,在任一节点执行
./bin/kafka-topics.sh --bootstrap-server=172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --topic topic-demo --list

7.模拟生产消费消息

需要注意的是网上搜到的一些老的博客kafka命令在高版本中是不再支持的,如:
sh ./bin/kafka-topics.sh --zookeeper=zk集群地址,可能出现命令无法识别:zookeeper is not a recognized option
需要替换为:sh ./bin/kafka-topics.sh --bootstrap-server=kafka集群地址,注意–bootstrap-server后面跟的是kafka集群地址,不是zookeeper地址。

# 在2个节点启动消费者模拟客户端接收消息,在第3个节点启动生产者模拟客户端发送消息
./bin/kafka-console-consumer.sh --bootstrap-server=172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --topic topic-demo
# 在第3个节点启动生产者客户端模拟发送消息:hello
./bin/kafka-console-producer.sh --bootstrap-server=172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --topic topic-demo

生产者客户端发送hello
在这里插入图片描述
此时可以看到2个消费者模拟客户端都受到了消息:hello
在这里插入图片描述

8.集成springboot

坐标如下:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

kafka配置类:

package com.example.kafka.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
//@RefreshScope
public class KafkaConfig {@Value("${xxxx:172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093}")private String kafkaServers;public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);props.put(ProducerConfig.RETRIES_CONFIG, 0);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}public Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);//props.put(ConsumerConfig.GROUP_ID_CONFIG, "0");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "20000");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@BeanConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());return kafkaTemplate;}
}

消费者客户端:

    @KafkaListener(topics = {"topic-demo"},groupId = "test1",properties = {"auto-offset-reset:latest", "enable.auto.commit:true"})public void listen(ConsumerRecord<String, String> consumerRecord) {log.info("consumer Received: " + consumerRecord);}

生产者发送消息:

@RestController
@RequiredArgsConstructor
@RequestMapping("producer")
public class ProducerController {private final KafkaTemplate<String, String> kafkaTemplate;@PostMapping(path = "/sendCommonMsg")public String sendCommonMsg(String topic, String msg) {ListenableFuture<SendResult<String, String>> hello_kafka = this.kafkaTemplate.send(topic, "hello kafka");SendResult<String, String> sendResult = hello_kafka.completable().join();System.out.println(sendResult);return "send topic: " + topic + ", msg: " + msg;}
}

发送测试:
在这里插入图片描述

消费者可以接收到消息:

consumer Received: ConsumerRecord(topic = topic-demo, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1721720091071, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)

9.IDEA客户端工具

可以使用kafkalytic工具本地开发环境可视化操作kafka服务器,如查看topic,创建topic

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/48248.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Mysql的主从复制(重要)和读写分离(理论重要实验不重要)

一、主从复制&#xff1a;架构一般是一主两从。 1.主从复制的模式&#xff1a; mysql默认模式为异步模式&#xff1a;主库在更新完事务之后会立即把结果返回给从服务器&#xff0c;并不关心从库是否接收到以及从库是否处理成功。缺点&#xff1a;网络问题没有同步、防火墙的等…

vue3-video-play 导入 以及解决报错

npm install vue3-video-play --save # 或者 yarn add vue3-video-play import Vue3VideoPlay from vue3-video-play; import vue3-video-play/dist/style.css; app.use(Vue3VideoPlay) <template><div id"main-container-part"><div class"al…

Meta发布最强AI模型,扎克伯格公开信解释为何支持开源?

凤凰网科技讯 北京时间7月24日&#xff0c;脸书母公司Meta周二发布了最新大语言模型Llama 3.1&#xff0c;这是该公司目前为止推出的最强大开源模型&#xff0c;号称能够比肩OpenAI等公司的私有大模型。与此同时&#xff0c;Meta CEO马克扎克伯格(Mark Zuckerberg)发表公开信&a…

opencv grabCut前景后景分割去除背景

参考&#xff1a; https://zhuanlan.zhihu.com/p/523954762 https://docs.opencv.org/3.4/d8/d83/tutorial_py_grabcut.html 环境本次&#xff1a; python 3.10 提取前景&#xff1a; 1、需要先把前景物体框出来 需要坐标信息&#xff0c;可以用windows自带的画图简单提取像素…

Concat() Function-SQL-字符串拼接函数

Concat() Function-SQL 在SQL中&#xff0c;CONCAT() 函数用于将两个或多个字符串连接在一起。 不同数据库管理系统可能有些许差异&#xff0c;但基本用法和语法通常是相似的。 语法 CONCAT(string1, string2, ...)string1, string2, …: 这些是需要连接的字符串参数。可以…

【时序约束】读懂用好Timing_report

一、静态时序分析&#xff1a; 静态时序分析&#xff08;Static Timing Analysis&#xff09;简称 STA&#xff0c;采用穷尽的分析方法来提取出整个电路存在的所有时序路径&#xff0c;计算信号在这些路径上的传播延时&#xff0c;检查信号的建立和保持时间是否满足时序要求&a…

定时器+外部中断实现NEC红外线协议解码

一、前言 1.1 功能介绍 随着科技的进步和人们生活水平的提高&#xff0c;红外遥控器已经成为了日常生活中不可或缺的电子设备之一&#xff0c;广泛应用于电视、空调、音响等多种家电产品中。 传统的红外遥控器通常只能实现预设的有限功能&#xff0c;无法满足用户对设备更加智…

TCP客户端connect断线重连

文章目录 TCP客户端connect断线重连1、为什么要断线重连2、实现代码 TCP客户端connect断线重连 1、为什么要断线重连 客户端会面临服务器崩溃的情况&#xff0c;我们可以试着写一个客户端重连的代码&#xff0c;模拟并理解一些客户端行为&#xff0c;比如游戏客户端等. 考虑到…

实战篇(十二):如何使用 Processing 创建一个多功能的简易吃豆人游戏

如何使用 Processing 创建一个多功能的简易吃豆人游戏 文章目录 如何使用 Processing 创建一个多功能的==简易==吃豆人游戏引言准备工作第一步:设置基本框架第二步:创建 Pacman 类第三步:创建 Obstacle 类第四步:添加分数系统第五步:运行游戏完整代码结论参考资料引言 吃…

STL常用算法——常用查找算法

自定义类型都要用仿函数判断 1.find() class Person { public:Person(string name,int age){this->m_Name name;this->m_Age age;}bool operator(const Person &p)//重载operator{if (this->m_Name p.m_Name && this->m_Age p.m_Age){return true;…

NVIDIA 全面转向开源 GPU 内核模块

NVIDIA 全面转向开源 GPU 内核模块 文章目录 NVIDIA 全面转向开源 GPU 内核模块支持的 GPU安装程序更改使用带有 CUDA 元包的包管理器 使用运行文件使用安装帮助脚本包管理器详细信息dnf&#xff1a;Red Hat Enterprise Linux、Fedora、Kylin、Amazon Linux 或 Rocky Linuxzypp…

网络安全等级保护:什么是网络安全等级保护?(非常详细)零基础入门到精通,收藏这一篇就够了

关键词&#xff1a; 网络安全等级保护 等级保护 网络 信息系统 旧话重提&#xff0c;一直以来&#xff0c;我们不断强调“等级保护”制度是我国的网络安全领域的基本制度、基本策略和基本方法&#xff0c;是促进信息化健康发展&#xff0c;维护国家安全、社会秩序和公共利益的…

数字图像处理中的常用特殊矩阵及MATLAB应用

一、前言 Matlab的名称来源于“矩阵实验室&#xff08;Matrix Laboratory&#xff09;”&#xff0c;其对矩阵的操作具有先天性的优势&#xff08;特别是相对于C语言的数组来说&#xff09;。在数字图像处理中&#xff0c;为了提高编程效率&#xff0c;我们可以使用多种方式来创…

Mysql数据库和Sql语句

数据库管理&#xff1a; sql语句&#xff1a;数据库用来增删改查的语句&#xff08;重要&#xff09; 备份&#xff1a;数据库的数据进行备份 主从复制、读写分离、高可用&#xff08;重要&#xff09; Mysql数据库和Sql语句 一、Mysql数据库 1、数据库&#xff1a;组织、…

Java基础(四) 内部类详解

Java 内部类详解 一. 内部类概述 内部类是嵌套在类内部进行定义的类&#xff0c;其外部的类则被称为外部类&#xff1b;按照内部类的定义位置&#xff0c;内部类可进一步划分为成员内部类、静态内部类、局部内部类和匿名内部类四种类型。内部类的出现实际上是进一步丰富了类的…

Modbus转BACnet/IP网关的技术实现与应用

引言 随着智能建筑和工业自动化的快速发展&#xff0c;不同通信协议之间的数据交换也变得日益重要。Modbus和BACnet/IP是两种广泛应用于自动化领域的通信协议&#xff0c;Modbus以其简单性和灵活性被广泛用于工业自动化&#xff0c;而BACnet/IP则在楼宇自动化系统中占据主导地…

Android APP 音视频(03)CameraX预览与MediaCodec编码

说明&#xff1a; 此CameraX预览和编码实操主要针对Android12.0系统。通过CameraX预览获取yuv格式数据&#xff0c;将yuv格式数据通过mediacodec编码输出H264码流&#xff08;使用ffmpeg播放&#xff09;&#xff0c;存储到sd卡上。 1 CameraX 和 MediaCodec简介 1.1 CameraX…

“微软蓝屏”事件,给IT行业带来的宝贵经验和教训

“微软蓝屏”事件是指2024年7月19日发生的一次全球性技术故障&#xff0c;主要涉及微软视窗&#xff08;Windows&#xff09;操作系统及其相关应用和服务。 以下是对该事件的详细解析&#xff1a; 一、事件概述 发生时间&#xff1a;2024年7月19日事件影响&#xff1a;全球多个…

【科学文献计量】中国知网(CNKI) 文献素材库生成软件详细使用说明

CNKI 文献素材库生成软件制作 1 背景2 使用步骤2.1 文献检索2.2 文献导出2.3 软件生成1 背景 在进行中文文献的综述时,往往是要借助中国知网(CNKI)文献检索平台,写作插入文献时会用Endnote软件进行辅助。因此就有需求:对于CNKI检索的结果直接导出到本地,第一是方便快速阅…

基于STM32的农业大棚温湿度采集控制系统的设计

目录 1、设计要求 2、系统功能 3、演示视频和实物 4、系统设计框图 5、软件设计流程图 6、原理图 7、主程序 8、总结 &#x1f91e;大家好&#xff0c;这里是5132单片机毕设设计项目分享&#xff0c;今天给大家分享的是智能教室。 设备的详细功能见网盘中的文章《8、基…