Flink-源算子-读取数据的几种方式

Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端。
在这里插入图片描述
在Flink1.12以前,旧的添加source的方式,是调用执行环境的addSource()方法:

DataStream<String> stream = env.addSource(...);

方法传入的参数是一个“源函数”(source function),需要实现SourceFunction接口。
从Flink1.12开始,主要使用流批统一的新Source架构:

DataStreamSource<String> stream = env.fromSource()

Flink直接提供了很多预实现的接口,此外还有很多外部连接工具也帮我们实现了对应的Source,通常情况下足以应对我们的实际需求。

从集合中读取数据

最简单的读取数据的方式,就是在代码中直接创建一个Java集合,然后调用执行环境的fromCollection方法进行读取。这相当于将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用,一般用于测试。

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();List<Integer> data = Arrays.asList(1, 22, 3);DataStreamSource<Integer> ds = env.fromCollection(data);stream.print();env.execute();
}

从文件读取数据

真正的实际应用中,自然不会直接将数据写在代码中。通常情况下,我们会从存储介质中获取数据,一个比较常见的方式就是读取日志文件。这也是批处理中最常见的读取方式。

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("input/word.txt")).build();env.fromSource(fileSource,WatermarkStrategy.noWatermarks(),"file").print();env.execute();
}

说明:

  • 参数可以是目录,也可以是文件;还可以从HDFS目录下读取,使用路径hdfs://…;

  • 路径可以是相对路径,也可以是绝对路径;

  • 相对路径是从系统属性user.dir获取路径:idea下是project的根目录,standalone模式下是集群节点根目录;

从socket读取数据

不论从集合还是文件,我们读取的其实都是有界数据。在流处理的场景中,数据往往是无界的。
我们之前用到的读取socket文本流,就是流处理场景。但是这种方式由于吞吐量小、稳定性较差,一般也是用于测试。

DataStream<String> stream = env.socketTextStream("localhost", 7777);

从数据生成器读取数据

Flink从1.11开始提供了一个内置的DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。

  // 如果有n个并行度, 最大值设为a// 将数值 均分成 n份,  a/n ,比如,最大100,并行度2,每个并行度生成50个// 其中一个是 0-49,另一个50-99env.setParallelism(2);/*** 数据生成器Source,四个参数:*     第一个: GeneratorFunction接口,需要实现, 重写map方法, 输入类型固定是Long*     第二个: long类型, 自动生成的数字序列(从0自增)的最大值(小于),达到这个值就停止了*     第三个: 限速策略, 比如 每秒生成几条数据*     第四个: 返回的类型*/
public class DataGeneratorDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataGeneratorSource<String> dataGeneratorSource =new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return "Number:"+value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(10),Types.STRING);env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "datagenerator").print();env.execute();}
}

从Kafka读取数据

Flink官方提供了连接工具flink-connector-kafka,直接帮我们实现了一个消费者FlinkKafkaConsumer,它就是用来读取Kafka数据的SourceFunction。
所以想要以Kafka作为数据源获取数据,我们只需要引入Kafka连接器的依赖。Flink官方提供的是一个通用的Kafka连接器,它会自动跟踪最新版本的Kafka客户端。目前最新版本只支持0.10.0版本以上的Kafka。

public class SourceKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092").setTopics("topic_1").setGroupId("atguigu").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()) .build();DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");stream.print("Kafka");env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/205736.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

微服务2 Docker学习 P42-P60

Docker学习视频https://www.bilibili.com/video/BV1LQ4y127n4?p42&vd_source8665d6da33d4e2277ca40f03210fe53a 文档资料: 链接&#xff1a;https://pan.baidu.com/s/1P_Ag1BYiPaF52EI19A0YRw?pwdd03r 提取码&#xff1a;d03r Docker 其他笔记 服务器容器化-docker(全…

redis-学习笔记(list)

因为 list 可以头插头删, 尾插尾删, 所以其实更像 C 中的 deque (双端队列) ---- 知道就好, 别乱说, 具体底层编码是啥, 俺也不知道(没注意过) 可以通过组合, 把 list 当作队列 / 栈来用 list 的几种底层编码: ziplist(压缩列表) , linkedlist(链表) , quicklist ziplist 就是将…

12-07 周四 Pytorch 使用Visdom 进行可视化

简介 在完成了龙良曲的Pytroch视频课程之后&#xff0c;楼主对于pytroch有了进一步的理解&#xff0c;比如&#xff0c;比之前更加深刻的了解了BP神经网络的反向传播算法&#xff0c;梯度、损失、优化器这些名词更加熟悉。这个博客简要介绍一下在使用Pytorch进行数据可视化的一…

《使用ThinkPHP6开发项目》 - 安装ThinkPHP框架

1.安装ThinkPHP6框架 这里我们使用的是composer安装的安装方式&#xff0c;请确保电脑已经安装了composer&#xff0c;如未安装可查看Composer 安装与使用 | 菜鸟教程 composer create-project topthink/think tp 上面命令安装的是稳定版的&#xff0c;也是最新的稳定版&…

Jquery easyui异步提交表单的两种方式

这篇文章分享一下easyui常用的两种表单异步提交的方式。 目录 第一种&#xff1a;利用ajax提交 $.post() $.ajax() 第二种&#xff1a;使用easyui提供的表单提交方式 首先&#xff0c;准备一个简单的表单&#xff0c;包含三个输入框&#xff0c;在页面引入easyui的js文件。…

探索 HTML 语义化:让你的网页更有意义(下)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

【Linux】进程通信之命名管道mkfifo

