使用python操作kafka

第一步:安装kafka的模块

pip install kafka-python

第二步:编写代码

from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import random
import timeclass StationLog:def __init__(self, station_id, call_out, call_in, call_status, timestamp, call_duration):self.station_id = station_idself.call_out = call_outself.call_in = call_inself.call_status = call_statusself.timestamp = timestampself.call_duration = call_durationdef to_string(self):return json.dumps(self.__dict__)def main():# 设置连接kafka集群的ip和端口producer = KafkaProducer(bootstrap_servers='bigdata01:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8'))arr = ["fail", "busy", "barring", "success", "success", "success", "success", "success", "success", "success", "success", "success"]while True:call_out = "1860000" + str(random.randint(0, 9999)).zfill(4)call_in = "1890000" + str(random.randint(0, 9999)).zfill(4)call_status = random.choice(arr)call_duration = 1000 * (10 + random.randint(0, 9)) if call_status == "success" else 0# 随机产生一条基站日志数据station_log = StationLog("station_" + str(random.randint(0, 9)),call_out,call_in,call_status,int(time.time() * 1000),  # 当前时间戳call_duration)print(station_log.to_string())time.sleep(0.1 + random.randint(0, 99) / 100)try:# 发送数据到Kafkaproducer.send('topicA', station_log.to_string())except KafkaError as e:print(f"Failed to send message: {e}")# 确保所有异步消息都被发送producer.flush()if __name__ == "__main__":main()

以上案例是通过python操作kafka,将一些模拟数据发送到kafka中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/886456.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CC4学习记录

&#x1f338; CC4 CC4要求的commons-collections的版本是4.0的大版本。 其实后半条链是和cc3一样的&#xff0c;但是前面由于commons-collections进行了大的升级&#xff0c;所以出现了新的前半段链子。 配置文件&#xff1a; <dependency><groupId>org.apach…

自动化报表怎么写

