Python 编写 Flink 应用程序经验记录(Flink1.17.1)

目录

官方API文档

提交作业到集群运行

官方示例

环境

实例处理Kafka后入库到Mysql

下载依赖

读取kafka数据

写入mysql数据


官方API文档

https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/python/overview/

https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/datastream/kafka/

提交作业到集群运行

#! /usr/bin/env python
# -*- coding: utf-8 -*-# /opt/test_flink.py
if __name__ == "__main__":print("这是一个简单的测试用例")

flink 安装目录下的 examples 目录里面已经提供了一些测试案例,我们也可以直接拿它来做实验。

提交至集群

./bin/flink run -py 代码文件

通过 flink run 即可运行应用程序,由于 flink 既可运行 Java 程序、也可以运行 Python 程序,所以这里我们需要指定 -py 参数,表示运行的是 py 文件。但默认情况下解释器使用的 python2,当然如果你终端输入 python 进入的就是 python3 的话则当我没说,要是我们想指定 flink 使用 python3 解释器的话,则需要配置一个环境变量。

export PYFLINK_CLIENT_EXECUTABLE=/usr/bin/python3

下面来测试一下:

./bin/flink run -py /opt/test_flink.py

很明显结果是成功的,当然这里面没有涉及到任何与 Flink 有关的内容,只是演示如何提交一个 Python 应用程序。当然 flink run 是同时支持 Java、Python 等语言的。

不管使用哪种 API 进行编程,最终客户端都会生成 JobGraph 提交到 JM 上。但毕竟 Flink 的内核是采用 Java 语言编写的,如果 Python 应用程序变成 JobGraph 对象被提交到 Flink 集群上运行的话,那么 Python 虚拟机和 Java 虚拟机之间一定有某种方式,使得 Python 可以直接动态访问 Java 中的对象、Java 也可以回调 Python 中的对象。没错,实现这一点的便是 py4j。

提交单个 py 文件知道怎么做了,但如果该文件还导入了其它文件该怎么办呢?一个项目中还会涉及到包的存在。其实不管项目里的文件有多少,启动文件只有一个,只需要把这个启动文件提交上去即可。举例说明,当然这里仍不涉及具体和 Flink 相关的内容,先把如何提交程序这一步给走通。因为不管编写的程序多复杂,提交这一步骤是不会变的。

先来看看编写的程序:

flink_test 就是主目录,里面有一个 apps 子目录和一个 main.py 文件,apps 目录里面有三个 py 文件,对应的内容分别如图所示。然后将其提交到 Flink Standalone 集群上运行,命令和提交单个文件是一样的

即使是多文件,提交方式也是相似的,输出结果表明提交成功了。

官方示例

环境

  • Java 11
  • Python 3.7, 3.8, 3.9 or 3.10
python -m pip install apache-flink==1.17.1

编写 Flink Python Table API 程序的第一步是创建 TableEnvironment。这是 Python Table API 作业的入口类。

t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("parallelism.default", "1")

接下来,我们将介绍如何创建源表和结果表。

t_env.create_temporary_table('source',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).build()).option('path', input_path).format('csv').build())
tab = t_env.from_path('source')t_env.create_temporary_table('sink',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).option('path', output_path).format(FormatDescriptor.for_format('canal-json').build()).build())

你也可以使用 TableEnvironment.execute_sql() 方法,通过 DDL 语句来注册源表和结果表:

my_source_ddl = """create table source (word STRING) with ('connector' = 'filesystem','format' = 'csv','path' = '{}')
""".format(input_path)my_sink_ddl = """create table sink (word STRING,`count` BIGINT) with ('connector' = 'filesystem','format' = 'canal-json','path' = '{}')
""".format(output_path)t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)

上面的程序展示了如何创建及注册表名分别为 source 和 sink 的表。 其中,源表 source 有一列: word,该表代表了从 input_path 所指定的输入文件中读取的单词; 结果表 sink 有两列: word 和 count,该表的结果会输出到 output_path 所指定的输出文件中。

接下来,我们介绍如何创建一个作业:该作业读取表 source 中的数据,进行一些变换,然后将结果写入表 sink

