大数据学习之Flink算子、了解(Source)源算子(基础篇二)

Source源算子(基础篇二)


目录

Source源算子(基础篇二)

二、源算子(source)

1. 准备工作

2.从集合中读取数据

可以使用代码中的fromCollection()方法直接读取列表

也可以使用代码中的fromElements()方法直接列出数据获取

3. 从文件中读取数据

说明:

4. 从Socket读取数据

(1)编写StreamWordCount

(2)在 Linux 环境的主机bigdata1 上,执行下列命令,发送数据进行测试:

(3)启动 StreamWordCount 程序

(4)从 bigdata1 发送数据:

(5)看控制台的输出结果

5.从Kafka读取数据

6.自定义源算子(source)

7.Flink支持的数据类型


二、源算子(source)

Flink 可以从各种来源获取数据,然后构建 DataStream 进行转换处理。一般将数据的输入 来源称为数据源,而读取数据的算子就是源算子(Source)。所以,Source 就是我们整个处理 程序的输入端。

Flink 代码中通用的添加 Source 的方式,是调用执行环境的 addSource()方法:

//通过调用 addSource()方法可以获取 DataStream 对象
val stream = env.addSource(...)

方法传入一个对象参数,需要实现 SourceFunction 接口,返回一个 DataStream。

1. 准备工作

case class Event(user: String, url: String, timestamp: Long)

2.从集合中读取数据

  • 最简单的读取数据的方式,就是在代码中直接创建一个集合,然后调用执行环境的 fromCollection 方法进行读取。
  • 这相当于将数据临时存储到内存中,形成特殊的数据结构后, 作为数据源使用,一般用于测试。
