如果想通过docker安装kafka,可参考
Docker安装Kafka
生产者
import json
import time
import tracebackfrom datetime import datetime
from kafka import KafkaProducer
from kafka.errors import kafka_errorsproducer = KafkaProducer(bootstrap_servers=['localhost:9092'],key_serializer=lambda k: json.dumps(k).encode(),value_serializer=lambda v: json.dumps(v).encode())while True:i = datetime.strftime(datetime.now(), '%Y%m%d %H:%M:%S')future = producer.send('kafka_demo',key='count_num', # 同一个key值,会被送至同一个分区value=str(i),partition=0) # 向分区1发送消息print("send {}".format(str(i)))time.sleep(3)try:future.get(timeout=10) # 监控是否发送成功except kafka_errors: # 发送失败抛出kafka_errorstraceback.format_exc()
消费者
import jsonfrom kafka import KafkaConsumerconsumer = KafkaConsumer('kafka_demo',bootstrap_servers=':9092',group_id='test'
)
for message in consumer:print("receive, key: {}, value: {}".format(json.loads(message.key.decode()),json.loads(message.value.decode())))
分为两个终端页面运行即可:
生产者打印:
消费者打印: