kafka-python使用手册
kafka-python
1. 生产者同步发送数据
# 生产者同步发送数据from kafka import KafkaProducer
from kafka.errors import KafkaErrorproducer = KafkaProducer(bootstrap_servers=["192.168.1.6:9092"])try:record_metadata = producer.send("predict_task_log", b"202312301505 predict res: success").get(timeout=10) # 同步方式print(record_metadata.topic)print(record_metadata.partition)print(record_metadata.offset)
except KafkaError:print(f"write data to kafka failed!")
finally:producer.close()
2. 生产则异步发送数据
# 生产者异步发送数据from kafka import KafkaProducer
from kafka.errors import KafkaErrorproducer = KafkaProducer(bootstrap_servers=["192.168.1.6:9092"])def on_send_success(record_metadata):"""发送成功之后的回调函数"""print(record_metadata.topic)print(record_metadata.partition)print(record_metadata.offset)def on_send_error(excp):"""发送失败后的回调函数"""print(f"write data to kafka error: {excp}")try:# 1. 主线程执行,子线程将数据写入缓冲池,不影响主线程做其他操作future = producer.send("predict_task_log", b"202312301505 predict res: success")# 2. 子线程通过回调函数通知主线程future.add_callback(on_send_success).add_errback(on_send_error)
except KafkaError:print(f"write data to kafka failed!")
finally:producer.close()
3. 消费者自动提交offset
# 消费者自动提交offsetfrom kafka import KafkaConsumerconsumer = KafkaConsumer(bootstrap_servers=["192.168.1.6:9092"],group_id='predict_group',enable_auto_commit=True, # 自动提交auto_commit_interval_ms=1000
)for msg in consumer:topic, partition, offset = msg.topic, msg.partition, msg.offsetkey, value = msg.key, msg.value.decode("utf-8")print(f"从topic为{topic}的{partition}分区上,获取偏移量为{offset}的消息为{key}: {value}")
4. 消费者手动提交offset
# 消费者手动提交offsetfrom kafka import KafkaConsumerconsumer = KafkaConsumer(bootstrap_servers=["192.168.1.6:9092"],group_id='predict_group',enable_auto_commit=False # 手动提交
)for msg in consumer:topic, partition, offset = msg.topic, msg.partition, msg.offsetkey, value = msg.key, msg.value.decode("utf-8")print(f"从topic为{topic}的{partition}分区上,获取偏移量为{offset}的消息为{key}: {value}")# 手动提交偏移量consumer.commit() # 同步commitconsumer.commit_async() # 异步commit,推荐使用