Flink(四)【DataStream API - Source算子】

前言

        今天开始学习 DataStream 的 API ,这一块是 Flink 的核心部分,我们不去学习 DataSet 的 API 了,因为从 Flink 12 开始已经实现了流批一体, DataSet 已然是被抛弃了。忘记提了,从这里开始,我开始换用 Flink 17 了。

一个 Flink 程序,其实就是对 DataStream 的各种转换。具体来说,代码基本上都由以下几部分构成:

  1.  获取执行环境(execution environment)
  2.  读取数据源(source)
  3.  定义基于数据的转换操作(transformations)
  4.  定义计算结果的输出位置(sink)
  5.  触发程序执行(execute)

其中,获取环境和触发执行,都可以认为是针对执行环境的操作。

1、执行环境(Execution Environment)

不同的环境,代码的提交运行的过程会有所不同。这就要求我们在提交作业执行计算时,首先必须获取当前 Flink 的运行环境,从而建立起与 Flink 框架之间的联系。只有获取了环境上下文信息,才能将具体的任务调度到不同的 TaskManager 执行。

1.1、创建执行环境

1、getExecutionEnvironment

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

这是最简单高效的一种方式了,它可以自己根据环境的信息去判断。

我们也可以给它传递一个 Configuration 对象作为参数,这样我们可以设置运行时的一些配置,比如端口号等。

Configuration conf = new Configuration();conf.set(RestOptions.BIND_PORT,"8082");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

这里我们设置端口号为 8082 ,这样我们在默认的 8081 端口就无法访问 Web UI 了,只能通过 8082 端口来访问。 

2、createLocalEnvironment

这种方式了解即可,它是用来创建一个本地的模拟集群环境。

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

3、createRemoteEnvironment

这种方式同样了解即可,因为配置起来比较繁琐,我们既然是在集群下运行了,一般都是把代码打包成 jar 去执行,不会把配置信息写死的。

StreamExecutionEnvironment.createRemoteEnvironment("hadoop102",8081,"/opt/module/xxx.jar");

1.2、执行模式(Execution Mode)

默认的执行模式就是 Streaming 模式。

1、batch 模式

env.setRuntimeMode(RuntimeExecutionMode.BATCH);

2、streaming 模式

env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

3、自动模式

前两种方式都过于死板,打包后的程序都不能修改,所以我们一般不明确指定执行模式到底是 流处理 还是 批处理,而是执行时通过命令行来配置:

bin/flink run -Dexecution.runtime-mode=BATCH ...

1.3、触发程序执行

默认执行方式

        Flink 是事件驱动型的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”(lazy execution)。所以我们需要显式地调用执行环境的 execute()方法,来触发程序执行。execute()方法将一直等待作业完成,然后返回一个执行结果(JobExecutionResult)但是这个返回对象我们一般不怎么用,而且这个返回结果在程序运行完才会返回。

        默认 env.execute() 触发生成一个 Flink Job。

env.execute();

异步执行方式

        极少情况下,可能我们一套代码中有两部分处理逻辑,比如 env.execute() 之后,又进行了一些操作然后再进行 execute() ,但在 main 线程中是会阻塞的,这就需要启动一个异步的 execute() 方法。

        executeAsync() 会触发执行多个 Flink Job。

env.execute();// 其他处理代码...env.executeAsync();

2、源算子(Source)

2.1、准备工作

写一个 Java Bean,注意类的属性序列化问题(这里我们的属性都是一些基本类型,Flink 是支持对它进行序列化的),Flink 会把这样的类作为一种特殊的 POJO 数据类型来对待,方便数据的解析和序列化。

import java.util.Objects;public class WaterSensor {public String id;public Long ts;public Integer vc;public WaterSensor(){}public WaterSensor(String id, Long ts, Integer vc) {this.id = id;this.ts = ts;this.vc = vc;}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;WaterSensor that = (WaterSensor) o;return Objects.equals(id, that.id) && Objects.equals(ts, that.ts) && Objects.equals(vc, that.vc);}@Overridepublic int hashCode() {return Objects.hash(id, ts, vc);}@Overridepublic String toString() {return "WaterSensor{" +"id='" + id + '\'' +", ts=" + ts +", vc=" + vc +'}';}public String getId() {return id;}public void setId(String id) {this.id = id;}public Long getTs() {return ts;}public void setTs(Long ts) {this.ts = ts;}public Integer getVc() {return vc;}public void setVc(Integer vc) {this.vc = vc;}
}

2.2、从集合中读取

