Flink - souce算子

水善利万物而不争,处众人之所恶,故几于道💦

目录

  1. 从Java的集合中读取数据
  2. 从本地文件中读取数据
  3. 从HDFS中读取数据
  4. 从Socket中读取数据
  5. 从Kafka中读取数据
  6. 自定义Source

官方文档 - Flink1.13

在这里插入图片描述


1. 从Java的集合中读取数据

fromCollection(waterSensors)

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);List<WaterSensor> waterSensors = Arrays.asList(new WaterSensor("ws_001", 1577844001L, 45),new WaterSensor("ws_002", 1577844015L, 43),new WaterSensor("ws_003", 1577844020L, 42));env.fromCollection(waterSensors).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

2. 从本地文件中读取数据

readTextFile(“input/words.txt”),支持相对路径和绝对路径

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.readTextFile("input/words.txt").print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}

运行结果:
在这里插入图片描述

3. 从HDFS中读取数据

readTextFile(“hdfs://hadoop101:8020/flink/data/words.txt”)

要先在pom文件中添加hadoop-client依赖:

<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version>
</dependency>
public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.readTextFile("hdfs://hadoop101:8020/flink/data/words.txt").print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

4. 从Socket中读取数据

socketTextStream(“hadoop101”,9999),这个输入源不支持多个并行度。

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);//从端口中读数据,  windows中 nc -lp 9999     Linux nc -lk 9999env.socketTextStream("hadoop101",9999).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

5. 从Kafka中读取数据

addSource(new FlinkKafkaConsumer<>(“flink_source_kafka”,new SimpleStringSchema(),properties))

第一个参数是topic,

第二个参数是序列化器,序列化器就是在Kafka和flink之间转换数据 - 官方注释:The de-/serializer used to convert between Kafka’s byte messages and Flink’s objects.(反-序列化程序用于在Kafka的字节消息和Flink的对象之间进行转换。)

第三个参数是Kafka的配置。

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);Properties properties = new Properties();// 设置集群地址properties.setProperty("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092");// 设置所属消费者组properties.setProperty("group.id", "flink_consumer_group");env.addSource(new FlinkKafkaConsumer<>("flink_source_kafka",new SimpleStringSchema(),properties)).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

6. 自定义Source

addSource(new XXXX())

  大多数情况下,前面的数据源已经能够满足需要,但是难免会存在特殊情况的场合,所以flink也提供了能自定义数据源的方式.

public class Flink06_myDefDataSource {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.addSource(new RandomWatersensor()).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}

  自定义数据源需要定义一个类,然后实现SourceFunction接口,然后实现其中的两个方法,runcancel,run方法包含具体读数据的逻辑,当调用cancel方法的时候应该可以让run方法中的读数据逻辑停止

public class RandomWatersensor implements SourceFunction<WaterSensor> {private Boolean running = true;@Overridepublic void run(SourceContext<WaterSensor> sourceContext) throws Exception {Random random = new Random();while (running){sourceContext.collect(new WaterSensor("sensor" + random.nextInt(50),Calendar.getInstance().getTimeInMillis(),random.nextInt(100)));Thread.sleep(1000);}}/*** 大多数的source在run方法内部都会有一个while循环,* 当调用这个方法的时候, 应该可以让run方法中的while循环结束*/@Overridepublic void cancel() {running = false;}}

运行结果:
在这里插入图片描述


