Python 编写 Flink 应用程序经验记录(Flink1.17.1)

目录

官方API文档

提交作业到集群运行

官方示例

环境

实例处理Kafka后入库到Mysql

下载依赖

读取kafka数据

写入mysql数据


官方API文档

https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/python/overview/

https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/datastream/kafka/

提交作业到集群运行

#! /usr/bin/env python
# -*- coding: utf-8 -*-# /opt/test_flink.py
if __name__ == "__main__":print("这是一个简单的测试用例")

flink 安装目录下的 examples 目录里面已经提供了一些测试案例,我们也可以直接拿它来做实验。

提交至集群

./bin/flink run -py 代码文件

通过 flink run 即可运行应用程序,由于 flink 既可运行 Java 程序、也可以运行 Python 程序,所以这里我们需要指定 -py 参数,表示运行的是 py 文件。但默认情况下解释器使用的 python2,当然如果你终端输入 python 进入的就是 python3 的话则当我没说,要是我们想指定 flink 使用 python3 解释器的话,则需要配置一个环境变量。

export PYFLINK_CLIENT_EXECUTABLE=/usr/bin/python3

下面来测试一下:

./bin/flink run -py /opt/test_flink.py

很明显结果是成功的,当然这里面没有涉及到任何与 Flink 有关的内容,只是演示如何提交一个 Python 应用程序。当然 flink run 是同时支持 Java、Python 等语言的。

不管使用哪种 API 进行编程,最终客户端都会生成 JobGraph 提交到 JM 上。但毕竟 Flink 的内核是采用 Java 语言编写的,如果 Python 应用程序变成 JobGraph 对象被提交到 Flink 集群上运行的话,那么 Python 虚拟机和 Java 虚拟机之间一定有某种方式,使得 Python 可以直接动态访问 Java 中的对象、Java 也可以回调 Python 中的对象。没错,实现这一点的便是 py4j。

提交单个 py 文件知道怎么做了,但如果该文件还导入了其它文件该怎么办呢?一个项目中还会涉及到包的存在。其实不管项目里的文件有多少,启动文件只有一个,只需要把这个启动文件提交上去即可。举例说明,当然这里仍不涉及具体和 Flink 相关的内容,先把如何提交程序这一步给走通。因为不管编写的程序多复杂,提交这一步骤是不会变的。

先来看看编写的程序:

flink_test 就是主目录,里面有一个 apps 子目录和一个 main.py 文件,apps 目录里面有三个 py 文件,对应的内容分别如图所示。然后将其提交到 Flink Standalone 集群上运行,命令和提交单个文件是一样的

即使是多文件,提交方式也是相似的,输出结果表明提交成功了。

官方示例

环境

  • Java 11
  • Python 3.7, 3.8, 3.9 or 3.10
python -m pip install apache-flink==1.17.1

编写 Flink Python Table API 程序的第一步是创建 TableEnvironment。这是 Python Table API 作业的入口类。

t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("parallelism.default", "1")

接下来,我们将介绍如何创建源表和结果表。

t_env.create_temporary_table('source',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).build()).option('path', input_path).format('csv').build())
tab = t_env.from_path('source')t_env.create_temporary_table('sink',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).option('path', output_path).format(FormatDescriptor.for_format('canal-json').build()).build())

你也可以使用 TableEnvironment.execute_sql() 方法,通过 DDL 语句来注册源表和结果表:

my_source_ddl = """create table source (word STRING) with ('connector' = 'filesystem','format' = 'csv','path' = '{}')
""".format(input_path)my_sink_ddl = """create table sink (word STRING,`count` BIGINT) with ('connector' = 'filesystem','format' = 'canal-json','path' = '{}')
""".format(output_path)t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)

上面的程序展示了如何创建及注册表名分别为 source 和 sink 的表。 其中,源表 source 有一列: word,该表代表了从 input_path 所指定的输入文件中读取的单词; 结果表 sink 有两列: word 和 count,该表的结果会输出到 output_path 所指定的输出文件中。

接下来,我们介绍如何创建一个作业:该作业读取表 source 中的数据,进行一些变换,然后将结果写入表 sink

