消息中间件 --- Kafka 快速入门
消息中间件:https://blog.51cto.com/u_9291927/category33
GitHub: GitHub - scorpiostudio/HelloKafka: HelloKafka
- Kafka快速入门(一)--- Kafka简介:https://blog.51cto.com/9291927/2493953
- Kafka快速入门(二)--- Kafka架构:https://blog.51cto.com/9291927/2497814
- Kafka快速入门(三)--- Kafka核心技术:https://blog.51cto.com/9291927/2497820
- Kafka快速入门(四)--- Kafka高级功能:https://blog.51cto.com/9291927/2497828
- Kafka快速入门(五)--- Kafka管理:https://blog.51cto.com/9291927/2497842
- Kafka快速入门(六)--- Kafka集群部署:https://blog.51cto.com/9291927/2498428
- Kafka快速入门(七)--- Kafka监控:https://blog.51cto.com/9291927/2498434
- Kafka快速入门(八)--- Confluent Kafka简介:https://blog.51cto.com/9291927/2499090
- Kafka快速入门(九)--- C客户端:https://blog.51cto.com/9291927/2502001
- Kafka快速入门(十)--- C++客户端:https://blog.51cto.com/9291927/2502063
- Kafka快速入门(十一)--- RdKafka源码分析:https://blog.51cto.com/9291927/2504489
- Kafka快速入门(十二)--- Python客户端:https://blog.51cto.com/9291927/2504495
Python3 学习(五十四):confluent-kafka 模块的使用
From:https://blog.csdn.net/liao392781/article/details/90487438
coufluent-kafka 是 Python 模块,是对 librdkafka 的轻量级封装,librdkafka 又是基于 c/c++ 的kafka 库,性能上不必多说。使用上要优于 kafka-python。confluent-kafka-python 是 Confluent 用于 Apache Kafka( Apache Kafka ) 和 Confluent Platform( Data in Motion Platform for the Enterprise | Confluent ) 的 Python 客户端。
特征:
- 高性能 : confluent-kafka-python 是 librdkafka( https://github.com/edenhill/librdkafka ) 的一个轻量级包装器,librdkafka是一个 经过精心调优的C客户端。
- 可靠性 : 在编写Apache Kafka客户端时,有很多细节要做。我们将它们放在一个地方(librdkafka)并在我们所有客户中利用这项工作(也是汇合 - kafka-go ( https://github.com/confluentinc/confluent-kafka-go ) 和 confluent-kafka-dotnet ( GitHub - confluentinc/confluent-kafka-dotnet: Confluent's Apache Kafka .NET client ))
示例代码:
# -*- coding: utf-8 -*-
# @Author :
# @Date :
# @File : kafka_operate.py
# @description : XXXimport time
import datetime
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer, AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
from confluent_kafka import Producer, Consumer, KafkaErrordef delivery_report(err, msg):""" Called once for each message produced to indicate delivery result.Triggered by poll() or flush(). """if err is not None:print('Message delivery failed: {}'.format(err))else:print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))def kafka_producer():p = Producer({'bootstrap.servers': 'mybroker1,mybroker2'})while True:try:current_date = str(datetime.datetime.now().replace(microsecond=0))data = current_date# Trigger any available delivery report callbacks from previous produce() callsp.poll(0)# Asynchronously produce a message, the delivery report callback# will be triggered from poll() above, or flush() below, when the message has# been successfully delivered or failed permanently.p.produce('my_topic', data.encode('utf-8'), callback=delivery_report)time.sleep(1)except BaseException as be:print(be)break# Wait for any outstanding messages to be delivered and delivery report# callbacks to be triggered.p.flush()def kafka_consumer():c = Consumer({'bootstrap.servers': 'mybroker','group.id': 'mygroup','auto.offset.reset': 'earliest'})c.subscribe(['my_topic'])while True:msg = c.poll(1.0)if msg is None:continueif msg.error():print("Consumer error: {}".format(msg.error()))continueprint('Received message: {}'.format(msg.value().decode('utf-8')))c.close()def kafka_avro_producer():value_schema_str = """{"namespace": "my.test","name": "value","type": "record","fields" : [{"name" : "name","type" : "string"}]}"""key_schema_str = """{"namespace": "my.test","name": "key","type": "record","fields" : [{"name" : "name","type" : "string"}]}"""value_schema = avro.loads(value_schema_str)key_schema = avro.loads(key_schema_str)value = {"name": "Value"}key = {"name": "Key"}avro_producer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2','schema.registry.url': 'http://schem_registry_host:port'}, default_key_schema=key_schema, default_value_schema=value_schema)avro_producer.produce(topic='my_topic', value=value, key=key)avro_producer.flush()def kafka_avro_consumer():c = AvroConsumer({'bootstrap.servers': 'mybroker,mybroker2','group.id': 'groupid','schema.registry.url': 'http://127.0.0.1:8081'})c.subscribe(['my_topic'])while True:try:msg = c.poll(10)except SerializerError as e:print("Message deserialization failed for {}: {}".format(msg, e))breakif msg is None:continueif msg.error():print("AvroConsumer error: {}".format(msg.error()))continueprint(msg.value())c.close()if __name__ == '__main__':pass