最后,需要做的就是启动 Flink Python Table API 作业。上面所有的操作,比如创建源表 进行变换以及写入结果表的操作都只是构建作业逻辑图,只有当 execute_insert(sink_name) 被调用的时候, 作业才会被真正提交到集群或者本地进行执行。

@udtf(result_types=[DataTypes.STRING()])
def split(line: Row):for s in line[0].split():yield Row(s)# 计算 word count
tab.flat_map(split).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink') \.wait()

该教程的完整代码如下:

import argparse
import logging
import sysfrom pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,DataTypes, FormatDescriptor)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtfword_count_data = ["To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,","And by opposing end them?--To die,--to sleep,--","No more; and by a sleep to say we end","The heartache, and the thousand natural shocks","That flesh is heir to,--'tis a consummation","Devoutly to be wish'd. To die,--to sleep;--","To sleep! perchance to dream:--ay, there's the rub;","For in that sleep of death what dreams may come,","When we have shuffled off this mortal coil,","Must give us pause: there's the respect","That makes calamity of so long life;","For who would bear the whips and scorns of time,","The oppressor's wrong, the proud man's contumely,","The pangs of despis'd love, the law's delay,","The insolence of office, and the spurns","That patient merit of the unworthy takes,","When he himself might his quietus make","With a bare bodkin? who would these fardels bear,","To grunt and sweat under a weary life,","But that the dread of something after death,--","The undiscover'd country, from whose bourn","No traveller returns,--puzzles the will,","And makes us rather bear those ills we have","Than fly to others that we know not of?","Thus conscience does make cowards of us all;","And thus the native hue of resolution","Is sicklied o'er with the pale cast of thought;","And enterprises of great pith and moment,","With this regard, their currents turn awry,","And lose the name of action.--Soft you now!","The fair Ophelia!--Nymph, in thy orisons","Be all my sins remember'd."]def word_count(input_path, output_path):t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())# write all the data to one filet_env.get_config().set("parallelism.default", "1")# define the sourceif input_path is not None:t_env.create_temporary_table('source',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).build()).option('path', input_path).format('csv').build())tab = t_env.from_path('source')else:print("Executing word_count example with default input data set.")print("Use --input to specify file input.")tab = t_env.from_elements(map(lambda i: (i,), word_count_data),DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())]))# define the sinkif output_path is not None:t_env.create_temporary_table('sink',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).option('path', output_path).format(FormatDescriptor.for_format('canal-json').build()).build())else:print("Printing result to stdout. Use --output to specify output path.")t_env.create_temporary_table('sink',TableDescriptor.for_connector('print').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).build())@udtf(result_types=[DataTypes.STRING()])def split(line: Row):for s in line[0].split():yield Row(s)# compute word counttab.flat_map(split).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink') \.wait()# remove .wait if submitting to a remote cluster, refer to# https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster# for more detailsif __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--input',dest='input',required=False,help='Input file to process.')parser.add_argument('--output',dest='output',required=False,help='Output file to write results to.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)word_count(known_args.input, known_args.output)

接下来,可以在命令行中运行作业(假设作业名为 word_count.py):

python word_count.py

上述命令会构建 Python Table API 程序,并在本地 mini cluster 中运行。如果想将作业提交到远端集群执行, 可以参考作业提交示例。

最后,你可以得到如下运行结果:

实例处理Kafka后入库到Mysql

下载依赖

wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/flink-sql-connector-kafka-1.17.1.jar

读取kafka数据

