部署kafka并通过python操作

目录

      • 一、安装JDK1.8
        • 1、检查服务器是否已安装JDK
        • 2、若已安装JDK,进行卸载
        • 3、更新yum源
        • 4、搜索JDK1.8安装包
        • 5、安装JDK1.8
        • 6、查看是否安装成功
        • 7、配置环境变量
      • 二、安装Kafka
        • 1、下载并解压kafka部署包至/usr/local/目录
        • 2、修改server.properties
        • 3、修改/etc/profile
        • 4、执行/etc/profile
        • 5、启动kafka
        • 6、topic管理
        • 7、生产者管理
        • 8、消费者管理
        • 9、查看group-id
      • 三、python连接kafka
        • 1、安装kafka-python
        • 2、消费消息
        • 3、生产数据

Kafka的安装需要依赖于jdk和zookeeper。(kafka 2.11-1.1.0版本才与JDK1.7兼容,更高版本需要JDK1.8);
2.8之前版本的Kafka需要单独下载zookeeper,2.8及之后的Kafka已经内置了一个zookeeper环境,无需单独下载;

一、安装JDK1.8

1、检查服务器是否已安装JDK
rpm -qa |grep java
rpm -qa |grep jdk
rpm -qa |grep gcj
2、若已安装JDK,进行卸载
rpm -qa | grep java | xargs rpm -e --nodeps
3、更新yum源
yum update -y
4、搜索JDK1.8安装包
yum list java-1.8*
5、安装JDK1.8
yum install java-1.8.0-openjdk* -y
6、查看是否安装成功
java -version
7、配置环境变量
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.412.b08-1.el7_9.x86_64
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=$CLASSPATH:.:${JAVA_HOME}/lib:${JAVA_HOME}/jre/lib
export PATH=${JAVA_HOME}/bin:${JAVA_HOME}/jre/bin:$PATH

二、安装Kafka

1、下载并解压kafka部署包至/usr/local/目录
tar -zxvf kafka_2.12-3.1.1.tgz -C /usr/local/
2、修改server.properties
vim /usr/local/kafka_2.12-3.1.1/config/server.properties修改以下内容
listeners=PLAINTEXT://192.168.15.128:9092
advertised.listeners=PLAINTEXT://192.168.15.128:9092
log.dirs=/data/kafka/logs
zookeeper.connect=localhost:2181(local改成192.168.15.128会报错[2024-12-03 11:17:06,427] INFO [ZooKeeperClient Kafka server] Closing. (kafka.zookeeper.ZooKeeperClient))
3、修改/etc/profile
vim /etc/profile新增:
export KAFKA_HOME=/usr/local/kafka_2.12-3.1.1
export PATH=$KAFKA_HOME/bin:$PATH
4、执行/etc/profile
source /etc/profile
5、启动kafka
先启动zookeeper
/usr/local/kafka_2.12-3.1.1/bin/zookeeper-server-start.sh /usr/local/kafka_2.12-3.1.1/config/zookeeper.properties查看是否启动
netstat -tuln | grep 2181再启动kafka
/usr/local/kafka_2.12-3.1.1/bin/kafka-server-start.sh /usr/local/kafka_2.12-3.1.1/config/server.properties查看是否启动
netstat -tuln | grep 9092
jps #有kafka则为启动后台启动
/usr/local/kafka_2.12-3.1.1/bin/zookeeper-server-start.sh -daemon /usr/local/kafka_2.12-3.1.1/config/zookeeper.properties/usr/local/kafka_2.12-3.1.1/bin/zookeeper-server-start.sh -daemon /usr/local/kafka_2.12-3.1.1/config/server.properties

在这里插入图片描述

在这里插入图片描述

6、topic管理
1. 创建topic
# replication-factor指定副本因子。注意:指定副本因子的时候,不能大于broker实例个数,否则报错
# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytest  # 旧版本创建方式,新版本只有--bootstrap-server 一种创建topic的方式
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --create --bootstrap-server 192.168.15.128:9092 --replication-factor 1 --partitions 1 --topic demo2. 查询topic详情
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --describe --bootstrap-server 192.168.15.128:9092 --topic demo3. 查询所有topic
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --bootstrap-server 192.168.15.128:9092 --list4. 修改topic参数配置
# 注意:partition个数count,只能增加,不能减少
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --bootstrap-server 192.168.15.128:9092 --alter --topic mytest --parti-tions count5. 删除topic
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --bootstrap-server 192.168.15.128:9092 --delete --topic mytest5.1 如果kafaka启动时加载的配置文件中server.properties没有配置delete.topic.enable=true,那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion
输入如下命令查看:
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics --zookeeper 【zookeeper server】 --list 来查看所有topic此时你若想真正删除它,可以如下操作:
(1)登录zookeeper客户端:命令:./bin/zookeeper-client
(2)找到topic所在的目录:ls /brokers/topics
(3)找到要删除的topic,执行命令:rm -r /brokers/topics/【topic name】即可,此时topic被彻底删除。另外被标记为marked for deletion的topic你可以在zookeeper客户端中通过命令获得:ls /admin/delete_topics/【topic name】,如果你删除了此处的topic,那么marked for deletion 标记消失
7、生产者管理
# 新起一个终端,进入kafka解压目录后,输入如下命令。在执行完毕后会进入的编辑器页面,此时任意编辑一个消息之后,消费者那边的终端可以看到,终端中已经打印出了我们刚才发送的消息
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-producer.sh --broker-list 192.168.15.128:9092 --topic demo

