java(kotlin)和 python 通过DoubleCloud的kafka进行线程间通信

进入

DoubleCloud
https://www.double.cloud
创建一个kafka
1 选择语言
2 运行curl 的url命令启动一个topic
3 生成对应语言的token
4 复制3中的配置文件到本地,命名为client.properties
5 复制客户端代码
对python和java客户端代码进行了重写,java改成了kotlin:

配置文件

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers=
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=
sasl.password=
group.id=
auto.offset.reset=earliest
# Best practice for higher availability in librdkafka clients prior to 1.7
session.timeout.ms=45000
import timefrom confluent_kafka import Producer, Consumer
import asyncio
import threadingclass KafkaClient:def __init__(self, config_file):self.config = self.read_config(config_file)def read_config(self, config_file):config = {}with open(config_file) as fh:for line in fh:line = line.strip()if len(line) != 0 and line[0] != "#":parameter, value = line.strip().split('=', 1)config[parameter] = value.strip()return configdef produce(self, topic, key, value):# Creates a new producer instanceproducer = Producer(self.config)# Produces a sample messageproducer.produce(topic, key=key, value=value)print(f"Produced message to topic {topic}: key = {key:12} value = {value:12}")# Send any outstanding or buffered messages to the Kafka brokerproducer.flush()def consume_async(self, topic, callback=None, group_id="python-group-1", auto_offset_reset="earliest"):# Sets the consumer group ID and offsetself.config["group.id"] = group_idself.config["auto.offset.reset"] = auto_offset_resetconsumer = Consumer(self.config)consumer.subscribe([topic])loop = asyncio.new_event_loop()asyncio.set_event_loop(loop)if callback is not None:loop.run_until_complete(callback(consumer))def consume(self, topic, callback=None):thread = threading.Thread(target=self.consume_async, args=(topic, callback,))thread.start()return threadasync def consume_async(consumer):try:while True:msg = consumer.poll(1.0)if msg is not None:breakif not msg.error():key = msg.key().decode("utf-8")value = msg.value().decode("utf-8")print(f"Consumed message: key = {key:12} value = {value:12}")except KeyboardInterrupt:passfinally:consumer.close()config_file_path = ".\\client.properties"
topic = "test"
key = "key"
value = "value"kafka_client = KafkaClient(config_file_path)
kafka_client.produce(topic, key, value)
thread = kafka_client.consume(topic, consume_async)

配置文件

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers=
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='GHFXZDIOMQW3IPKA' password='TimUk7hj/EwTiB031lA95LeKfXN3t2Ddnw+izhKx3+7wFxZKMLGEqTOnneTKrlQQ';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
# Best practice for higher availability in Apache Kafka clients prior to 3.0
session.timeout.ms=45000
topic=
group.id=
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer 
# Best practice for Kafka producer to prevent data loss
acks=all

java(kotiln)