和 Spark 一样,集合类型我们一般只在测试的时候使用。
主要方法就是 fromCollection 或者 fromElements 。
import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Arrays;public class CollectionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();WaterSensor sensor1 = new WaterSensor("1",1L,1);WaterSensor sensor2 = new WaterSensor("2",2L,2);// 从集合读取数据DataStreamSource<WaterSensor> source = env
//                .fromElements(sensor1,sensor2); //直接填写元素.fromCollection(Arrays.asList(sensor1,sensor2));   // 从集合读取数据source.print();env.execute();}
}

2.3、从文件中读取

读取文件,需要添加文件连接器依赖:

    <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency>

新的 Source 读取语法:

 env.fromSource(Source的实现类,Watermark,source名称)

示例: 

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FileSourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从文件中读取FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(),new Path("input/words.txt")).build();env.fromSource(fileSource, WatermarkStrategy.noWatermarks(),"fileSource").print();env.execute();}
}

2.4、从 Socket 读取数据

这种方式同样常用于模拟流数据,稳定性较差,通常用来测试。

DataStream<String> stream = env.socketTextStream("localhost", 9999);

2.5、从 Kafka 读取数据

实际开发也是用 Kafka 来读取的,我们的实时流数据都是由 Kafka 来做收集和传输的。

导入依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version>
</dependency>

案例

package com.lyh.source;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class KafkaSourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从 Kafka 读取KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")    //指定kafka地址和端口.setGroupId("lyh")  // 指定消费者组id.setTopics("like")  // 指定消费的topic,可以是多个用List<String>.setValueOnlyDeserializer(new SimpleStringSchema()) // 指定反序列化器 因为kafka是生产者 flink作为消费者要反序列化.setStartingOffsets(OffsetsInitializer.latest())    // 指定flink消费kafka的策略.build();env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(),"kafkaSource").print();env.execute();}/**  kafka 消费者的参数: *      auto.reset.offsets:*          earliest: 如果有offset,从offset继续消费;如果没有 就从 最早 消费*          latest  : 如果有offset,从offset继续消费;如果没有 就从 最新 消费* flink 的 kafkaSource offset消费者策略: offsetsInitializer,默认是 earliest*      earliest: 一定从 最早 消费 (不管有没有offset) *      latest  : 一定从 最新 消费 (不管有没有offset)*/
}

启动 Kafka 集群(需要先启动 zookeeper)

使用命令行生产者生产消息:

kafka-console-producer.sh --broker-list hadoop102:9092 --topic like

2.6、从数据生成器读取数据

导入依赖:

 <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version></dependency>

 案例

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;public class DataGeneratorDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);/*** 数据生成器参数说明:*  1. GeneratorFunction接口,需要重写 map 方法,输入类型必须是Long*  2. Long类型, 自动生成的数字序列(从0自增)的最大值*  3. 限速策略, 比如每秒生成几条数据*  4. 返回的数据类型*/DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return "number: " + value;}},10L,RateLimiterStrategy.perSecond(1),Types.STRING);env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(),"dataGenerator").print();env.execute();}
}

运行效果:

number: 0
number: 1
number: 2
number: 3
number: 4
number: 5
number: 6
number: 7
number: 8
number: 9Process finished with exit code 0

如果想达到无界流的效果,直接给数据生成器的第二个参数传一个 Long.MAX_VALUE。

假如我们的第二个参数设置为100(意味着从0自增到99)。如果并行度为3,那么第二个线程将从100的1/3处(即34)开始累加,第三个线程将从100的2/3(即67)开始累加。

Flink 支持的数据类型

        这里主要说泛型类型和类型提示,别的类型比如我们基本的数据类型及其包装类型和String(引用类型)、基本类型数组、对象数组、复合数据类型(Flink 内置的 Tuple0~Tuple25),辅助类型Option、Either、List、Map等,还有 POJO 类型,Flink 的 TypeInfomation 类型都已经为我们封装好了,它为每个数据类型生成了特定的序列化、反序列化器和比较器。

泛型

Flink 支持所有的 Java 类和 Scala 类。但如果没有按照 POJO 类型的要求来定义,就会被 Flink 当作泛型类来处理。Flink 会把泛型类型当作黑盒,无法获取它们内部的属性;它们也不是由 Flink 本身序列化的,而是由 Kryo 序列化的。
在这些类型中,元组类型和 POJO 类型最为灵活,因为它们支持创建复杂类型。而相比之下,POJO 还支持在键(key)的定义中直接使用字段名,这会让我们的代码可读性大大增加。所以,在项目实践中,往往会将流处理程序中的元素类型定为 Flink 的 POJO 类型。
Flink 对 POJO 类型的要求如下:
⚫ 类是公共的(public)和独立的(standalone,也就是说没有非静态的内部类);
⚫ 类有一个公共的无参构造方法;
⚫ 类中的所有字段是 public 且非 final 的;或者有一个公共的 getter 和 setter 方法,这些方法需要符合 Java bean 的命名规范。所以我们上面的 WaterSensor,就是我们创建的符合 Flink POJO 定义的数据类型。

