kafka-python简单生产消费数据

kafka-python使用手册

kafka-python

1. 生产者同步发送数据

# 生产者同步发送数据from kafka import KafkaProducer
from kafka.errors import KafkaErrorproducer = KafkaProducer(bootstrap_servers=["192.168.1.6:9092"])try:record_metadata = producer.send("predict_task_log", b"202312301505 predict res: success").get(timeout=10)   # 同步方式print(record_metadata.topic)print(record_metadata.partition)print(record_metadata.offset)
except KafkaError:print(f"write data to kafka failed!")
finally:producer.close()

2. 生产则异步发送数据

# 生产者异步发送数据from kafka import KafkaProducer
from kafka.errors import KafkaErrorproducer = KafkaProducer(bootstrap_servers=["192.168.1.6:9092"])def on_send_success(record_metadata):"""发送成功之后的回调函数"""print(record_metadata.topic)print(record_metadata.partition)print(record_metadata.offset)def on_send_error(excp):"""发送失败后的回调函数"""print(f"write data to kafka error: {excp}")try:# 1. 主线程执行,子线程将数据写入缓冲池,不影响主线程做其他操作future = producer.send("predict_task_log", b"202312301505 predict res: success")# 2. 子线程通过回调函数通知主线程future.add_callback(on_send_success).add_errback(on_send_error)
except KafkaError:print(f"write data to kafka failed!")
finally:producer.close()

3. 消费者自动提交offset

# 消费者自动提交offsetfrom kafka import KafkaConsumerconsumer = KafkaConsumer(bootstrap_servers=["192.168.1.6:9092"],group_id='predict_group',enable_auto_commit=True,        # 自动提交auto_commit_interval_ms=1000
)for msg in consumer:topic, partition, offset = msg.topic, msg.partition, msg.offsetkey, value = msg.key, msg.value.decode("utf-8")print(f"从topic为{topic}{partition}分区上,获取偏移量为{offset}的消息为{key}: {value}")

4. 消费者手动提交offset

# 消费者手动提交offsetfrom kafka import KafkaConsumerconsumer = KafkaConsumer(bootstrap_servers=["192.168.1.6:9092"],group_id='predict_group',enable_auto_commit=False        # 手动提交
)for msg in consumer:topic, partition, offset = msg.topic, msg.partition, msg.offsetkey, value = msg.key, msg.value.decode("utf-8")print(f"从topic为{topic}{partition}分区上,获取偏移量为{offset}的消息为{key}: {value}")# 手动提交偏移量consumer.commit()           # 同步commitconsumer.commit_async()     # 异步commit,推荐使用

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/586743.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java企业网站系统Myeclipse开发mysql数据库web结构java编程计算机网页项目

一、源码特点 java Web企业网站系统是一套完善的java web信息管理系统,对理解JSP java编程开发语言有帮助,系统具有完整的源代码和数据库,系统主要采用B/S模式开发。开发环境为 TOMCAT7.0,Myeclipse8.5开发,数据库为Mysql5.0&…

科技创新实验室数据管理优选:高效企业网盘推荐

科技创新实验室建设是国家加强科技创新基本能力建设的重要措施,企业网盘等高效办公工具的应用是保证科技创新实验室正常运行、提高科研项目团队合作效率的重要手段。 本文将介绍企业网盘Zoho WorkDrive提供的解决方案: 行业痛点1:分散的数据…

听GPT 讲Rust源代码--src/tools(39)

File: rust/src/tools/rustfmt/src/config/config_type.rs 在Rust代码中,rust/src/tools/rustfmt/src/config/config_type.rs文件的作用是定义了与配置相关的数据结构和函数。 Config struct(配置结构体):该结构体用于存储rustfmt…

图形化编程(3)之猜拳的加速度计

今天说我们来学习图形化第三节内容,加速度计。加速度传感器是一种能够测量物体加速度的传感器,在运动过程中,通过测量质量的惯性力和牛顿第二定律得到加速度。 根据传感器敏感元件的不同,常见的加速度传感器有电容式、电感式、应变…

zookeeper之集群搭建

1. 集群角色 zookeeper集群下,有3种角色,分别是领导者(Leader)、跟随着(Follower)、观察者(Observer)。接下来我们分别看一下这三种角色的作用。 领导者(Leader): 事务请求(写操作)的唯一调度者和处理者,保…

音频播放软件Foobar2000 mac特点介绍

