Python操作Kafka基础教程

01 Python操作Kafka基础教程

创建ZooKeeper容器

docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper

创建Kafka容器

语法是:

docker run  -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=[你的IP地址]:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://[你的IP地址]:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka

我的虚拟机IP是192.168.31.86,所以我的命令是:

docker run  -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.31.86:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.31.86:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka

安装可视化工具

下载UI工具:https://kafkatool.com/download2/offsetexplorer_64bit.exe

下载好以后按照默认进行安装。

在这里插入图片描述

在这里插入图片描述

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

在这里插入图片描述

在这里插入图片描述

连接Kafka

搜索软件并打开:

在这里插入图片描述

在这里插入图片描述

配置zookeeper:

在这里插入图片描述

配置Kafka:

在这里插入图片描述

点击Test测试按钮,测试是否能够连接Kafka:

在这里插入图片描述

点击是,然后就成功的使用客户端连接上Kafka了。

在这里插入图片描述

安装依赖

安装Python3.8

安装:

pip install kafka-python==2.0.2

发布和消费json数据

生产者

from kafka import KafkaProducer
import json# 创建生产者
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'),bootstrap_servers=['127.0.0.1:9092']
)# 要提交的消息
msg_dict = {"operatorId": "test",  # 公交公司ID"terminalId": "123",  # 设备Id"terminalCode": "123",  # 设备编码(使用车辆ID)"terminalNo": "1",  # 同一车辆内terminal序号从1开始
}# 向指定的主题发送消息
producer.send("text1", msg_dict)
producer.close()

消费者

from kafka import KafkaConsumer# 创建消费者
consumer = KafkaConsumer('text1', bootstrap_servers='127.0.0.1:9092')# 不停的消费数据
for msg in consumer:print(msg.value.decode())

发布和消费文本数据

生产者

from kafka import KafkaProducer# 创建生产者
producer = KafkaProducer(value_serializer=lambda v: v.encode('utf-8'),bootstrap_servers=['127.0.0.1:9092']
)# 向指定的主题发送消息
producer.send("text1", "你好")
producer.close()

消费者

from kafka import KafkaConsumer# 创建消费者
consumer = KafkaConsumer('text1', bootstrap_servers='127.0.0.1:9092')# 不停的消费数据
for msg in consumer:print(msg.value.decode())

发布和消费键值对文本数据

生产者

from kafka import KafkaProducer# 创建生产者
producer = KafkaProducer(key_serializer=lambda v: v.encode('utf-8'),value_serializer=lambda v: v.encode('utf-8'),bootstrap_servers=['127.0.0.1:9092']
)# 向指定的主题发送消息
producer.send("text1", key="msg", value="你好")
producer.close()

消费者

from kafka import KafkaConsumer# 创建消费者
consumer = KafkaConsumer('text1', bootstrap_servers='127.0.0.1:9092')# 不停的消费数据
for msg in consumer:print("key=", msg.key.decode())print("value=", msg.value.decode())

发布和消费键值对JSON数据

生产者

from kafka import KafkaProducer
import json# 创建生产者
producer = KafkaProducer(key_serializer=lambda v: json.dumps(v).encode('utf-8'),value_serializer=lambda v: json.dumps(v).encode('utf-8'),bootstrap_servers=['127.0.0.1:9092']
)# 向指定的主题发送消息
key = {"a": 1}
value = {"b": 2}
producer.send("text1", key=key, value=value)
producer.close()

消费者

from kafka import KafkaConsumer# 创建消费者
consumer = KafkaConsumer('text1', bootstrap_servers='127.0.0.1:9092')# 不停的消费数据
for msg in consumer:print("key=", msg.key.decode())print("value=", msg.value.decode())

发布和消费压缩文本数据

生产者

from kafka import KafkaProducer# 创建生产者
producer = KafkaProducer(value_serializer=lambda v: v.encode('utf-8'),bootstrap_servers=['127.0.0.1:9092'],compression_type='gzip',  # 通过此参数声明要压缩数据传输
)# 向指定的主题发送消息
producer.send("text1", "你好")
producer.close()

消费者

from kafka import KafkaConsumer# 创建消费者
consumer = KafkaConsumer('text1', bootstrap_servers='127.0.0.1:9092')# 不停的消费数据
for msg in consumer:print(msg.value.decode())

同时消费多个主题

生产者

