【API篇】二、源算子API

文章目录

  • 0、demo数据
  • 1、源算子Source
  • 2、从集合中读取数据
  • 3、从文件中读取
  • 4、从Socket读取
  • 5、从Kafka读取
  • 6、从数据生成器读取数据
  • 7、Flink支持的数据类型
  • 8、Flink的类型提示(Type Hints)

0、demo数据

准备一个实体类WaterSensor:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class WaterSensor{private String id;   //水位传感器类型private Long ts;     //传感器记录时间戳private Integer vc;  //水位记录
}
//注意所有属性的类型都是可序列化的,如果属性类型是自定义类,那要实现Serializable接口

模块下准备个文件words.txt,内容:

hello flink
hello world
hello java

1、源算子Source

Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。

在这里插入图片描述
Flink1.12以前,添加数据源的方式是,调用执行环境对象的addSource方法

DataStream<String> stream = env.addSource(...);
//方法传入的参数是一个“源函数”(source function),需要实现SourceFunction接口

Flink1.12开始的流批统一的Source框架下,则是:

DataStreamSource<String> stream = env.fromSource()

2、从集合中读取数据

调用执行环境对象的fromCollection方法进行读取。这相当于将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用,一般用于测试

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();List<Integer> data = Arrays.asList(1, 22, 33);DataStreamSource<Integer> ds = env.fromCollection(data);stream.print();   //直接打印env.execute();
}

还可以直接fromElements方法:

DataStreamSource<Integer> ds = env.fromElements(1,22,33);

在这里插入图片描述

3、从文件中读取

从文件中读是批处理中最常见的读取方式,比如读取某个日志文件。首先需要引入文件连接器依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version>
</dependency>
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//第三个参数为自定义的sourceNameFileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("input/word.txt")).build();env.fromSource(fileSource,WatermarkStrategy.noWatermarks(),"file").print();env.execute();
}
  • FileSource数据源对象的创建,传参可以是目录,也可以是文件,可以相对、绝对路径,也可从HDFS目录下读,开头格式hdfs://…
  • 相对路径是从系统属性user.dir获取路径:idea下是project的根目录,standalone模式下是集群节点根目录
  • 之前的env.readTextFile方法被标记为过时是因为底层调用了addSource

4、从Socket读取

前面的文件和集合,都是有界流,而Socket常用于调试阶段模拟无界流:

DataStream<String> stream = env.socketTextStream("localhost", 9527);
# 对应的主机执行
nc -lk 9527

5、从Kafka读取

数据源是外部系统,常需要导入对应的连接器的依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version>
</dependency>

实例:

public class SourceKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop01:9092,hadoop02:9092,hadoop03:9092")   //指定Kafka节点的端口和地址.setTopics("topic_1")  //消费的Topic.setGroupId("code9527") //消费者组id//Flink程序做为Kafka的消费者,要进行对象的反序列化,setDeserializer对key和value都生效.setStartingOffsets(OffsetsInitializer.latest())  //指定Flink消费Kafka的策略.setValueOnlyDeserializer(new SimpleStringSchema())   //反序列化Value的反序列化器.build();DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");stream.print("Kafka");env.execute();}
}//很多传参Ctrl+P看源码类型、Ctrl+H实现类自行分析

Kafaka的消费者参数:

  • earliest:有offset,就从offset继续消费,没offset,就从最早开始消费
  • latest:有offset,就从offset继续消费,没offset,就从最新开始消费

Flink下的KafkaSource,offset消费策略有个初始化器OffsetInitializer,默认是earliest:

  • earliest:一定从最早消费
  • latest:一定从最新消费

注意和Kafka自身的区别。

6、从数据生成器读取数据

Flink从1.11开始提供了一个内置的DataGen 连接器,主要是用于生成一些随机数来调试。1.17版本提供了新写法,导入依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version>
</dependency>
public class DataGeneratorDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//env.setParallelism(1);DataGeneratorSource<String> dataGeneratorSource =new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return "Number:"+value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(10),Types.STRING);env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "datagenerator").print();env.execute();}
}

new数据生成器源对象,有四个参数:

  • 第一个为GeneratorFunction接口,key为Long型,value为需要map转换后的类型。需要实现map方法,输入方法固定是Long类型
  • 第二个为自动生成数字的最大值,long型,到这个值就停止生成
  • 第三个为限速策略,比如每秒生成几个
  • 第四个为返回的数据类型,Types.xx,Types类是Flink包下的

在这里插入图片描述

嘶,并行度默认为CPU核心数了,输出算子6个子任务,且是每个并行度上是各自自增的(先按总数/并行度划分,再各自执行,比如最大值100,并行度2,那一个从0开始,另一个从50到99)。数字打印出来看着有点乱了,改下并行度

env.setParallelism(1);

在这里插入图片描述