#! /usr/bin/env python
# -*- coding: utf-8 -*-import sys
import loggingfrom pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import WatermarkStrategy, Types
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializerfrom pyflink.common import Row
from pyflink.datastream import FlatMapFunctiondef read_kafka():env = StreamExecutionEnvironment.get_execution_environment()env.add_jars("file:///D:/安技汇/运营平台/DataManage/flink-sql-connector-kafka-1.17.1.jar")source = KafkaSource.builder() \.set_bootstrap_servers("172.16.12.128:9092") \.set_topics("test") \.set_group_id("my-group") \.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \.set_value_only_deserializer(SimpleStringSchema()) \.build()# 从消费组提交的位点开始消费,不指定位点重置策略#.set_starting_offsets(KafkaOffsetsInitializer.committed_offsets()) \# 从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点#.set_starting_offsets(KafkaOffsetsInitializer.committed_offsets(KafkaOffsetResetStrategy.EARLIEST)) \# 从时间戳大于等于指定时间戳(毫秒)的数据开始消费#.set_starting_offsets(KafkaOffsetsInitializer.timestamp(1657256176000)) \# 从最早位点开始消费#.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \# 从最末尾位点开始消费#.set_starting_offsets(KafkaOffsetsInitializer.latest()) \#.set_property("partition.discovery.interval.ms", "10000")  # 每 10 秒检查一次新分区#.set_property("security.protocol", "SASL_PLAINTEXT") \#.set_property("sasl.mechanism", "PLAIN") \#.set_property("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")kafka_stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")kafka_stream.print()env.execute("Source")if __name__ == "__main__":logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")read_kafka()

写入mysql数据

没通,待补充。。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/120375.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

汇编学习(1)

汇编、CPU架构、指令集、硬编码之间的关系 ● 汇编语言:这是一种低级语言,用于与硬件直接交互。它是由人类可读的机器码或指令组成的,这些指令告诉CPU如何执行特定的任务。每条汇编指令都有一个对应的机器码指令,CPU可以理解和执…

css属性clip-path的使用说明

前言 当ui设计上的图片、div等的形状不是长方形,而是多边形的时候,就可以借助clip-path这个css属性来实现。 clip-path CSS 属性使用裁剪方式创建元素的可显示区域。区域内的部分显示,区域外的隐藏。【from: MDN】 clip-path可以理解为一把剪…

Spring Cloud学习:二【详细】

目录 Nacos的配置 Nacos的单机启动 服务注册 Nacos服务分级存储模型 优先访问同集群的服务 根据权重负载均衡 环境隔离Namespace Nacos调用流程 Nacos与Eureka注册对比 Nacos与Eureka的共同点 Nacos与Eureka的区别 Nacos配置管理 统一配置 配置自动刷新 多环境配…

python自动化测试平台开发:自动化测试平台简介

