[flink 实时流基础]源算子和转换算子

文章目录

    • 1. 源算子 Source
        • 1. 从集合读
        • 2. 从文件读取
        • 3. 从 socket 读取
        • 4. 从 kafka 读取
        • 5. 从数据生成器读取数据
    • 2. 转换算子
        • 基本转换算子(map/ filter/ flatMap)


1. 源算子 Source

Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端。
image.png
在Flink1.12以前,旧的添加source的方式,是调用执行环境的addSource()方法:
DataStream stream = env.addSource(…);
方法传入的参数是一个“源函数”(source function),需要实现SourceFunction接口。
从Flink1.12开始,主要使用流批统一的新Source架构:
DataStreamSource stream = env.fromSource(…)
Flink直接提供了很多预实现的接口,此外还有很多外部连接工具也帮我们实现了对应的Source,通常情况下足以应对我们的实际需求。

1. 从集合读
    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 1. 从集合读
//        DataStreamSource<Integer> source = env.fromCollection(Arrays.asList(1, 2, 3));// 2. 直接填元素DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4);source.print();env.execute();}
2. 从文件读取
		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency>
    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();FileSource<String> source = FileSource.forRecordStreamFormat(new TextLineInputFormat(),new Path("input/world.txt")).build();env.fromSource(source, WatermarkStrategy.noWatermarks(), "fileSource").print();env.execute();}
3. 从 socket 读取
    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.socketTextStream("localhost", 7777);source.print();env.execute();}

可以使用 nc -l 7777创建一个监听链接的 tcp

4. 从 kafka 读取
		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency>
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092").setTopics("topic_1").setGroupId("atguigu").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()) .build();DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");stream.print("Kafka");env.execute();}
5. 从数据生成器读取数据
		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version></dependency>
 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return "Number:" + value;}}, 10, // 自动生成的数字序列RateLimiterStrategy.perSecond(10), // 限速策略,每秒生成10条Types.STRING // 返回类型);env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "datagenerator").print();env.execute();}

2. 转换算子

数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个DataStream转换为新的DataStream。
image.png

基本转换算子(map/ filter/ flatMap)

map是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素。
image.png
filter转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉。
image.png
flatMap操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。
:::info
消费一个元素,可以产生0到多个元素。
:::
flatMap可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。
image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/780809.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【MySQL】mysql数据库小功能整理,持续更新~

目录 1、把从数据库中查询出的两个字段拼接 2、自定义新字段 1、把从数据库中查询出的两个字段拼接 在ThinkPHP中使用 field 查询数据库字段时&#xff0c;使用数据库自带的CONCAT函数使两个字段拼接成一个新的自定义字段。 示例&#xff1a; 有两个字段 number 和 filenam…

Day55:WEB攻防-XSS跨站CSP策略HttpOnly属性Filter过滤器标签闭合事件触发

目录 XSS跨站-安全防御-CSP XSS跨站-安全防御-HttpOnly XSS跨站-安全防御-XSSFilter(过滤器的意思) 1、无任何过滤 2、实体化 输入框没有 3、全部实体化 利用标签事件 单引号闭合 4、全部实体化 利用标签事件 双引号闭合 5、事件关键字过滤 利用其他标签调用 双引号闭合…

代码随想录训练营第60天 | LeetCode 84.柱状图中最大的矩形、总结

LeetCode 84.柱状图中最大的矩形 文章讲解&#xff1a;代码随想录(programmercarl.com) 视频讲解&#xff1a;单调栈&#xff0c;又一次经典来袭&#xff01; LeetCode&#xff1a;84.柱状图中最大的矩形_哔哩哔哩_bilibili 思路 代码如下&#xff1a; ​​​​​​总结 感…

代码随想录|Day28|贪心03|1005.K次取反后最大化的数组和、134.加油站、135.分发糖果

1005.K次取反后最大化的数组和 思路&#xff1a; 优先取反 绝对值最大的负数如果没有负数&#xff0c;不断取反 绝对值最小的数&#xff0c;直到次数 K 耗尽 取反最小数有一个优化技巧&#xff1a; 如果 K 为偶数&#xff0c;则取反 K 次后&#xff0c;正负不变。如果 K 为奇数…

聊聊java中的CountDownLatch,CyclicBarrier,Semaphore

CountDownLatch&#xff08;倒计时器&#xff09; 是什么&#xff1a; CountDownLatch是Java中的一个同步工具类&#xff0c;它允许一个或多个线程等待其他线程完成操作。 使用场景&#xff1a; 当一个线程需要等待多个其他线程执行完毕后才能继续执行时&#xff0c;可以使用…

ROM-IP

1.原理 通过添加数据文件&#xff0c;使ROM看起来不是易失性存储器&#xff0c; 产生256个数据&#xff0c;每个数据的位宽是8 如果前面为x&#xff0c;后面就是x256-1 2.单端口ROM配置 FPGA内部没有非易失性存储器。调用的ROM和RAM都是用RAM来生成的 3.双端口ROM配置 使用第一…

大学 Python 程序设计实验报告:判断密码是否符合要求

