Flink学习之旅:(三)Flink源算子(数据源)

1.Flink数据源

        Flink可以从各种数据源获取数据,然后构建DataStream 进行处理转换。source就是整个数据处理程序的输入端。

数据集合
数据文件
Socket数据
kafka数据
自定义Source

2.案例

2.1.从集合中获取数据

        创建 FlinkSource_List 类,再创建个 Student 类(姓名、年龄、性别三个属性就行,反正测试用)

package com.qiyu;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;/*** @author MR.Liu* @version 1.0* @data 2023-10-18 16:13*/
public class FlinkSource_List {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);ArrayList<Student> clicks = new ArrayList<>();clicks.add(new Student("Mary",25,1));clicks.add(new Student("Bob",26,2));DataStream<Student> stream = env.fromCollection(clicks);stream.print();env.execute();}
}

运行结果:

Student{name='Mary', age=25, sex=1}
Student{name='Bob', age=26, sex=2}

2.2.从文件中读取数据

文件数据:

spark
hello world kafka spark
hadoop spark

package com.qiyu;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author MR.Liu* @version 1.0* @data 2023-10-18 16:31*/
public class FlinkSource_File {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> stream = env.readTextFile("input/words.txt");stream.print();env.execute();}
}

运行结果:(没做任何处理)

spark
hello world kafka spark
hadoop spark

2.3.从Socket中读取数据

package com.qiyu;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author MR.Liu* @version 1.0* @data 2023-10-18 17:41*/
public class FlinkSource_Socket {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取文本流DataStreamSource<String> lineDSS = env.socketTextStream("192.168.220.130",7777);lineDSS.print();env.execute();}
}

运行结果:

服务器上执行:

 nc -lk 7777

疯狂输出

控制台打印结果 

6> hello
7> world

2.4.从Kafka中读取数据

pom.xml 添加Kafka连接依赖

      <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>
package com.qiyu;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;/*** @author MR.Liu* @version 1.0* @data 2023-10-19 10:01*/
public class FlinkSource_Kafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);Properties properties = new Properties();properties.setProperty("bootstrap.servers", "hadoop102:9092");properties.setProperty("group.id", "consumer-group");properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("auto.offset.reset", "latest");DataStreamSource<String> stream = env.addSource(new FlinkKafkaConsumer<String>("clicks", new SimpleStringSchema(), properties));stream.print("Kafka");env.execute();}
}

启动 zk 和kafka

创建topic

bin/kafka-topics.sh --create --bootstrap-server hadoop102:9092 --replication-factor 1 --partitions 1 --topic clicks

生产者、消费者命令

bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092  --topic clicks
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092  --topic clicks --from-beginning

启动生产者命令后疯狂输入 

运行java类,运行结果:和生产者输入的是一样的

Kafka> flinks
Kafka> hadoop
Kafka> hello
Kafka> nihaop

2.5.从自定义Source中读取数据

        大多数情况下,前面几个数据源已经满足需求了。但是遇到特殊情况我们需要自定义的数据源。实现方式如下:

        1.编辑自定义源Source

package com.qiyu;import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Calendar;
import java.util.Random;/*** @author MR.Liu* @version 1.0* @data 2023-10-19 10:37*//**** 主要实现2个方法 run() 和 cancel()*/
public class FlinkSource_Custom implements SourceFunction<Student> {// 声明一个布尔变量,作为控制数据生成的标识位private Boolean running = true;@Overridepublic void run(SourceContext<Student> sourceContext) throws Exception {Random random = new Random(); // 在指定的数据集中随机选取数据String[] name = {"Mary", "Alice", "Bob", "Cary"};int[] sex = {1,2};int age = 0;while (running) {sourceContext.collect(new Student(name[random.nextInt(name.length)],sex[random.nextInt(sex.length)],random.nextInt(100)));// 隔 1 秒生成一个点击事件,方便观测Thread.sleep(1000);}}@Overridepublic void cancel() {running = false;}
}

        2.编写主程序

package com.qiyu;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author MR.Liu* @version 1.0* @data 2023-10-19 10:46*/
public class FlinkSource_Custom2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);
//有了自定义的 source function,调用 addSource 方法DataStreamSource<Student> stream = env.addSource(new FlinkSource_Custom());stream.print("SourceCustom");env.execute();}
}

 运行主程序,运行结果:

SourceCustom> Student{name='Mary', age=1, sex=46}
SourceCustom> Student{name='Cary', age=2, sex=52}
SourceCustom> Student{name='Bob', age=1, sex=14}
SourceCustom> Student{name='Alice', age=1, sex=84}
SourceCustom> Student{name='Alice', age=2, sex=82}
SourceCustom> Student{name='Cary', age=1, sex=28}

.............

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/111202.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PHP 如何查看php函数源码

一、在git找到php对应的版本 找到对应的分支版本可以下载也可以在线直接查看 通过这个地址 https://github.com/php/php-src 二、下面已shuffle函数举例&#xff0c;版本为7.4 找到对应的版本进入 点击ext&#xff0c;这个文件夹里面是存放函数的目录 在文件夹里搜不到stu…

【疯狂Java讲义】Java学习记录(使用jar命令打包)

jar命令 把多个文件打包成一个压缩包——这个压缩包和WinZip的压缩格式是一样的。 区别在于jar压缩的文件默认多一个META-INF的文件夹&#xff0c;该文件夹里包含一个MANIFEST.MF的文件&#xff08;清单&#xff09;。 通常来说&#xff0c;得到的压缩包有3种&#xff08;压缩格…

mac虚拟机安装配置qt遇到的坑

本人的环境大致如下&#xff1a; VMware Workstation 16pro Mac镜像 macOS.Mojave10.14.6 &#xff08;后面在系统中升级到了 Sonoma 14.0&#xff09; Qt5.9.6 Xcode15.0 问题1&#xff1a; 环境都安装好后&#xff0c;qt创建工程一直只有.pro文件&#xff0c;看不到头文件和c…

paddlenlp:社交网络中多模态虚假媒体内容核查(特征篇)

初赛之特征构造 写在前面一、安装paddleOCR二、代码部分三、模型优缺点四、写在最后 写在前面 通过前面两篇文章的介绍&#xff0c;我们可以大致的知道模型用到的特征分为四块&#xff1a;qCap&#xff0c;qImg&#xff0c;captions&#xff0c;imgs。根据这些特征&#xff0c…

springmvc视图格式——模板引擎freemarker输出HTML文本

目录 1. freemarker 介绍创建测试工程2.2.2) 配置文件2.2.3) 创建模型类2.2.4) 创建模板2.2.5) 创建controller2.2.6) 创建启动类2.2.7) 测试 2.3) freemarker基础2.3.1) 基础语法种类2.3.2) 集合指令&#xff08;List和Map&#xff09;2.3.3) if指令2.3.4) 运算符2.3.5) 空值处…

TStor CSP文件存储在大模型训练中的实践

业务背景 大模型作为人工智能领域的重要发展趋势&#xff0c;正在逐渐改变人们的生活和工作方式。随着近年来大模型领域技术的突破&#xff0c;各类语言模型、图像模型、视频模型快速演进&#xff0c;国内外市场也不断涌现出优秀的大模型研究及商业化平台&#xff0c;预期通过…

【LeetCode】48. 旋转图像

1 问题 给定一个 n n 的二维矩阵 matrix 表示一个图像。请你将图像顺时针旋转 90 度。 你必须在 原地 旋转图像&#xff0c;这意味着你需要直接修改输入的二维矩阵。请不要 使用另一个矩阵来旋转图像。 示例 1&#xff1a; 输入&#xff1a;matrix [[1,2,3],[4,5,6],[7,8…

记录Bug:VScode中无法识别万能头文件#include<bits/stdc++.h>

问题&#xff1a; 在VScode中使用万能头文件#include<bits/stdc.h>编写程序时报错&#xff1a;“检测到 #include 错误。请更新 includePath。已为此翻译单元(D:\Code_C\desC。。。。”。但是普通的c语言头文件#include <stdio.h>等可以正常运行。 原因&#xff1…

JS加密/解密那些必须知道的事儿

一直以来&#xff0c;字符串的编码问题对于新手程序员来说&#xff0c;或者平常不太涉猎这方面的程序员来说&#xff0c;是犹如灵异学一样的存在。经常会遇到莫名其妙的编码问题&#xff0c;导致的各种的无法理解的错误。 ​ 今天&#xff0c;本问就来介绍一下作者所知晓的一切…

springboot+html实现密码重置功能