类型提示(Type Hints)

        Flink 还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于 Java 中泛型擦除的存在,在某些特殊情况下(比如 Lambda 表达式中),自动提取的信息是不够精细的,它是不可靠的;这时就需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。

        为了解决这类问题,Java API 提供了专门的“类型提示”(type hints)。回忆一下之前的 word count 流处理程序,我们在将 String 类型的每个词转换成(word,count)二元组后,就明确地用 returns 指定了返回的类型。因为对于 map 里传入的 Lambda 表达式,系统只能推断出返回的是 Tuple2 类型,而无法得到 Tuple2<String, Long>。只有显式地告诉系统当前的返回类型,才能正确地解析出完整数据。

下面给出两种写法:

DataStreamSource<String> lineDS = env.socketTextStream("hadoop102",9999);// 3. flatMap 打散数据 返回元组SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDS.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}})//.returns(Types.TUPLE(Types.STRING, Types.LONG));.returns(new TypeHint<Tuple2<String, Long>>() {});  //也可以这样写

这是一种比较简单的场景,二元组的两个元素都是基本数据类型。那如果元组中的一个元素又有泛型,该怎么处理呢?
Flink 专门提供了 TypeHint 类,它可以捕获泛型的类型信息,并且一直记录下来,为运行时提供足够的信息。我们同样可以通过.returns()方法,明确地指定转换之后的 DataStream 里元素的类型。

returns(new TypeHint<Tuple2<Integer, SomeType>>(){})

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/143210.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【论文精读2】R-MVSNet

R-MVSNet【递归多视图立体网络】&#xff0c;论文全名&#xff1a;“Recurrent MVSNet for High-resolution Multi-view Stereo Depth Inference”&#xff0c;CVPR 2019(CCF A) 在MVSNet的基础上做了一些改进&#xff0c;主要解决的问题是代价体正则化&#xff08;Cost Volume…

三、Eureka注册中心

目录 一、作用及调用方式 二、搭建eureka注册中心 三、注册user-service和order-service 四、新增实例 五、服务拉取 六、总结 一、作用及调用方式 在服务提供者启动时&#xff0c;它会向eureka注册中心提供自己的信息&#xff0c;并每30秒进行一次刷新eureka注册中心保存…

ping: www.baidu.com: Name or service not known解决办法

解决服务器无法ping通外网问题 1、问题描述&#xff1a; 配置了网卡信息&#xff0c;发现还是无法访问外网&#xff0c;并报ping: www.baidu.com: Name or service not known信息 2、问题原因&#xff1a; 这就是外网没开通好 3、解决方法&#xff1a; 修改网卡文件&#xff…

易货:一种古老而有效的商业模式

在当今的商业世界中&#xff0c;我们常常听到关于电子商务、互联网和社交媒体等新技术的讨论。然而&#xff0c;尽管这些新技术为我们的日常生活带来了许多便利&#xff0c;但它们并没有完全取代传统的商业模式。其中&#xff0c;易货模式是一种古老而有效的商业模式&#xff0…

Python爬虫程序网络请求及内容解析

以下是一个简单的Python爬虫程序&#xff0c;用于爬取商户的内容。这个程序使用了requests和BeautifulSoup库来进行网络请求和内容解析。 import requests from bs4 import BeautifulSoup# 爬虫爬虫IP信息 proxy_host duoip proxy_port 8000# 请求URL url 目标网站# 创建一个…

优测云测试平台 | 有效的单元测试(下)

接着上一篇内容&#xff0c;我们继续~ 四、测试的目标之三&#xff1a;快速反馈 测试的快速反馈有两个方面的含义: 1.测试运行要快速出结果。 2.当测试失败时&#xff0c;要能快速定位失败原因。 测试运行效率决定了开发的工作周期运转的快慢。在理想的 TDD 模型中&#x…

多线程压缩ZIP文件

工作过程中&#xff0c;可能会遇到有需要生成压缩包的需求&#xff0c;而生成压缩包&#xff0c;一般速度不快&#xff0c;比较影响效率&#xff0c;所以一般会考虑使用多线程进行压缩。本文就多线程压缩方式进行以下介绍 多线程压缩一般分为两种方式 多线程读源文件&#xff…

docker-compose 部署 MySQL 8

目录 前言MySQL 配置文件(my.cnf)docker-compose.yml安装卸载 前言 Windows/Linux 系统通过 docker-compose 部署 MySQL8.0。 MySQL 配置文件(my.cnf) # 服务端参数配置 [mysqld] usermysql # MySQL启动用户 default-storage-engineINNODB # 创建新表时…

