Python 版分布式消息队列 Kafka 实现图片数据传输

1、Kafka 介绍

在使用 Kafka 之前,通常需要先安装和配置 ZooKeeper。ZooKeeper 是 Kafka 的依赖项之一,它用于协调和管理 Kafka 集群的状态。

ZooKeeper 是一个开源的分布式协调服务,它提供了可靠的数据存储和协调机制,用于协调分布式系统中的各个节点。Kafka 使用 ZooKeeper 来存储和管理集群的元数据、配置信息和状态。

2、Kafka 环境搭建

环境:

  • Windows11
  • Java 1.8 及以上
  • Anaconda
  • Python10
  • Kafka 2.0.2 (kafka-python)
2.1、安装 Python 版本 Kafka
pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple

至此,Windows 环境下还不能运行 Kafka,一般情况下,程序会提示超时(60ms)等报错。原因是,还需要启动 Kafka 服务。

2.2、启动 Kafka 服务

从 Kafka 官网下下载对应的文件:Apache Kafka 官网下载地址

在这里插入图片描述

下载红色箭头所指向的文件到本地并解压。

在这里插入图片描述

注意:

从 Kafka 官网上下载的 kafka_2.12-3.2.1 文件需要放置在路径较浅文件夹下解压,一旦放置的路径较深,会报错:

在这里插入图片描述

输入行太长。
命令语法不正确。

本案例放在 E 盘下。

2.2.1、启动 Zookeeper 服务

在上图路径下打开 cmd 命令窗口,执行如下命令:

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

在这里插入图片描述

出现如下信息,表示 Zookeeper 服务启动成功:

在这里插入图片描述

2.2.2、启动 Kafka 服务

在上图路径下打开 cmd 命令窗口,执行如下命令:

.\bin\windows\kafka-server-start.bat .\config\server.properties

出现如下信息,表示 Kafka 服务启动成功:

在这里插入图片描述

3、构建图片传输队列

在这里插入图片描述

3.1、配置文件

Properties/config.yaml:

kafka:host: "127.0.0.1"port: 9092parameter:bootstrap_servers: '127.0.0.1:9092'api_version: "2.5.0"log_path: "KafkaLog/log.txt"
workspace:path: "E:/harrytsz-workspace/harrytsz-python/DistributedSystemDemo00"
input:images_path: "DataSource/Images"
output:output_path: "DataSource/Output"
3.2、Kafka 创建分区

KafkaModule/ProducerConsumer/KafkaClient.py:

from kafka.admin import KafkaAdminClient, NewPartitionsclient = KafkaAdminClient(bootstrap_servers="127.0.0.1:9092")# 在已有的 topic 中创建分区
new_partitions = NewPartitions(3)
client.create_partitions({"kafka_demo": new_partitions})
3.3、生产者、消费者(单线程版)

生产者:

KafkaModule/ProducerConsumer/KafkaDemoProducer.py:

# -*- coding: utf-8 -*-
import json
import yaml
import base64
import os.path
import logging
import random
import traceback
from kafka.errors import kafka_errors
from kafka import KafkaProducerdef producer_demo(cfg):""":param cfg::return:"""# 假设生产的消息为键值对(不是一定要键值对),且序列化方式为jsonproducer = KafkaProducer(bootstrap_servers=cfg['kafka']['parameter']['bootstrap_servers'],key_serializer=lambda k: json.dumps(k).encode(),value_serializer=lambda v: json.dumps(v).encode())logging.info("Kafka Producer Starting")images_path = cfg['input']['images_path']workspace_path = cfg['workspace']['path']for i, img in enumerate(os.listdir(os.path.join(workspace_path, images_path))):print(f"img: {img}")workspace_path = cfg['workspace']['path']image_path = os.path.join(workspace_path, images_path, img)with open(image_path, "rb") as image_file:image_data = image_file.read()encode_image = base64.b64encode(image_data)json_data = encode_image.decode("utf-8")json_string = json.dumps(json_data)future = producer.send('kafka_demo',key=str(i),  # 同一个key值,会被送至同一个分区value=json_string,partition=random.randint(0, 2))  # 向分区1发送消息producer.flush()logging.info("Send {}".format(str(i)))try:future.get(timeout=10)  # 监控是否发送成功except kafka_errors:  # 发送失败抛出 kafka_errorstraceback.format_exc()def process():with open(os.path.expanduser("E:/harrytsz-workspace/harrytsz-python/DistributedSystemDemo00/""Properties/config.yaml"), "r") as config:cfg = yaml.safe_load(config)logging.basicConfig(filename=cfg['kafka']['parameter']['log_path'],format='%(asctime)s - %(name)s - %(levelname)s - %(message)s - %(funcName)s',level=logging.INFO)producer_demo(cfg)if __name__ == '__main__':process()