目录 登录注册&#xff1a; 前端&#xff1a; chnangePssword.html 后端&#xff1a; controller: Mapper层&#xff1a; 逻辑&#xff1a; 登录注册&#xff1a; https://blog.csdn.net/m0_67930426/article/details/133849132 前端&#xff1a; 通过点击忘记密码跳转…

VMware——VMware17安装WindowServer2012R2环境(图解版)

目录 一、WindowServer2012R2镜像百度云下载二、安装 一、WindowServer2012R2镜像百度云下载 下载链接&#xff1a;https://pan.baidu.com/s/1TWnSRJTk0ruGNn4YinzIgA 提取码&#xff1a;e7u0 二、安装 打开虚拟机&#xff0c;点击【创建新的虚拟机】&#xff0c;如下图&…

基于SpringCloud实现房产销售平台的设计与实现项目【项目源码+论文说明】

摘要 信息技术的发展推动了管理系统的进步&#xff0c;目前各种行业都积极参与管理系统的建设工作。特别是疫情带来的影响&#xff0c;让传统行业逐渐认识到只有通过在线管理才能继续的发展。房产销售平台是为求租者提供房源必备的平台&#xff0c;如何找到一个好的房源是生活…

如何借助边缘智能网关打造智慧城市便民驿站

智慧城市驿站是一类提供多样化便利服务的新型智能公共设施&#xff0c;通过融合物联网技术、边缘智能技术、新能源技术等&#xff0c;为城市居民整合提供休闲、购物、卫生、广告、安全等公共服务&#xff0c;进一步提升日常生活体验。本篇就为大家介绍如何基于边缘智能网关&…

WebSocket: 实时通信的新维度

介绍&#xff1a; 在现代Web应用程序中&#xff0c;实时通信对于提供即时更新和交互性至关重要。传统的HTTP协议虽然适合请求-响应模式&#xff0c;但对于需要频繁数据交换的场景并不理想。而WebSocket技术的出现填补了这个空白&#xff0c;为Web开发者们带来了一种高效、实时的…

C# Winform编程(6)高级控件

C# Winform编程&#xff08;6&#xff09;高级控件 RadioButton&#xff08;单选框&#xff09;PictureBox&#xff08;图像框&#xff09;TabControl&#xff08;选项卡&#xff09;ProgressBar(进度条)TrackBar(滑动条)ImageList&#xff08;图像列表控件&#xff09;ToolBar…

https证书配置(nginx)

HTTPS 是什么 HTTPS 是一种应用层协议&#xff0c;是一种透过计算机网络进行安全通信的传输协议&#xff0c;HTTPS 经由 HTTP 进行通信&#xff0c;但是在 HTTP 的基础上引入了一个加密层&#xff0c;使用 SSL/TLS 来加密数据包&#xff0c;HTTPS 开发的主要目的&#xff0c;是…

层序中序还原二叉树

题目&#xff1a; 样例&#xff1a; 输入 6 0 2 5 1 4 3 1 2 4 0 5 3 输出 0 2 1 4 5 3 思路&#xff1a; 这道题&#xff0c;核心思想就是 结合 层序遍历的性质&#xff0c;根据 中序来判断左右孩子是否存在。 前中后序的遍历实现&#xff0c;主要都是 递归的形式实现遍历…

【Linux】在Ubuntu下安装Zotero

【Linux】在Ubuntu下安装Zotero 文章目录 【Linux】在Ubuntu下安装Zotero1. Debian InstallationReference 1. Debian Installation 直接使用下面三条语句进行安装即可 wget -qO- https://raw.githubusercontent.com/retorquere/zotero-deb/master/install.sh | sudo bash su…

Fiddler之Replay功能详解

今天就先来看看Fiddler的功能。 Fiddler&#xff0c;最容易看到的就是快捷工具栏中的 Replay 按钮 解释下&#xff1a; Reissue the selected requests. 重发选中的请求 Hold CTRL to reissue unconditionallly. 选中请求按住 CTRL 键&#xff0c;点击Replay时无条件重发选中…

TP5.1 导出excel文件

在 ThinkPHP 5.1 中引入 PHPExcel&#xff08;现在已被官方弃用&#xff0c;推荐使用 PhpSpreadsheet&#xff09;时&#xff0c;可以按照以下步骤进行操作&#xff1a; 在 composer.json 文件中添加 PHPExcel&#xff08;PhpSpreadsheet&#xff09;的依赖项。找到 require 部…