pyfink1.20版本下实现消费kafka中数据并实时计算

1、环境

JDK版本:1.8.0_412python版本:3.10.6apache-flink版本:1.20.0flink版本:1.20kafka版本:kafka_2.12-3.1.1flink-sql-connector-kafka版本:3.3.0-1.20

2、执行python-flink脚本

从kafka的demo获取消息,并将其中的a字段存入kafka的test_kafka_topic内,并打印sum(b)的值

from pyflink.table import TableEnvironment, EnvironmentSettingsdef log_processing():# 创建流处理环境env_settings = EnvironmentSettings.in_streaming_mode()t_env = TableEnvironment.create(env_settings)# 设置 Kafka 连接器 JAR 文件的路径# 确保 JAR 文件确实存在于指定路径,并且与 Flink 版本兼容t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///home/data/flink/flink-1.20.0/lib/flink-sql-connector-kafka-3.3.0-1.20.jar")# 定义源表 DDLsource_ddl = """CREATE TABLE source_table(a VARCHAR,b INT  -- 如果 b 字段不重要,可以考虑从源表中移除它) WITH ('connector' = 'kafka','topic' = 'demo','properties.bootstrap.servers' = '192.168.15.130:9092','properties.group.id' = 'test_3','scan.startup.mode' = 'latest-offset','format' = 'json')"""# 定义目标表 DDLsink_ddl = """CREATE TABLE sink_table(a VARCHAR) WITH ('connector' = 'kafka','topic' = 'test_kafka_topic','properties.bootstrap.servers' = '192.168.15.130:9092','format' = 'json')"""# 执行 DDL 语句创建表t_env.execute_sql(source_ddl)#table = t_env.from_path("sql_source")#table.execute().print()table_result  = t_env.execute_sql("select sum(b) sb from source_table")table_result.print()t_env.execute_sql(sink_ddl)# 执行 SQL 查询并将结果插入到目标表# 注意:wait() 方法会阻塞,直到插入操作完成(在流处理中通常是无限的)t_env.sql_query("SELECT a FROM source_table") \.execute_insert("sink_table").wait()  # 考虑是否真的需要 wait()if __name__ == '__main__':log_processing()
python3 KafkaSource.py

3、启动kafka生产者

/usr/local/kafka_2.12-3.1.1/bin/kafka-console-producer.sh --broker-list 192.168.15.130:9092 --topic demo
输入模拟数据进行测试
>{"a": "example_string_1672531199", "b": 42}
>{"a": "example_string_1672531199", "b": 42}
>{"a": "example_string_1672531199", "b": 4}
>{"a": "example_string_1672531199", "b": 4}
>{"a": "example_string_1672531199", "b": 4}

可以看到sum(b)值已输出
在这里插入图片描述

4、启动kafka消费者

查看往test_kafka_topic插入的a字段数据已被消费

/usr/local/kafka_2.12-3.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.15.130:9092  --from-beginning --topic test_kafka_topic

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/63102.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据结构速成

1. 数据结构与算法 2. 顺序表 3. 链表 4. 栈与队列 5. 串 6. 树与二叉树(1) 7. 树与二叉树(2) 8. 图 9. 图的应用 10. 查找 11. 排序(1) 12. 排序(2)

k8s的污点与容忍度

污点(Taint)针对节点来说,和节点亲和性正好相对,节点亲和性使Pod被吸引到一类特定的节点,而污点则使节点能够排斥一类特定的Pod。 容忍度(Toleration)应用于Pod上,它用来允许调度器…

how to write 述职pptx as a tech manager

As a technical manager, crafting an effective 述职 (performance review) PPT requires you to highlight your leadership, team accomplishments, technical contributions, challenges faced, and future plans. Heres a structured approach to design your PPT: 1. Cov…

从源码层级深入探索 Spring AMQP 如何在 Spring Boot 中实现 RabbitMQ 集成——消费者如何进行消费

本章节主要从底层源码探索Spring Boot中RabbitMQ如何进行消费,至于RabbitMQ是如何使用如何生产消息,本章不做过多介绍,感兴趣的小伙伴可以参考:从源码层级深入探索 Spring AMQP 如何在 Spring Boot 中实现 RabbitMQ 集成——生产者…

计算机视觉中的边缘检测算法

摘要: 本文全面深入地探讨了计算机视觉中的边缘检测算法。首先阐述了边缘检测的重要性及其在计算机视觉领域的基础地位,随后详细介绍了经典的边缘检测算法,包括基于梯度的 Sobel 算子算法、Canny 边缘检测算法等,深入剖析了它们的…

Unix 和 Windows 的有趣比较

Unix 和 Windows NT 比较 来源于这两本书,把两本书对照来读,发现很多有意思的地方: 《Unix 传奇》 https://book.douban.com/subject/35292726/ 《观止 微软创建NT和未来的夺命狂奔 》 Showstopper!: The Breakneck Race to Create Windows…

SSM 垃圾分类系统——高效分类的科技保障