最后,需要做的就是启动 Flink Python Table API 作业。上面所有的操作,比如创建源表 进行变换以及写入结果表的操作都只是构建作业逻辑图,只有当 execute_insert(sink_name) 被调用的时候, 作业才会被真正提交到集群或者本地进行执行。

@udtf(result_types=[DataTypes.STRING()])
def split(line: Row):for s in line[0].split():yield Row(s)# 计算 word count
tab.flat_map(split).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink') \.wait()

该教程的完整代码如下:

import argparse
import logging
import sysfrom pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,DataTypes, FormatDescriptor)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtfword_count_data = ["To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,","And by opposing end them?--To die,--to sleep,--","No more; and by a sleep to say we end","The heartache, and the thousand natural shocks","That flesh is heir to,--'tis a consummation","Devoutly to be wish'd. To die,--to sleep;--","To sleep! perchance to dream:--ay, there's the rub;","For in that sleep of death what dreams may come,","When we have shuffled off this mortal coil,","Must give us pause: there's the respect","That makes calamity of so long life;","For who would bear the whips and scorns of time,","The oppressor's wrong, the proud man's contumely,","The pangs of despis'd love, the law's delay,","The insolence of office, and the spurns","That patient merit of the unworthy takes,","When he himself might his quietus make","With a bare bodkin? who would these fardels bear,","To grunt and sweat under a weary life,","But that the dread of something after death,--","The undiscover'd country, from whose bourn","No traveller returns,--puzzles the will,","And makes us rather bear those ills we have","Than fly to others that we know not of?","Thus conscience does make cowards of us all;","And thus the native hue of resolution","Is sicklied o'er with the pale cast of thought;","And enterprises of great pith and moment,","With this regard, their currents turn awry,","And lose the name of action.--Soft you now!","The fair Ophelia!--Nymph, in thy orisons","Be all my sins remember'd."]def word_count(input_path, output_path):t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())# write all the data to one filet_env.get_config().set("parallelism.default", "1")# define the sourceif input_path is not None:t_env.create_temporary_table('source',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).build()).option('path', input_path).format('csv').build())tab = t_env.from_path('source')else:print("Executing word_count example with default input data set.")print("Use --input to specify file input.")tab = t_env.from_elements(map(lambda i: (i,), word_count_data),DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())]))# define the sinkif output_path is not None:t_env.create_temporary_table('sink',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).option('path', output_path).format(FormatDescriptor.for_format('canal-json').build()).build())else:print("Printing result to stdout. Use --output to specify output path.")t_env.create_temporary_table('sink',TableDescriptor.for_connector('print').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).build())@udtf(result_types=[DataTypes.STRING()])def split(line: Row):for s in line[0].split():yield Row(s)# compute word counttab.flat_map(split).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink') \.wait()# remove .wait if submitting to a remote cluster, refer to# https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster# for more detailsif __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--input',dest='input',required=False,help='Input file to process.')parser.add_argument('--output',dest='output',required=False,help='Output file to write results to.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)word_count(known_args.input, known_args.output)

接下来,可以在命令行中运行作业(假设作业名为 word_count.py):

python word_count.py

上述命令会构建 Python Table API 程序,并在本地 mini cluster 中运行。如果想将作业提交到远端集群执行, 可以参考作业提交示例。

最后,你可以得到如下运行结果:

实例处理Kafka后入库到Mysql

下载依赖

wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/flink-sql-connector-kafka-1.17.1.jar

读取kafka数据

#! /usr/bin/env python
# -*- coding: utf-8 -*-import sys
import loggingfrom pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import WatermarkStrategy, Types
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializerfrom pyflink.common import Row
from pyflink.datastream import FlatMapFunctiondef read_kafka():env = StreamExecutionEnvironment.get_execution_environment()env.add_jars("file:///D:/安技汇/运营平台/DataManage/flink-sql-connector-kafka-1.17.1.jar")source = KafkaSource.builder() \.set_bootstrap_servers("172.16.12.128:9092") \.set_topics("test") \.set_group_id("my-group") \.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \.set_value_only_deserializer(SimpleStringSchema()) \.build()# 从消费组提交的位点开始消费,不指定位点重置策略#.set_starting_offsets(KafkaOffsetsInitializer.committed_offsets()) \# 从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点#.set_starting_offsets(KafkaOffsetsInitializer.committed_offsets(KafkaOffsetResetStrategy.EARLIEST)) \# 从时间戳大于等于指定时间戳(毫秒)的数据开始消费#.set_starting_offsets(KafkaOffsetsInitializer.timestamp(1657256176000)) \# 从最早位点开始消费#.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \# 从最末尾位点开始消费#.set_starting_offsets(KafkaOffsetsInitializer.latest()) \#.set_property("partition.discovery.interval.ms", "10000")  # 每 10 秒检查一次新分区#.set_property("security.protocol", "SASL_PLAINTEXT") \#.set_property("sasl.mechanism", "PLAIN") \#.set_property("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")kafka_stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")kafka_stream.print()env.execute("Source")if __name__ == "__main__":logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")read_kafka()

