Kafka Producer/Consumer 关系解释及测试demo

文章目录

  • Producer/Consumer
    • 1. 餐厅的故事
    • 2. Kafka的工作方式
    • 3. 生动的场景
    • 4. 测试Demo
      • 4.1 KafkaProducer
      • 4.2 KafkaConsumer

Producer/Consumer

Kafka的生产者(Producer)和消费者(Consumer)的关系,可以通过一个餐厅的例子来形象地说明。

1. 餐厅的故事

想象一个忙碌的餐厅,这里有:

  • 厨师(Producers):负责准备美味的菜肴。
  • 服务台(Kafka Topic):菜肴准备好后,厨师会将它们放到服务台上,服务台有多个部分,每部分代表一个不同类型的菜(即Kafka中的不同Partition)。
  • 服务员(Consumers):负责从服务台上取走菜肴,并将它们送到顾客手中。

在这个餐厅中,有时候会有特别多的订单,厨师需要快速高效地准备菜肴。每当一道菜准备好,他们就会把它放到对应的部分在服务台上。服务台非常长,可以容纳很多菜肴,让不同的服务员能够同时服务多个顾客,提高效率。

2. Kafka的工作方式

  • Producers(厨师):在Kafka中,生产者的角色是发布消息到Topic中。就像厨师准备好菜肴后,会将它们放到服务台的对应部分。
  • Kafka Topic(服务台):Topic是消息的分类,可以细分为多个Partitions(服务台的多个部分),这样可以提高并行处理的能力。每个Partition都是一个独立的队列。
  • Consumers(服务员):消费者从Topic中读取消息。如果有多个消费者在同一个Consumer Group中,它们可以像一队服务员那样协作,每个人负责从服务台的一部分取菜,这样可以更快地服务所有顾客。每个消费者负责读取特定Partition中的消息,确保每条消息都能被及时处理。

3. 生动的场景

假设一天晚上,餐厅接到了一个大型宴会的预订,需要同时准备多道菜。这时,厨师们(Producers)开始忙碌起来,每准备好一道菜,就会放到服务台(Topic)的指定位置(Partition)。服务员们(Consumers)各自负责一部分服务台,快速地将菜肴送到顾客手中。

在这个过程中,如果某一部分的菜准备得特别快,服务台上的这一部分就会堆积更多的菜肴。负责这一部分的服务员需要加快速度,以确保所有的菜肴都能及时送出。这就像在Kafka中,如果某个Partition的消息积压,负责这个Partition的消费者就需要更快地处理消息,以防止延迟。

通过这个例子,我们可以看到,Kafka的Producer和Consumer之间是如何通过Topic(服务台)和Partition(服务台的不同部分)协作的,以实现高效、可靠的消息处理。

4. 测试Demo

4.1 KafkaProducer

from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import kafka_errors
import traceback
import json
import timeimport logginglogging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')def process():# Kafka配置,需自行修改bootstrap_servers = ['ip:port']producer_topic = 'XXX_topic'# Kafka生产者producer = KafkaProducer(bootstrap_servers=bootstrap_servers,value_serializer=lambda m: json.dumps(m).encode('utf-8'),api_version=(1,0,0))data = {"task_id": 1,"image_path": "XXX","video_path": "XXX","guidence_text": "XXX",}# Kafka请求监听try:res = data# 发送结果到Kafkaproducer.send(producer_topic, res)logging.info(f"send data to {producer_topic}")time.sleep(3)except Exception as e:# 记录错误日志logging.error(f"Error processing kafka request: {e}")if __name__ == "__main__":process()

4.2 KafkaConsumer

from kafka import KafkaConsumer
import json
import logginglogging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')def consume_messages():# Kafka配置bootstrap_servers = ['ip:port']consumer_topic = 'XXX'consumer_group = 'XXX'# Kafka消费者consumer = KafkaConsumer(consumer_topic,bootstrap_servers=bootstrap_servers,group_id=consumer_group,# auto_offset_reset='earliest',  # 从最早的消息开始读取auto_offset_reset= "latest",value_deserializer=lambda m: json.loads(m.decode('utf-8'))  # 解码JSON格式的消息)logging.info(f"Started consuming messages from {consumer_topic}")# 消费消息try:for message in consumer:msg = message.valuelogging.info(f"Received message: {msg}")print(f"msg:{msg}")except KeyboardInterrupt:logging.info("Stopping consumer...")except Exception as e:logging.error(f"Error while consuming messages: {e}")finally:consumer.close()if __name__ == "__main__":consume_messages()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/685302.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Mysql运维篇(四) Xtarbackup--备份与恢复练习

一路走来,所有遇到的人,帮助过我的、伤害过我的都是朋友,没有一个是敌人。如有侵权,请留言,我及时删除! 前言 xtrabackup是Percona公司CTO Vadim参与开发的一款基于InnoDB的在线热备工具,具有…

Compose自定义动画API指南

很多动画API都可以自定义其参数达到不同的效果,Compose也提供了相应的API供开发者进行自定义动画规范。 AnimationSpec 主要用存储动画规格,可以自定义动画的行为,在animate*AsState和updateTransition函数中,此函数默认参数为s…

【防网盘在线解压】Peazip 豌豆压缩 v9.7.0

软件介绍 Peazip 是一个免费的文件归档应用程序, 支持跨平台,是和WinRar、WinZip类似软件的开源免费替代品;支持压缩/ 存档到 7Z, ARC、Brotli BR、BZip2、GZip、 PAQ、PEA、RAR、自解压档案、TAR、WIM、XZ、Zstandard ZST、打开…

数据检索:倒排索引加速、top-k和k最邻近

之前在https://www.yuque.com/treblez/qksu6c/wbaggl2t24wxwqb8?singleDoc# 《Elasticsearch: 非结构化的数据搜索》我们看了ES的设计,主要侧重于它分布式的设计以及LSM-Tree,今天我们来关注算法部分:如何进行检索算法的设计以及如何加速倒排…

系统调用的概念

在嵌入式开发、操作系统开发以及一般的系统编程中,系统调用是一个核心概念。它允许用户空间程序请求内核执行某些操作,如打开文件、读写数据、创建进程等。这些操作通常需要特殊的权限或访问硬件资源,因此不能直接在用户模式下执行。 系统调…

挑战杯 wifi指纹室内定位系统

简介 今天来介绍一下室内定位相关的原理以及实现方法; WIFI全称WirelessFidelity,在中文里又称作“行动热点”,是Wi-Fi联盟制造商的商标做为产品的品牌认证,是一个创建于IEEE 802.11标准的无线局域网技术。基于两套系统的密切相关&#xff…

【数据结构】LRU Cache

文章目录 LRUCache LRUCache 1. LRUCache是一种缓存的替换技术,在CPU和main memory之间根据计算机的局部性原理,往往会采用SRAM技术来构建CPU和主存之间的高速缓存,DRAM(dynamic random access memory)用于构建主存,LRUCache这种…

命令行参数和环境变量

命令行参数 命令行参数是在用户在命令行中输入命令时,跟随命令一起输入的一些附加信息。这些参数可以用来配置命令的行为或传递一些数据给命令。 让同样的程序在不同的命令行参数下运行出不同的结果! 将这些命令和参数可以传给 main 函数生&#xff0…

[ABC329B] Next

链接:[ABC329B] Next - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 题意翻译 给定一个序列,求其严格次大值。 输入输出样例 输入 5 2 1 3 3 2 输出 2 输入 4 4 3 2 1 输出 3 输入 8 22 22 18 16 22 18 18 22 输出 18 说明/提示 制約 2 ≤ &am…

【教程】MySQL数据库学习笔记(一)——认识与环境搭建(持续更新)