import org.apache.flink.streaming.api.scala._case class Event(user: String, url: String, timestamp: Long)object SourceCollection {def main(args: Array[String]): Unit = {//获取流执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//设置并行度(并行任务的数量)为1env.setParallelism(1)// 创建包含点击事件的列表// 点击操作中包含两个事件val clicks = List(Event("Mary", "/.home", 1000L), Event("Bob", "/.cart", 2000L))//将列表作为流输出//把clicks作为数据流val stream = env.fromCollection(clicks)//fromElements从给定的元素集合中创建一个DataStreamval stream1 = env.fromElements(Event("zhangsan","/.opt",1000L),Event("lisi","/.opt",2000L))stream.print("stream")stream1.print("stream1")env.execute()}
}

可以使用代码中的fromCollection()方法直接读取列表

也可以使用代码中的fromElements()方法直接列出数据获取

3. 从文件中读取数据

真正的实际应用中,自然不会直接将数据写在代码中。通常情况下,我们会从存储介质中 获取数据,一个比较常见的方式就是读取日志文件。这也是批处理中最常见的读取方式。

val stream = env.readTextFile("input/words.txt")
说明:
  • 参数可以是文件,可以是目录

  • 可以是绝对路径,也可以是相对路径

  • 相对路径是从系统属性 user.dir 获取路径:在 IDEA 下是 project 的根目录, standalone 模式下是集群节点根目录;

    • 系统属性 user.dir:这是一个Java系统属性,它表示用户当前的工作目录。在很多应用中,它通常被用作参考路径。

    • IDEA下是project的根目录:当你在IDEA中打开一个项目时,项目的根目录通常是IDEA的工作目录。相对路径就是基于这个根目录来确定的。

    • standalone模式下是集群节点根目录:如Hadoop分布式计算系统中的独立模式(standalone mode)。在这种模式下,路径可能是相对于集群节点的根目录。

  • 也可以从 HDFS 目录下读取, 使用路径 hdfs://...

    • 前提要在pom文件中添加hadoop相关依赖

4. 从Socket读取数据

不论从集合还是文件,我们读取的其实都是有界数据。在流处理的场景中,数据往往是无 界的。一个简单的例子,就是我们之前用到的读取 socket 文本流。这种方式由于吞吐量小、 稳定性较差,一般也是用于测试。

//通过主机名和端口号读取socket文本流val linDs = env.socketTextStream("bigdata1",7777)

具体实现案例:

(1)编写StreamWordCount

import org.apache.flink.streaming.api.scala._object StreamWordCount {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//通过主机名和端口号读取socket文本流val linDs = env.socketTextStream("bigdata1",7777)//进行转换计算val result = linDs.flatMap(data => data.split(" ")) //用空格切分字符串.map((_,1)) //切分后的字符串转换为一个元组.keyBy(_._1) //使用元组的第一个字段进行分组.sum(1) //分组后的数据的第二个字段进行累加//打印计算结果result.print()env.execute()}
}

(2)在 Linux 环境的主机bigdata1 上,执行下列命令,发送数据进行测试:

$ nc -lk 7777

(3)启动 StreamWordCount 程序

我们会发现程序启动之后没有任何输出、也不会退出。这是正常的——因为 Flink 的流处 理是事件驱动的,当前程序会一直处于监听状态,只有接收到数据才会执行任务、输出统计结果。

(4)从 bigdata1 发送数据:

hello flink
hello world
hello scala

(5)看控制台的输出结果

5.从Kafka读取数据

Kafka 作为分布式消息传输队列,是一个高吞吐、易于扩展的消息系统。

而消息队列的传输方式,恰恰和流处理是完全一致的。

所以可以说 Kafka 和 Flink 天生一对,是当前处理流式数据的双子星。

在如今的实时流处理应用中,由 Kafka 进行数据的收集和传输,Flink 进行分析计算,这样的架构已经成为众多企业的首选

调用 env.addSource(),传入 FlinkKafkaConsumer 的对象实例就可以了。

创建 FlinkKafkaConsumer 时需要传入三个参数:

  • 第一个参数 topic,定义了从哪些主题中读取数据。可以是一个 topic,也可以是 topic 列表,还可以是匹配所有想要读取的 topic 的正则表达式。当从多个 topic 中读取数据 时,Kafka 连接器将会处理所有 topic 的分区,将这些分区的数据放到一条数据流中 去。
  • 第二个参数是一个 DeserializationSchema 或者 KeyedDeserializationSchema。Kafka 消 息被存储为原始的字节数据,所以需要反序列化成 Java 或者 Scala 对象。上面代码中 53 使用的 SimpleStringSchema,是一个内置的 DeserializationSchema,它只是将字节数 组简单地反序列化成字符串。DeserializationSchema 和 KeyedDeserializationSchema 是 公共接口,所以我们也可以自定义反序列化逻辑。
  • 第三个参数是一个 Properties 对象,设置了 Kafka 客户端的一些属性。
更新中...

6.自定义源算子(source)

接下来我们创建一个自定义的数据源,实现 SourceFunction 接口。主要重写两个关键方法: run()和 cancel()。

  • run()方法:使用运行时上下文对象(SourceContext)向下游发送数据;
  • cancel()方法:通过标识位控制退出循环,来达到中断数据源的效果。

7.Flink支持的数据类型

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/644382.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

北斗短报文DTU 北斗通信DTU无线数传终端

北斗是我国自主建设的卫星导航系统,被广泛应用于全球定位、导航和时间同步等领域。随着物联网的迅猛发展,北斗短报文DTU作为物联网连接的关键技术,成为了各行各业的热门话题。 ** 一、北斗短报文DTU的概念与特点 **   北斗短报文DTU(Data…

二叉树知识

提示:文章 文章目录 前言一、背景二、 2.1 2.2 总结 前言 前期疑问: 本文目标: 一、背景 二、二叉树初始知识 题目 源于做的一道牛课题 若一颗完全二叉树中某节点无左孩子,则该节点是 A、高度为1的节点 B、高度为2的节点…

Redis持久化和集群架构

目录 Redis持久化 RDB快照(snapshot) RDB优点 RDB缺点 RDB的触发机制 AOF持久化 AOF文件重写 AOF触发机制 混合模式 Redis主从架构 Redis哨兵高可用架构 Redis Cluster架构 槽位定位算法 跳转重定位 Redis集群节点间的通信机制 Redis持久化…

在 MATLAB 中注释多行

使用 MATLAB 中的注释块注释多行代码 要注释一行或两行代码,我们可以使用%字符来完成。但是,如果我们必须注释多行代码,则此方法将花费大量时间。我们可以使用注释块来注释多行代码,而不是使用%来注释多行代码。写在该块中的任何…

【数学建模】综合评价方法

文章目录 综合评价的基本理论和数据预处理综合评价的基本概念综合评价体系的构建综合指标的预处理方法评价指标预处理示例 常用的综合评价数学模型线性加权综合评价模型TOPSIS法灰色关联度分析熵值法秩和比(RSR)法综合评价示例 综合评价的基本理论和数据…

【学网攻】 第(3)节 -- 交换机配置聚合端口

文章目录 【学网攻】 第(1)节 -- 认识网络【学网攻】 第(2)节 -- 交换机认识及使用 前言 网络已经成为了我们生活中不可或缺的一部分,它连接了世界各地的人们,让信息和资源得以自由流动。随着互联网的发展,我们可以通过网络学习、工作、娱乐…

精品基于Uniapp+springboot自习室预约系统App教室阅览室

《[含文档PPT源码等]精品基于Uniappspringboot自习室预约系统App》该项目含有源码、文档、PPT、配套开发软件、软件安装教程、项目发布教程、包运行成功! 软件开发环境及开发工具: 开发语言:Java 后台框架:springboot、ssm 安…

【数据库连接】连接sqlite3报错:go-sqlite3 requires cgo to work. This is a stub

报错信息 register db Ping default, Binary was compiled with ‘CGO_ENABLED0’, go-sqlite3 requires cgo to work. This is a stubWindows解决办法 新建环境变量 新报错 Failed to build the application: # runtime/cgo cgo: C compiler “gcc” not found: exec: “gc…

IaC基础设施即代码:Terraform 连接 tencentcloud COS 实现多资源管理

目录 一、实验 1.环境 2.Terraform 连接 tencentcloud 腾讯云COS 3.申请VPC专有网络资源 4.申请安全组资源 5.申请CVM资源 6.申请CLB资源 7.申请DNS资源 8.销毁资源 二、问题 1. Terraform申请安全组资源失败 2.Terraform验证云主机资源报错 3. A记录和CNAME的区别 …

阿里云推出第八代企业级实例 g8i:AI 推理性能最高提升 7 倍、可支持 72B 大语言模型

云布道师 1 月 11 日,全球领先的云计算厂商阿里云宣布推出第八代企业级通用计算实例 ECS g8i,这也是国内首款搭载第五代英特尔至强可扩展处理器(代号 EMR)的云计算产品。依托阿里云自研的「飞天CIPU」架构体系,ECS g8…

十一、常用API——爬虫

目录 爬虫本地爬虫和网络爬虫贪婪爬取和非贪婪爬取正则表达式在字符串方法中的使用捕获分组和非捕获分组分组捕获分组非捕获分组 爬虫 本地爬虫和网络爬虫 有如下文本,请按照要求爬取数据。(本地爬虫) Java自从95年问世以来,经历…

记录一些多维数组的方法

文章目录 前言一、获取多维数组的数据二、多维数组自带的方法总结 前言 验证过程中,我们经常会用到多维数组存储数据,本文主要记录一下,如何去获取我们需要的数据,以及多维数组自带的一些方法。 一、获取多维数组的数据 获取多维…

山体滑坡监测预警系统-gnss位移监测站

GNSS山体滑坡位移监测站是一种利用全球导航卫星系统(GNSS)进行山体滑坡位移监测的设备。它通过接收和处理GNSS卫星信号,能够实时监测山体的位移变化,并将数据传输到后端系统进行分析和处理。 GNSS山体滑坡位移监测站具有高精度、…

olap/clickhouse keeper 一致性协调服务

在https://www.yuque.com/treblez/qksu6c/ahgvn94c2nh1y34w?singleDoc# 《Redis集群:分布式的less is more》中我提到,无论是啥服务,想要达到操作视角的强一致性,要么使用类似TSO/原子钟的方案,要么有一套一致性协调服务。 click…

Python对Excel文件中不在指定区间内的数据加以去除的方法

本文介绍基于Python语言,读取Excel表格文件,基于我们给定的规则,对其中的数据加以筛选,将不在指定数据范围内的数据剔除,保留符合我们需要的数据的方法。 首先,我们来明确一下本文的具体需求。现有一个Exc…

中国大模型迎来“95后” 百度奖学金发掘百位“未来AI技术领袖”

在人工智能掀起的科技革命和产业变革浪潮下,大模型成为最受关注的研究领域。1月22日,第十一届百度奖学金颁奖典礼在北京举行,来自全球顶尖高校及科研机构的10位“未来AI技术领袖”脱颖而出,他们平均年龄仅27岁,其中8人…

【江科大】STM32:DMA转运

DMA 直接存储器存取(协助CPU完成数据转运,可以直接访问32位内部存储器,内存SRAM,程序存储器Flash,寄存器等) DMA可以提供外设和存储器或者存储器和存储器之间的高速数据传输,无须CPU干预&#…

【C++】入门(二)

前言: c基础语法(下) 文章目录 五、引用5.1 引用概念5.2 引用使用规则5.3 常引用5.4 引用的使用场景5.5 引用和指针的区别 六、内联函数6.1 概念6.2 内联函数的特性 七、auto关键字(C11)7.1 概念7.2 使用规则7.3 用于f…

社区分享|百果园选择DataEase搭档蜜蜂微搭实现企业数据应用一体化

百果园,全称为深圳百果园实业(集团)股份有限公司,2001年12月成立于深圳,2002年开出中国第一家水果专卖店。截至2022年11月,百果园全国门店数量超过5600家,遍布全国140多个城市,消费会…

5118优惠码vip、svip、专业版和旗舰版使用yhm666

5118大数据平台会员优惠码【yhm666】,结算时勾选“使用优惠码”,然后在优惠码窗口中输入yhm666,然后点确定即可享受特价会员价格。阿腾云atengyun.com分享如下图: 5118会员优惠码【yhm666】 5118会员价格和使用优惠码之后的价格对…