在这里插入图片描述

8、消费者管理
1.创建消费者(有非必须参数,分区与consumer之间的关系:一个分区不能分给两个consumer,但是两个分区可以分给一个consumer)
# 下面的命令可以创建一个用于消费topic为mytest的消费者
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.15.128:9092 --topic demo --from-beginning --group testgroup2.从尾部开始取数据,必需要指定分区(指定分区)
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.15.128:9092 --topic demo --offset latest --partition 03.从尾部开始取数据,必需要指定分区(取指定个数)
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.15.128:9092 --topic demo --offset latest --partition 0 --max-messages 1 
9、查看group-id
kafka-consumer-groups.sh --bootstrap-server 192.168.15.128:9092 --list 

三、python连接kafka

1、安装kafka-python
pip3 install kafka-python-ng

在这里插入图片描述

2、消费消息
from kafka import KafkaConsumerkafka_broker = '192.168.15.128:9092'  # 替换为虚拟机的IP和端口# 创建Kafka消费者
consumer = KafkaConsumer('demo', bootstrap_servers=[kafka_broker])for message in consumer:print(message.value)

在这里插入图片描述

3、生产数据
import json
from kafka import KafkaProducer# 指定Kafka代理地址,格式为"host:port"
kafka_broker = '192.168.15.128:9092'  # 替换为虚拟机的IP和端口# 创建Kafka生产者
producer = KafkaProducer(bootstrap_servers=[kafka_broker])
# 发送10条消息
for i in range(10):# 创建一个字典,然后使用json.dumps()将其转换为JSON格式的字符串,并编码为字节串message = json.dumps({'name': 'kafka', 'index': i}).encode('utf-8')producer.send('demo', message)# 如果你需要打印消息内容,可以解码字节串并打印print(message.decode('utf-8'))# 确保所有消息都已发送
producer.flush()

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/63208.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Linux】 进程池 一主多从 管道通信

目录 1.代码介绍 2.channel 类 3.进程池类编写 4.主函数及其他 5. 源码 1.代码介绍 本文代码采用一主多从式(一个主进程(master)多个子进程(worker))通过管道进行通信,实现主进程分发任务&…

Ubuntu环境安装RabbitMQ

1.安装Erlang RabbitMq需要Erlang语⾔的⽀持,在安装rabbitMq之前需要安装erlang # 更新软件包 sudo apt-get update # 安装 erlang sudo apt-get install erlang 查看erlang版本 : erl 退出命令:halt(). 2. 安装RabbitMQ # 更新软件包 sudo apt-get update # 安装 …

力扣--199.二叉树的右视图

题目 给定一个二叉树的 根节点 root&#xff0c;想象自己站在它的右侧&#xff0c;按照从顶部到底部的顺序&#xff0c;返回从右侧所能看到的节点值。 提示: 二叉树的节点个数的范围是 [0,100] -100 < Node.val < 100 代码 class Solution { public List rightSideV…

Ubuntu Server 22.04.5 LTS重启后IP被重置问题

Ubuntu Server 22.04.5 LTS重启后IP被重置问题 最近在使用Ubuntu Server 22.04做项目开发测试时发现每次重启和关机后&#xff0c;所设置的静态IP地址都会回复到安装系统时所设置的ip Ubuntu Server 22.04 官网下载地址&#xff1a;Ubuntu官方下载地址 对虚拟机下安装Ubuntu感…

kube-proxy的iptables工作模式分析

系列文章目录 iptables基础知识 文章目录 系列文章目录前言一、kube-proxy介绍1、kube-proxy三种工作模式2、iptables中k8s相关的链 二、kube-proxy的iptables模式剖析1.集群内部通过clusterIP访问到pod的流程1.1.流程分析 2.从外部访问内部service clusterIP后端pod的流程2.1…

华为网络设备配置文件备份与恢复(上传、下载、导出,导入)

在日常运维工作中&#xff0c;会经常存在网络割接的情况&#xff0c;为了保证网络割接失败时能重新回退至原有配置&#xff0c;从而不影响原有的办公环境&#xff0c;在网络割接前的备份工作就非常有必要了。 备份方式&#xff1a;FTP 备份技术&#xff1a;PC客户端<---&g…

清风数学建模学习笔记——Topsis法

数模评价类&#xff08;2&#xff09;——Topsis法 概述 Topsis:Technique for Order Preference by Similarity to Ideal Solution 也称优劣解距离法&#xff0c;该方法的基本思想是&#xff0c;通过计算每个备选方案与理想解和负理想解之间的距离&#xff0c;从而评估每个…

[软件工程]九.可依赖系统(Dependable Systems)

