1. 安装pykafka
pip install pykafka
2. 生产者
from pykafka import KafkaClientdef get_kafka_producer(hosts, topics):client = KafkaClient(hosts=hosts)print(client.topics)topic = client.topics[topics]producer = topic.get_producer()return producer
测试
hosts = '192.168.20.203:9092,192.168.20.204:9092,192.168.20.205:9092'
topics = "test_kafka_topic"
producer = get_kafka_producer(hosts, topics)
for i in range(10):msg = "test message " + str(i)msg = bytes(msg, encoding='utf-8') producer.produce(msg)
3. 消费者
def get_kafka_consumer(hosts, topics):client = KafkaClient(hosts=hosts)topic=client.topics[topics]consumer = topic.get_balanced_consumer(consumer_group='test_kafka_topic', auto_commit_enable=True,
zookeeper_connect='192.168.20.201:2181,192.168.20.202:2181,192.168.20.203:2181')return consumer
测试
hosts = '192.168.20.203:9092,192.168.20.204:9092,192.168.20.205:9092'
topics = "test_kafka_topic"
consumer = get_kafka_consumer(hosts, topics)
for msg in consumer:print(msg)if msg is not None:print(msg.offset)print(msg.value)