- Concept
待后续填坑… - Push Data
from kafka import KafkaProducer
import jsondef push_kafka(sqlstring, valuelist):# logging.info("kafka string ----- [%s]" % (sqlstring % valuelist))producer = KafkaProducer(bootstrap_servers=["ip1:9092", "ip2:9092", "ip3:9092"])try:msg_dict = {"data": {'sqlstring': sqlstring,'paramslist': valuelist}}msg = json.dumps(msg_dict)producer.send("xx3_ccccc", value=msg.encode("utf-8"))except Exception as e:logging.exception('push_kafka Exception[%s]', str(e))
- Consumer
def consumer_session():consumer = KafkaConsumer("xx3_ccccc",bootstrap_servers=["ip1:9092", "ip2:9092", "ip3:9092"])while True:c_msgs = consumer.poll(max_records=300).values()for m_bo in c_msgs:for msg in m_bo:c_msg = msg.value.decode("utf-8")d_msg = json.loads(c_msg)try:cur.execute(d_msg['data']["sqlstring"], d_msg['data']["paramslist"])conn.commit()except Exception as e:logging.error("insert error:%s", e)# 关闭游标和连接cur.close()conn.close()
未完待续…