使用python操作kafka

第一步:安装kafka的模块

pip install kafka-python

第二步:编写代码

from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import random
import timeclass StationLog:def __init__(self, station_id, call_out, call_in, call_status, timestamp, call_duration):self.station_id = station_idself.call_out = call_outself.call_in = call_inself.call_status = call_statusself.timestamp = timestampself.call_duration = call_durationdef to_string(self):return json.dumps(self.__dict__)def main():# 设置连接kafka集群的ip和端口producer = KafkaProducer(bootstrap_servers='bigdata01:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8'))arr = ["fail", "busy", "barring", "success", "success", "success", "success", "success", "success", "success", "success", "success"]while True:call_out = "1860000" + str(random.randint(0, 9999)).zfill(4)call_in = "1890000" + str(random.randint(0, 9999)).zfill(4)call_status = random.choice(arr)call_duration = 1000 * (10 + random.randint(0, 9)) if call_status == "success" else 0# 随机产生一条基站日志数据station_log = StationLog("station_" + str(random.randint(0, 9)),call_out,call_in,call_status,int(time.time() * 1000),  # 当前时间戳call_duration)print(station_log.to_string())time.sleep(0.1 + random.randint(0, 99) / 100)try:# 发送数据到Kafkaproducer.send('topicA', station_log.to_string())except KafkaError as e:print(f"Failed to send message: {e}")# 确保所有异步消息都被发送producer.flush()if __name__ == "__main__":main()

以上案例是通过python操作kafka,将一些模拟数据发送到kafka中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/886456.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CC4学习记录

&#x1f338; CC4 CC4要求的commons-collections的版本是4.0的大版本。 其实后半条链是和cc3一样的&#xff0c;但是前面由于commons-collections进行了大的升级&#xff0c;所以出现了新的前半段链子。 配置文件&#xff1a; <dependency><groupId>org.apach…

自动化报表怎么写