demo2 - 自定义从socket中读取数据
public class Flink04_Source_Custom {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new MySource("hadoop102", 9999)).print();env.execute();}public static class MySource implements SourceFunction<WaterSensor> {private String host;private int port;private volatile boolean isRunning = true;private Socket socket;public MySource(String host, int port) {this.host = host;this.port = port;}@Overridepublic void run(SourceContext<WaterSensor> ctx) throws Exception {// 实现一个从socket读取数据的sourcesocket = new Socket(host, port);BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));String line = null;while (isRunning && (line = reader.readLine()) != null) {String[] split = line.split(",");ctx.collect(new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])));}}/*** 大多数的source在run方法内部都会有一个while循环,* 当调用这个方法的时候, 应该可以让run方法中的while循环结束*/@Overridepublic void cancel() {isRunning = false;try {socket.close();} catch (IOException e) {e.printStackTrace();}}}
}
/*
sensor_1,1607527992000,20
sensor_1,1607527993000,40
sensor_1,1607527994000,50*/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/15207.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue 3:玩一下web前端技术(一)

前言 本章内容为VUE前端环境搭建与相关前端技术讨论。 下一篇文章地址&#xff1a; Vue 3&#xff1a;玩一下web前端技术&#xff08;二&#xff09;_Lion King的博客-CSDN博客 一、环境搭建 1. 安装Node.js Vue是基于Node.js的&#xff0c;因此首先需要安装Node.js。官网…

缓存数据同步技术Canal

说明&#xff1a;缓存数据同步&#xff0c;以Redis为例&#xff0c;如何保证从Redis中取出来的数据与MySQL中的一致&#xff1f;在微服务架构下&#xff0c;通常可以用以下两种技术来实现&#xff1a; MQ&#xff1a;在修改数据的同时&#xff0c;发送一个消息修改缓存&#x…

Go Ethereum源码学习笔记 001 Geth Start

Go Ethereum源码学习笔记 前言[Chapter_001] 万物的起点: Geth Start什么是 geth&#xff1f;go-ethereum Codebase 结构 Geth Start前奏: Geth Consolegeth 节点是如何启动的NodeNode的关闭 Ethereum Backend附录 前言 首先读者需要具备Go语言基础&#xff0c;至少要通关菜鸟…

【wsl-windows子系统】安装、启用、禁用以及同时支持docker-desktop和vmware方案

如果你要用docker桌面版&#xff0c;很可能会用到wsl&#xff0c;如果没配置好&#xff0c;很可能wsl镜像会占用C盘很多空间。 前提用管理员身份执行 wsl-windows子系统安装和启用 pushd "%~dp0" dir /b %SystemRoot%\servicing\Packages\*Hyper-V*.mum >hyper…

06. 管理Docker容器数据

目录 1、前言 2、Docker实现数据管理的方式 2.1、数据卷&#xff08;Data Volumes&#xff09; 2.2、数据卷容器&#xff08;Data Volume Containers&#xff09; 3、简单示例 3.1、数据卷示例 3.2、数据卷容器示例 1、前言 在生产环境中使用 Docker&#xff0c;一方面…

211. 添加与搜索单词 - 数据结构设计---------------字典树

211. 添加与搜索单词 - 数据结构设计 原题链接&#xff1a;完成情况&#xff1a;解题思路&#xff1a;参考代码&#xff1a; 原题链接&#xff1a; 211. 添加与搜索单词 - 数据结构设计 https://leetcode.cn/problems/design-add-and-search-words-data-structure/descriptio…

Exadata磁盘损坏导致磁盘组无法mount恢复(oracle一体机磁盘组异常恢复)---惜分飞

Oracle Exadata客户,在换盘过程中,cell节点又一块磁盘损坏,导致datac1磁盘组&#xff08;该磁盘组是normal方式冗余)无法mount Thu Jul 20 22:01:21 2023 SQL> alter diskgroup datac1 mount force NOTE: cache registered group DATAC1 number1 incarn0x0728ad12 NOTE: ca…

【iOS】Frame与Bounds的区别详解

iOS的坐标系 iOS特有的坐标是&#xff0c;是在iOS坐标系的左上角为坐标原点&#xff0c;往右为X正方向&#xff0c;向下为Y正方向。 bounds和frame都是属于CGRect类型的结构体&#xff0c;系统的定义如下&#xff0c;包含一个CGPoint&#xff08;起点&#xff09;和一个CGSiz…

windows使用多账户Git,多远程仓库版本管理

1 清除全局配置 git config --global --list // 看一下是否配置过user.name 和 user.email git config --global --unset user.name // 清除全局用户名 git config --global --unset user.email // 清除全局邮箱 2 本地仓库&#xff0c;每个远程对应的本地仓库目录下执行 $…

求三个球面交点的高效解法

文章目录 一、问题描述二、推导步骤代数法几何法 三、MATLAB代码 一、问题描述 如图&#xff0c;已知三个球面的球心坐标分别为 P 1 ( x 1 , y 1 , z 1 ) , P 2 ( x 2 , y 2 , z 2 ) , P 3 ( x 3 , y 3 , z 3 ) P_1(x_1,y_1,z_1),P_2(x_2,y_2,z_2),P_3(x_3,y_3,z_3) P1​(x1​,…

idea项目依赖全部找不到

目录 1&#xff0c;出错现象2&#xff0c;解决3&#xff0c;其他尝试 1&#xff0c;出错现象 很久没打开的Java项目&#xff0c;打开之后大部分依赖都找不到&#xff0c;出现了所有的含有import语句的文件都会报错和一些注解报红报错&#xff0c;但pom文件中改依赖是确实被引入…

深度学习实践——循环神经网络实践

系列实验 深度学习实践——卷积神经网络实践&#xff1a;裂缝识别 深度学习实践——循环神经网络实践 深度学习实践——模型部署优化实践 深度学习实践——模型推理优化练习 代码可见于&#xff1a; 深度学习实践——循环神经网络实践 0 概况1 架构实现1.1 RNN架构1.1.1 RNN架…

管理类联考——写作——论说文——实战篇——标题篇

角度3——4种材料类型、4个立意对象、5种写作态度 老吕的“1342”&#xff0c;一个标题&#xff0c;三句开头&#xff0c;四层结构&#xff0c;两句结尾。 经过审题立意后&#xff0c;我们要根据我们的立意&#xff0c;确定一个主题&#xff0c;这个主题必须通过文章的标题直接…

【手撕】list

系列文章目录 文章目录 系列文章目录前言list_node<T>&#xff08;节点&#xff09;_list_iterator<T, Ref, Ptr>&#xff08;迭代器&#xff09;成员变量构造函数运算符重载 ReverseIterator<Iterator, Ref, Ptr>&#xff08;反向迭代器&#xff09;List<…

python+django+mysql项目实践一(环境准备)

python项目实践 环境说明: Pycharm 开发环境 Django 前端 MySQL 数据库 Navicat 数据库管理 创建Pycharm项目 安装Django 在pycharm文件—设置进行安装 新建Django项目 注意项目创建目录 项目默认目录文件说明: __init__.py asgi.py 【异步接受网络…

机器学习--课后作业--hw1

机器学习(课后作业–hw1) 本篇文章全文参考这篇blog 网上找了很多教程&#xff0c;这个是相对来说清楚的&#xff0c;代码可能是一模一样&#xff0c;只是进行了一些微调&#xff0c;但是一定要理解这个模型具体的处理方法&#xff0c;这个模型我认为最巧妙的它对于数据的处理…

Linux新手小程序——进度条

前言 目录 前言 需要先了解 1.\r和\n 2.缓冲区 一.理解字符的含义&#xff1a; 学习c语言时&#xff0c;我们可以粗略把字符分为可显字符和控制字符. 在按回车换到下一行开始的操作时&#xff0c;实际上是进行了两个操作&#xff1a;1.让光标跳到下一行&#xff08;只…

Spring注解开发,bean的作用范围及生命周期、Spring注解开发依赖注入

&#x1f40c;个人主页&#xff1a; &#x1f40c; 叶落闲庭 &#x1f4a8;我的专栏&#xff1a;&#x1f4a8; c语言 数据结构 javaweb 石可破也&#xff0c;而不可夺坚&#xff1b;丹可磨也&#xff0c;而不可夺赤。 Spring注解开发 一、注解开发定义Bean二、纯注解开发Bean三…

常见的几种排序

&#x1f436;博主主页&#xff1a;ᰔᩚ. 一怀明月ꦿ ❤️‍&#x1f525;专栏系列&#xff1a;线性代数&#xff0c;C初学者入门训练&#xff0c;题解C&#xff0c;C的使用文章&#xff0c;「初学」C &#x1f525;座右铭&#xff1a;“不要等到什么都没有了&#xff0c;才下…

线程属性——线程分离应用

文章目录 相关函数初始化释放线程属性的资源获取线程分离的状态属性设置线程分离的状态属性获取线程的栈的大小线程分离应用 相关函数 可以通过man pthread_attr_然后按两次table键查询和属性相关的函数 初始化 释放线程属性的资源 获取线程分离的状态属性 设置线程分离的状…