目录
- 一:需求背景
- 二:相关文档
- 三:验证TDMQ广播消息
一:需求背景
- 目前公司需要将决策引擎处理的结果, 一部分数据交给下游分析/入黑/通知等功能。因此就需要决策引擎生产结果让多方下游去消费。 而我需要实现下游的一部分功能。
二:相关文档
- TDMQ官方文档(TCP的SDK): https://www.tencentcloud.com/zh/document/product/1110/42950
三:验证TDMQ广播消息
-
将TDMQ使用配置写入到一个配置文件中: config.py
topic = '' tdmq_url = '' tdmq_secret_key = ''
-
编写生产者发送消息:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2023/7/24 16:28 # 模拟TDMQ生产者发送消息 import jsonimport pulsarfrom conf.config import tdmq_secret_key, tdmq_url, topicdef produser_send_msg():"""生产者发送消息"""tdmq_client = pulsar.Client(authentication=pulsar.AuthenticationToken(tdmq_secret_key),service_url=tdmq_url)producer = tdmq_client.create_producer(topic=topic)send_data = json.dumps({"uniq_id": "1234567890", "project_id": 2})producer.send(send_data.encode('utf-8'),properties={},partition_key='')if __name__ == '__main__':produser_send_msg()
-
编写两个消费者, 消费消息(第二个只需要将subscription_name改为sub_topic2)
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2023/7/24 18:55 # @Author : shanwen.ren import pulsarfrom conf.config import tdmq_secret_key, tdmq_url, post_fund_topictdmq_client = pulsar.Client(authentication=pulsar.AuthenticationToken(tdmq_secret_key),service_url=tdmq_url)consumer = tdmq_client.subscribe(# topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制topic=topic,# 订阅名称subscription_name='sub_topic1' )def produser_send_msg():"""消费者消费消息"""# 获取消息msg = consumer.receive()try:# 模拟业务print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))# 消费成功,回复ackconsumer.acknowledge(msg)except:# 消费失败,消息将会重新投递consumer.negative_acknowledge(msg)if __name__ == '__main__':while True:produser_send_msg()
-
开启两个消费者进程。
-
启动生产者发送消息