1.认识命名管道 匿名管道应用的一个限制就是只能在具有共同祖先&#xff08;具有亲缘关系&#xff09;的进程间通信。如果我们想在不相关的进程之间交换数据&#xff0c;可以使用FIFO文件来做这项工作&#xff0c;它经常被称为命名管道。命名管道是一种特殊类型的文件 2.在命…

时间序列预测实战(二十四)PyTorch实现RNN进行多元和单元预测(附代码+数据集+完整解析)

一、本文介绍 本篇文章给大家带来的是利用我个人编写的架构进行RNN时间序列卷积进行时间序列建模&#xff08;专门为了时间序列领域新人编写的架构&#xff0c;简单且不同于市面上大家用GPT写的代码&#xff09;&#xff0c;包括结果可视化、支持单元预测、多元预测、模型拟合…

【Java代码接口自动化测试】REST Assured接口测试 HTTPClient接口测试

近几年接口自动化变得越来越热门&#xff0c;相对比于UI自动化&#xff0c;接口自动化有一些优势 1.运行比UI更稳定&#xff0c;让BUG更容易定位 2.UI自动化维护成本太高&#xff0c;接口相对低一些 接口测试其实有很多方式&#xff0c;主要有两种&#xff0c;一个是工具&am…

JM中ref_pic_list_modification bug记录

问题描述 今天在用JM对YUV420p编码时,发现编出的码流用ffplay播放花屏,报如下错误: JM的版本时19.1,没有使能B帧,PicOrderCntType设置为2,其它都是encoder.cfg中的默认配置。我用一些码流分析工具播放H264码流正常,用一些播放器播放也都存在花屏,不过大多数播放器都是…

k8s集群部分使用gpu资源的pod出现UnexpectedAdmissionError问题

记录一次排查UnexpectedAdmissionError问题的过程 1. 问题 环境 3master节点N个GPU节点 kubelet版本&#xff1a;v1.19.4 kubernetes版本&#xff1a;v1.19.4 生产环境K8S集群&#xff0c;莫名其妙的出现大量UnexpectedAdmissionError状态的Pod&#xff0c;导致部分任务执…

12.07

#include "mywidget.h"MyWidget::MyWidget(QWidget *parent): QWidget(parent) {//窗口设置//去掉表头this->setWindowFlags(Qt::FramelessWindowHint);//重新设置大小this->resize(800,420);//设置背景颜色this->setStyleSheet("background-color:whi…

【推荐系统】了解推荐系统的生态(重点:推荐算法的主要分类)

【大家好&#xff0c;我是爱干饭的猿&#xff0c;本文重点介绍推荐系统的关键元素和思维模式、推荐算法的主要分类、推荐系统常见的问题、推荐系统效果评测。 后续会继续分享其他重要知识点总结&#xff0c;如果喜欢这篇文章&#xff0c;点个赞&#x1f44d;&#xff0c;关注一…

javaee实验:Spring Boot 整合 Mybatis

目录 MybatisMyBatis 框架简介Mybatis 框架执行流程图映射器 实验目的实验内容实验过程数据库准备项目结构代码实现 实验结果 Mybatis MyBatis 框架简介 Mybatis 的前身是 Apache 的开源框架 iBatis&#xff0c;与 Hibernate 一样是一个 Java 持久层的框 架。Mybatis 的优势在…

使用Python实现的Excel像素画

简介&#xff1a;本项目主要使用python语言&#xff0c;将图片转为 Excel&#xff0c;图片中的每一个像素转化为 Excel 中的每一个单元格。主要使用pillow和xlsxwriter这两个模块。项目使用一个python文件即可。 一&#xff1a;项目功能和流程介绍 项目的主要功能&#xff1a…

Python-封装配置文件

Code [url] baidu http://www.baidu.com[value] send_value 百度[server] ip 220.181.111.188封装的格式可以套用 # 封装,类似函数调用 import configparserclass ReadConfigIni():def __init__(self,filename):self.cf configparser.ConfigParser()self.cf.read(filenam…

Pr项目标准化ProjectNormalizer插件|解决PR剪辑视频在Windows和Mac电脑切换打开pr项目工程文件需要重新链接媒体问题

当我们在 Windows 中打开在 Mac 上剪辑视频的 Premiere Pro 项目文件时&#xff0c;需要重新链接媒体。通常&#xff0c;如果选中“自动重新链接其他人”复选框&#xff0c;媒体将在某种程度上链接在一起。但是&#xff0c;有时这是行不通的&#xff0c;并且可能是一个非常困难…

java获取ip的工具类

java获取ip的工具类 直接上代码 package com.loit.park.common.utils;import org.slf4j.Logger; import org.slf4j.LoggerFactory;import javax.servlet.http.HttpServletRequest; import java.net.InetAddress; import java.net.UnknownHostException;/*** author hanjinqun*…

百面嵌入式专栏(岗位分析)大疆嵌入式工程师【通信/流媒体】

文章目录 一、岗位简介二、解析2.1、网络协议2.2、音视频传输算法2.3、大规模音视频会议或直播系统 三、简历 沉淀、分享、成长&#xff0c;让自己和他人都能有所收获&#xff01;&#x1f604; &#x1f4e2;本篇我们将对大疆嵌入式工程师【通信/流媒体】岗位进行分析 。 一、…

视频封面提取:精准截图,如何从指定时长中提取某一帧图片

在视频制作和分享过程中&#xff0c;一个有吸引力的封面或截图往往能吸引更多的观众点击观看。有时候要在特定的时间段内从视频中提取一帧作为封面或截图。如果每个视频都手动提取的话就会耗费很长时间&#xff0c;那么如何智化能批量提取呢&#xff1f;现在一起来看下云炫AI智…