python 读写kafka

    1. 安装pykafka

pip install pykafka

    2. 生产者

from pykafka import KafkaClientdef get_kafka_producer(hosts, topics):client = KafkaClient(hosts=hosts)print(client.topics)topic = client.topics[topics]producer = topic.get_producer()return producer

测试

hosts = '192.168.20.203:9092,192.168.20.204:9092,192.168.20.205:9092'
topics = "test_kafka_topic"
producer = get_kafka_producer(hosts, topics)   
for i in range(10):msg = "test message " + str(i)msg = bytes(msg, encoding='utf-8')    producer.produce(msg)

3. 消费者

def get_kafka_consumer(hosts, topics):client = KafkaClient(hosts=hosts)topic=client.topics[topics]consumer = topic.get_balanced_consumer(consumer_group='test_kafka_topic', auto_commit_enable=True,
zookeeper_connect='192.168.20.201:2181,192.168.20.202:2181,192.168.20.203:2181')return consumer

测试

hosts = '192.168.20.203:9092,192.168.20.204:9092,192.168.20.205:9092'
topics = "test_kafka_topic"
consumer = get_kafka_consumer(hosts, topics)   
for msg in consumer:print(msg)if msg is not None:print(msg.offset)print(msg.value)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/624821.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python基础语法看一篇就够了,全网最全python语法笔记汇总

前言 Python 是一种代表简单思想的语言,其语法相对简单,很容易上手。不过,如果就此小视 Python 语法的精妙和深邃,那就大错特错了。 如能在实战中融会贯通、灵活使用,必将使代码更为精炼、高效,同时也会极…

提前避坑Anzo Capital总结浮动差价的3个缺点

在交易中很多投资者倾向于选择浮动差价模式,这种模式的便利性就不言而喻了,但Anzo Capital需要提醒各位投资者,一定要知道浮动差价的3个缺点,在交易中提前避坑,下面Anzo Capital就和各位投资者一起总结浮动差价的这3…

leetcode 24两两交换链表中的节点

题目 给你一个链表,两两交换其中相邻的节点,并返回交换后链表的头节点。你必须在不修改节点内部的值的情况下完成本题(即,只能进行节点交换)。 思想 对于操作链表节点的时候,首先需要就是创建一个虚拟的…

前端动画特效分享(附在线预览)

分享几款不错的动画特效源码 其中有CSS动画、canvas动画、js小游戏等等 下面我会给出特效样式图或演示效果图 但你也可以点击在线预览查看源码的最终展示效果及下载源码资源 canvas爱心代码动画 爱心代码动画特效 由里向外不断的产生的小爱心形成一个巨大的爱心动画 以下图片…

前端工程记录:用RecordRTC实现对<video>标签的录像功能

项目需求:后端给一个mp4视频的链接,在前端播放,同时支持用户的录制视频操作。 一、技术选择 1. 毫无关系的getUserMedia 官方介绍文档:MediaDevices.getUserMedia() - Web API 接口参考 | MDN 在网上搜索“前端如何录像”&…

Maxwell数据同步(增量)

1. Maxwell简介 1.1 Maxwell概述 Maxwell 是由美国Zendesk公司开源,用Java编写的MySQL变更数据抓取软件。它会实时监控Mysql数据库的数据变更操作(包括insert、update、delete),并将变更数据以 JSON 格式发送给 Kafka、Kinesi等流…

全链路压力测试:现代软件工程中的重要性

全链路压力测试不仅可以确保系统在高负载下的性能和稳定性,还能帮助企业进行有效的风险管理和性能优化。在快速发展的互联网时代,全链路压力测试已成为确保软件产品质量的关键步骤。 1、测试环境搭建 测试应在与生产环境尽可能相似的环境中进行&#xff…

windows使用redis-安装和配置

windows使用redis 安装和配置 下载安装方式一-使用压缩包安装解压到指定的文件Redis安装为Windows服务安装成功 方式二-MSI安装包安装完成 Redis配置远程访问1.修改配置文件redis.windows.conf2.修改完redis配置文件,必须重启redis 下载 先下载Redis for windows 的…

