部署kafka并通过python操作

目录

      • 一、安装JDK1.8
        • 1、检查服务器是否已安装JDK
        • 2、若已安装JDK,进行卸载
        • 3、更新yum源
        • 4、搜索JDK1.8安装包
        • 5、安装JDK1.8
        • 6、查看是否安装成功
        • 7、配置环境变量
      • 二、安装Kafka
        • 1、下载并解压kafka部署包至/usr/local/目录
        • 2、修改server.properties
        • 3、修改/etc/profile
        • 4、执行/etc/profile
        • 5、启动kafka
        • 6、topic管理
        • 7、生产者管理
        • 8、消费者管理
        • 9、查看group-id
      • 三、python连接kafka
        • 1、安装kafka-python
        • 2、消费消息
        • 3、生产数据

Kafka的安装需要依赖于jdk和zookeeper。(kafka 2.11-1.1.0版本才与JDK1.7兼容,更高版本需要JDK1.8);
2.8之前版本的Kafka需要单独下载zookeeper,2.8及之后的Kafka已经内置了一个zookeeper环境,无需单独下载;

一、安装JDK1.8

1、检查服务器是否已安装JDK
rpm -qa |grep java
rpm -qa |grep jdk
rpm -qa |grep gcj
2、若已安装JDK,进行卸载
rpm -qa | grep java | xargs rpm -e --nodeps
3、更新yum源
yum update -y
4、搜索JDK1.8安装包
yum list java-1.8*
5、安装JDK1.8
yum install java-1.8.0-openjdk* -y
6、查看是否安装成功
java -version
7、配置环境变量
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.412.b08-1.el7_9.x86_64
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=$CLASSPATH:.:${JAVA_HOME}/lib:${JAVA_HOME}/jre/lib
export PATH=${JAVA_HOME}/bin:${JAVA_HOME}/jre/bin:$PATH

二、安装Kafka

1、下载并解压kafka部署包至/usr/local/目录
tar -zxvf kafka_2.12-3.1.1.tgz -C /usr/local/
2、修改server.properties
vim /usr/local/kafka_2.12-3.1.1/config/server.properties修改以下内容
listeners=PLAINTEXT://192.168.15.128:9092
advertised.listeners=PLAINTEXT://192.168.15.128:9092
log.dirs=/data/kafka/logs
zookeeper.connect=localhost:2181(local改成192.168.15.128会报错[2024-12-03 11:17:06,427] INFO [ZooKeeperClient Kafka server] Closing. (kafka.zookeeper.ZooKeeperClient))
3、修改/etc/profile
vim /etc/profile新增:
export KAFKA_HOME=/usr/local/kafka_2.12-3.1.1
export PATH=$KAFKA_HOME/bin:$PATH
4、执行/etc/profile
source /etc/profile
5、启动kafka
先启动zookeeper
/usr/local/kafka_2.12-3.1.1/bin/zookeeper-server-start.sh /usr/local/kafka_2.12-3.1.1/config/zookeeper.properties查看是否启动
netstat -tuln | grep 2181再启动kafka
/usr/local/kafka_2.12-3.1.1/bin/kafka-server-start.sh /usr/local/kafka_2.12-3.1.1/config/server.properties查看是否启动
netstat -tuln | grep 9092
jps #有kafka则为启动后台启动
/usr/local/kafka_2.12-3.1.1/bin/zookeeper-server-start.sh -daemon /usr/local/kafka_2.12-3.1.1/config/zookeeper.properties/usr/local/kafka_2.12-3.1.1/bin/zookeeper-server-start.sh -daemon /usr/local/kafka_2.12-3.1.1/config/server.properties

在这里插入图片描述

在这里插入图片描述

6、topic管理
1. 创建topic
# replication-factor指定副本因子。注意:指定副本因子的时候,不能大于broker实例个数,否则报错
# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytest  # 旧版本创建方式,新版本只有--bootstrap-server 一种创建topic的方式
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --create --bootstrap-server 192.168.15.128:9092 --replication-factor 1 --partitions 1 --topic demo2. 查询topic详情
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --describe --bootstrap-server 192.168.15.128:9092 --topic demo3. 查询所有topic
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --bootstrap-server 192.168.15.128:9092 --list4. 修改topic参数配置
# 注意:partition个数count,只能增加,不能减少
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --bootstrap-server 192.168.15.128:9092 --alter --topic mytest --parti-tions count5. 删除topic
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --bootstrap-server 192.168.15.128:9092 --delete --topic mytest5.1 如果kafaka启动时加载的配置文件中server.properties没有配置delete.topic.enable=true,那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion
输入如下命令查看:
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics --zookeeper 【zookeeper server】 --list 来查看所有topic此时你若想真正删除它,可以如下操作:
(1)登录zookeeper客户端:命令:./bin/zookeeper-client
(2)找到topic所在的目录:ls /brokers/topics
(3)找到要删除的topic,执行命令:rm -r /brokers/topics/【topic name】即可,此时topic被彻底删除。另外被标记为marked for deletion的topic你可以在zookeeper客户端中通过命令获得:ls /admin/delete_topics/【topic name】,如果你删除了此处的topic,那么marked for deletion 标记消失
7、生产者管理
# 新起一个终端,进入kafka解压目录后,输入如下命令。在执行完毕后会进入的编辑器页面,此时任意编辑一个消息之后,消费者那边的终端可以看到,终端中已经打印出了我们刚才发送的消息
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-producer.sh --broker-list 192.168.15.128:9092 --topic demo

