[flink 实时流基础]源算子和转换算子

文章目录

    • 1. 源算子 Source
        • 1. 从集合读
        • 2. 从文件读取
        • 3. 从 socket 读取
        • 4. 从 kafka 读取
        • 5. 从数据生成器读取数据
    • 2. 转换算子
        • 基本转换算子(map/ filter/ flatMap)


1. 源算子 Source

Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端。
image.png
在Flink1.12以前,旧的添加source的方式,是调用执行环境的addSource()方法:
DataStream stream = env.addSource(…);
方法传入的参数是一个“源函数”(source function),需要实现SourceFunction接口。
从Flink1.12开始,主要使用流批统一的新Source架构:
DataStreamSource stream = env.fromSource(…)
Flink直接提供了很多预实现的接口,此外还有很多外部连接工具也帮我们实现了对应的Source,通常情况下足以应对我们的实际需求。

1. 从集合读
    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 1. 从集合读
//        DataStreamSource<Integer> source = env.fromCollection(Arrays.asList(1, 2, 3));// 2. 直接填元素DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4);source.print();env.execute();}
2. 从文件读取
		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency>
    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();FileSource<String> source = FileSource.forRecordStreamFormat(new TextLineInputFormat(),new Path("input/world.txt")).build();env.fromSource(source, WatermarkStrategy.noWatermarks(), "fileSource").print();env.execute();}
3. 从 socket 读取
    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.socketTextStream("localhost", 7777);source.print();env.execute();}

可以使用 nc -l 7777创建一个监听链接的 tcp

4. 从 kafka 读取
		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency>
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092").setTopics("topic_1").setGroupId("atguigu").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()) .build();DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");stream.print("Kafka");env.execute();}
5. 从数据生成器读取数据
		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version></dependency>
 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return "Number:" + value;}}, 10, // 自动生成的数字序列RateLimiterStrategy.perSecond(10), // 限速策略,每秒生成10条Types.STRING // 返回类型);env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "datagenerator").print();env.execute();}

2. 转换算子

数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个DataStream转换为新的DataStream。
image.png

基本转换算子(map/ filter/ flatMap)

map是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素。
image.png
filter转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉。
image.png
flatMap操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。
:::info
消费一个元素,可以产生0到多个元素。
:::
flatMap可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。
image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/780809.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Day55:WEB攻防-XSS跨站CSP策略HttpOnly属性Filter过滤器标签闭合事件触发

目录 XSS跨站-安全防御-CSP XSS跨站-安全防御-HttpOnly XSS跨站-安全防御-XSSFilter(过滤器的意思) 1、无任何过滤 2、实体化 输入框没有 3、全部实体化 利用标签事件 单引号闭合 4、全部实体化 利用标签事件 双引号闭合 5、事件关键字过滤 利用其他标签调用 双引号闭合…

代码随想录训练营第60天 | LeetCode 84.柱状图中最大的矩形、总结

LeetCode 84.柱状图中最大的矩形 文章讲解&#xff1a;代码随想录(programmercarl.com) 视频讲解&#xff1a;单调栈&#xff0c;又一次经典来袭&#xff01; LeetCode&#xff1a;84.柱状图中最大的矩形_哔哩哔哩_bilibili 思路 代码如下&#xff1a; ​​​​​​总结 感…

代码随想录|Day28|贪心03|1005.K次取反后最大化的数组和、134.加油站、135.分发糖果

1005.K次取反后最大化的数组和 思路&#xff1a; 优先取反 绝对值最大的负数如果没有负数&#xff0c;不断取反 绝对值最小的数&#xff0c;直到次数 K 耗尽 取反最小数有一个优化技巧&#xff1a; 如果 K 为偶数&#xff0c;则取反 K 次后&#xff0c;正负不变。如果 K 为奇数…

ROM-IP

1.原理 通过添加数据文件&#xff0c;使ROM看起来不是易失性存储器&#xff0c; 产生256个数据&#xff0c;每个数据的位宽是8 如果前面为x&#xff0c;后面就是x256-1 2.单端口ROM配置 FPGA内部没有非易失性存储器。调用的ROM和RAM都是用RAM来生成的 3.双端口ROM配置 使用第一…

马斯克旗下xAI发布Grok-1.5,相比较开源的Grok-1,各项性能大幅提升,接近GPT-4!

本文原文来自DataLearnerAI官方网站&#xff1a;马斯克旗下xAI发布Grok-1.5&#xff0c;相比较开源的Grok-1&#xff0c;各项性能大幅提升&#xff0c;接近GPT-4&#xff01; | 数据学习者官方网站(Datalearner) 继Grok-1开源之后&#xff0c;xAI宣布了Grok-1.5的内测消息&…

手撕算法-跳跃游戏

描述 分析 如果某一个作为 起跳点 的格子可以跳跃的距离是 3&#xff0c;那么表示后面 3 个格子都可以作为 起跳点可以对每一个能作为 起跳点 的格子都尝试跳一次&#xff0c;把 能跳到最远的距离 不断更新如果可以一直跳到最后&#xff0c;就成功了 代码 class Solution {…

07-JavaScript DOM事件