Java lambda表达式如何自定义一个toList Collector

匿名类: package l8;import java.util.*; import java.util.function.BiConsumer; import java.util.function.BinaryOperator; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collector; import java.util.s…

PyTorch中的FX图

一.FX 图介绍 FX 图是 PyTorch 中的一个主要数据结构,用于在 FX 中间表示(Intermediate Representation,IR)中表示程序。FX 图由一系列节点组成,每个节点代表调用站点(比如运算符、方法和模块)。…

高级编程JavaScript。Notifications消息通知

- Notifications Notifications API 用于向用户显示通知。无论从哪个角度看,这里的通知都很类似 alert()对话框: 都使用 JavaScript API 触发页面外部的浏览器行为,而且都允许页面处理用户与对话框或通知弹层的交 互。不过,通知…

React 18 中的并发性

并发性是我们在 React 18 发布后获得的重大成就之一。由于此功能是完全选择加入的,并且 React 18 向后兼容以前的版本,因此您甚至可能没有注意到新功能。那么并发性是什么、它是如何工作的以及它如何改进您的应用程序的呢? 什么是并发 并发…

360度评估的应用场景和评估内容

360度评估是一种多元化的评估工具,它从多个角度获取对个体绩效、能力和行为的全面反馈。这种评估方法不仅涵盖了传统的上级评价,还包括同级、下级、自我评价以及客户或外部利益相关者的反馈。 一、360度评估的应用场景 员工绩效评估:360度评…

【PostgreSQL内核学习(二十二)—— 执行器(ExecutePlan)】

执行器(InitPlan) 概述ExecutePlan 函数ExecProcNode 函数 总结 声明:本文的部分内容参考了他人的文章。在编写过程中,我们尊重他人的知识产权和学术成果,力求遵循合理使用原则,并在适用的情况下注明引用来…

超详细!4小时开发一个SpringBoot+vue前后端分离博客项目!!

超详细!4小时开发一个SpringBootvue前后端分离博客项目!! 前后端分离项目 文章总体分为2大部分,Java后端接口和vue前端页面,比较长,因为不想分开发布,真正想你4小时学会,哈哈。 先…

【自学笔记】01Java基础-07面向对象基础-04接口与内部类详解

记录学习Java基础中有关接口类和内部类的知识。 1 接口 interface 关键字用于定义接口类,接口类是一系列方法的声明,一般只有方法的特征没有方法的实现,因此可以被不同的类接入实现,而这些实现可以具有不同的行为(功…

Graham扫描凸包算法

凸包(Convex Hull)是包含给定点集合的最小凸多边形。凸包算法有多种实现方法,其中包括基于递增极角排序、Graham扫描、Jarvis步进法等。下面,我将提供一个简单的凸包算法实现,基于Graham扫描算法。 Graham扫描算法是一…

多级缓存架构(一)项目初始化

文章目录 一、项目克隆二、数据库准备三、项目工程准备 一、项目克隆 克隆此项目到本地 https://github.com/Xiamu-ssr/MultiCache 来到start目录下,分别有以下文件夹 docker:docker相关文件item-service:springboot项目 二、数据库准备 …

Ncast盈可视高清智能录播系统busiFacade RCE漏洞(CVE-2024-0305)

产品介绍 Ncast盈可视高清智能录播系统是一套新进的音视频录制和播放系统,旨在提供高质量,高清定制的录播功能。 漏洞描述 广州盈可视电子科技有限公司的高清智能录播系统存在信息泄露漏洞(CVE-2024-0305),攻击者可通过该漏洞,…

Sectigo的DV通配符https

Sectigo是近些年发展比较快速的CA认证机构,为了提升审核效率,在全国成立了审核机构,亚太审核中心的成立加快了Sectigo旗下的https证书的审核速度。Sectigo的https证书可以为网站安全提供有力支持,从而保护网站信息安全。今天就随S…