写入mysql数据

没通,待补充。。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/120375.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring Boot Actuator 介绍

Spring Boot Actuator是什么 Spring Boot Actuator 模块提供了生产级别的功能,比如健康检查,审计,指标收集,HTTP 跟踪等,帮助我们监控和管理Spring Boot 应用。 这个模块是一个采集应用内部信息暴露给外部的模块&…

汇编学习(1)

汇编、CPU架构、指令集、硬编码之间的关系 ● 汇编语言:这是一种低级语言,用于与硬件直接交互。它是由人类可读的机器码或指令组成的,这些指令告诉CPU如何执行特定的任务。每条汇编指令都有一个对应的机器码指令,CPU可以理解和执…

css属性clip-path的使用说明

前言 当ui设计上的图片、div等的形状不是长方形,而是多边形的时候,就可以借助clip-path这个css属性来实现。 clip-path CSS 属性使用裁剪方式创建元素的可显示区域。区域内的部分显示,区域外的隐藏。【from: MDN】 clip-path可以理解为一把剪…

decapoda-research/llama-7b-hf 的踩坑记录

使用transformers加载decapoda-research/llama-7b-hf的踩坑记录。 ValueError: Tokenizer class LLaMATokenizer does not exist or is not currently imported. 解决办法: https://github.com/huggingface/transformers/issues/22222 将tokenizer_config.json中LLa…

Spring Cloud学习:二【详细】

目录 Nacos的配置 Nacos的单机启动 服务注册 Nacos服务分级存储模型 优先访问同集群的服务 根据权重负载均衡 环境隔离Namespace Nacos调用流程 Nacos与Eureka注册对比 Nacos与Eureka的共同点 Nacos与Eureka的区别 Nacos配置管理 统一配置 配置自动刷新 多环境配…

javascript中的继承

基本术语 本文中,proto [[Prototype]] 原型链 基本思想: 构造函数生成的对象有一个指针(proto)指向构造函数的原型。如果将构造函数1的原型指向另一个构造函数2的实例,则构造函数1的实例__proto__.proto 指向了构…

线性表——设计一个高效算法,将顺序表L的所有元素逆置,要求算法的空间复杂度为O(1)。

题目&#xff1a;设计一个高效算法&#xff0c;将顺序表L的所有元素逆置&#xff0c;要求算法的空间复杂度为O(1)。 算法思想&#xff1a;扫描顺序表的L的前半部分元素&#xff0c;对于元素L.data[i](0<i<L.length/2)&#xff0c;将其与后半部分的对应元素L.data[L.lengt…

python自动化测试平台开发:自动化测试平台简介

一.测试平台简介 为什么需要测试平台 已有的开源测试平台不能满足需要&#xff0c;不要轻易造轮子 需要公司级别的定制 需要整合公司内部的多套平台 例子&#xff1a;DevOps平台、精准化测试平台、质量监控平台等等 常见的测试平台开发模式 大一统模式&#xff08;适合简单的…

linux find命令搜索日志内容

linux find命令搜索日志内容 查询服务器log日志 find /opt/logs/ -name "filename.log" | xargs grep -a "这里是要查询的字符"加上-a 是为了不报查出 binary 的错 服务器会返回 包含所查字符的整行日志信息

从瀑布模式到水母模式:ChatGPT如何赋能软件研发全流程【文末送书五本】

从瀑布模式到水母模式&#xff1a;ChatGPT如何赋能软件研发全流程 前言内容简介购买链接作者简介专家推荐读者对象参与方式往期赠书回 &#x1f3d8;️&#x1f3d8;️个人简介&#xff1a;以山河作礼。 &#x1f396;️&#x1f396;️:Python领域新星创作者&#xff0c;CSDN实…