可以看到程序结束了,相当于有界流了,想模拟无界流,可以第二个参数传Long.MAX_VALUE,这就一直输出了

7、Flink支持的数据类型

Flink使用类型信息(TypeInformation)来统一表示数据类型。TypeInformation类是Flink中所有类型描述符的基类。它涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化器、反序列化器和比较器

在这里插入图片描述

对于Java和Scale常见的数据类型,Flink都支持,在Types工具类中可以看到:

在这里插入图片描述

Flink支持所有自定义的Java类和Scala类,但要符合以下要求:

  • 类是公有(public)的
  • 有一个无参的构造方法
  • 所有属性都是可访问的,即公有public或private+getter、setter
  • 类中所有属性的类型都是可以序列化的

不满足以上要求的类,会被Flink当作泛型类来处理。Flink会把泛型类型当作黑盒,无法获取它们内部的属性,它们也不是由Flink本身序列化的,而是由Kryo序列化的。

8、Flink的类型提示(Type Hints)

Flink还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于Java中泛型擦除的存在,在某些特殊情况下(比如Lambda表达式中),自动提取的信息是不够精细的,需要我们手动显示提供类型信息。

之前的word count流处理程序,我们在将String类型的每个词转换成(word, count)二元组后,就明确地用returns指定了返回的类型。因为对于map里传入的Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long>。只有显式地告诉系统当前的返回类型,才能正确地解析出完整数据。

//....
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/105228.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【入门】.Net Core 6 WebApi 项目搭建

一、创建项目 1.1.创建新项目&#xff1a;打开开发工具>创建新项目>搜索API>选择C#语言的ASP.NET Core Web API 1.2.配置新项目&#xff1a;**自定义项目信息以及存储路径 1.3.其他信息&#xff1a;这里框架必须选择.NET 6.0,其他配置默认勾选即可&#xff0c;也可以根…

逐字稿 | 对比学习论文综述【论文精读】

对比学习在计算机视觉领域的发展历程&#xff0c;4个阶段&#xff1a; 百花齐放&#xff1a;方法、模型、目标函数、代理任务都还没有统一。CV双雄&#xff1a;MOCOv1、SimCLRv1、MOCOv2、SimCLRv2、CPC和CMC的延伸工作、SwaV&#xff0c;这个阶段发展非常迅速&#xff0c;以上…

云上攻防-云原生篇Docker安全系统内核版本漏洞CDK自动利用容器逃逸

文章目录 云原生-Docker安全-容器逃逸&内核漏洞云原生-Docker安全-容器逃逸&版本漏洞-CVE-2019-5736 runC容器逃逸-CVE-2020-15257 containerd逃逸 云原生-Docker安全-容器逃逸&CDK自动化 云原生-Docker安全-容器逃逸&内核漏洞 细节部分在权限提升章节会详解&…

SQLite4Unity3d安卓 在手机上创建sqlite失败解决

总结 要在Unity上运行一次出现库&#xff0c;再打包进APK内 问题 使用示例代码的创建库 var dbPath string.Format("Assets/StreamingAssets/{0}", DatabaseName); #else// check if file exists in Application.persistentDataPathvar filepath string.Format…

idea插件开发javax.net.ssl.SSLException: No PSK available. Unable to resume.

idea插件开发,编译出错 javax.net.ssl.SSLException: No PSK available. Unable to resume.at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:129)at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:117)at java.base/sun.security.ssl.…

通讯网关软件024——利用CommGate X2Access实现Modbus TCP数据转储Access

本文介绍利用CommGate X2ACCESS实现从Modbus TCP设备读取数据并转储至ACCESS数据库。CommGate X2ACCESS是宁波科安网信开发的网关软件&#xff0c;软件可以登录到网信智汇(http://wangxinzhihui.com)下载。 【案例】如下图所示&#xff0c;实现从Modbus TCP设备读取数据并转储…

Ubuntu 上传项目到 GitHub

一、前言 GitHub 作为时下最大的开源代码管理项目&#xff0c;广泛被工程和科研人员使用&#xff0c;本文主要介绍如何如何将自己的项目程序上传到 GitHub 上。 要上传本地项目到 GitHub 上&#xff0c;主要分为两步&#xff0c;第一步是 二、创建 SSH keys 首先登录 GitHu…

Jenkins+Gitlab+Docker(Dockerfile)部署

Docker部署运行 ​ 上一篇内容中使用Jenkins(运行服务器)Gitlab(代码存储库)Webhook(网络钩子)的方式部署运行我们的项目。需要我们在服务器上做好很多相关的环境配置及依赖。 ​ 那么假如有这样一个场景&#xff1a;需要把不同技术栈的项目部署到同一台服务器上运行。比如PH…

如何开始使用 Kubernetes RBAC

