1、Kafka 介绍
在使用 Kafka 之前,通常需要先安装和配置 ZooKeeper。ZooKeeper 是 Kafka 的依赖项之一,它用于协调和管理 Kafka 集群的状态。
ZooKeeper 是一个开源的分布式协调服务,它提供了可靠的数据存储和协调机制,用于协调分布式系统中的各个节点。Kafka 使用 ZooKeeper 来存储和管理集群的元数据、配置信息和状态。
2、Kafka 环境搭建
环境:
- Windows11
- Java 1.8 及以上
- Anaconda
- Python10
- Kafka 2.0.2 (kafka-python)
2.1、安装 Python 版本 Kafka
pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple
至此,Windows 环境下还不能运行 Kafka,一般情况下,程序会提示超时(60ms)等报错。原因是,还需要启动 Kafka 服务。
2.2、启动 Kafka 服务
从 Kafka 官网下下载对应的文件:Apache Kafka 官网下载地址
下载红色箭头所指向的文件到本地并解压。
注意:
从 Kafka 官网上下载的 kafka_2.12-3.2.1
文件需要放置在路径较浅文件夹下解压,一旦放置的路径较深,会报错:
输入行太长。
命令语法不正确。
本案例放在 E 盘下。
2.2.1、启动 Zookeeper 服务
在上图路径下打开 cmd 命令窗口,执行如下命令:
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
出现如下信息,表示 Zookeeper 服务启动成功:
2.2.2、启动 Kafka 服务
在上图路径下打开 cmd 命令窗口,执行如下命令:
.\bin\windows\kafka-server-start.bat .\config\server.properties
出现如下信息,表示 Kafka 服务启动成功:
3、构建图片传输队列
3.1、配置文件
Properties/config.yaml:
kafka:host: "127.0.0.1"port: 9092parameter:bootstrap_servers: '127.0.0.1:9092'api_version: "2.5.0"log_path: "KafkaLog/log.txt"
workspace:path: "E:/harrytsz-workspace/harrytsz-python/DistributedSystemDemo00"
input:images_path: "DataSource/Images"
output:output_path: "DataSource/Output"
3.2、Kafka 创建分区
KafkaModule/ProducerConsumer/KafkaClient.py:
from kafka.admin import KafkaAdminClient, NewPartitionsclient = KafkaAdminClient(bootstrap_servers="127.0.0.1:9092")# 在已有的 topic 中创建分区
new_partitions = NewPartitions(3)
client.create_partitions({"kafka_demo": new_partitions})
3.3、生产者、消费者(单线程版)
生产者:
KafkaModule/ProducerConsumer/KafkaDemoProducer.py:
# -*- coding: utf-8 -*-
import json
import yaml
import base64
import os.path
import logging
import random
import traceback
from kafka.errors import kafka_errors
from kafka import KafkaProducerdef producer_demo(cfg):""":param cfg::return:"""# 假设生产的消息为键值对(不是一定要键值对),且序列化方式为jsonproducer = KafkaProducer(bootstrap_servers=cfg['kafka']['parameter']['bootstrap_servers'],key_serializer=lambda k: json.dumps(k).encode(),value_serializer=lambda v: json.dumps(v).encode())logging.info("Kafka Producer Starting")images_path = cfg['input']['images_path']workspace_path = cfg['workspace']['path']for i, img in enumerate(os.listdir(os.path.join(workspace_path, images_path))):print(f"img: {img}")workspace_path = cfg['workspace']['path']image_path = os.path.join(workspace_path, images_path, img)with open(image_path, "rb") as image_file:image_data = image_file.read()encode_image = base64.b64encode(image_data)json_data = encode_image.decode("utf-8")json_string = json.dumps(json_data)future = producer.send('kafka_demo',key=str(i), # 同一个key值,会被送至同一个分区value=json_string,partition=random.randint(0, 2)) # 向分区1发送消息producer.flush()logging.info("Send {}".format(str(i)))try:future.get(timeout=10) # 监控是否发送成功except kafka_errors: # 发送失败抛出 kafka_errorstraceback.format_exc()def process():with open(os.path.expanduser("E:/harrytsz-workspace/harrytsz-python/DistributedSystemDemo00/""Properties/config.yaml"), "r") as config:cfg = yaml.safe_load(config)logging.basicConfig(filename=cfg['kafka']['parameter']['log_path'],format='%(asctime)s - %(name)s - %(levelname)s - %(message)s - %(funcName)s',level=logging.INFO)producer_demo(cfg)if __name__ == '__main__':process()
消费者:
KafkaModule/ProducerConsumer/KafkaDemoConsumer.py:
import json
import yaml
import base64
import logging
import os.path
from kafka import KafkaConsumerdef consumer_demo0(cfg):""":param cfg::return:"""consumer = KafkaConsumer('kafka_demo',bootstrap_servers=cfg['kafka']['parameter']['bootstrap_servers'],api_version=cfg['kafka']['parameter']['api_version'],group_id='test')logging.info("consumer_demo0 starting")for message in consumer:key_json_string = json.loads(message.key.decode())value_json_string = json.loads(message.value.decode())name_data = "test0" + key_json_string + ".jpg"image_data = base64.b64decode(value_json_string)logging.info(f"Receiving {name_data} data.")workspace_path = cfg['workspace']['path']output_path = cfg['output']['output_path']image_path = os.path.join(workspace_path, output_path, name_data)with open(image_path, 'wb') as jpg_file:jpg_file.write(image_data)logging.info(f"Save {name_data} data finished.")def process():""":return:"""with open(os.path.expanduser("E:/harrytsz-workspace/harrytsz-python/DistributedSystemDemo00/""Properties/config.yaml"), "r") as config:cfg = yaml.safe_load(config)logging.basicConfig(filename=cfg['kafka']['parameter']['log_path'],format='%(asctime)s - %(name)s - %(levelname)s - %(message)s - %(funcName)s',level=logging.INFO)consumer_demo0(cfg)if __name__ == '__main__':process()
3.4、生产者、消费者(线程池版)
生产者:
KafkaModule/ProducerConsumer/KafkaDemoProducerMultiThread.py:
# -*- coding: utf-8 -*-
import json
import yaml
import base64
import os.path
import logging
import random
import traceback
from kafka.errors import kafka_errors
from kafka import KafkaProducerdef producer_demo(cfg):""":param cfg::return:"""# 假设生产的消息为键值对(不是一定要键值对),且序列化方式为jsonproducer = KafkaProducer(bootstrap_servers=cfg['kafka']['parameter']['bootstrap_servers'],key_serializer=lambda k: json.dumps(k).encode(),value_serializer=lambda v: json.dumps(v).encode())logging.info("Kafka Producer Starting")images_path = cfg['input']['images_path']workspace_path = cfg['workspace']['path']for i, img in enumerate(os.listdir(os.path.join(workspace_path, images_path))):print(f"img: {img}")workspace_path = cfg['workspace']['path']image_path = os.path.join(workspace_path, images_path, img)with open(image_path, "rb") as image_file:image_data = image_file.read()encode_image = base64.b64encode(image_data)json_data = encode_image.decode("utf-8")json_string = json.dumps(json_data)future = producer.send('kafka_demo',key=str(i), # 同一个key值,会被送至同一个分区value=json_string,partition=random.randint(0, 2)) # 向分区1发送消息producer.flush()logging.info("Send {}".format(str(i)))try:future.get(timeout=10) # 监控是否发送成功except kafka_errors: # 发送失败抛出 kafka_errorstraceback.format_exc()def process():with open(os.path.expanduser("E:/harrytsz-workspace/harrytsz-python/DistributedSystemDemo00/""Properties/config.yaml"), "r") as config:cfg = yaml.safe_load(config)logging.basicConfig(filename=cfg['kafka']['parameter']['log_path'],format='%(asctime)s - %(name)s - %(levelname)s - %(message)s - %(funcName)s',level=logging.INFO)producer_demo(cfg)if __name__ == '__main__':process()
消费者:
KafkaModule/ProducerConsumer/KafkaDemoConsumerMultiThread.py:
import json
import yaml
import base64
import logging
import os.path
from kafka import KafkaConsumer
from concurrent.futures import ThreadPoolExecutor, as_completeddef consumer_demo0(cfg, thread_id):""" 线程池版的消费者:param cfg: 配置文件:param thread_id: 线程序号:return:"""consumer = KafkaConsumer('kafka_demo',bootstrap_servers=cfg['kafka']['parameter']['bootstrap_servers'],api_version=cfg['kafka']['parameter']['api_version'],group_id='test')logging.info("consumer_demo0 starting")for message in consumer:key_json_string = json.loads(message.key.decode())value_json_string = json.loads(message.value.decode())name_data = f"test_{thread_id}_" + key_json_string + ".jpg"image_data = base64.b64decode(value_json_string)logging.info(f"Receiving {name_data} data.")workspace_path = cfg['workspace']['path']output_path = cfg['output']['output_path']image_path = os.path.join(workspace_path, output_path, name_data)with open(image_path, 'wb') as jpg_file:jpg_file.write(image_data)logging.info(f"Save {name_data} data finished.")def process():""":return:"""with open(os.path.expanduser("E:/harrytsz-workspace/harrytsz-python/DistributedSystemDemo00/""Properties/config.yaml"), "r") as config:cfg = yaml.safe_load(config)logging.basicConfig(filename=cfg['kafka']['parameter']['log_path'],format='%(asctime)s - %(name)s - %(levelname)s - %(message)s - %(funcName)s',level=logging.INFO)# 线程池thread_pool_executor = ThreadPoolExecutor(max_workers=5, thread_name_prefix="thread_test_")all_task = [thread_pool_executor.submit(consumer_demo0, cfg, i) for i in range(10)]for future in as_completed(all_task):res = future.result()print("res", str(res))thread_pool_executor.shutdown(wait=True)if __name__ == '__main__':process()
运行顺序:
- 首先运行
KafkaDemoConsumer.py
或者KafkaDemoConsumerMultiThread.py
- 然后运行
KafkaDemoProducer.py
或者KafkaDemoProducerMultiThread.py
DataSource/Output
中会接受生产者发送的图片数据,ProducerConsumer/KafkaLog
路径也会产生运行日志。