在这里插入图片描述

8、消费者管理
1.创建消费者(有非必须参数,分区与consumer之间的关系:一个分区不能分给两个consumer,但是两个分区可以分给一个consumer)
# 下面的命令可以创建一个用于消费topic为mytest的消费者
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.15.128:9092 --topic demo --from-beginning --group testgroup2.从尾部开始取数据,必需要指定分区(指定分区)
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.15.128:9092 --topic demo --offset latest --partition 03.从尾部开始取数据,必需要指定分区(取指定个数)
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.15.128:9092 --topic demo --offset latest --partition 0 --max-messages 1 
9、查看group-id
kafka-consumer-groups.sh --bootstrap-server 192.168.15.128:9092 --list 

三、python连接kafka

1、安装kafka-python
pip3 install kafka-python-ng

在这里插入图片描述

2、消费消息
from kafka import KafkaConsumerkafka_broker = '192.168.15.128:9092'  # 替换为虚拟机的IP和端口# 创建Kafka消费者
consumer = KafkaConsumer('demo', bootstrap_servers=[kafka_broker])for message in consumer:print(message.value)

在这里插入图片描述

3、生产数据
import json
from kafka import KafkaProducer# 指定Kafka代理地址,格式为"host:port"
kafka_broker = '192.168.15.128:9092'  # 替换为虚拟机的IP和端口# 创建Kafka生产者
producer = KafkaProducer(bootstrap_servers=[kafka_broker])
# 发送10条消息
for i in range(10):# 创建一个字典,然后使用json.dumps()将其转换为JSON格式的字符串,并编码为字节串message = json.dumps({'name': 'kafka', 'index': i}).encode('utf-8')producer.send('demo', message)# 如果你需要打印消息内容,可以解码字节串并打印print(message.decode('utf-8'))# 确保所有消息都已发送
producer.flush()

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/63208.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Linux】 进程池 一主多从 管道通信

目录 1.代码介绍 2.channel 类 3.进程池类编写 4.主函数及其他 5. 源码 1.代码介绍 本文代码采用一主多从式(一个主进程(master)多个子进程(worker))通过管道进行通信,实现主进程分发任务&…

Ubuntu环境安装RabbitMQ

1.安装Erlang RabbitMq需要Erlang语⾔的⽀持,在安装rabbitMq之前需要安装erlang # 更新软件包 sudo apt-get update # 安装 erlang sudo apt-get install erlang 查看erlang版本 : erl 退出命令:halt(). 2. 安装RabbitMQ # 更新软件包 sudo apt-get update # 安装 …

集合框架(3)Map

Map接口 现实生活与开发中,我们常会看到这样的一类集合:用户ID与账户信息、学生姓名与考试成绩、IP地址与主机名等,这种一一对应的关系,就称作映射。Java提供了专门的集合框架用来存储这种映射关系的对象,即java.util…

力扣--199.二叉树的右视图

题目 给定一个二叉树的 根节点 root&#xff0c;想象自己站在它的右侧&#xff0c;按照从顶部到底部的顺序&#xff0c;返回从右侧所能看到的节点值。 提示: 二叉树的节点个数的范围是 [0,100] -100 < Node.val < 100 代码 class Solution { public List rightSideV…

Ubuntu Server 22.04.5 LTS重启后IP被重置问题

Ubuntu Server 22.04.5 LTS重启后IP被重置问题 最近在使用Ubuntu Server 22.04做项目开发测试时发现每次重启和关机后&#xff0c;所设置的静态IP地址都会回复到安装系统时所设置的ip Ubuntu Server 22.04 官网下载地址&#xff1a;Ubuntu官方下载地址 对虚拟机下安装Ubuntu感…

Python+OpenCV系列:Python和OpenCV的结合和发展

PythonOpenCV系列&#xff1a;Python和OpenCV的结合和发展 **引言****Python语言的发展****1.1 Python的诞生与发展****1.2 Python的核心特性与优势****1.3 Python的应用领域** **OpenCV的发展****2.1 OpenCV的起源与发展****2.2 OpenCV的功能特性****2.3 OpenCV的应用场景** *…

kube-proxy的iptables工作模式分析

系列文章目录 iptables基础知识 文章目录 系列文章目录前言一、kube-proxy介绍1、kube-proxy三种工作模式2、iptables中k8s相关的链 二、kube-proxy的iptables模式剖析1.集群内部通过clusterIP访问到pod的流程1.1.流程分析 2.从外部访问内部service clusterIP后端pod的流程2.1…

CSMM 软件能力成熟度评估认证补贴政策汇总!

CSMM认证&#xff0c;全称为“软件能力成熟度评估”&#xff0c;也被称作“中国版CMMI认证”。这是中国自主制定的软件能力成熟度评估标准&#xff0c;于2021年6月8日发布。该标准由中国电子技术标准化研究院联合多家产学研用相关单位制定&#xff0c;旨在适合中国国情以及中国…