自动化报表设计 标题 日期 筛选器 具体字段自由字段 迷你图 同环比 条件格式 步骤 填充数值 1、先筛选战区日期sumifs(纯数值-注册人数&#xff0c;纯数值-战区列&#xff0c;周报-战区单元格&#xff0c;纯数值-日期&#xff0c;周报-日期单元格) 需要注意⚠️纯数值里的单元格…

mysql中mvcc如何处理纯读事务的?

在 MySQL 的 InnoDB 中&#xff0c;即使事务尚未分配事务 ID&#xff0c;读视图&#xff08;Read View&#xff09; 的生成仍然依赖于系统中的活跃事务列表。这是通过 MVCC 的机制来实现的&#xff0c;以下是具体的原理和可见性判断方法。 MVCC 的核心概念 在 MVCC 中&#xf…

魔改log4j2的JsonLayout,支持自定义json格式日志

小伙伴们&#xff0c;你们好&#xff0c;我是老寇&#xff0c;我又回来辣&#xff0c;1个多月不见甚是想念啊&#xff01;&#xff01;&#xff01;跟我一起魔改源码吧 1.自定义json格式【PatternLayout】 大部分教程都是这个&#xff0c;因此&#xff0c;我就简单给个配置&a…

笔记分享: 西安交通大学COMP551705数据仓库与数据挖掘——02. 关联规则挖掘

文章目录 1. \textbf{1. } 1. 基本概念 2. \textbf{2. } 2. 布尔关联规则 2.1. \textbf{2.1. } 2.1. 一些基本概念 2.2. \textbf{2.2.} 2.2. Apriori \textbf{Apriori} Apriori算法 2.3. \textbf{2.3.} 2.3. Apriori \textbf{Apriori} Apriori算法示例 3. \textbf{3. } 3. 多…

基于标签相关性的多标签学习

✨✨ 欢迎大家来访Srlua的博文&#xff08;づ&#xffe3;3&#xffe3;&#xff09;づ╭❤&#xff5e;✨✨ &#x1f31f;&#x1f31f; 欢迎各位亲爱的读者&#xff0c;感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢&#xff0c;在这里我会分享我的知识和经验。&am…

网站小程序app怎么查有没有备案?

网站小程序app怎么查有没有备案&#xff1f;只需要官方一个网址就可以&#xff0c;工信部备案查询官网地址有且只有一个&#xff0c;百度搜索 "ICP备案查询" 找到官方gov.cn网站即可查询&#xff01; 注&#xff1a;网站小程序app备案查询&#xff0c;可通过输入单位…

手撸 chatgpt 大模型:简述 LLM 的架构,算法和训练流程

本节我们自顶向下看看大模型的相关概念&#xff0c;了解其基本架构以及从零手撸大模型的基本流程。自从 openai 释放了 chatgpt 后&#xff0c;人工智能就立马进入了大模型时代&#xff0c;我还记得在此之前 NLP 的处理主要依赖于深度学习的 LSTM&#xff0c;GRU 等模型架构。这…

爬虫——JSON数据处理

第三节&#xff1a;JSON数据处理 在爬虫开发中&#xff0c;JSON&#xff08;JavaScript Object Notation&#xff09;是最常见的数据格式之一&#xff0c;特别是在从API或动态网页中抓取数据时。JSON格式因其结构简单、可读性强、易于与其他系统交互而广泛应用于前端与后端的数…

AutoUpdater.NET 实现 dotNET应用自动更新

AutoUpdater.NET 是一款用于WPF、Winform软件版本更新的框架&#xff0c;类似框架还有Squirrel、WinSparkle、NetSparkle、Google Omaha。 一、安装AutoUpdater.NET 首先&#xff0c;您需要在项目中安装AutoUpdater.NET库。您可以通过NuGet包管理器来安装它。在Visual Studio中…

鸿蒙实现 web 传值

前言&#xff1a;安卓和 IOS 加载 H5 的时候&#xff0c;都有传值给到 H5 或者接收 H5 值&#xff0c;鸿蒙也可传值和接收 H5 的内容&#xff0c;以下是鸿蒙传值给 H5 端的具体操作 一: 定义好 H5 和鸿蒙传值的方法名&#xff0c;两端必须保持方法名一致 // xxx.ets import …

SpringBoot集成itext导出PDF

添加依赖 <!-- PDF导出 --><dependency><groupId>com.itextpdf</groupId><artifactId>itextpdf</artifactId><version>5.5.11</version></dependency><dependency><groupId>com.itextpdf</groupId>&l…

【快速解决】kafka崩了,重启之后,想继续消费,怎么做?

目录 一、怎么寻找我们关心的主题在崩溃之前消费到了哪里&#xff1f; 1、一个问题&#xff1a; 2、查看消费者消费主题__consumer_offsets 3、一个重要前提&#xff1a;消费时要提交offset 二、指定 Offset 消费 假如遇到kafka崩了&#xff0c;你重启kafka之后&#xff0…

Kotlin深度面试题:协程、密封类和高阶函数

文章目录 知识回顾前言源码分析1.面试题目1&#xff1a;Kotlin中的协程与线程的区别是什么&#xff1f;如何在Android中使用协程进行异步编程&#xff1f;2.面试题目2&#xff1a;Kotlin中的扩展函数和扩展属性是什么&#xff1f;如何在Android开发中使用它们&#xff1f;3. 面…

查询DBA_FREE_SPACE缓慢问题

这个是一个常见的问题&#xff0c;理论上应该也算是一个bug&#xff0c;在oracle10g&#xff0c;到19c&#xff0c;我都曾经遇到过&#xff1b;今天在给两套新建的19C RAC添加监控脚本时&#xff0c;又发现了这个问题&#xff0c;在这里记录一下。 Symptoms 环境&#xff1a;…

【网络安全】网络安全防护体系

1.网络安全防护体系概述 1.1 网络安全的重要性 网络安全是保护网络空间不受恶意攻击、数据泄露和其他安全威胁的关键。随着数字化转型的加速&#xff0c;网络安全的重要性日益凸显&#xff0c;它不仅关系到个人隐私和企业机密的保护&#xff0c;还涉及到国家安全和社会稳定。…

从零开始学习 sg200x 多核开发之 TF 存储卡根文件系统扩容

入式 Linux 镜像制作时&#xff0c;考虑体积等因素&#xff0c;会把根文件系统做的比较小&#xff0c;镜像包较小&#xff0c;方便量产&#xff1b;有时&#xff0c;我们的 tf 或 emmc 的容量较大&#xff0c;烧写镜像后&#xff0c;有较大的空余空间未使用&#xff0c;现尝试把…

【Apache Paimon】-- 1 -- Apache Paimon 是什么?

目录 1、简介 2、概览 3、哪些场景可以使用 Paimon 4、周边生态 5、小结 6、参考 1、简介 我们听说过数据仓库、数据湖、数据湖仓,那你听说过流式数据仓库(Stream warehouse,简称:Streamhouse)吗?那我们今天就来解锁看看他们之中的新秀: Apache paimon 到底是什么…

[实战]SpringBoot使用MongoTemplate存储Float精度丢失问题

问题&#xff1a;使用SpringBoot2.x版本进行MongoDB的存储操作&#xff0c;Float类型数据出现精度丢失问题 解决方案如下&#xff1a; 情况一、字段类型为JSONObject进行存储时 设置值时采用Decimal128类型 Decimal128 value new Decimal128(new BigDecimal(declaredField.g…

Oracle 单机及 RAC 环境 归档模式及路径修改

Oracle 数据库的使用过程中经常会根据需求的不同而调整归档模式&#xff0c;也经常会修改归档文件存放路径。 下面分别演示单机及 RAC 环境下修改归档模式及路径的操作步骤。 一、单机环境 1.查询当前归档模式及路径 SQL> archive log list Database log mode …