基于角色的访问控制 (RBAC) 是一种用于定义用户帐户可以在 Kubernetes 集群中执行的操作的机制。启用 RBAC 可以降低与凭证盗窃和帐户接管相关的风险。向每个用户授予他们所需的最低权限集可以防止帐户拥有过多的特权。 大多数流行的 Kubernetes 发行版都从单个用户帐户开始,…

【MySQL × SpringBoot 突发奇想】全面实现流程 · 数据库导出Excel表格文件的接口

文章目录 【MySQL SpringBoot 小点子】全面实现流程 数据库导出Excel表格文件的接口1. 什么是VO&#xff08;View Object&#xff09;对象2. BeanCopyUtils进行两个对象的数据转移3. mapper层实现4. service层实现5. vo对象创建6. 保存路径配置7. controller层核心代码实现8.…

JavaFX: 使用本地openjfx包

JavaFX: 使用本地openjfx包 1、注释配置2、下载openjfx包3、导入openjfx的jar包 1、注释配置 build.gradle配置注释&#xff1a; 2、下载openjfx包 下载javaFx地址&#xff1a;https://gluonhq.com/products/javafx/ 3、导入openjfx的jar包

elasticsearch安装

安装elasticsearch 1.部署单点es 1.1.创建网络 因为我们还需要部署kibana容器&#xff0c;因此需要让es和kibana容器互联。这里先创建一个网络&#xff1a; docker network create es-net1.2.加载镜像 elasticsearch的镜像的tar包&#xff1a;点击下载 将其上传到虚拟机中…

电源集成INN3270C-H215-TL、INN3278C-H114-TL、INN3278C-H215-TL简化了反激式电源转换器的设计和制造。

一、概述 InnoSwitch™3-CP系列IC极大地简化了反激式电源转换器的设计和制造&#xff0c;特别是那些需要高效率和/或紧凑尺寸的产品。InnoSwitch3-CP系列将初级和次级控制器以及安全额定反馈集成到单个IC中。 InnoSwitch3-CP系列器件集成了多种保护功能&#xff0c;包括线路过…

gRPC之gRPC Gateway

1、gRPC Gateway etcd3 API全面升级为gRPC后&#xff0c;同时要提供REST API服务&#xff0c;维护两个版本的服务显然不太合理&#xff0c;所以 grpc-gateway 诞生了。通过protobuf的自定义option实现了一个网关&#xff0c;服务端同时开启gRPC和HTTP服务&#xff0c; HTTP服…

京东数据接口:京东数据分析怎么做?

电商运营中数据分析的重要性不言而喻&#xff0c;而想要做数据分析&#xff0c;就要先找到数据&#xff0c;利用数据接口我们能够更轻松的获得比较全面的数据。因此&#xff0c;目前不少品牌商家都选择使用一些数据接口来获取相关电商数据、以更好地做好数据分析。 鲸参谋电商…

2023年中国云计算软件市场规模、市场结构及市场份额情况分析[图]

云计算是分布式计算的一种&#xff0c;指的是通过网络“云”将巨大的数据计算处理程序分解成无数个小程序&#xff0c;然后&#xff0c;通过多部服务器组成的系统进行处理和分析这些小程序得到结果并返回给用户。云计算软件类型分为三类&#xff0c;即基础设施即服务、平台即服…

Python算法练习 10.14

leetcode 2095 删除链表的中间节点 给你一个链表的头节点 head 。删除 链表的 中间节点 &#xff0c;并返回修改后的链表的头节点 head 。 长度为 n 链表的中间节点是从头数起第 ⌊n / 2⌋ 个节点&#xff08;下标从 0 开始&#xff09;&#xff0c;其中 ⌊x⌋ 表示小于或等于…

非类型模板参数+模板的特化

目录 一、非类型模板参数 二、模板的特化 &#xff08;一&#xff09;函数模板特化 &#xff08;二&#xff09;类模板举例 1. 全特化 2. 偏特化 一、非类型模板参数 模板参数分类&#xff1a;类型形参与非类型形参。类型形参即&#xff1a;出现在模板参数列表中&#x…

使用docker搭建kafka集群、可视化操作台

单机搭建 1 拉取zookeeper镜像 docker pull wurstmeister/zookeeper 2 启动zookeeper容器 docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper 3 拉取kafka镜像 docker pull wurstmeister/kafka 4 启动kafka镜像 docker…

【玩机】如何修改iPhone充电提示音!最详细简单保姆级教程~ 学费了可替换任意音频做你的专属充电提示音!——后厂村路灯

其实方法很简单&#xff0c;利用快捷指令&#xff0c;获得base64 位的音频文本&#xff0c;然后再充电时播放即可。 视频教程 【玩机】如何修改iPhone充电提示音&#xff01;最详细简单保姆级教程 具体操作如下&#xff1a; 1.首先&#xff0c;网上找到需要设定的音频&#xf…