自动化报表设计 标题 日期 筛选器 具体字段自由字段 迷你图 同环比 条件格式 步骤 填充数值 1、先筛选战区日期sumifs(纯数值-注册人数&#xff0c;纯数值-战区列&#xff0c;周报-战区单元格&#xff0c;纯数值-日期&#xff0c;周报-日期单元格) 需要注意⚠️纯数值里的单元格…

魔改log4j2的JsonLayout,支持自定义json格式日志

小伙伴们&#xff0c;你们好&#xff0c;我是老寇&#xff0c;我又回来辣&#xff0c;1个多月不见甚是想念啊&#xff01;&#xff01;&#xff01;跟我一起魔改源码吧 1.自定义json格式【PatternLayout】 大部分教程都是这个&#xff0c;因此&#xff0c;我就简单给个配置&a…

笔记分享: 西安交通大学COMP551705数据仓库与数据挖掘——02. 关联规则挖掘

文章目录 1. \textbf{1. } 1. 基本概念 2. \textbf{2. } 2. 布尔关联规则 2.1. \textbf{2.1. } 2.1. 一些基本概念 2.2. \textbf{2.2.} 2.2. Apriori \textbf{Apriori} Apriori算法 2.3. \textbf{2.3.} 2.3. Apriori \textbf{Apriori} Apriori算法示例 3. \textbf{3. } 3. 多…

基于标签相关性的多标签学习

✨✨ 欢迎大家来访Srlua的博文&#xff08;づ&#xffe3;3&#xffe3;&#xff09;づ╭❤&#xff5e;✨✨ &#x1f31f;&#x1f31f; 欢迎各位亲爱的读者&#xff0c;感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢&#xff0c;在这里我会分享我的知识和经验。&am…

网站小程序app怎么查有没有备案?

网站小程序app怎么查有没有备案&#xff1f;只需要官方一个网址就可以&#xff0c;工信部备案查询官网地址有且只有一个&#xff0c;百度搜索 "ICP备案查询" 找到官方gov.cn网站即可查询&#xff01; 注&#xff1a;网站小程序app备案查询&#xff0c;可通过输入单位…

手撸 chatgpt 大模型:简述 LLM 的架构,算法和训练流程

本节我们自顶向下看看大模型的相关概念&#xff0c;了解其基本架构以及从零手撸大模型的基本流程。自从 openai 释放了 chatgpt 后&#xff0c;人工智能就立马进入了大模型时代&#xff0c;我还记得在此之前 NLP 的处理主要依赖于深度学习的 LSTM&#xff0c;GRU 等模型架构。这…

爬虫——JSON数据处理

第三节&#xff1a;JSON数据处理 在爬虫开发中&#xff0c;JSON&#xff08;JavaScript Object Notation&#xff09;是最常见的数据格式之一&#xff0c;特别是在从API或动态网页中抓取数据时。JSON格式因其结构简单、可读性强、易于与其他系统交互而广泛应用于前端与后端的数…

SpringBoot集成itext导出PDF

添加依赖 <!-- PDF导出 --><dependency><groupId>com.itextpdf</groupId><artifactId>itextpdf</artifactId><version>5.5.11</version></dependency><dependency><groupId>com.itextpdf</groupId>&l…

【快速解决】kafka崩了,重启之后,想继续消费,怎么做?

目录 一、怎么寻找我们关心的主题在崩溃之前消费到了哪里&#xff1f; 1、一个问题&#xff1a; 2、查看消费者消费主题__consumer_offsets 3、一个重要前提&#xff1a;消费时要提交offset 二、指定 Offset 消费 假如遇到kafka崩了&#xff0c;你重启kafka之后&#xff0…

查询DBA_FREE_SPACE缓慢问题

这个是一个常见的问题&#xff0c;理论上应该也算是一个bug&#xff0c;在oracle10g&#xff0c;到19c&#xff0c;我都曾经遇到过&#xff1b;今天在给两套新建的19C RAC添加监控脚本时&#xff0c;又发现了这个问题&#xff0c;在这里记录一下。 Symptoms 环境&#xff1a;…

用OMS进行 OceanBase 租户间数据迁移的测评

基本概念 OceanBase迁移服务&#xff08;&#xff0c;简称OMS&#xff09;&#xff0c;可以让用户在同构或异构 RDBMS 与OceanBase 数据库之间进行数据交互&#xff0c;支持数据的在线迁移&#xff0c;以及实时增量同步的复制功能。 OMS 提供了可视化的集中管控平台&#xff…

IDEA一键部署SpringBoot项目到服务器

安装Alibaba Cloud Toolkit插件 配置部署环境 1&#xff1a;设置服务名称 2&#xff1a;选择文件上传的类型 3:选择打包之后的jar文件 4: 添加需要上传的服务器信息 5:需要上传到服务器的地址 输入绝对路径 6: 选择上传文件后执行的脚本 可以参考另一篇文章 Linux启…

渗透测试之信息收集 DNS主机发现探测方式NetBIOS 协议发现主机 以及相关PorCheck scanline工具的使用哟

目录 主机发现 利用NetBIOS 协议发现主机 利用TCP/UDP发现主机 PorCheck scanline 利用DNS协议发现主机 主机发现 信息收集中的一项重要工作是发现内网中的主机、数据库、IP段网络设备、安全设备等资产&#xff0c;以便于更快地获取更多权限和密码&#xff0c;更加接近红…

打造专业问答社区:Windows部署Apache Answer结合cpolar实现公网访问

文章目录 前言1. 本地安装Docker2. 本地部署Apache Answer2.1 设置语言选择简体中文2.2 配置数据库2.3 创建配置文件2.4 填写基本信息 3. 如何使用Apache Answer3.1 后台管理3.2 提问与回答3.3 查看主页回答情况 4. 公网远程访问本地 Apache Answer4.1 内网穿透工具安装4.2 创建…

基于java的医院门诊信息管理系统

一、作品包含 源码数据库设计文档万字PPT全套环境和工具资源部署教程 二、项目技术 前端技术&#xff1a;Html、Css、Js、Vue、Element-ui 数据库&#xff1a;MySQL 后端技术&#xff1a;Java、Spring Boot、MyBatis 三、运行环境 开发工具&#xff1a;IDEA/eclipse 数据…

67页PDF |埃森哲_XX集团信息发展规划IT治理优化方案(限免下载)

一、前言 这份报告是埃森哲_XX集团信息发展规划IT治理优化方案&#xff0c;报告中详细阐述了XX集团如何优化IT治理结构以适应新的要求。报告还分析了集团管控模式的变化&#xff0c;提出了六大业务中心的差异化管控策略&#xff0c;并探讨了这些变化对IT治理模式的影响。报告进…

【AI大模型】ELMo模型介绍:深度理解语言模型的嵌入艺术

学习目标 了解什么是ELMo.掌握ELMo的架构.掌握ELMo的预训练任务.了解ELMo的效果和成绩.了解ELMo的优缺点. 目录 &#x1f354; ELMo简介 &#x1f354; ELMo的架构 2.1 总体架构 2.2 Embedding模块 2.3 两部分的双层LSTM模块 2.4 词向量表征模块 &#x1f354; ELMo的预…

【srm,招标询价】采购电子化全流程,供应商准入审核,在线询价流程管理(JAVA+Vue+mysql)

前言&#xff1a; 随着互联网和数字技术的不断发展&#xff0c;企业采购管理逐渐走向数字化和智能化。数字化采购平台作为企业采购管理的新模式&#xff0c;能够提高采购效率、降低采购成本、优化供应商合作效率&#xff0c;已成为企业实现效益提升的关键手段。系统获取在文末…

优选算法 - 4 ( 链表 哈希表 字符串 9000 字详解 )

一&#xff1a;链表 1.1 链表常用技巧和操作总结 1.2 两数相加 题目链接&#xff1a;两数相加 /*** Definition for singly-linked list.* public class ListNode {* int val;* ListNode next;* ListNode() {}* ListNode(int val) { this.val val; }* …