Foobar2000 mac是一款高度可定制的音频播放器,适用于Windows平台。它支持各种音频格式,包括MP3、FLAC、AAC、WMA等,同时也支持各种音频插件和效果器,可以提供更好的音质和用户体验。 Foobar2000 mac软件特点 1. 高度可定制&#…

信号类型——正交频分复用(OFDM)

系列文章目录 《信号类型(通信)——仿真》 《信号类型(通信)——QAM调制信号》 《信号类型(通信)——QPSK、OQPSK、IJF_OQPSK调制信号》 《信号类型(通信)——最小频移键控&…

【C语言】分支与循环语句

什么是语句? C语句可分为以下五类: 表达式语句函数调用语句控制语句 (本篇重点介绍)复合语句空语句 控制语句用于控制程序的执行流程,以实现程序的各种结构方式。C语言支持三种结构: 顺序结构选择结构循…

AUTOSAR从入门到精通-网络通信(UDPNm)(三)

目录 前言 原理 网络状态 初始化 执行 处理器架构 时间参数

软件设计师——软件工程(三)

📑前言 本文主要是【软件工程】——软件设计师——软件工程的文章,如果有什么需要改进的地方还请大佬指出⛺️ 🎬作者简介:大家好,我是听风与他🥇 ☁️博客首页:CSDN主页听风与他 &#x1f304…

使用SecoClient软件连接L2TP

secoclient软件是华为防火墙与友商设备进行微屁恩对接的一款软件,运行在windows下可以替代掉win系统自带的连接功能,因为win系统自带的连接功能总是不可用而且我照着网上查到的各种方法调试了很久都调不好,导致我一度怀疑是我的服务没搭建好,浪费了大把时间去研究其他搭建方案 …

最新Redis7哨兵模式(保姆级教学)

一定一定要把云服务器的防火墙打开一定要!!!!!!!!!否则不成功!!!!!!!!&…

AppWeb认证绕过漏洞(CVE-2018-8715)

一、环境搭建 二、影响版本 三、构造payload Authorization: Digest usernameadmin 四、抓包获取sesion 五、修改数据包、认证头 记得设置用户名 六、漏洞存在特征(Gigest)

基于AM62x的ARM+FPGA+Codesys低成本软PLC解决方案

GPMC并口简介 GPMC(General Purpose Memory Controller)是TI处理器特有的通用存储器控制器接口,支持8/16bit数据位宽,支持128MB访问空间,最高时钟速率133MHz。GPMC是AM62x、AM64x、AM437x、AM335x、AM57x等处理器专用于与外部存储器设备的接口…

大数据应用发展史:从搜索引擎时代到机器学习时代

文章目录 搜索引擎时代数据仓库时代数据挖掘时代机器学习时代小结 大数据技术的使用经历了一个发展过程 从最开始的Google在搜索引擎中开始使用大数据技术,到现在无处不在的各种人工智能应用,伴随着大数据技术的发展,大数据应用也从曲高和寡…

2023年新一代开发者工具 Vue ,正式开源!

以下文章来源于前端充电宝 ,作者CUGGZ 近日,Vue 新一代开发者工具(DevTools)正式开源!Vue DevTools 是一个旨在增强 Vue 开发人员体验的工具,它提供了一些功能来帮助开发者更好地了解 Vue 应用。下面就来看…

小程序入门-登录+首页

正常新建一个登录页面 创建首页和TatBar,实现登录后底部出现两个按钮 代码 "pages": ["pages/login/index","pages/index/index","pages/logs/logs" ],"tabBar": {"list": [{"pagePath"…

html 表格 笔记

<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>第二个页面</title><meta name"language" content"cn"> </head> <body><h2 sytle"width:500px;…

Hexo 部署 Github Pages, Github Actions自动部署

想整个静态的博客部署在github pages 历经两天的折磨终于是摸索成功了&#xff0c;官网的文档太简陋了&#xff0c;很多东西没说清楚。 欢迎大家访问我的博客&#xff01; CanyueThis is Canyues blog.https://mobeicanyue.github.io/ 最终实现的效果&#xff0c;一个项目仓库…

51单片机之LED灯

51单片机之LED灯 &#x1f334;前言&#xff1a;&#x1f3ee;点亮LED灯的原理&#x1f498;点亮你的第一个LED灯&#x1f498;点亮你的八个LED灯 &#x1f4cc;让LED灯闪烁的原理&#x1f3bd; LED灯的闪烁&#x1f3d3;错误示范1&#x1f3d3;正确的LED闪烁代码应该是这样&am…