华为网络设备配置文件备份与恢复(上传、下载、导出,导入)

在日常运维工作中&#xff0c;会经常存在网络割接的情况&#xff0c;为了保证网络割接失败时能重新回退至原有配置&#xff0c;从而不影响原有的办公环境&#xff0c;在网络割接前的备份工作就非常有必要了。 备份方式&#xff1a;FTP 备份技术&#xff1a;PC客户端<---&g…

Linux HTTP代理Squid 基本变更配置及目标白名单方式限制转发

1、文件管理转发白名单 sudo touch /etc/squid/whitelistip sudo touch /etc/squid/whitelistdomain # 目的地ip地址 acl whitelistip dst "/etc/squid/whitelistip" http_access allow whitelistip# 目的地域名限制&#xff0c;可使用.xxx.com 放开整个子域名 acl…

清风数学建模学习笔记——Topsis法

数模评价类&#xff08;2&#xff09;——Topsis法 概述 Topsis:Technique for Order Preference by Similarity to Ideal Solution 也称优劣解距离法&#xff0c;该方法的基本思想是&#xff0c;通过计算每个备选方案与理想解和负理想解之间的距离&#xff0c;从而评估每个…

[软件工程]九.可依赖系统(Dependable Systems)

9.1什么是系统的可靠性&#xff08;reliability&#xff09; 系统的可靠性反映了用户对系统的信任程度。它反映了用户对其能够按照预期运行且正常使用中不会失效的信心程度。 9.2什么是可依赖性&#xff08;dependablity&#xff09;的目的 其目的是覆盖系统的可用性&#x…

减少30%人工处理时间,AI OCR与表格识别助力医疗化验单快速处理

在医疗行业&#xff0c;化验单作为重要的诊断依据和数据来源&#xff0c;涉及大量的文字和表格信息&#xff0c;传统的手工输入和数据处理方式不仅繁琐&#xff0c;而且容易出错&#xff0c;给医院的运营效率和数据准确性带来较大挑战。随着人工智能技术的快速发展&#xff0c;…

Jackson使用实例:将后端返回的 JSON 字段名转换为大写(多种方案详细实例实现)

目录 将返回 JSON 字段名转换为大写背景解决方案1. **局部字段名转换为大写** — 使用 JsonNaming 注解方案概述步骤 2. **全局字段名转换为大写** — 配置 ObjectMapper方案概述步骤 3. **手动指定字段名称** — 使用 JsonProperty 注解方案概述步骤 4. **总结**推荐方案 将返…

汽车一键启动开关 、一键启动按键 、一键启动按钮

‌汽车一键启动按钮是智能汽车的重要部分&#xff0c;通常用于启动和关闭引擎‌。 ‌具体功能‌&#xff1a; ‌启动引擎‌&#xff1a;在许多现代汽车中&#xff0c;一键启动按键取代了传统的钥匙启动方式。只需轻轻按下一键启动按钮&#xff0c;车辆电源即被接通&#xff0c…

.NET用C#导入Excel数据到数据库

将Excel文件中的数据导入到数据库中不仅能够提升数据处理的效率和准确性&#xff0c;还能极大地促进数据分析和决策制定的过程。尤其在企业级应用中&#xff0c;Excel作为数据输入和初步整理的工具非常普遍&#xff0c;但其功能对于复杂查询、大规模数据管理和跨部门的数据共享…

python中数组怎么转换为字符串

1、数组转字符串 #方法1 arr [a,b] str1 .join(arr)#方法2 arr [1,2,3] #str .join(str(i) for i in arr)#此处str命名与str函数冲突&#xff01; str2 .join(str(i) for i in arr) 2、字符串转数组 #方法一 str_x avfg st_list list(str_x) #使用list()#方法二 list_s…

国内管理咨询公司哪家落地辅导做的好?

在当今快速变化的市场环境中&#xff0c;企业面临着前所未有的竞争压力与转型挑战。为了在这场没有硝烟的战争中脱颖而出&#xff0c;许多企业开始寻求外部专业力量的帮助&#xff0c;以期通过科学的管理咨询实现战略升级和业绩突破。而在众多的管理咨询公司中&#xff0c;思博…

前端进阶指南:详解 Source Map 的作用与工作原理,解析.map文件

前言 在前端开发中&#xff0c;代码的压缩与混淆是提升网页性能的常见做法。然而&#xff0c;这种优化措施也带来了调试难度的增加&#xff0c;因为压缩后的代码往往难以阅读和理解。这时&#xff0c;Source Map 技术应运而生&#xff0c;作为连接源代码和构建后代码的桥梁&am…

Cursor vs VSCode:主要区别与优势分析

Cursor - The AI Code Editor 1. AI 集成能力 Cursor的优势 原生AI集成&#xff1a; # Cursor可以直接通过快捷键调用AI # 例如&#xff1a;按下 Ctrl K 可以直接获取代码建议 def complex_function():# 在这里&#xff0c;你可以直接询问AI如何实现功能# AI会直接在编辑器中…