【C】关于动态内存的试题及解析

目录 第1题&#xff1a; 第2题&#xff1a; 第3题&#xff1a; 第4题&#xff1a; 第5题&#xff1a; 在学习了关于动态开辟内存的相关知识后&#xff0c;下面是一些涉及到动态开辟内存程序的试题及解析&#xff08;试题部分来自《高质量的C/C编程》、笔试题&#xff09;。 第1…

(1)(1.9) HC-SR04声纳

文章目录 前言 1 连接到自动驾驶仪 2 参数说明 前言 HC-SR04 声纳是一种价格低廉但量程很短&#xff08;最远只有 2m&#xff09;的测距仪&#xff0c;主要设计用于室内&#xff0c;但也成功地在室外的 Copter 上使用过。极短的测距范围使其用途有限。 &#xff01;Warning…

移远通信5G RedCap模组拿下首个中国移动5G物联网开放实验室5G及轻量化产品能力认证

10月21日&#xff0c;在2023世界物联网博览会期间&#xff0c;中国移动举办了以“智融万物 创见未来”为主题的物联网开发者大会暨物联网产业论坛。作为中国移动在物联网领域重要的合作伙伴&#xff0c;移远通信应邀参加论坛。 随着千行百业数智化进程的不断加速&#xff0c;5G…

酷开科技依托酷开系统推动家庭智能化加速发展

为什么越来越多的人会选择智能家居&#xff1f;因为智能家居的出现&#xff0c;大大方便了我们的生活&#xff0c;为生活提供便利舒适的体验&#xff1b;就如同洗衣机与洗碗机解放了我们的双手是一样的道理&#xff0c;智能家居是在生活的方方面面为我们提供更加便利化的可能性…

java代码审计-换行符CRLF注入

CRLF 的缩写是指回车和换行操作&#xff0c;其中 CR 为 ASCII 中的第 13 个字符&#xff0c;也 写作 \r , LF 是 ASCII 中的第 10 个字符&#xff0c;也写作 \n &#xff0c;因此 CRLF 一般翻译为回车换行注入漏洞。 什么是CRLF注入漏洞? CRLF 即【回车\r换行\n】的简…

找游戏外包开发游戏,有哪些好处呢?

游戏外包开发是将游戏开发的一部分或全部工作交给专业的外部开发团队或公司完成的做法。这种方法有许多潜在的好处&#xff0c;包括&#xff1a; 降低成本&#xff1a;游戏外包通常可以降低游戏开发成本&#xff0c;因为外包开发公司通常可以提供更具竞争力的价格。这是因为它…

「译文」深入了解Kubernetes和Nomad

&#x1f449;️原文链接: https://www.cncf.io/blog/2023/10/23/introduction-a-closer-look-at-kubernetes-and-nomad/ ✍️作者: Rob Newsome &#x1f4dd;Description: stack.io 产品管理主管 Rob Newsome 的特邀文章 在容器编排领域&#xff0c;Kubernetes 和 Nomad 都是…

SpringMVC(下)

1、拦截器&#xff1a; 1、拦截器的配置: SpringMVC中的拦截器用于拦截控制器方法的执行 SpringMVC中的拦截器需要实现HandlerInterceptor <!--配置拦截器--><mvc:interceptors><!--对所有的请求进行拦截--><!--<bean class"com.songzhishu.m…

【2023CANN训练营第二季】——通过一份入门级算子开发代码了解Ascend C算子开发流程

本次博客讲解的代码是Gitee代码仓的Ascend C加法算子开发代码&#xff0c;代码地址为&#xff1a; quick-start 打开Add文件&#xff0c;可以看到文件结构如下&#xff1a; 其中add_custom.cpp是算子开发的核心文件&#xff0c;包括了核函数的实现&#xff0c;展示了如何在Asc…

#力扣:LCP 01. 猜数字@FDDLC

LCP 01. 猜数字 - 力扣&#xff08;LeetCode&#xff09; 一、Java class Solution {public int game(int[] guess, int[] answer) {int cnt0;for(int i0;i<3;i){if(guess[i]answer[i])cnt;}return cnt;} }