为了向Kafka集群生产和消费消息,我们可以使用confluent-kafka
库,它是Confluent为Python提供的官方Kafka客户端。以下是一个简化的示例,展示如何将Kafka的生产者和消费者操作封装到一个类中:
首先,确保你已经安装了所需的库:
pip install confluent-kafka
然后,你可以使用以下代码:
from confluent_kafka import Producer, Consumer, KafkaErrorclass KafkaManager:def __init__(self, bootstrap_servers):self.bootstrap_servers = bootstrap_serversdef produce(self, topic, key, value):"""生产消息到Kafka"""p = Producer({'bootstrap.servers': self.bootstrap_servers})def delivery_report(err, msg):"""Called once for each message produced to indicate delivery result."""if err is not None:print('Message delivery failed: {}'.format(err))else:print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))p.produce(topic, key=key, value=value, callback=delivery_report)p.flush()def consume(self, topic, group_id, timeout=1.0):"""从Kafka消费消息"""c = Consumer({'bootstrap.servers': self.bootstrap_servers,'group.id': group_id,'auto.offset.reset': 'earliest'})c.subscribe([topic])while True:msg = c.poll(timeout)if msg is None:continueif msg.error():if msg.error().code() == KafkaError._PARTITION_EOF:print('Reached end of partition')else:print('Error while consuming message: {}'.format(msg.error()))else:print('Received message: {}'.format(msg.value().decode('utf-8')))c.close()# 使用示例
if __name__ == "__main__":manager = KafkaManager('localhost:9092')# 生产消息manager.produce('test_topic', 'key1', 'value1')# 消费消息manager.consume('test_topic', 'test_group')
pip install kafka-python
from kafka import KafkaProducer, KafkaConsumerclass KafkaManager:def __init__(self, bootstrap_servers):self.bootstrap_servers = bootstrap_serversdef produce(self, topic, key, value):"""生产消息到Kafka"""producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers,key_serializer=str.encode,value_serializer=str.encode)producer.send(topic, key=key, value=value)producer.flush()producer.close()def consume(self, topic, group_id, timeout=10):"""从Kafka消费消息"""consumer = KafkaConsumer(topic,bootstrap_servers=self.bootstrap_servers,group_id=group_id,auto_offset_reset='earliest',key_deserializer=bytes.decode,value_deserializer=bytes.decode)for message in consumer:print(f"Received message: {message.value}")consumer.close()# 使用示例
if __name__ == "__main__":manager = KafkaManager('localhost:9092')# 生产消息manager.produce('test_topic', 'key1', 'value1')# 消费消息manager.consume('test_topic', 'test_group')