9.1什么是系统的可靠性&#xff08;reliability&#xff09; 系统的可靠性反映了用户对系统的信任程度。它反映了用户对其能够按照预期运行且正常使用中不会失效的信心程度。 9.2什么是可依赖性&#xff08;dependablity&#xff09;的目的 其目的是覆盖系统的可用性&#x…

减少30%人工处理时间,AI OCR与表格识别助力医疗化验单快速处理

在医疗行业&#xff0c;化验单作为重要的诊断依据和数据来源&#xff0c;涉及大量的文字和表格信息&#xff0c;传统的手工输入和数据处理方式不仅繁琐&#xff0c;而且容易出错&#xff0c;给医院的运营效率和数据准确性带来较大挑战。随着人工智能技术的快速发展&#xff0c;…

汽车一键启动开关 、一键启动按键 、一键启动按钮

‌汽车一键启动按钮是智能汽车的重要部分&#xff0c;通常用于启动和关闭引擎‌。 ‌具体功能‌&#xff1a; ‌启动引擎‌&#xff1a;在许多现代汽车中&#xff0c;一键启动按键取代了传统的钥匙启动方式。只需轻轻按下一键启动按钮&#xff0c;车辆电源即被接通&#xff0c…

.NET用C#导入Excel数据到数据库

将Excel文件中的数据导入到数据库中不仅能够提升数据处理的效率和准确性&#xff0c;还能极大地促进数据分析和决策制定的过程。尤其在企业级应用中&#xff0c;Excel作为数据输入和初步整理的工具非常普遍&#xff0c;但其功能对于复杂查询、大规模数据管理和跨部门的数据共享…

python中数组怎么转换为字符串

1、数组转字符串 #方法1 arr [a,b] str1 .join(arr)#方法2 arr [1,2,3] #str .join(str(i) for i in arr)#此处str命名与str函数冲突&#xff01; str2 .join(str(i) for i in arr) 2、字符串转数组 #方法一 str_x avfg st_list list(str_x) #使用list()#方法二 list_s…

国内管理咨询公司哪家落地辅导做的好?

在当今快速变化的市场环境中&#xff0c;企业面临着前所未有的竞争压力与转型挑战。为了在这场没有硝烟的战争中脱颖而出&#xff0c;许多企业开始寻求外部专业力量的帮助&#xff0c;以期通过科学的管理咨询实现战略升级和业绩突破。而在众多的管理咨询公司中&#xff0c;思博…

Cursor vs VSCode:主要区别与优势分析

Cursor - The AI Code Editor 1. AI 集成能力 Cursor的优势 原生AI集成&#xff1a; # Cursor可以直接通过快捷键调用AI # 例如&#xff1a;按下 Ctrl K 可以直接获取代码建议 def complex_function():# 在这里&#xff0c;你可以直接询问AI如何实现功能# AI会直接在编辑器中…

python+selenium的八大定位方式

1.id定位 元素的id属 driver.find_element_by_id(By.ID,"username")2.name定位 driver.find_element_by_id(By.NAME,"username")#一个login_btn_list webdriver.find_elements(By.CLASS_NAME,)#多个元素组成的列表&#xff0c; login_btn_list[1].click…

RTCMultiConnection 跨域问题解决

js套件地址 https://github.com/muaz-khan/RTCMultiConnection server套件地址 https://github.com/muaz-khan/RTCMultiConnection-Server 要解决的就是server代码的跨域问题 原装写法&#xff1a; 解决写法&#xff1a; // 喜欢组合语法的自己组 const io new ioServer.S…

【JavaEE】多线程(6)

一、用户态与内核态 【概念】 用户态是指用户程序运行时的状态&#xff0c;在这种状态下&#xff0c;CPU只能执行用户态下的指令&#xff0c;并且只能访问受限的内存空间 内核态是操作系统内核运行时的状态&#xff0c;内核是计算机系统的核心部分&#xff0c;CPU可以执行所有…

Ajax:回忆与节点

一点回忆 面对我的Ajax学习&#xff0c;实现前后端交互&#xff0c;最开始我采用的使用网络寻找intellij IDEA Ultimate破解方法&#xff0c;然后最终成功&#xff0c;然后按照相关教程配置java ee项目&#xff0c;然后中间又去配置了Tomcat服务器&#xff0c;然后又去学习了一…

1688:开启跨境电商新篇章

引言 在全球化贸易不断深化的今天&#xff0c;跨境电商已成为中小企业拓展国际市场的重要渠道。1688&#xff0c;作为阿里巴巴集团旗下领先的内贸平台&#xff0c;近年来也逐渐发力跨境电商领域&#xff0c;为全球买家提供了一个直通中国工厂的贸易平台。本文将带您深入了解16…

2024-12-06 Unity Addressables3——资源加载

文章目录 1 引用加载1.1 Addressables 的资源引用类1.2 加载资源1.3 加载场景1.4 释放资源 2 Label 介绍3 动态加载3.1 加载单个资源3.2 加载多个资源 Unity 版本&#xff1a;6000.0.26f1c1Addressables 版本&#xff1a;2.3.1 1 引用加载 1.1 Addressables 的资源引用类 Ass…