【Apache Flink】Flink DataStream API的基本使用

Flink DataStream API的基本使用

文章目录

  • 前言
  • 1. 基本使用方法
  • 2. 核心示例代码
  • 3. 完成工程代码
    • pom.xml
    • WordCountExample
    • 测试验证
  • 4. Stream 执行环境
  • 5. 参考文档

前言

Flink DataStream API主要用于处理无界和有界数据流 。
无界数据流是一个持续生成数据的数据源,它没有明确的结束点,例如实时的交易数据或传感器数据。这种类型的数据流需要使用Apache Flink的实时处理功能来连续地处理和分析。

有界数据流是一个具有明确开始和结束点的数据集,例如一个文件或数据库表。这种类型的数据流通常在批处理场景中使用,其中所有数据都已经可用,并可以一次性处理。

Flink的DataStream API提供了一套丰富的操作符,如map、filter、reduce、aggregations、windowing、join等,以支持各种复杂的数据处理和分析需求。此外,DataStream API还提供了容错保证,能确保在发生故障时,应用程序能从最近的检查点(checkpoint)恢复,从而实现精确一次(exactly-once)的处理语义。

1. 基本使用方法

  1. 创建执行环境:

    每一个Flink程序都需要创建一个StreamExecutionEnvironment(执行环境),它可以被用来设置参数和创建从外部系统读取数据的流。

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
  2. 创建数据流:

    你可以从各种数据源中创建数据流,如本地集合,文件,socket等。下面的代码是从本地集合创建数据流的示例:

    DataStream<String> dataStream = env.fromElements("hello", "flink");
    
  3. 转换操作:

    Flink提供了丰富的转换操作,如mapfilterreduce等。以下代码首先将每个字符串映射为其长度,然后过滤出长度大于5的元素:

    DataStream<Integer> transformedStream = dataStream.map(s -> s.length()).filter(l -> l > 5);
    
  4. 数据输出:

    Flink支持将数据流输出到各种存储系统,如文件,socket,数据库等。下面的代码将数据流输出到标准输出:

    transformedStream.print();
    
  5. 执行程序:

    将上述所有步骤放在main函数中,并在最后调用env.execute()方法来启动程序。Flink程序是懒加载的,只有在调用execute方法时才会真正开始执行。

    env.execute("Flink Basic API Usage");
    

2. 核心示例代码

