第一步:安装kafka的模块
pip install kafka-python
第二步:编写代码
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import random
import timeclass StationLog:def __init__(self, station_id, call_out, call_in, call_status, timestamp, call_duration):self.station_id = station_idself.call_out = call_outself.call_in = call_inself.call_status = call_statusself.timestamp = timestampself.call_duration = call_durationdef to_string(self):return json.dumps(self.__dict__)def main():# 设置连接kafka集群的ip和端口producer = KafkaProducer(bootstrap_servers='bigdata01:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8'))arr = ["fail", "busy", "barring", "success", "success", "success", "success", "success", "success", "success", "success", "success"]while True:call_out = "1860000" + str(random.randint(0, 9999)).zfill(4)call_in = "1890000" + str(random.randint(0, 9999)).zfill(4)call_status = random.choice(arr)call_duration = 1000 * (10 + random.randint(0, 9)) if call_status == "success" else 0# 随机产生一条基站日志数据station_log = StationLog("station_" + str(random.randint(0, 9)),call_out,call_in,call_status,int(time.time() * 1000), # 当前时间戳call_duration)print(station_log.to_string())time.sleep(0.1 + random.randint(0, 99) / 100)try:# 发送数据到Kafkaproducer.send('topicA', station_log.to_string())except KafkaError as e:print(f"Failed to send message: {e}")# 确保所有异步消息都被发送producer.flush()if __name__ == "__main__":main()
以上案例是通过python操作kafka,将一些模拟数据发送到kafka中。