import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.newFixedThreadPoolContext
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import java.io.Closeable
import java.io.FileInputStream
import java.io.IOException
import java.nio.file.Files
import java.nio.file.Paths
import java.time.Duration
import java.util.*class KafkaClient<T, V> : Closeable {private var producer: KafkaProducer<T, V>? = nullprivate var fileConfig: Properties? = nullval TOPIC = "topic"private val DURATION = 100Lprivate val POOLSIZE = 10private val DISPATCHER = newFixedThreadPoolContext(POOLSIZE, "CoroutinePool")private val SCOPE = CoroutineScope(DISPATCHER)constructor(configPath: String? = null, config: Properties? = null) {if (config == null && configPath == null) throw Exception("don't have any config")var config1 = Properties()if (configPath != null) {fileConfig = readConfig(configPath)fileConfig?.let { config1.putAll(it) }}if (config != null) {config1.putAll(config)}producer = KafkaProducer(config1)}fun produce(key: T, value: V, topic: String? = null) {producer?.send(ProducerRecord(topic ?: (fileConfig?.getProperty(TOPIC) as String), key, value))}fun consume(func: suspend (ConsumerRecords<T, V>) -> Unit) {val consumer: KafkaConsumer<T, V> = KafkaConsumer(fileConfig)consumer.subscribe(Arrays.asList(fileConfig?.getProperty(TOPIC)))SCOPE.launch {while (true) {val records: ConsumerRecords<T, V> = consumer.poll(Duration.ofMillis(DURATION))func(records)delay(DURATION)}}}@Throws(IOException::class)fun readConfig(configFile: String): Properties {if (!Files.exists(Paths.get(configFile))) {throw IOException("$configFile not found.")}val config = Properties()FileInputStream(configFile).use { inputStream -> config.load(inputStream) }return config}override fun close() {producer?.close()}
}fun main() {val cli =KafkaClient<String, String>("D:\\src\\main\\java\\com\\tr\\robot\\io\\kafka\\client.properties")cli.consume {println("test beg")for (record in it) {println(String.format("Consumed message from topic %s: key = %s value = %s", cli.TOPIC, record.key(), record.value()))}println("test end")}// Give some time for the consumer to startThread.sleep(2000)cli.produce("key1", "test")// Give some time for the consumer to consume the messageThread.sleep(5000)
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/24442.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

websocket php workerman 服务器nginx配置wss协议

首先 Nginx的版本要高&#xff0c;尽量用当前最新稳定版本。 其次 WSS协议&#xff0c;是在HTTPS协议的基础上&#xff0c;进行协议升级&#xff0c;进行通讯的&#xff0c;所以先要保证你有一个 HTTPS正常的WEB站点。 所以&#xff0c;通过Nginx -V 请保证 一定有 --with-ht…

【TensorFlow深度学习】使用TensorFlow构建马尔可夫决策过程模型

使用TensorFlow构建马尔可夫决策过程模型 使用TensorFlow构建马尔可夫决策过程模型&#xff1a;决策分析的深度实践一、马尔可夫决策过程简介二、TensorFlow准备三、定义MDP模型参数四、构建状态值函数模型五、迭代更新值函数六、策略提取与决策结语 使用TensorFlow构建马尔可夫…

【ArcGIS微课1000例】0119:TIFF与grid格式互相转换

文章目录 一、任务描述二、tiff转grid三、grid转tif四、注意事项一、任务描述 地理栅格数据常用TIFF格式和GRID格式进行存储。TIFF格式的栅格数据常以单文件形式存储,不仅存储有R、G、B三波段的像素值,还保存有地理坐标信息。GRID格式的栅格数据常以多文件的形式进行存储,且…

国产操作系统上给麒麟虚拟机安装virtualbox增强工具 _ 统信 _ 麒麟 _ 中科方德

原文链接&#xff1a;国产操作系统上给麒麟虚拟机安装virtualbox增强工具 | 统信 | 麒麟 | 中科方德 Hello&#xff0c;大家好啊&#xff01;昨天给大家带来了一篇在国产操作系统上给VirtualBox中的Win7虚拟机安装增强工具的文章&#xff0c;今天我们将继续深入&#xff0c;介绍…

ORA-12541:TNS:没有监听器

"ORA-12541: TNS: 没有监听器" 是 Oracle 数据库连接时可能遇到的错误。这个错误通常表示客户端尝试连接到的数据库监听器未在目标主机上运行或未配置正确。解决这个问题的方法通常涉及以下步骤&#xff1a; 1.确保监听器在运行 确保数据库服务器上的 Oracle 监听器…

如何在 C# 中轻松从 HTML 中提取纯文本

一.介绍 处理 HTML 内容通常需要提取纯文本以进行处理、分析或显示&#xff0c;而不会产生 HTML 标记的杂乱。在本博客中&#xff0c;我们将探索一种简单而有效的方法&#xff0c;即使用 C# 中的正则表达式 (Regex) 来剥离 HTML 标记并将 HTML 实体解码为纯文本。此技术在读取…

ForceMode应用力的不同模式

1. ForceMode.Force 解释: 这种模式下&#xff0c;力被持续应用&#xff0c;类似于施加一个恒定的加速度。应用场景: 适用于需要不断施加力的情况&#xff0c;如推动物体。公式: F m * a&#xff08;质量*加速度&#xff09;效果: 施加的力会被乘以 Time.deltaTime&#xff0…

Plotly : 超好用的Python可视化工具

文章目录 安装&#xff1a;开始你的 Plotly 之旅基本折线图&#xff1a;简单却强大的起点带颜色的散点图&#xff1a;数据的多彩世界三维曲面图&#xff1a;探索数据的深度气泡图&#xff1a;让世界看到你的数据小提琴图&#xff1a;数据分布的优雅展现旭日图&#xff1a;分层数…

虚拟机与windows文件同步

如果上图中不能设置&#xff0c;则在虚拟机mnt文件夹执行以下命令&#xff1a;

项目质量保证措施(Word原件)

一、 质量保障措施 二、 项目质量管理保障措施 &#xff08;一&#xff09; 资深的质量经理与质保组 &#xff08;二&#xff09; 全程参与的质量经理 &#xff08;三&#xff09; 合理的质量控制流程 1&#xff0e; 质量管理规范&#xff1a; 2&#xff0e; 加强协调管理&…

超详解——Python模块文档——小白篇

目录 1. Unix起始行 示例&#xff1a; 2. 对象和类型 示例&#xff1a; 3. 一切都是对象 示例&#xff1a; 4. 理解对象和引用 示例&#xff1a; 5. 理解对象和类型 示例&#xff1a; 6. 标准类型 示例&#xff1a; 7. 其他内建类型 示例&#xff1a; 8. 类型的类…

【乐吾乐2D可视化组态编辑器】在线使用,快速入门

一、在线使用 乐吾乐2D可视化组态编辑器地址&#xff1a;https://2d.le5le.com/ 二、步骤 本教程将带领你快速体验2D可视化编辑器的全流程开发。 1.创建图纸 进入2d编辑器主界面后&#xff0c;主界面最中心为图纸面板&#xff0c;默认为空图纸&#xff0c;在界面左侧为组…

数仓SQL如何做code review?

第一步应该是先明确需求&#xff0c;明确完需求以后在进行开发&#xff0c;接着code review 在明确HiveSQL、SparkSQL的编写需求后&#xff0c;接下来将详细介绍代码审查&#xff08;Code Review&#xff09;时的一些关键注意点&#xff1a; 1. 关联关系 left join 和 join …

AI魔法相机:实时3D重建与场景魔法化

一、产品概述 AI魔法相机是一款创新的硬件产品,它结合了AI技术和3D重建扫描技术,能够实时捕捉并重建3D场景和物理世界。用户只需通过简单的点击操作,即可捕捉现实物体或环境,并将其无缝融合到任何场景中,创造出全新的想象现实。 二、核心功能 实时捕捉:一键式操作,迅速…

用例与用例之间的三种关系:泛化、包含、扩展

UML用例图&#xff08;Use Case Diagrame)&#xff0c;是UML图的一种&#xff0c;主要用来描述角色及角色与用例之间的连接关系。 1.泛化 当多个用例共有一种类似的结构和行为时。能够将他们的共性抽象成为父用例&#xff0c;其它的用例作为泛化关系的子用例。箭头指向父用例…

优先队列的实现:基于最小堆的 Java 实现

优先队列是一种重要的数据结构&#xff0c;与普通队列不同&#xff0c;它每次从队列中取出的是具有最高优先级的元素。本文将介绍如何使用最小堆来实现优先队列&#xff0c;并提供详细的 Java 代码示例和解释。 什么是优先队列&#xff1f; 优先队列是一种抽象数据类型&#…

使用Aspose技术将Excel/Word转换为PDF

简介&#xff1a;本文将介绍如何使用Aspose技术将Excel文件转换为PDF格式。我们将使用Aspose-Cells-8.5.2.jar包&#xff0c;并演示Java代码以及进行测试。 一、Aspose技术概述 Aspose是一款强大的文档处理库&#xff0c;支持多种编程语言&#xff0c;如Java、C#、Python等。…

关于 spring boot 的 目录详解 和 配置文件 以及 日志

目录 配置文件 spring boot 的配置文件有两种格式&#xff0c;分别是 properties 和 yml&#xff08;yaml&#xff09;。这两种格式的配置文件是可以同时存在的&#xff0c;此时会以 properties 的文件为主&#xff0c;但一般都是使用同一种格式的。 格式 properties 语法格…

小程序中用于跳转页面的5个api是什么?区别

小程序中用于跳转页面的5个API及其区别如下&#xff1a; wx.navigateTo(options) 功能&#xff1a;保留当前页面&#xff0c;跳转到应用内的某个页面&#xff0c;使用wx.navigateBack可以返回到原页面。特性&#xff1a;可以打开新的页面&#xff0c;新页面可以是tabBar页面&a…

【Python】selenium 点击某个按钮 click() 出现的报错问题--ElementClickInterceptedException(全!)

写在前面&#xff1a; 我们在使用selenium 点击某个元素时或者获取find_element的某个网页元素时&#xff0c;总会遇到一些问题。本人经验是&#xff0c;最直接的方法是用try_except 报错&#xff0c;直接绕过问题&#xff0c;可以直接看第一条。如果有兴趣具体解决&#xff0c…