目录&#xff1a; 利用 string 模块判断使用正则表达式判断 密码强度判断&#xff0c;输入一个密码&#xff0c;判断密码是否符合要求。 要求密码长度8-12位&#xff0c;密码中必须包含大写字母、小写字母和数字&#xff0c;不能含有其他符号。 如果符合要求输出"密码符合…

Dubbo 几种方式来传递和使用隐式参数

在 Dubbo 中&#xff0c;隐式参数通常指的是那些不直接作为 RPC 方法参数传递&#xff0c;但需要在 RPC 调用过程中共享或传递的信息。这些信息可能包括用户身份、请求标识、认证令牌等。Dubbo 提供了几种方式来传递和使用这些隐式参数。 以下是使用 Dubbo 隐式参数的一些常见…

马斯克旗下xAI发布Grok-1.5,相比较开源的Grok-1,各项性能大幅提升,接近GPT-4!

本文原文来自DataLearnerAI官方网站&#xff1a;马斯克旗下xAI发布Grok-1.5&#xff0c;相比较开源的Grok-1&#xff0c;各项性能大幅提升&#xff0c;接近GPT-4&#xff01; | 数据学习者官方网站(Datalearner) 继Grok-1开源之后&#xff0c;xAI宣布了Grok-1.5的内测消息&…

C#热门技术应用:跨平台、异步编程与云原生

C#热门技术应用&#xff1a;跨平台、异步编程与云原生 C#&#xff0c;作为微软主导的编程语言&#xff0c;近年来在跨平台应用开发、异步编程以及云原生应用等领域展现出了强大的生命力。这些热门技术的应用&#xff0c;使得C#成为了现代软件开发中不可或缺的一部分。 一、跨平…

手撕算法-跳跃游戏

描述 分析 如果某一个作为 起跳点 的格子可以跳跃的距离是 3&#xff0c;那么表示后面 3 个格子都可以作为 起跳点可以对每一个能作为 起跳点 的格子都尝试跳一次&#xff0c;把 能跳到最远的距离 不断更新如果可以一直跳到最后&#xff0c;就成功了 代码 class Solution {…

07-JavaScript DOM事件

1. 事件 1.1 事件概述 JavaScript 使我们有能力创建动态页面&#xff0c;而事件是可以被 JavaScript 侦测到的行为。 简单理解&#xff1a; 触发--- 响应机制。 网页中的每个元素都可以产生某些可以触发 JavaScript 的事件&#xff0c;例如&#xff0c;我们可以在用户点击某…

【漏洞潜在风险】弹框干扰类风险

弹框干扰风险定义: 游戏过程中&#xff0c;客户端经常会以文字类形式对玩家进行说明和指引&#xff0c;而对于一些更为重要的信息&#xff0c;便会用游戏中的弹框进行强调。由玩家主动触发对其他玩家造成重复弹框进而干扰到正常游戏的都可以称之为弹框干扰类风险。弹框干扰风险…

C++项目——集群聊天服务器项目(六)MySQL模块

Hello&#xff0c;大家好啊&#xff0c;最近比较忙&#xff0c;没来得及更新项目&#xff0c;实在抱歉~今天就恢复更新拉~ 在验证完网络模块与业务模块代码可以正常使用后&#xff0c;需完成的操作是与底层数据库进行交互&#xff0c;为实现各类用户查询、增删业务奠定良好的基…

【笔记】Android U RILJ 中与运营商名称SPN显示相关的日志分析

源码阅读&#xff1a;AOSPXRef 常用日志关键字 Note&#xff1a;">"下发MD&#xff0c;"<"MD上报&#xff0c;[]中的id有请求和返回的对应关系 KEYComment> OPERATOR下发MD&#xff0c;请求运营商信息< OPERATORMD上报运营商注册信息> DA…

Java--队列

无界队列 public class Task implements Runnable, Comparable<Task>{private int priority; // 优先级别public Task(int priority) {this.priority priority;}// 当前对象和其他对象作比较, 当前优先级大就返回-1, 优先级小就返回1, 值越小, 优先级越高.Overridepubl…

【群晖】白群晖如何公网访问

【群晖】白群晖如何公网访问 ——> 点击查看原文 在使用默认配置搭建好的群晖NAS后&#xff0c;我们可以通过内网访问所有的服务。但是&#xff0c;当我们出差或者不在家的时候也想要使用应该怎么办呢&#xff1f; 目前白群提供了两种比较快捷的方式&#xff0c;一种是直接注…

【Python系列】合并配置文件的最佳实践

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

python实现两个Excel表格数据对比、补充、交叉验证

业务背景 业务中需要用到类似企查查一类的数据平台进行数据导出&#xff0c;但企查查数据不一定精准&#xff0c;所以想采用另一个官方数据平台进行数据对比核验&#xff0c;企查查数据缺少的则补充&#xff0c;数据一致的保留企查查数据&#xff0c;不一致的进行颜色标注。 …

脱壳之常用的加固样本特征

梆梆加固样本特征 清单文件入口 android:name“com.SecShell.SecShell.ApplicationWrapper” 特征 免费版 meta-data meta-data总结 assets/secData0.jar lib/armeabi/libSecShell.so lib/armeabi/libSecShell-x86.so 梆梆企业版 assets/classes0.jar lib/armeabi-v7a/libD…