from kafka import KafkaProducer# 创建生产者
producer = KafkaProducer(value_serializer=lambda v: v.encode('utf-8'),bootstrap_servers=['127.0.0.1:9092']
)# 向指定的主题发送消息
producer.send("text1", "你好")
producer.send("text2", "你好")producer.send("text1", "你好1")
producer.send("text2", "你好1")producer.send("text1", "你好2")
producer.send("text2", "你好2")producer.close()

消费者

from kafka import KafkaConsumer# 创建消费者
consumer = KafkaConsumer(bootstrap_servers='127.0.0.1:9092')# 不停的消费数据
consumer.subscribe(["text1", "text2"])
for msg in consumer:print(msg)print(msg.topic)print(msg.value.decode())

获取发布结果

生产者

from kafka import KafkaProducer# 创建生产者
producer = KafkaProducer(value_serializer=lambda v: v.encode('utf-8'),bootstrap_servers=['127.0.0.1:9092']
)# 向指定的主题发送消息
feature = producer.send("text1", "你好")# 会阻塞,直到发送成功
print(feature.get(timeout=60))producer.close()

消费者

from kafka import KafkaConsumer# 创建消费者
consumer = KafkaConsumer("text1", bootstrap_servers='127.0.0.1:9092')# 不停的消费数据
for msg in consumer:print(msg.topic)print(msg.value.decode())

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/689881.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

世界顶级名校计算机专业,都在用哪些书当教材?(文末送书)

目录 01《深入理解计算机系统》02《算法导论》03《计算机程序的构造和解释》04《数据库系统概念》05《计算机组成与设计:硬件/软件接口》06《离散数学及其应用》07《组合数学》08《斯坦福算法博弈论二十讲》参与规则 清华、北大、MIT、CMU、斯坦福的学霸们在新学期里…

vue常见问题

文章目录 data为什么是一个函数,而不是一个对象?什么情况下可以使用对象?key的作用,为什么不能用Index?render函数,h函数,和template什么关系?vue 是怎么解析template的? template会…

讨好型人格的职业分析,如何改变讨好型人格

一味讨好他人,忽略自己感受,凡事以人为先,忽视自己需求,这就是讨好型人格。 讨好型人格最典型的表现就是非常注重外界的看法,不管做什么事都会小心翼翼,生怕自己所做的事会引发别人的不满。 如果自己哪方…

MAC电脑系统清理空间免费版软件CleanMyMac X2024

大家好,我是那个总是被苹果电脑“内存已满”提示搞得焦头烂额的专业博主。如果你也像我一样,在使用Mac时经常遭遇卡顿、慢吞吞的情况,那么今天的Mac清理空间妙招分享绝对适合你! CleanMyMac X全新版下载如下: https://wm.makedi…

【Redis快速入门】深入解读哨兵模式

个人名片: 🐼作者简介:一名大三在校生,喜欢AI编程🎋 🐻‍❄️个人主页🥇:落798. 🐼个人WeChat:hmmwx53 🕊️系列专栏:🖼️…

分糖果问题(java实现)

一、题目 描述 一群孩子做游戏,现在请你根据游戏得分来发糖果,要求如下: 每个孩子不管得分多少,起码分到一个糖果。任意两个相邻的孩子之间,得分较多的孩子必须拿多一些糖果。(若相同则无此限制) 给定一个数组 arr …

odoo16实用功能之作业队列(queue.job)

目录 1、官方文档 2、说明 3、简单的开发手册 1、在 Odoo 代码中定义需要异步处理的方法。 2、在需要调用异步方法的位置,使用 with_delay() 调用该方法。 3、注意事项 1、官方文档 Job Queue .. !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! This…

layui-tab加载echarts宽度丢失

主要是设置div的样式为&#xff1a;width:100%;height:100%;display: block; <style>.layui-tab-item{width:100%;height:480px;} </style><div class"layui-tab layui-tab-brief" lay-filter"component-tabs-brief"><ul class"…

C语言系列12——多线程与并发编程

目录 写在开头1 多线程的基本概念与应用场景1.1 什么是多线程&#xff1f;1.2 多线程的优势1.3 应用场景详解1.3.1 并行计算1.3.2 高响应性应用程序1.3.3 网络编程1.3.4 实时处理 1.4 多线程编程的挑战 2 POSIX线程库的使用与实际案例2.1 基础概念2.2 创建和管理线程2.3 线程同…

HTML 入门指南