使用Flink DataStream API构建一个实时Word Count程序,它会从一个socket端口读取文本数据,统计每个单词的出现次数,并将结果输出到标准输出。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCountExample {public static void main(String[] args) throws Exception {// 1. 创建执行环境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 创建数据流,从socket接收数据,需要在本地启动一个端口为9000的socket服务器DataStream<String> textStream = env.socketTextStream("localhost", 9000);// 3. 转换操作DataStream<Tuple2<String, Integer>> wordCountStream = textStream.flatMap(new LineSplitter()) // 将文本行切分为单词.keyBy(0) // 按单词分组.sum(1); // 对每个单词的计数求和// 4. 数据输出wordCountStream.print();// 5. 执行程序env.execute("Socket Word Count Example");}// 自定义一个FlatMapFunction,将输入的每一行文本切分为单词,并输出为Tuple2,第一个元素是单词,第二个元素是计数(初始值为1)public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) {for (String word : line.split(" ")) {out.collect(new Tuple2<>(word, 1));}}}
}

3. 完成工程代码

下面是一个基于Apache Flink的实时单词计数应用程序的完整工程代码,包括Pom.xml文件和所有Java类。

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>flink-wordcount-example</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><properties><flink.version>1.13.2</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>
</project>

WordCountExample

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCountExample {public static void main(String[] args) throws Exception {// 1. 创建执行环境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 创建数据流,从socket接收数据,需要在本地启动一个端口为9000的socket服务器DataStream<String> textStream = env.socketTextStream("localhost", 9000);// 3. 转换操作DataStream<Tuple2<String, Integer>> wordCountStream = textStream.flatMap(new LineSplitter())  // 将文本行切分为单词.keyBy(0)  // 按单词分组.sum(1);  // 对每个单词的计数求和// 4. 数据输出wordCountStream.print();// 5. 执行程序env.execute("Socket Word Count Example");}// 自定义一个FlatMapFunction,将输入的每一行文本切分为单词,并输出为Tuple2,第一个元素是单词,第二个元素是计数(初始值为1)public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) {for (String word : line.split(" ")) {out.collect(new Tuple2<>(word, 1));}}}
}

现在,你可以使用Maven编译并运行这个程序。在启动程序之前,你需要在本地启动一个端口为9000的Socket服务器。这可以通过使用Netcat工具 (nc -lk 9000) 或者其他任何能打开端口的工具实现。然后,你可以输入文本行,Flink程序会统计每个单词出现的次数,并实时打印结果。

测试验证

用py在本地启动一个socket服务器,监听9000端口,

python比较简单实现一个socket通信 。写一个Python来验证上面写的例子。

import socketserver_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(("localhost", 9000))
server_socket.listen(1)print("Waiting for connection...")
client_socket, client_address = server_socket.accept()
print("Connected to:", client_address)while True:data = input("Enter text: ")client_socket.sendall(data.encode())

运行Flink程序和Python socket服务器,然后在Python程序中输入文本, 会看到Flink程序实时统计每个单词出现的次数并输出到控制台。

4. Stream 执行环境

开发学习过程中,不需要关注。每个 Flink 应用都需要有执行环境,在该示例中为 env。流式应用需要用到 StreamExecutionEnvironment。
在这里插入图片描述

DataStream API 将你的应用构建为一个 job graph,并附加到 StreamExecutionEnvironment 。当调用 env.execute() 时此 graph 就被打包并发送到 JobManager 上,后者对作业并行处理并将其子任务分发给 Task Manager 来执行。每个作业的并行子任务将在 task slot 中执行。

注意,如果没有调用 execute(),应用就不会运行。

Flink runtime: client, job manager, task managers
此分布式运行时取决于你的应用是否是可序列化的。它还要求所有依赖对集群中的每个节点均可用。

5. 参考文档

https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/datastream_api/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/126886.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于MFC的串口通信(Mscomm)

1、串口通信的概述&#xff1a; 串口是一种重要的通信资源&#xff0c;例如鼠标口、USB接口都是串口。串行端口是CPU和串行设备间的编码转换器。当数据从CPU经过端口发送出去的时候&#xff0c;字节数据会被转为串行的位&#xff0c;在接收数据时&#xff0c;串行的位被转换为…

机器人仿真-gazebo学习笔记(4)xacro和传感器添加

1.xacro简介 URDF文件不具备代码复用的特性&#xff08;在上一篇文章也能发现&#xff0c;其实左右轮是极其相似的但还是要单独描述&#xff09;&#xff0c;一个复杂的机器人模型会拥有大量了的传感器和关节组件&#xff0c;这时候使用URDF文件就太难阅读了。精简化、可复用、…

学习视频剪辑:如何从指定时段快速抽出视频图片!高效技巧分享

随着数字媒体的普及&#xff0c;越来越多的人开始接触视频剪辑。在视频剪辑过程中&#xff0c;有时候我们需要从指定时段快速抽出视频图片。这不仅可以帮助我们提高剪辑效率&#xff0c;还可以让我们的视频更加丰富多彩。本文将分享一些高效技巧&#xff0c;帮助你轻松实现从指…

Vue路由(router)的安装和使用

Vue路由&#xff08;router&#xff09;的安装和使用 安装vue-router插件 第一步&#xff1a;在CMD窗口中&#xff0c;使用命令跳转到vue的安装路径下第二步&#xff1a;输入命令&#xff1a;npm i vue-router3 vue2 要安装 vue-router3 npm i vue-router3 vu3 要安装 vue-ro…

【Python微信机器人】第三篇:使用ctypes调用进程函数和读取内存结构体

目录修整 目前的系列目录(后面会根据实际情况变动): 在windows11上编译python将python注入到其他进程并运行注入Python并使用ctypes主动调用进程内的函数和读取内存结构体使用汇编引擎调用进程内的任意函数利用beaengine反汇编引擎的c接口写一个pyd库&#xff0c;用于实现inl…

Docker安装部署[8.x]版本Elasticsearch+Kibana+IK分词器

文章目录 Docker安装部署elasticsearch拉取镜像创建数据卷创建网络elasticsearch容器&#xff0c;启动&#xff01;踩坑&#xff1a;虚拟机磁盘扩容 Docker安装部署Kibana拉取镜像Kibana容器&#xff0c;启动&#xff01; 安装IK分词器安装方式一&#xff1a;直接从github上下载…

IDEA初步入门

1 安装 现在的系统更迭很快&#xff0c;很多软件都只支持win10 和 11了&#xff0c;但我们过时党还在用win7. 所以就必须找到合适的版本。在windows 7 64位系统下&#xff0c;可以使用IDEA 2020.1.4版本。 在Jetbrain官方下&#xff0c;找到历史版本&#xff0c;找到windows版…

iOS开发-CoreNFC实现NFC标签Tag读取功能

iOS开发-CoreNFC实现NFC标签Tag读取功能 一、NFC近场通信 近场通信&#xff08;NFC&#xff09;是一种无线通信技术&#xff0c;它使设备能够在不使用互联网的情况下相互通信。它首先识别附近配备NFC的设备。NFC常用于智能手机和平板电脑。 二、实现NFC标签Tag读取功能 在…

1985-2022年全国各地级市绿色专利申请和授权数据

1985-2022年全国各地级市绿色专利申请和授权数据 1、时间&#xff1a;1985-2022年 2、指标&#xff1a;年份、地区、行政区划代码、所属省份、所属地域、绿色专利申请总量、绿色专利申请_发明专利、绿色专利申请_实用新型专利、绿色专利授权总量、绿色专利授权_发明专利、绿色…

一种FSK信号符号同步的思想

FSK原理 FSK利用频率传输信息,即将信息流调制到频率上。以最简单的2FSK通俗来讲,用2个不同的频率 f 1 f_1 f1

Unity 报警告warning CS0649: Field ‘...‘ is never assigned to,...解决办法

文章目录 1. 现象2. 警告出现原因3. 解决方法 1. 现象 2. 警告出现原因 该警告应仅出现在私有成员变量中。那些不能从外部设置&#xff0c;这就是为什么编译器可以确定这些变量没有在任何地方蛇者其值。在C&#xff03;中&#xff0c;没有访问修饰符的变量&#xff08;private…

el-date-picker如何选择规定范围内的时间(十天以内的时间)

这个需求是可以选择之后来计算,选择当前日期之后自动计算当前日期前后的十天以内的日期 如下图 就是19号前面十天的日期 以及后面十天的日期(包含当天) 需要用到elementUI el-date-picker是Element UI库中的一个组件&#xff0c;用于日期选择 <el-date-picker v-model&q…

AI:46-基于深度学习的垃圾邮件识别

🚀 本文选自专栏:AI领域专栏 从基础到实践,深入了解算法、案例和最新趋势。无论你是初学者还是经验丰富的数据科学家,通过案例和项目实践,掌握核心概念和实用技能。每篇案例都包含代码实例,详细讲解供大家学习。 📌📌📌本专栏包含以下学习方向: 机器学习、深度学…

第20期 | GPTSecurity周报

GPTSecurity是一个涵盖了前沿学术研究和实践经验分享的社区&#xff0c;集成了生成预训练 Transformer&#xff08;GPT&#xff09;、人工智能生成内容&#xff08;AIGC&#xff09;以及大型语言模型&#xff08;LLM&#xff09;等安全领域应用的知识。在这里&#xff0c;您可以…

Tigger绕过激活锁/屏幕锁隐藏工具,支持登入iCloud有消息通知,支持iOS12.0-14.8.1。

绕过激活锁工具Tigger可以用来帮助因为忘记自己的ID或者密码而导致iPhone/iPad无法激活的工具来绕过自己的iPhone/iPad。工具支持Windows和Mac。 工具支持的功能&#xff1a; 1.Hello界面两网/三网/无基带/乱码绕过&#xff0c;可以完美重启&#xff0c;支持iCloud登录、有消…

【Linux】第九站:make和makefile

文章目录 一、 Linux项目自动化构建工具make/Makefile1.make/makefile工作现象2.依赖关系与依赖方法3.如何清理4.为什么这里我们需要带上clean5.连续的make6.特殊符号 二、Linux下实现一个简单的进度条1.回车换行2.缓冲区3.倒计时的实现 一、 Linux项目自动化构建工具make/Make…

图解刘润2023年度演讲--进化的力量思维导图精华

大家好&#xff0c;我是老原。 周末&#xff0c;商业顾问刘润发表了年度演讲&#xff1a;《进化的力量&#xff1a;寒武纪大爆发》。 这两天出差期间&#xff0c;陆陆续续看完了这个长达4小时的演讲&#xff0c;梳理了2023年到底发生了些什么&#xff0c;现在的环境如何…… …

如何使用 Docker 搭建 Jenkins 环境?从安装到精通

不少兄弟搭 jenkins 环境有问题&#xff0c;有的同学用 window, 有的同学用 mac&#xff0c; 有的同学用 linux。 还有的同学公司用 window, 家里用 mac&#xff0c;搭个环境头发掉了一地。。。 这回我们用 docker 去搭建 jenkins 环境&#xff0c;不管你是用的是什么系统&…

opencv在linux上调用usb摄像头进行拍照

功能 1.按照指定的文件名创建文件夹&#xff0c;创建之前判断该文件夹是否存在 2.调用摄像头按可调整窗口大小的方式显示 3.按esc退出摄像头画面 4.按p保存当前摄像头的画面&#xff0c;并按当前时间为照片的名字进行保存打开终端查看是否有摄像头 ls /dev/video*一般video1就…

面试了字节、美团、腾讯等30几家公司后,才知道软件测试面试全是这个套路......

一、Linux系统应用和环境配置&#xff1a; 1、Linux系统的操作命令给我说10个&#xff0c;一般用什么工具远程连接Linux服务器&#xff1f; 2、Linux中的日志存储在哪里&#xff1f;怎么查看日志内容&#xff1f; 3、Linux中top和ps命令的区别&#xff1f; 4、Linux命令运行…