消费者:

KafkaModule/ProducerConsumer/KafkaDemoConsumer.py:

import json
import yaml
import base64
import logging
import os.path
from kafka import KafkaConsumerdef consumer_demo0(cfg):""":param cfg::return:"""consumer = KafkaConsumer('kafka_demo',bootstrap_servers=cfg['kafka']['parameter']['bootstrap_servers'],api_version=cfg['kafka']['parameter']['api_version'],group_id='test')logging.info("consumer_demo0 starting")for message in consumer:key_json_string = json.loads(message.key.decode())value_json_string = json.loads(message.value.decode())name_data = "test0" + key_json_string + ".jpg"image_data = base64.b64decode(value_json_string)logging.info(f"Receiving {name_data} data.")workspace_path = cfg['workspace']['path']output_path = cfg['output']['output_path']image_path = os.path.join(workspace_path, output_path, name_data)with open(image_path, 'wb') as jpg_file:jpg_file.write(image_data)logging.info(f"Save {name_data} data finished.")def process():""":return:"""with open(os.path.expanduser("E:/harrytsz-workspace/harrytsz-python/DistributedSystemDemo00/""Properties/config.yaml"), "r") as config:cfg = yaml.safe_load(config)logging.basicConfig(filename=cfg['kafka']['parameter']['log_path'],format='%(asctime)s - %(name)s - %(levelname)s - %(message)s - %(funcName)s',level=logging.INFO)consumer_demo0(cfg)if __name__ == '__main__':process()
3.4、生产者、消费者(线程池版)

生产者:

KafkaModule/ProducerConsumer/KafkaDemoProducerMultiThread.py:

# -*- coding: utf-8 -*-
import json
import yaml
import base64
import os.path
import logging
import random
import traceback
from kafka.errors import kafka_errors
from kafka import KafkaProducerdef producer_demo(cfg):""":param cfg::return:"""# 假设生产的消息为键值对(不是一定要键值对),且序列化方式为jsonproducer = KafkaProducer(bootstrap_servers=cfg['kafka']['parameter']['bootstrap_servers'],key_serializer=lambda k: json.dumps(k).encode(),value_serializer=lambda v: json.dumps(v).encode())logging.info("Kafka Producer Starting")images_path = cfg['input']['images_path']workspace_path = cfg['workspace']['path']for i, img in enumerate(os.listdir(os.path.join(workspace_path, images_path))):print(f"img: {img}")workspace_path = cfg['workspace']['path']image_path = os.path.join(workspace_path, images_path, img)with open(image_path, "rb") as image_file:image_data = image_file.read()encode_image = base64.b64encode(image_data)json_data = encode_image.decode("utf-8")json_string = json.dumps(json_data)future = producer.send('kafka_demo',key=str(i),  # 同一个key值,会被送至同一个分区value=json_string,partition=random.randint(0, 2))  # 向分区1发送消息producer.flush()logging.info("Send {}".format(str(i)))try:future.get(timeout=10)  # 监控是否发送成功except kafka_errors:  # 发送失败抛出 kafka_errorstraceback.format_exc()def process():with open(os.path.expanduser("E:/harrytsz-workspace/harrytsz-python/DistributedSystemDemo00/""Properties/config.yaml"), "r") as config:cfg = yaml.safe_load(config)logging.basicConfig(filename=cfg['kafka']['parameter']['log_path'],format='%(asctime)s - %(name)s - %(levelname)s - %(message)s - %(funcName)s',level=logging.INFO)producer_demo(cfg)if __name__ == '__main__':process()

消费者:

KafkaModule/ProducerConsumer/KafkaDemoConsumerMultiThread.py:

import json
import yaml
import base64
import logging
import os.path
from kafka import KafkaConsumer
from concurrent.futures import ThreadPoolExecutor, as_completeddef consumer_demo0(cfg, thread_id):""" 线程池版的消费者:param cfg: 配置文件:param thread_id: 线程序号:return:"""consumer = KafkaConsumer('kafka_demo',bootstrap_servers=cfg['kafka']['parameter']['bootstrap_servers'],api_version=cfg['kafka']['parameter']['api_version'],group_id='test')logging.info("consumer_demo0 starting")for message in consumer:key_json_string = json.loads(message.key.decode())value_json_string = json.loads(message.value.decode())name_data = f"test_{thread_id}_" + key_json_string + ".jpg"image_data = base64.b64decode(value_json_string)logging.info(f"Receiving {name_data} data.")workspace_path = cfg['workspace']['path']output_path = cfg['output']['output_path']image_path = os.path.join(workspace_path, output_path, name_data)with open(image_path, 'wb') as jpg_file:jpg_file.write(image_data)logging.info(f"Save {name_data} data finished.")def process():""":return:"""with open(os.path.expanduser("E:/harrytsz-workspace/harrytsz-python/DistributedSystemDemo00/""Properties/config.yaml"), "r") as config:cfg = yaml.safe_load(config)logging.basicConfig(filename=cfg['kafka']['parameter']['log_path'],format='%(asctime)s - %(name)s - %(levelname)s - %(message)s - %(funcName)s',level=logging.INFO)# 线程池thread_pool_executor = ThreadPoolExecutor(max_workers=5, thread_name_prefix="thread_test_")all_task = [thread_pool_executor.submit(consumer_demo0, cfg, i) for i in range(10)]for future in as_completed(all_task):res = future.result()print("res", str(res))thread_pool_executor.shutdown(wait=True)if __name__ == '__main__':process()

运行顺序:

  • 首先运行 KafkaDemoConsumer.py 或者 KafkaDemoConsumerMultiThread.py
  • 然后运行 KafkaDemoProducer.py 或者 KafkaDemoProducerMultiThread.py
  • DataSource/Output 中会接受生产者发送的图片数据,ProducerConsumer/KafkaLog 路径也会产生运行日志。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/811758.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

清远某国企IBM服务器Board故障上门维修

接到一台来自广东清远市清城区某水利大坝国企单位报修一台IBM System x3650 M4服务器无法开机的故障,分享给大家,同时也方便有需要的朋友能及时找到我们快速解决服务器问题。 故障服务器型号:IBM System x3650 M4 服务器使用单位:…

docker、ctr、crictl命令对比

命令dockerctr(containerd)crictl(kubernetes)查看运行的容器docker psctr task ls/ctr container lscrictl ps查看镜像docker imagesctr image lscrictl images查看容器日志docker logs无crictl logs查看容器数据信息docker insp…

厂区3D全景线上漫游体验突破现实时空阻碍

智慧园区,作为现代城市发展的重要引擎,其管理效率和安全监控的需求日益凸显。而720VR全景展示技术的引入,无疑为智慧园区的建设注入了新的活力。通过360全空间无死角的视觉展示,用户可以身临其境地感受园区的每一个角落&#xff0…

记账本|基于SSM的家庭记账本小程序设计与实现(源码+数据库+文档)

家庭记账本小程序目录 基于SSM的家庭记账本小程序设计与实现 一、前言 二、系统设计 三、系统功能设计 1、小程序端: 2、后台 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源码获取: 博主介绍:✌️大…

【opencv】示例-imagelist_reader.cpp 读取YAML格式文件中的图片列表,并逐个显示这些图片的灰度图...

这段代码的功能是使用OpenCV库读取一个YAML或XML格式文件中的图片列表,并且逐个地在窗口中以灰度图像的形式显示这些图片。用户可以按任意键来查看下一张图片。程序提供了帮助信息输出,指导用户如何使用该程序。此外,它使用命令行参数解析器来…

8266 Ubuntu下 arduino开发

8266 NodeMCU arduino开发 直接用usb连接8266的usb接口即可,设备中会出现/dev/ttyUSB0, 需要将其权限设置下sudo usermod -a -G dialout $USER. logout后生效. 下载arduino IDE,做以下设置: file/preferences Additional boards manager URLS设置为http://arduino.esp8266.co…

NCCL集合通信算子DEMO及性能测试

NCCL集合通信算子DEMO及性能测试 一.复现代码 以下代码用于测试NCCL算子的性能及正确性 一.复现代码 tee ccl_benchmark.py <<-EOF import os import torch import argparse import torch.distributed as dist from torch.distributed import ReduceOp from datetime im…

(文章复现)考虑网络动态重构的分布式电源选址定容优化方法

参考文献&#xff1a; [1]朱俊澎,顾伟,张韩旦,等.考虑网络动态重构的分布式电源选址定容优化方法[J].电力系统自动化,2018,42(05):111-119. 1.摘要 以投资周期经济收益最高为目标&#xff0c;基于二阶锥规划提出了一种考虑网络动态重构的分布式电源选址定容优化方法。首先&am…

毅四捕Go设计模式笔记——责任链模式