1. 事件 1.1 事件概述 JavaScript 使我们有能力创建动态页面&#xff0c;而事件是可以被 JavaScript 侦测到的行为。 简单理解&#xff1a; 触发--- 响应机制。 网页中的每个元素都可以产生某些可以触发 JavaScript 的事件&#xff0c;例如&#xff0c;我们可以在用户点击某…

【漏洞潜在风险】弹框干扰类风险

弹框干扰风险定义: 游戏过程中&#xff0c;客户端经常会以文字类形式对玩家进行说明和指引&#xff0c;而对于一些更为重要的信息&#xff0c;便会用游戏中的弹框进行强调。由玩家主动触发对其他玩家造成重复弹框进而干扰到正常游戏的都可以称之为弹框干扰类风险。弹框干扰风险…

C++项目——集群聊天服务器项目(六)MySQL模块

Hello&#xff0c;大家好啊&#xff0c;最近比较忙&#xff0c;没来得及更新项目&#xff0c;实在抱歉~今天就恢复更新拉~ 在验证完网络模块与业务模块代码可以正常使用后&#xff0c;需完成的操作是与底层数据库进行交互&#xff0c;为实现各类用户查询、增删业务奠定良好的基…

【群晖】白群晖如何公网访问

【群晖】白群晖如何公网访问 ——> 点击查看原文 在使用默认配置搭建好的群晖NAS后&#xff0c;我们可以通过内网访问所有的服务。但是&#xff0c;当我们出差或者不在家的时候也想要使用应该怎么办呢&#xff1f; 目前白群提供了两种比较快捷的方式&#xff0c;一种是直接注…

【Python系列】合并配置文件的最佳实践

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

python实现两个Excel表格数据对比、补充、交叉验证

业务背景 业务中需要用到类似企查查一类的数据平台进行数据导出&#xff0c;但企查查数据不一定精准&#xff0c;所以想采用另一个官方数据平台进行数据对比核验&#xff0c;企查查数据缺少的则补充&#xff0c;数据一致的保留企查查数据&#xff0c;不一致的进行颜色标注。 …

脱壳之常用的加固样本特征

梆梆加固样本特征 清单文件入口 android:name“com.SecShell.SecShell.ApplicationWrapper” 特征 免费版 meta-data meta-data总结 assets/secData0.jar lib/armeabi/libSecShell.so lib/armeabi/libSecShell-x86.so 梆梆企业版 assets/classes0.jar lib/armeabi-v7a/libD…

第一次运行 Python 项目,使用 python-pptx 提取 ppt 中的文字和图片

人工智能时代&#xff0c;最需要学习的编程语言是&#xff1a;python 。笔者是个 python 小白&#xff0c;昨天花了两个小时&#xff0c;第一次成功运行起来 python 项目 。 项目是 powerpoint-extractor &#xff0c;可以将 ppt 文件中的图片提取出来&#xff0c;并输出到固定…

Windows安装tomcat,以服务的方式管理,如何设置虚拟内存

之前工作中&#xff0c;部署tomcat都是使用Linux服务器&#xff0c;最近遇到个客户&#xff0c;提供的服务器是Windows server&#xff0c;并且需要通过服务的方式管理tomcat&#xff1b;以自己多年的码农经验&#xff0c;感觉应该没有问题&#xff0c;结果啪啪打脸了&#xf…

双向BFS

P1032 [NOIP2002 提高组] 字串变换 - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 方法学自于B19 双向BFS 字串变换_哔哩哔哩_bilibili #include<iostream> #include<algorithm> #include<cstdio> #include<queue> #include<map> using namesp…

Adaboost集成学习 | Matlab实现基于BiLSTM-Adaboost双向长短期记忆神经网络结合Adaboost集成学习时间序列预测(股票价格预测)

目录 效果一览基本介绍模型设计程序设计参考资料效果一览 基本介绍 Matlab实现基于BiLSTM-Adaboost双向长短期记忆神经网络结合Adaboost集成学习时间序列预测(股票价格预测) 模型设计 股票价格预测是一个具有挑战性的时间序列预测问题,可以使用深度学习模型如双向长短期记忆…

python爬虫之selenium4使用(万字讲解)

文章目录 一、前言二、selenium的介绍1、优点&#xff1a;2、缺点&#xff1a; 三、selenium环境搭建1、安装python模块2、selenium4新特性3、安装驱动WebDriver驱动选择驱动安装和测试 基础操作1、属性和方法2、单个元素定位通过id定位通过class_name定位一个元素通过xpath定位…

【OJ】动归练习五之子组串

个人主页 &#xff1a; zxctscl 如有转载请先通知 题目 1. 53. 最大子数组和1.1 分析1.2 代码 2. 918. 环形子数组的最大和2.1 分析2.2 代码 3. 152. 乘积最大子数组3.1 分析3.2 代码 4. 1567. 乘积为正数的最长子数组长度4.1 分析4.2 代码 1. 53. 最大子数组和 1.1 分析 一、…

密码学基础-对称密码/公钥密码/混合密码系统 详解

密码学基础-对称密码/公钥密码 加解密说明1.加密解密必要因素加密安全性说明 什么是对称密码图示说明对称密码详解什么是DES?举例说明 什么是3DES什么是AES? 公钥密码什么是RSA? 对称密钥和公钥密码优缺点对比对称密码对称密码算法总结对称密码存在的问题? 公钥密码公钥密码…