第五章 系统功能实现 5.1管理员登录 管理员登录,通过填写用户名、密码、角色等信息,输入完成后选择登录即可进入垃圾分类系统,如图5-1所示。 图5-1管理员登录界面图 5.2管理员功能实现 5.2.1 用户管理 管理员对用户管理进行填写账号、姓名、…

系列1:基于Centos-8.6部署Kubernetes (1.24-1.30)

每日禅语 “木末芙蓉花,山中发红萼,涧户寂无人,纷纷开自落。​”这是王维的一首诗,名叫《辛夷坞》​。这首诗写的是在辛夷坞这个幽深的山谷里,辛夷花自开自落,平淡得很,既没有生的喜悦&#xff…

Y20030004基于asp.net+Sql的环保网站的设计与实现(附源码 调试 文档)

环保网站的设计与实现 1.摘要要2. 系统功能3.功能结构图4.界面展示5.源码获取 1.摘要要 近几年国家对于环境管理是高度重视,尤其是对于环境生态的破坏与环境污染,已经严重影响到人类的生存和发展。为了使生态环境能够得到保护和改善,持续发展…

安全计算环境-(一)路由器-1

安全计算环境-网络设备 安全管理中心针对整个系统提出了安全管理方面的技术控制要求,通过技术手段实现集中管理;涉及的安全控制点包括系统管理、审计管理、安全管理和集中管控。以下以三级等级保护对象为例,描述安全管理中心各个控制要求项的…

D9741是一块脉宽调制方三用于也收路像机和笔记本电的等设备上的直流转换器。在便携式的仪器设备上。

概述: D9741是一块脉宽调制方三用于也收路像机和笔记本电的等设备上的直流转换器。在便携式的仪器设备上。 主要特点: ● 高精度基准电路 ● 定时闩锁、短路保护电路 ● 低电压输入时误操作保护电路 ● 输出基准电压(2.5V) ● 超过工作范围能进行自动校…

数据挖掘之聚类分析

聚类分析(Clustering Analysis) 是数据挖掘中的一项重要技术,旨在根据对象间的相似性或差异性,将对象分为若干组(簇)。同一簇内的对象相似性较高,而不同簇间的对象差异性较大。聚类分析广泛应用…

Qt 图形框架下图形拖动后位置跳动问题

在使用Qt 的图形框架QGraphicsScene,QGraphicsView实现图形显示时。遇到一个很棘手的BUG。 使用的图形是自定义的QGraphicsObject的子类。 现象是将图形添加到画布上之后,用鼠标拖动图形,图形能正常改变位置,当再次用鼠标点击图…

Vue技术中参数传递:Props与事件的实践指南

在Vue.js中,组件间的参数传递是构建动态和交互式应用的核心。本文将深入探讨如何通过Props和事件($emit)在Vue组件间进行参数传递,并提供代码示例。 Props传递数据 Props是Vue中组件间传递数据的一种方式,它允许父组…

一、LRU缓存

LRU缓存 1.LRU缓存介绍2.LRU缓存实现3.LRU缓存总结3.1 LRU 缓存的应用3.2 LRU 缓存的优缺点 1.LRU缓存介绍 LRU是Least Recently Used 的缩写,意为“最近最少使用”。它是一种常见的缓存淘汰策略,用于在缓存容量有限时,决定哪些数据需要被删…

LabVIEW光栅衍射虚拟仿真系统

随着现代教育技术的快速发展,虚拟仿真实验平台逐渐成为物理实验教学的重要辅助工具。基于LabVIEW的平面透射光栅虚拟仿真系统帮助学生更好地理解和分析光栅衍射现象,提高教学质量和学生的学习兴趣。 项目背景 在波动光学的教学中,光栅衍射实…

241211 selenium问题记录

The process started from chrome location /usr/bin/chromedriver is no longer running, so ChromeDriver is assuming that Chrome has crashed. 声明option类 chrome_option.add_argument(--headless) 后台启动webdriver NoSuchDriverException(msg) from err selenium.c…

前端核心知识总结

‌前端架构知识总结‌主要包括以下几个方面&#xff1a; ‌HTML‌&#xff1a;HTML是构建网页的基础&#xff0c;使用各种标签定义网页的结构&#xff0c;如<html>、<head>、<body>等。HTML5引入了新的语义化标签&#xff0c;如<article>、<section…

libcublas.so.11: cannot open shared object file: no such file or di

问题&#xff1a;在linux系统安装tensorrt后import tensorrt时出现 libcublas.so.11: cannot open shared object file: no such file or directory 或者 libcublasLt.so.11&#xff1a;cannot open shared object file: no such file or directory 或者 libcudnn.so.8&…

Linux编译Kernel时的文件zImage、文件dtb(dtbs)、核心模块分别是什么东西?

zImage文件的介绍 在编译Linux内核时&#xff0c;zImage 是一种内核映像文件&#xff0c;它是内核的压缩版本&#xff0c;通常用于引导嵌入式设备或其他资源有限的环境。 zImage 的具体含义 zImage 是 “Compressed Kernel Image” 的缩写。它是通过压缩原始的内核映像&…