责任链模式&#xff08;Chain of Responsibility Pattern&#xff09; 为了解决什么问题&#xff1f; 责任链模式的目的是为了将请求的发送者和接收者解耦。它允许多个处理器都有机会处理请求&#xff0c;将这些处理器连接成一条链&#xff0c;并沿着这条链传递请求&#xff…

Linux磁盘空间问题排查记录

问题 pip install时总提示OSError(28, ‘No space left on device’)或者ERROR: Could not install packages due to an OSError: [Errno 28] No space left on device 分析 很明显&#xff0c;磁盘空间不足。尝试了以下方法&#xff0c;没有解决问题&#xff1a; 清理pip缓…

给现有rabbitmq集群添加rabbitmq节点

现有的&#xff1a;10.2.59.216 rabbit-node1 10.2.59.217 rabbit-node2 新增 10.2.59.199 rabbit-node3 1、分别到官网下载erlang、rabbitmq安装包&#xff0c;我得版本跟现有集群保持一致。 erlang安装包&#xff1a;otp_src_22.0.tar.gz rabbitmq安装包&#xff1…

详解App Inventor 2 中的文件作用域(作用范围):App、程序包、缓存、兼容、私有、共享

本文内容来自中文网文档“文件管理器”组件部分&#xff0c;详细介绍了每一种文件作用域的特点及用法。 下面是每种作用域类型的简述&#xff1a; App [推荐] &#xff1a;Android 2.2及更高版本上文件将从应用程序特定存储中读取和写入&#xff0c;在 Android 早期版本上&…

STM32—DMA直接存储器访问详解

DMA——直接存储器访问 DMA&#xff1a;Data Memory Access, 直接存储器访问。 DMA和我们之前学过的串口、GPIO都是类似的&#xff0c;都是STM32中的一个外设。串口是用来发送通信数据的&#xff0c;而DMA则是用来把数据从一个地方搬到另一个地方&#xff0c;而且不占用CPU。…

Arthas排查工具

简介 | arthas (aliyun.com) 在线安装 #下载jar包 curl -O https://arthas.aliyun.com/arthas-boot.jar#启动会先检测虚拟机进程&#xff0c;如果没有启动失败(idea) java -jar arthas-boot.jar linux安装与window一样

利用SOCKS5代理和代理IP提升网络安全与匿名性

一、引言 随着网络技术的迅猛发展&#xff0c;数据安全和隐私保护已成为业界关注的热点。企业和个人用户越来越依赖于各种网络技术来保护敏感信息免受未授权访问。本文将探讨SOCKS5代理、代理IP以及HTTP协议在提升网络安全和匿名性方面的作用和实践应用。 二、基础技术概述 2.…

flask毕业设计选题管理系统python+django_96r19

本系统选择编程语言。Pymysql是封装了MySQL驱动的Python驱动一个能使Python连接到MySQL的库。Python语言官方规范访问数据库的统一接口规范(Python DB-API)&#xff0c;防止在使用不同数据库时&#xff0c;由于底层数据库技术不同造成接口程序紊乱的问题。通过本次系统设计可以…

【Spring高级】Spring Boot启动过程

目录 SpringApplication new 分析源码分析步骤演示primarySources和Sources应用类型webApplicationTypesetInitializers设置容器初始化器setListeners设置监听器主类推断 SpringApplication run 分析主要步骤步骤演示事件发布容器相关执行 runner准备EnvironmentEnvironmentPos…

时间序列分析 #ARMA模型的识别与参数估计 #R语言

掌握ARMA模型的识别和参数估计。 原始数据在文末&#xff01;&#xff01;&#xff01; 练习1、 根据某1915-2004年澳大利亚每年与枪支有关的凶杀案死亡率&#xff08;每10万人&#xff09;数据&#xff08;题目1数据.txt&#xff09;&#xff0c;求&#xff1a; 第1小题&…

C# Solidworks二次开发:模型中实体Entity相关操作API详解

大家好&#xff0c;今天要讲的一些API是关于实体的相关API。 在开发的过程&#xff0c;很多地方会涉及到实体的相关操作&#xff0c;比如通过实体选中节点。下面就直接开始介绍API&#xff1a; &#xff08;1&#xff09;第一个API为Select4&#xff0c;这个API的含义为选中一…

微信小程序中调取小程序实现报错:提示 开发版小程序已过期,请在开发者工具中重新扫码的 解决方案

出现的问题&#xff1a; 解决方法&#xff1a; 将envVersion: develop,开发版切换为正式版 envVersion: release,wx.navigateToMiniProgram({appId:res.data.appId,path: res.data.prePayTn,extraData: {foo: bar,miniProgramOrgId:res.data.miniProgramOrgId,orderId: res.d…