Kafka学习之:mac 上基础使用 python 来使用 kafka 的生产者和消费者进行数据处理

文章目录

  • 前提
  • python 环境配置
  • Kafka 生产消费者模型
    • 生产者 producer
      • 检查当前存在的所有 topic / 是否自动创建 topic
      • 为什么 producer 要通过 key, value 来发布数据
        • 键(Key)
        • 值(Value)
    • 消费者 consumer
      • consumer 得到的 message 有哪些方法?
        • 为什么 consumer 拿到的内容需要 decode

前提

  • 我的配置是 M1 芯片 Macbook pro

  • 你的 kafka 处于启动状态,如果尚未启动,则通过以下命令依次运行 zookeeperkafka,如果有安装问题可以参考上一篇文章

    brew services start zookeeper
    brew services start kafka
    

python 环境配置

  • 首先,确保安装了confluent-kafka

    pip install confluent-kafka
    
  • 使用以下Python脚本创建一个新的Kafka主题:

    from confluent_kafka.admin import AdminClient, NewTopic# Kafka服务器配置
    admin_client = AdminClient({"bootstrap.servers": "localhost:9092"
    })# 创建新主题的配置
    topic_list = [NewTopic("my_new_topic", num_partitions=3, replication_factor=1)]
    # 注意: replication_factor 和 num_partitions 可能需要根据你的Kafka集群配置进行调整; # 创建主题
    fs = admin_client.create_topics(topic_list)# 处理结果
    for topic, f in fs.items():try:f.result()  # The result itself is Noneprint(f"Topic {topic} created")except Exception as e:print(f"Failed to create topic {topic}: {e}")
    • 注意 replication_factor 不能超过 broker 的数量
    • 具体原因可以参考视频
    • 通过 python 只能创建 broker 的主题,而不能控制创建多个 broker,增加或管理brokers的过程需要在集群的配置和部署阶段进行,而不能通过像confluent_kafka这样的客户端库来实现。

Kafka 生产消费者模型

生产者 producer

"""@file: producer.py@Time    : 2024/3/29@Author  : Peinuan qin"""
from confluent_kafka import Producer
import json# Kafka配置
config = {'bootstrap.servers': 'localhost:9092'
}# 创建生产者
producer = Producer(**config)# 模拟的用户活动数据
data = {'user_id': 1234, 'activity': 'page_view', 'page': 'homepage'}# 发送数据
producer.produce('user_activities', key=str(data['user_id']), value=json.dumps(data))
producer.flush()
print("Data sent to Kafka")

Data sent to Kafka

检查当前存在的所有 topic / 是否自动创建 topic

  • 可以用如下命令来检查已经存在的 topics

    kafka-topics --list --bootstrap-server localhost:9092
    
  • 对于上述 producer 中,我的 server.property 中的 auto.create.topics.enable 设置为 True,这意味着如果当前 topics 不存在会自动创建。

  • 检查 auto.create.topics.enable 的方式:

    grep "auto.create.topics.enable" /path/to/your/kafka/config/server.properties
    
  • 一般 /path/to/your/kafka/config/server.properties 在我的 上篇文章 中提到了, m1 芯片的 mac 的地址是在 /opt/homebrew/etc/kafka/server.properties,所以对应的查看命令就是:

    grep "auto.create.topics.enable" /opt/homebrew/etc/kafka/server.properties
    
  • 如果这一行在你的 server.properties 中并不存在,则默认为 true,如果想更改,需要在 server.properties 中加入 auto.create.topics.enable=false 然后保存更改,重新启动 kafka

  • 当你设置成 auto.create.topics.enable=false,再次运行上面的代码,但是 topic 换成一个新的

producer.produce('user_activities1', key=str(data['user_id']), value=json.dumps(data))
  • 你会发现执行结果还是下面内容,并且没有报错:

    Data sent to Kafka

  • 但是当你列出所有的 topic,却发现其实 user_activities1 并没有创建成功

为什么 producer 要通过 key, value 来发布数据

键(Key)
  • 分区选择: 键主要用于决定消息被发送到主题的哪个分区。如果为消息指定了键,Kafka会对键进行哈希处理,根据哈希值将消息均匀分配到不同的分区。这种方式确保了相同键的所有消息都会被发送到同一个分区中,保证了消息的顺序性。如果没有指定键,消息会以轮询的方式分配到所有分区,这可能不会保证相同键的消息顺序。

  • 日志压缩: 键还用于日志压缩(log compaction)功能。在这个模式下,Kafka保证每个键在分区日志中只保留最后一次更新的值。这对于维护长期运行的聚合状态非常有用。

值(Value)
  • 消息内容: 值部分承载了消息的实际内容。这是生产者想要发送给消费者的数据。值可以是任何格式的数据,比如字符串、JSON对象、序列化后的字节码等。

  • 使用场景示例

    • 订单系统:在一个订单系统中,订单ID可以作为键,而订单的详细信息(如客户信息、订单项、价格等)作为值。使用订单ID作为键确保了相同订单的更新会被顺序地发送到同一个分区,并且通过日志压缩,Kafka可以只保留订单的最新状态。
    • 用户行为跟踪:在用户行为跟踪应用中,用户ID 可以作为键,用户的行为(如点击、浏览等)作为值。这样,相同用户的所有行为都会被顺序地记录在同一个分区中,便于后续进行用户行为分析。

消费者 consumer

"""@file: consumer.py@Time    : 2024/3/29@Author  : Peinuan qin"""from confluent_kafka import Consumer, KafkaError# Kafka配置
config = {'bootstrap.servers': 'localhost:9092','group.id': 'user-activity-group','auto.offset.reset': 'earliest'
}# 创建消费者
consumer = Consumer(**config)
consumer.subscribe(['user_activities'])# 读取数据
try:while True:msg = consumer.poll(timeout=1.0)  # 1秒超时if msg is None:continueif msg.error():if msg.error().code() == KafkaError._PARTITION_EOF:# End of partition eventcontinueelse:print(msg.error())break# 成功接收消息print('Received message: {}'.format(msg.value().decode('utf-8')))
except KeyboardInterrupt:pass
finally:# 清理操作consumer.close()
  • 可以看到我们订阅了 'user_activities' 这个 topic,从其中源源不断地取数据来进行消费(处理)

  • 但是同时在 consumer 的代码中定义了一个 group.id,而这个是 producer 中没有的,这样做的原因是:

    • 负载均衡:在同一个消费者组中,每个消费者可以负责消费特定的分区中的消息,这样可以在消费者之间分摊负载。如果一个消费者组中有多个消费者实例,Kafka会尽量平衡地将分区分配给每个消费者,确保每个分区只被组内的一个消费者消费。这意味着增加消费者可以提高消费的并行度,加快处理速度。

    • 容错和高可用性: 如果某个消费者失败,它负责的分区会被重新分配给同一消费者组内的其他消费者,这样可以确保消息的持续消费,提高了系统的容错能力。

    • 消息广播: 通过使用不同的消费者组,可以实现消息的广播模式,即相同的消息可以被多个消费者组独立消费。

  • 如果不是认为还不是很清楚,可以 参考视频

consumer 得到的 message 有哪些方法?