一.测试平台简介 为什么需要测试平台 已有的开源测试平台不能满足需要,不要轻易造轮子 需要公司级别的定制 需要整合公司内部的多套平台 例子:DevOps平台、精准化测试平台、质量监控平台等等 常见的测试平台开发模式 大一统模式(适合简单的…

从瀑布模式到水母模式:ChatGPT如何赋能软件研发全流程【文末送书五本】

从瀑布模式到水母模式:ChatGPT如何赋能软件研发全流程 前言内容简介购买链接作者简介专家推荐读者对象参与方式往期赠书回 🏘️🏘️个人简介:以山河作礼。 🎖️🎖️:Python领域新星创作者,CSDN实…

【C】关于动态内存的试题及解析

目录 第1题: 第2题: 第3题: 第4题: 第5题: 在学习了关于动态开辟内存的相关知识后,下面是一些涉及到动态开辟内存程序的试题及解析(试题部分来自《高质量的C/C编程》、笔试题)。 第1…

(1)(1.9) HC-SR04声纳

文章目录 前言 1 连接到自动驾驶仪 2 参数说明 前言 HC-SR04 声纳是一种价格低廉但量程很短(最远只有 2m)的测距仪,主要设计用于室内,但也成功地在室外的 Copter 上使用过。极短的测距范围使其用途有限。 !Warning…

移远通信5G RedCap模组拿下首个中国移动5G物联网开放实验室5G及轻量化产品能力认证

10月21日,在2023世界物联网博览会期间,中国移动举办了以“智融万物 创见未来”为主题的物联网开发者大会暨物联网产业论坛。作为中国移动在物联网领域重要的合作伙伴,移远通信应邀参加论坛。 随着千行百业数智化进程的不断加速,5G…

酷开科技依托酷开系统推动家庭智能化加速发展

为什么越来越多的人会选择智能家居?因为智能家居的出现,大大方便了我们的生活,为生活提供便利舒适的体验;就如同洗衣机与洗碗机解放了我们的双手是一样的道理,智能家居是在生活的方方面面为我们提供更加便利化的可能性…

java代码审计-换行符CRLF注入

CRLF 的缩写是指回车和换行操作,其中 CR 为 ASCII 中的第 13 个字符,也 写作 \r , LF 是 ASCII 中的第 10 个字符,也写作 \n ,因此 CRLF 一般翻译为回车换行注入漏洞。 什么是CRLF注入漏洞? CRLF 即【回车\r换行\n】的简…

找游戏外包开发游戏,有哪些好处呢?

游戏外包开发是将游戏开发的一部分或全部工作交给专业的外部开发团队或公司完成的做法。这种方法有许多潜在的好处,包括: 降低成本:游戏外包通常可以降低游戏开发成本,因为外包开发公司通常可以提供更具竞争力的价格。这是因为它…

【2023CANN训练营第二季】——通过一份入门级算子开发代码了解Ascend C算子开发流程

本次博客讲解的代码是Gitee代码仓的Ascend C加法算子开发代码,代码地址为: quick-start 打开Add文件,可以看到文件结构如下: 其中add_custom.cpp是算子开发的核心文件,包括了核函数的实现,展示了如何在Asc…

Spring Cloud之负载均衡与服务调用(Ribbon)

目录 Ribbon 简介 负载均衡 简介 负载均衡方式 服务端负载均衡 工作原理 特点 客户端负载均衡 工作原理 特点 对比 实现 负载均衡策略 切换负载均衡策略 定制负载均衡策略 超时与重试 单个服务配置 全局配置 服务调用 示例 Ribbon 简介 Ribbon 是 Netfli…

WEB使用百度地图展示某地地址

第一步 进入百度地图开发平台 百度地图开放平台 | 百度地图API SDK | 地图开发 第二步注册 获取AK秘钥,点击【创建应用】进入AK申请页面,填写应用名称,务必选择AK类型为“浏览器端”,JS API只支持浏览器端AK进行请求与访问 下面…

最小二乘法,可视化UI界面

import tkinter as tk import numpy as np import matplotlib.pyplot as plt from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg from pylab import mplmpl.rcParams[font.sans-serif] [FangSong] # 指定默认字体 mpl.rcParams[axes.unicode_minus] False …

雪糕冰淇淋经营配送小程序商城效果如何

雪糕冰淇淋是很多年轻人喜欢的食品之一,市面上的雪糕品牌类型众多,销售模式主要为厂家批发、经销商零售等,由于雪糕冰淇淋的易化性,很多用户会选择就近购买,但制作技术升级和长途冷藏技术下,网购成为另一种…

VB.NET 三层登录系统实战:从设计到部署全流程详解

目录 前言: 什么是三层 为什么要用到三层: 饭店→软件 理解: 过程: 1.三层包图: 2.数据库 3.三层项目 4.用户界面 5.添加引用 代码实现: Entity层 BLL层 DAL层 UI层 总结: 前言: 什么是三层 三层就是把各个功能模块划分为表示层&#…

【从瀑布模式到水母模式】ChatGPT如何赋能软件研发全流程

文章目录 🎄前言🍔本书概要🌺内容简介🌺作者简介🌺专家推荐🛸读者对象🍔彩蛋 🎄前言 计算机技术的发展和互联网的普及,使信息处理和传输变得更加高效,极大地…

安全和便捷:如何将运营商二要素API应用于实名制管理中

引言 随着互联网的快速发展,数字化身份验证和实名制管理变得越来越重要。在金融、电子商务、社交媒体等领域,确保用户身份的安全和准确性至关重要。运营商二要素核验API成为了实名制管理的有力工具,它不仅能够提供高水平的安全性&#xff0c…

JVM进阶(2)

一)方法区: java虚拟机中有一个方法区,该区域被所有的java线程都是共享,虚拟机一启动,运行时数据区就被开辟好了,官网上说了方法区可以不压缩还可以不进行GC,JAVA虚拟机就相当于是接口,具体的HotSpot就是虚…