写在前面: 如果文章对你有帮助,记得点赞关注加收藏一波,利于以后需要的时候复习,多谢支持! 【MySQL数据库学习】系列文章 第一章 《认识与环境搭建》 第二章 《数据类型》 文章目录 【MySQL数据库学习】系列文章一、认…

图像识别基础之模板匹配

principle 图像匹配 本质:图像的相似度很高(矩阵的相似度很高) code /*\brief 我的图像匹配函数,获取差方和均值最小的矩阵作为结果\param srcPicFile:用以匹配的图像文件\param templatePicFile:模板图像文件\param destPicFile:输出的检测结果文件…

汇报工作时,你的工作会让领导满意吗?

当前你正在做的事 众所周知,跟领导汇报,第一件事需着重汇报你正在做的事,否则领导会感觉你无所事事。 举个例子: 完成了某某项目,在这项目中我负责:协调不同科室之间的纠纷,并把问题集中上报给…

阿里云幻兽帕鲁服务器配置4核16G10M带宽够8个人玩吗?玩起来流畅度怎么样?

阿里云幻兽帕鲁服务器配置4核16G10M带宽这个,个人实测下来,五六个人玩是比较流畅的,不过8个人的话,估计会有点卡。如果是8个人的话,我建议选择8核32G那个配置,更加适合一些。 阿里云一键部署幻兽帕鲁详细教…

Leetcode 583 两个字符串的删除操作

题意理解: 给定两个单词 word1 和 word2 ,返回使得 word1 和 word2 相同所需的最小步数。 每步 可以删除任意一个字符串中的一个字符。 该题的要求是:当前有两个单词,要是两个单词剩余的部分相同,最少需要删除多少个字…

解惑:工业物联网 modbus 地址到底从0开始还是从1开始

困惑了好久,终于搞明白了,直接上结果: modbus本身是从0开始,这可以从指令数据中直接看出来PLC分区的地址从1开始,这是PLC的习惯,但是对应的modbus指令的地址要-1 简单说就是,PLC4个区的表达方式…

打印最小公倍数

打印最小公倍数 题目描述: 输入2个整数m和n,计算m和n的最小公倍数,并打印出结果 测试1: 输入:18 24 输出:72 测试2: 输入:18 6 输出:18解法思路: 最小公倍数是指两个…

分布式锁redisson

文章目录 1. 分布式锁1.1 基本原理和实现方式对比synchronized锁在集群模式下的问题多jvm使用同一个锁监视器分布式锁概念分布式锁须满足的条件分布式锁的实现 1.2 基于Redis的分布式锁获取锁&释放锁操作示例 基于Redis实现分布式锁初级版本ILock接口SimpleRedisLock使用示…

原型模式-Prototype Pattern

原文地址:https://jaune162.blog/design-pattern/prototype-pattern/ 引言 在Java中如果我们想要拷贝一个对象应该怎么做?第一种方法是使用 getter和setter方法一个字段一个字段设置。或者使用 BeanUtils.copyProperties() 方法。这种方式不仅能实现相同类型之间对象的拷贝,…

CMake进行C/C++与汇编混合编程

1. 前提 这篇文章记录一下怎么用CMake进行项目管理, 并用C/C和汇编进行混合编程, 为了使用这项技术, 必须在VS的环境中安装好cmake组件 由于大部分人不会使用C/C与汇编进行混合编程的情况。所以这篇文章并不适用于绝大部分人不会对其中具体细节进行过多叙述。只是做一些简单的…

【C++前缀和】野牛与狼

题目描述 位于欧洲中部的赫希费尔登狩猎保护区生活着许多动物,雄壮的野牛和凶狠的狼群之间保持着一种平衡,当野牛的数量和狼群一样多时,它们彼此互不侵犯相安无事,但只要狼的数量多于野牛狼群就会攻击牛群,而野牛数量…