HTTP代理与SOCKS5代理,有什么区别?

在数字通信领域&#xff0c;数据安全和匿名性都是非常重要的指标。互联网的不断发展催生了几种协议&#xff0c;每种协议都有独特的优势和挑战。其中&#xff0c;SOCKS5 代理、HTTP代理最为广泛使用&#xff0c;下面给大家一起讨论&#xff0c;HTTP代理与SOCKS5代理&#xff0c…

POE也收费了

一直通过POE在用chatgpt&#xff0c;今天下午发现要收费了…

开发知识点-Vue-Electron

Electron ElectronVue打包.exe桌面程序 ElectronVue打包.exe桌面程序 为了不报错 卸载以前的脚手架 npm uninstall -g vue-cli安装最新版脚手架 cnpm install -g vue/cli创建一个 vue 随便起个名 vue create electron-vue-example (随便起个名字electron-vue-example)进入 创建…

ai语音电销机器人电销行业要怎么降低封号率?

工信部对电话营销电话的管控越来越严格&#xff0c;企业电销行业的发展受到了很多限制&#xff0c;因为电话销售人员在进行销售工作的时候&#xff0c;经常会因为各种原因触发封号机制&#xff0c;导致手机卡号被封&#xff0c;那企业电销行业要怎么降低封号率&#xff1f; 很多…

vue+iView实现下载zip文件导出多个excel表格

1&#xff0c;需求&#xff1a;在vue项目中&#xff0c;实现分月份导出多个Excel表格。 点击导出&#xff0c;下载zip文件&#xff0c;解压出多张表数据。 2&#xff0c;关键代码&#xff1a; <Button class"export button-style button-space" click"ex…

ssm823基于ssm的心理预约咨询管理系统的设计与实现+vue

ssm823基于ssm的心理预约咨询管理系统的设计与实现vue 交流学习&#xff1a; 更多项目&#xff1a; 全网最全的Java成品项目列表 https://docs.qq.com/doc/DUXdsVlhIdVlsemdX 演示 项目功能演示&#xff1a; ————————————————

【QT HTTP】使用QtNetwork模块制作基于HTTP请求的C/S架构

目录 0 引言1 HTTP基本知识1.1 请求类型1.2 HTTP请求报文格式1.3 HTTP响应报文格式1.4 拓展&#xff1a;GET vs POST 请求方法GET请求请求报文&#xff1a;响应报文 POST请求请求报文响应报文 其他注意事项示例&#xff1a;GET请求示例POST请求示例 2 实战2.1 QtNetwork模块介绍…

电脑指示灯闪烁,但是无法开机的解决方案

【便携机开机故障】电脑指示灯闪烁&#xff0c;但是无法开机的解决方案 问题描述 设备型号&#xff1a;联想 ThinkPad T14s 故障详情&#xff1a;电脑使用后未关机锁屏合盖后&#xff0c;再次使用时开关机指示灯一直闪烁&#xff0c;但是无法正常开机。 其他尝试方法&#xf…

Kafka 的应用场景

Kafka 是一个开源的分布式流式平台&#xff0c;它可以处理大量的实时数据&#xff0c;并提供高吞吐量&#xff0c;低延迟&#xff0c;高可靠性和高可扩展性。 Kafka 最初是为分布式系统中海量日志处理而设计的。它可以通过持久化功能将消息保存到磁盘&#xff0c;并让消费者按…

Express基本接口开发-入门学习与后续进阶

前提推荐 任何一个新的知识都是从文档看起&#xff0c;因此express官方文档示例有必要去学习一遍。 推荐看&#xff1a; 推荐入门指南-路由指南-中间件 看完这几个内容之后心里大概知道express有些什么东西了&#xff0c;然后现在就可以去练习了 注意&#xff1a;更多示例-代…

安全区域边界(设备和技术注解)

网络安全等级保护相关标准参考《GB/T 22239-2019 网络安全等级保护基本要求》和《GB/T 28448-2019 网络安全等级保护测评要求》 密码应用安全性相关标准参考《GB/T 39786-2021 信息系统密码应用基本要求》和《GM/T 0115-2021 信息系统密码应用测评要求》 1边界防护 1.1应保证跨…

03-CSS基础选择器

3.1 CSS基础认知&#x1f34e; 3.1.1 &#x1f441;️‍&#x1f5e8;️CSS概念 CSS&#xff1a;层叠样式表&#xff08;Cascading style sheets)&#xff0c;为网页标签增加样式表现的 语法格式&#xff1a; 选择器{<!-- 属性设置 -->属性名:属性值; <!--每一个…