简述 参考&#xff1a;HTML 教程- (HTML5 标准) HTML 语言的介绍、特点 HTML&#xff1a;超级文本标记语言&#xff08;HyperText Markup Language&#xff09; “超文本” 就是指页面内可以包含图片、链接等非文字内容。“标记” 就是使用标签的方法将需要的内容包括起来。…

116 C++ 可变参数函数,initializer_list (初始化列表), 省略号形参

一 可变参数函数 有时候我们传递的参数是不固定的。 这种能接受非固定个数参数的函数就是可变参数函数 怎么实现呢&#xff1f;就要用到 initializer_list 标准库类型 该类型能够使用的前提条件是&#xff1a;所有的实参类型相同。 二&#xff0c;initializer_list(初始化列…

电阻(二):希尔伯特(Hilbert)曲线

1、Hilbert简介 希尔伯特曲线是一种能在 2D平面完美填充正方形的曲线&#xff0c;连续且稳定&#xff08;当细分足够小时&#xff0c;线构成面&#xff09;而又不可导的曲线。只要恰当选择函数&#xff0c;画出一条连续的参数曲线&#xff0c;当参数 t 在 [0、1 ] 区间取值时&a…

ESP32-Cam学习(2)——PC实时显示摄像头画面

具体代码和操作过程见&#xff1a; 3. 实时显示摄像头画面 (itprojects.cn)https://doc.itprojects.cn/0006.zhishi.esp32/02.doc/index.html#/e03.showvideo我主要记录一下我在复现的过程中&#xff0c;遇到的问题以及解决方法。 1.安装第三方库 首先电脑端的代码需要用pych…

备战蓝桥杯---动态规划(入门3之子串问题)

本专题再介绍几种经典的字串问题。 这是一个两个不重叠字串和的问题&#xff0c;我们只要去枚举分界点c即可&#xff0c;我们不妨让c作为右区间的左边界&#xff0c;然后求[1,c)上的单个字串和并用max数组维护。对于右边&#xff0c;我们只要反向求单个字串和然后选左边界为c的…

到底什么是哈希值,哈希值到底是怎么生成的,有什么用?

哈希值&#xff0c;即HASH值&#xff0c;通常用一个短的随机字母和数位组成的字串来代表&#xff0c;是一组任意长度的输入信息通过哈希算法得到的“数据指纹”&#xff0c;即进行加密运算得到的一组二进制值。因为电脑在底层机器码是采用二进位的模式&#xff0c;因此通过哈希…

java中x++和++x的区别,执行后x的值是多少

在Java和C等编程语言中&#xff0c;x 和 x 都是用来对变量 x 进行自增操作的表达式&#xff0c;它们的主要区别在于自增操作发生的时机以及返回值&#xff1a; 后置递增运算符 x&#xff1a; 先使用当前 x 的值进行表达式计算&#xff0c;然后将 x 的值加 1。 执行后的 x 值为…

django连接本地数据库并执行增删改查

1&#xff0c;首先需要将本地数据库的表同步到django的models.py文件 py manage.py inspectdb tb_books tb_heros > demo001/models.py 2&#xff0c;同步成功后models.py会根据每张表映射出不同的类 models.py文件根据数据库表映射出对应的类 3&#xff0c;然后根据不同…

初识 Rust 语言

目录 前言一、Rust 的背景二、Rust的特性三、部署开发环境&#xff0c;编写一个简单demo1、在ubuntu 20.04部署环境2、编写demo测试 四、如何看待Linux内核引入Rust 前言 自Linux 6.1起&#xff0c;初始的Rust基础设施被添加到Linux内核中。此后为了使内核驱动程序能够用Rust编…

应如何看待用AI写论文一事? AI写论文有助科研还是助长作弊?

自大语言模型问世后&#xff0c;许多高校学生都在悄悄利用ChatGPT等AI&#xff08;人工智能&#xff09;写作软件代写论文&#xff0c;或者用AI辅助论文写作&#xff0c;如罗列提纲、润色语言、降低重复率等。 国内类似ChatGPT的AI写作软件并不少见。在各大等网站上&#xff0…

SpringBoot 打成jar包后如何获取jar包Resouces下的文件

获取resouces下的文件使用以下代码即可读取&#xff0c;如果需要变成file传入其他的方法中&#xff0c;需要创建临时文件将输入流文件 复制到 临时文件中&#xff0c;并传入相关方法&#xff0c;最后删除临时文件即可。不能通过ClassPathResouce对象直接获取 文件File ClassPa…