print("msg dict:", dir(msg))
msg dict: ['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__len__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'error', 'headers', 'key', 'latency', 'leader_epoch', 'offset', 'partition', 'set_headers', 'set_key', 'set_value', 'timestamp', 'topic', 'value']
  1. Key: 消息的键(如果有)。键用于消息的分区内排序和日志压缩。

  2. Value: 消息的实际内容或负载。

  3. Topic: 消息所属的主题。

  4. Partition: 消息所在的分区号。Kafka中的每个主题可以被分割成多个分区,分区号从0开始。

  5. Offset: 消息在其分区中的偏移量。偏移量是一个递增的序列号,用于唯一标识分区中的每条消息。

  6. Timestamp: 消息的时间戳。它可以是消息创建时的时间戳(生产者发送消息的时间)或者是消息被追加到日志的时间戳。时间戳的具体含义取决于Kafka生产者的配置。

  7. Headers: 消息头部,是键值对的集合,可以用来存储与消息相关的附加信息。生产者可以添加任意多的键值对作为消息的一部分,消费者可以读取这些信息进行相应的处理。

  8. Serialized Key Size: 键的序列化后的大小(以字节为单位)。如果消息没有键,通常这个值是-1。

  9. Serialized Value Size: 值的序列化后的大小(以字节为单位)。

  10. Leader Epoch (Kafka 0.11.0及以上版本): 分区领导者的纪元号。这是一个内部使用的字段,用于Kafka的复制机制,以确保数据一致性。

为什么 consumer 拿到的内容需要 decode
  • 消费者在接收到Kafka中的消息后需要进行解码(decoding),原因在于消息的生产者在发送消息到Kafka之前通常会对消息的键(key)和值(value)进行编码(encoding)。编码和解码是为了在网络上传输数据时确保数据的一致性和完整性,同时也支持消息的有效存储。

  • 一般使用 utf-8 进行编解码

  • 上述的 producer 中没有显式调用 encoder 是因为 json.dumps 本身就是序列化的过程,也就是编码的过程。 但好习惯应该是对 keyvalue 都进行 encode

    producer.produce('user_activities1', key=str(data['user_id']).encode("utf-8"), value=json.dumps(data).encode("utf-8"))
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/780929.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MES系统怎么解决车间生产调度难的问题?

MES系统三个层次 1、MES决定了生产什么,何时生产,也就是说它使公司保证按照订单规定日期交付准确的产品; 2、MES决定谁通过什么方式(流程)生产,即通过优化资源配置,最有效运用资源; …

关于SVG格式图片实现室内地图

SVG格式图片 可缩放矢量图形(Scalable Vector Graphics,SVG)基于 XML 标记语言,用于描述二维的矢量图形。 作为一个基于文本的开放网络标准,SVG 能够优雅而简洁地渲染不同大小的图形,并和 CSS、DOM、JavaScript 和 SMIL 等其他网络标准无缝衔接。本质上,SVG 相对于图像…

react学习总结(二)之案例分享

一.项目框架的搭建 1./src/pages下建不同的页面Header.jsx,About.jsx,Home.jsx, Message.jsx,News.jsx,Detail.jsx Header.jsx import React from react import { useNavigate } from react-router-domexport defau…

C++多线程:线程的创建、join、detach、joinable方法(二)

1、线程的开始与结束 程序运行起来,生成一个进程,该进程所持有的主线程开始自动运行,main主线程运行完所有的代码从main函数中返回表示整个进程运行完毕,标志着主线程和进程的死亡,等待操作系统回收资源,因…

nginx如何清理页面缓存

在 Nginx 中,清理页面缓存通常涉及配置缓存头以控制缓存行为,或者使用外部工具或机制来清除缓存。以下是一些建议来管理和清理 Nginx 的页面缓存: 配置缓存头: Nginx 本身不直接提供缓存机制,但可以通过配置 proxy_cac…

安全算法 - 国密算法

国密算法是中国自主研发的密码算法体系,包括对称加密算法、非对称加密算法和哈希算法。其中,国密算法采用SM4作为对称加密算法,SM2作为非对称加密算法,以及SM3作为哈希算法。国密算法在信息安全领域具有重要意义和广泛应用&#x…

Cocos Creator 常见问题记录

目录 问题1、精灵图九宫格,角度不拉伸 问题2、BlockInputEvents 防止透屏 问题1、精灵图九宫格,角度不拉伸 点击编辑,拖拽到可变区域 问题2、BlockInputEvents 防止透屏

【独立开发前线】Vol.26 【独立开发产品】吉光卡片-让你的文字变得酷炫起来

今天给大家分享一下 独立开发前线 社区成员张小吉 的作品 吉光卡片; 这是一款iOS的APP,下载:吉光卡片,主要功能是帮你制作酷炫的文字卡片,用精美的卡片让你的文字生动起来。 展示效果如下: 你可以用它制作…

【公示】2023年度青岛市级科技企业孵化器拟认定名单

根据《青岛市科技企业孵化器管理办法》(青科规〔2023〕1号)(以下简称《管理办法》)、《关于开展2023年度市级科技企业孵化器认定申报工作的通知》,经申报受理、区市推荐、形式审查、专家评审及现场核查等程序&#xff…

为何keil编译信息显示data使用量不是整数

在使用Keil软件进行嵌入式系统开发时,编译后显示的数据使用量(Data Usage)可能会以小数形式显示。这种情况通常是由以下几个原因造成的: 1.内存对齐:为了提高内存访问效率,编译器会对数据进行对齐处理。例…

【笔记】动⼿学深度学习(花书)|| Aston Zhang Mu Li Zachary C. LiptonAlexander J. Smola

系列文章目录 提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加 前言 第一章 深度学习简介 第二章 P 提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 系列文章目录前言本书…

BasicVSR++模型转JIT并用c++libtorch推理

BasicVSR模型转JIT并用clibtorch推理 文章目录 BasicVSR模型转JIT并用clibtorch推理安装BasicVSR 环境1.下载源码2. 新建一个conda环境3. 安装pytorch4. 安装 mim 和 mmcv-full5. 安装 mmedit6. 下载模型文件7. 测试一下能否正常运行 转换为JIT模型用c libtorch推理效果 安装Ba…

使用docker 安装oracle 11g 挂载数据目录并修改SID centos-7

建议:建议使用其它系统去装ubuntu或Rocky(因为centos已经停止维护) 1、安装docker 这里就不细写了,可以查看清华镜像源或者阿里镜像源 清华:https://mirrors.tuna.tsinghua.edu.cn/help/docker-ce/ 阿里:ht…

2434. 使用机器人打印字典序最小的字符串

点击跳转题目 本题学到两点: 1.初始化数组,全部为0的简单写法。之前都是 int arr[26]; memset(arr,0,sizeof(arr));2.if条件中的&&部分左右顺序不能颠倒。颠倒报错,之前一直没重视。 思路: 遍历s,push当前字…

[c++]类和对象常见题目详解

本专栏内容为:C学习专栏,分为初阶和进阶两部分。 通过本专栏的深入学习,你可以了解并掌握C。 💓博主csdn个人主页:小小unicorn ⏩专栏分类:C 🚚代码仓库:小小unicorn的代码仓库&…

2024.03.19 校招 实习 内推 面经

绿*泡*泡VX: neituijunsir 交流*裙 ,内推/实习/校招汇总表格 1、校招 | RoboSense 速腾聚创2024届春招启动(内推) 校招 | RoboSense 速腾聚创2024届春招启动(内推) 2、实习 | 百度智能驾驶事业群组 202…

kanzi 3d知识点

整理学习资料 名字链接Kanzi视频合集中科创达-智能座舱视频专辑-中科创达-智能座舱视频合集-哔哩哔哩视频 (bilibili.com)Kanzi在线文档Working with … - Kanzi framework 3.9.7 documentationThe Book of ShadersThe Book of Shaders着色器语言Shader_着色语言Shading Langua…

Vim - 文本编辑器 Vi vs Vim

你是否应该在学习 Vim 之前先学习 Vi,这完全取决于您自己、您的要求以及您的具体目标和需求。Vim 是 Vi 的扩展、增强和改进版本,它包括 Vi 的所有功能以及许多附加功能。 简约: Vi 设计简约。先学习 Vi 可以让你对基础知识有扎实的了解&…

malloc是如何分配内存|malloc(1)分配多大内存|free释放内存,会还给操作系统吗?

前言 大家好, 我jiantaoyab,这篇文章给大家介绍mallo和free面试中常问到的问题。 malloc是如何分配内存的? 如果用户分配的内存小于128KB,则通过brk()申请内存 如果用户分配的内存大于128KB,则通过mmap()申请内存 简…

数据分析之POWER Piovt的KPI设置

内容总结: 1.两个表格关联不上:需要添加辅助列,建立关联 2.添加辅助列后还关联不上:将虚线变为实线 3.根据需求要增加一些度量值 4.设置KPI后,绝对值选1后设定百分比 5.在透视表里面加入KPI状态 导入所关联的数据后建立…