【极数系列】Flink集成DataSource读取集合数据(07)

文章目录

  • 01 引言
  • 02 简介概述
  • 03 基于集合读取数据
    • 3.1 集合创建数据流
    • 3.2 迭代器创建数据流
    • 3.3 给定对象创建数据流
    • 3.4 迭代并行器创建数据流
    • 3.5 基于时间间隔创建数据流
    • 3.6 自定义数据流
  • 04 源码实战demo
    • 4.1 pom.xml依赖
    • 4.2 创建集合数据流作业
    • 4.3 运行结果日志

01 引言

源码地址,一键下载可用:https://gitee.com/shawsongyue/aurora.git
模块:aurora_flink
主类:FlinkListSourceJob(集合)

02 简介概述

1.Source 是Flink程序从中读取其输入数据的地方。你可以用 StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到你的程序。2.Flink 自带了许多预先实现的 source functions,不过你仍然可以通过实现 SourceFunction 接口编写自定义的非并行 source。3.也可以通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 类编写自定义的并行 sources。

03 基于集合读取数据

3.1 集合创建数据流

fromCollection(Collection)函数
从 Java Java.util.Collection 创建数据流。集合中的所有元素必须属于同一类型

3.2 迭代器创建数据流

fromCollection(Iterator, Class) 
从迭代器创建数据流。class 参数指定迭代器返回元素的数据类型。

3.3 给定对象创建数据流

fromElements(T ...)
从给定的对象序列中创建数据流。所有的对象必须属于同一类型。

3.4 迭代并行器创建数据流

注意!使用迭代器的时候对象必须是实现持久化的,否则报错,详情可以看我的另外一篇文章、

错误:org.apache.flink.api.common.InvalidProgramException: java.util.Arrays$ArrayItr@784c3487 is not serializable

fromParallelCollection(SplittableIterator, Class) 
从迭代器并行创建数据流。class 参数指定迭代器返回元素的数据类型

3.5 基于时间间隔创建数据流

generateSequence 
基于给定间隔内的数字序列并行生成数据流。

3.6 自定义数据流

addSource - 关联一个新的 source function。例如,你可以使用 addSource(new FlinkKafkaConsumer<>(...)) 来从 Apache Kafka 获取数据。更多详细信息见连接器。

04 源码实战demo

4.1 pom.xml依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xsy</groupId><artifactId>aurora_flink</artifactId><version>1.0-SNAPSHOT</version><!--属性设置--><properties><!--java_JDK版本--><java.version>11</java.version><!--maven打包插件--><maven.plugin.version>3.8.1</maven.plugin.version><!--编译编码UTF-8--><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--输出报告编码UTF-8--><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><!--json数据格式处理工具--><fastjson.version>1.2.75</fastjson.version><!--log4j版本--><log4j.version>2.17.1</log4j.version><!--flink版本--><flink.version>1.18.0</flink.version><!--scala版本--><scala.binary.version>2.11</scala.binary.version><!--log4j依赖--><log4j.version>2.17.1</log4j.version></properties><!--通用依赖--><dependencies><!-- json --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!--================================集成外部依赖==========================================--><!--集成日志框架 start--><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version></dependency><!--集成日志框架 end--></dependencies><!--编译打包--><build><finalName>${project.name}</finalName><!--资源文件打包--><resources><resource><directory>src/main/resources</directory></resource><resource><directory>src/main/java</directory><includes><include>**/*.xml</include></includes></resource></resources><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>org.google.code.flindbugs:jar305</exclude><exclude>org.slf4j:*</exclude><excluder>org.apache.logging.log4j:*</excluder></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.xsy.sevenhee.flink.TestStreamJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><!--插件统一管理--><pluginManagement><plugins><!--maven打包插件--><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring.boot.version}</version><configuration><fork>true</fork><finalName>${project.build.finalName}</finalName></configuration><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin><!--编译打包插件--><plugin><artifactId>maven-compiler-plugin</artifactId><version>${maven.plugin.version}</version><configuration><source>${java.version}</source><target>${java.version}</target><encoding>UTF-8</encoding><compilerArgs><arg>-parameters</arg></compilerArgs></configuration></plugin></plugins></pluginManagement></build><!--配置Maven项目中需要使用的远程仓库--><repositories><repository><id>aliyun-repos</id><url>https://maven.aliyun.com/nexus/content/groups/public/</url><snapshots><enabled>false</enabled></snapshots></repository></repositories><!--用来配置maven插件的远程仓库--><pluginRepositories><pluginRepository><id>aliyun-plugin</id><url>https://maven.aliyun.com/nexus/content/groups/public/</url><snapshots><enabled>false</enabled></snapshots></pluginRepository></pluginRepositories></project>

4.2 创建集合数据流作业

注意:Flink根据集群撇嘴可能会启动多个并行度运行,可能导致数据重复处理,可以通过.setParallelism(1)设置为一个平行度运行即可

package com.aurora.source;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.NumberSequenceIterator;
import org.apache.flink.util.SplittableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import javax.sql.DataSource;
import java.util.*;/*** @description flink的list集合source应用* @author 浅夏的猫* @datetime 23:03 2024/1/28
*/
public class FlinkListSourceJob {private static final Logger logger = LoggerFactory.getLogger(FlinkListSourceJob.class);public static void main(String[] args) throws Exception {//1.创建Flink运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.设置Flink运行模式://STREAMING-流模式,BATCH-批模式,AUTOMATIC-自动模式(根据数据源的边界性来决定使用哪种模式)env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);List<String> list = Arrays.asList("测试", "开发", "运维");// 01 从集合创建数据流DataStreamSource<String> dataStreamSource_01 = env.fromCollection(list);// 02 从迭代器创建数据流,这里直接使用list的迭代器会报错,因为没有ArrayList没有进行持久化,需要深入了解的,可以看我的另外一篇文章
//        DataStreamSource<String> dataStreamSource_02 = env.fromCollection(list.iterator(),String.class);// 03 从给定的对象序列中创建数据流DataStreamSource<String> dataStreamSource_03 = env.fromElements("测试", "开发", "运维");// 04 从迭代器并行创建数据流NumberSequenceIterator splittableIterator = new NumberSequenceIterator(1,10);DataStreamSource dataStreamSource_04=env.fromParallelCollection(splittableIterator,Long.TYPE);// 05 基于给定间隔内的数字序列并行生成数据流DataStreamSource<Long> dataStreamSource_05 = env.generateSequence(1, 10);//自定义数据流DataStreamSource<String> dataStreamSource_06 = env.addSource(new SourceFunction<String>() {@Overridepublic void run(SourceContext<String> sourceContext) throws Exception {//自定义你自己的数据来源for (int i = 0; i < 10; i++) {sourceContext.collect("测试数据" + i);}}@Overridepublic void cancel() {}});//5.输出打印dataStreamSource_01.print();
//        dataStreamSource_02.print();dataStreamSource_03.print();dataStreamSource_04.print();dataStreamSource_05.print();dataStreamSource_06.print();//6.启动运行env.execute();}}

4.3 运行结果日志

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/653374.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言-指针的基本知识(下)

四、指针的分类 按指针指向的数据的类型来分 1:字符指针 字符型数据的地址 char *p;//定义了一个字符指针变量&#xff0c;只能存放字符型数据的地址编号 char ch; p &ch; 2&#xff1a;短整型指针 short int *p;//定义了一个短整型的指针变量p&#xff0c…

怎样做好Code Review

Code Review方案 定义 Code Review代码评审是指在软件开发过程中&#xff0c;通过对源代码进行系统性检查的过程。通常的目的是查找各种缺陷&#xff0c;包括代码缺陷、功能实现问题、编码合理性、性能优化等&#xff1b;保证软件总体质量和提高开发者自身水平 code review …

【机器学习】正则化

正则化是防止模型过拟合的方法&#xff0c;它通过对模型的权重进行约束来控制模型的复杂度。 正则化在损失函数中引入模型复杂度指标&#xff0c;利用给W加权值&#xff0c;弱化了数据的噪声&#xff0c;一般不正则化b。 loss(y^,y)&#xff1a;模型中所有参数的损失函数&…

[论文阅读] |RAG评估_Retrieval-Augmented Generation Benchmark

写在前面 检索增强能够有效缓解大模型存在幻觉和知识时效性不足的问题&#xff0c;RAG通常包括文本切分、向量化入库、检索召回和答案生成等基本步骤。近期组里正在探索如何对RAG完整链路进行评估&#xff0c;辅助阶段性优化工作。上周先对评估综述进行了初步的扫描&#xff0…

mkcert的安装和使用,5分学会在本地开启localhost的https访问方式

mkcert官方仓库地址&#xff1a;https://github.com/FiloSottile/mkcert#installation mkcert 是一个简单的工具&#xff0c;用于制作本地信任的开发证书。它不需要配置。 简化我们在本地搭建 https 环境的复杂性&#xff0c;无需操作繁杂的 openssl 实现自签证书了&#xff…

在WebSocket中使用Redis出现空指针异常解决方案

文章目录 在WebSocket中使用Redis1.问题描述2.原因3.解决步骤1.新建一个SpringUtil.java类&#xff0c;通过getBean的方法主动获取实例2.在WebSocketSingleServer.java中导入 在WebSocket中使用Redis 1.问题描述 在controller 和 service中都可以正常使用Redis&#xff0c;在…

xcode安装visionOS Simulator模拟器报错解决方法手动安装方法

手动安装方法&#xff1a; 手动下载visionOS Simulator模拟器地址&#xff1a; https://developer.apple.com/download/all/ 选择 Xcode 版本 sudo xcode-select -s /Applications/Xcode.app # 用 Xcode-beta 的话是&#xff1a; # xcode-select -s /Applications/Xcode-beta…

openssl3.2 - 测试程序的学习

文章目录 openssl3.2 - 测试程序的学习概述笔记openssl3.2 - 测试程序的学习 - 准备openssl测试专用工程的模板openssl3.2 - 测试程序的学习 - test\aborttest.copenssl3.2 - 测试程序的学习 - test\sanitytest.copenssl3.2 - 测试程序的学习 - test\acvp_test.copenssl3.2 - 测…

邻接矩阵、关联矩阵

邻接矩阵&#xff1a; 邻接矩阵是一种用来表示图中顶点间相互连接关系的矩阵。在邻接矩阵中&#xff0c;矩阵的行和列都代表图中的顶点。 对于无权图&#xff0c;如果顶点 i 和顶点 j 之间有一条边&#xff0c;则矩阵中的元素 Aij​&#xff08;位于第 i 行和第 j 列&#xff…

编译Opencv3.3 版本遇到的Cuda版本变更导致:CUDA_nppicom_LIBRARY (ADVANCED)链接找不到的问题根本解法:

前言&#xff1a; Opencv 开源库的使用是必须的&#xff0c;但是&#xff0c;开源项目的特性&#xff0c;造成&#xff0c;版本的依赖性比较复杂&#xff0c; 尤其是针对某一款老硬件的SDK&#xff0c;往往随着某个开源库的使用&#xff0c;导致&#xff0c;无法编译的问题&am…

(bean的创建图)学习Spring的第十天(很重要)

大致框架按如下 第一次细分 bean对象还未创建 操作第一个map 引入BeanFactoryPostProcessor , 即Bean工厂后处理器 , 为Spring很重要的扩展点 BeanFactoryPostProcessor内部的方法 可以对BeaDefinition进行修改 , 也可进行BeanDefinition的注册 ( 原有在xml文件配置的bean…

从零学习Linux操作系统 第二十部分 mariadb数据库的管理

一、对于数据库的基本介绍 1.什么是数据库 数据库就是个高级的表格软件 2.常见数据库 Mysql Oracle mongodb db2 sqlite sqlserver … 3.Mysql (SUN -----> Oracle) 4.mariadb (Mysql的一种&#xff09; 数据库中的常用名词 1.字段 &#xff1a;表格中的表头 2.表 &…

Day01_变量和数据类型(注释,关键字,标识符,数据类型,字面量,变量,常量,进制,计算机存储单位,Java的基本数据类型的存储范围,计算机如何表示数据)

文章目录 JavaSE_Day01 变量和数据类型学习目标1.1 注释&#xff08;*comment*&#xff09;&#xff08;掌握&#xff09;1.2 关键字&#xff08;*keyword*&#xff09;&#xff08;掌握&#xff09;1.3 标识符( identifier)&#xff08;掌握&#xff09;1.3.1 标识符的命名规则…

构造器模式

构造器模式 意图 将一个复杂对象的构建和表示分离&#xff0c;使得相同的构建能创建不同的表示。 解释 案例&#xff1a;想象一个角色扮演游戏的特征生成器。最简单的选择是让计算机为你创建角色。如果你想手动选择特征的细节像职业、性别、头发的颜色等。特征的产生是一个循…

js实现动漫拼图1.0版

文章目录 1 实现效果视频2 功能实现思路3代码实现 1 实现效果视频 拼图1.0 2 功能实现思路 布局忽略&#xff08;小白学前端&#xff0c;不献丑了&#xff09; 左侧拼图格 左侧4*4的拼图小格子 利用表格实现&#xff0c;规划好td的大小&#xff0c;给每个格子加上背景图片&…

Flink问题解决及性能调优-【Flink不同并行度引起sink2es报错问题】

最近需求&#xff0c;仅想提高sink2es的qps&#xff0c;所以仅调节了sink2es的并行度&#xff0c;但在调节不同算子并行度时遇到一些问题&#xff0c;找出问题的根本原因解决问题&#xff0c;并分析整理。 实例代码 --SET table.exec.state.ttl86400s; --24 hour,默认: 0 ms …

MySQL查询—联合查询、子查询

关于表格的创建&#xff0c;请看上一篇文章——MySQL查询—连接查询 1、联合查询&#xff1a;把多次查询的结果合并&#xff0c;形成一共新的查询集。 UNION&#xff0c;UNION ALL 语法&#xff1a; SELECT 字段列表 FROM 表&#xff21;&#xff0e;&#xff0e;&#…

qt-C++笔记之使用信号和槽实现跨类成员变量同步响应

qt-C笔记之使用信号和槽实现跨类成员变量同步响应 —— 杭州 2024-01-24 code review! 文章目录 qt-C笔记之使用信号和槽实现跨类成员变量同步响应1.运行2.main.cpp3.test.pro4.编译 1.运行 2.main.cpp 代码 #include <QCoreApplication> #include <QObject> #…

Linux下Docker搭建部署Typecho博客【详细版】

Linux下Docker搭建部署Typecho博客【详细版】 一、环境准备1.1.准备阿里云服务器【新用户免费使用三个月】1.2.准备远程工具【FinalShell】1.3.系统信息1.4.安装所需软件包1.5.设置docker镜像源1.6.更新yum软件包索引1.7.确认停用selinux 二、安装Docker2.1.安装Docker-Ce2.2.查…

【RTP】webrtc 学习3: webrtc对h264的rtp解包

rtp_rtcp\source\video_rtp_depacketizer_h264.cc【RTP】webrtc 学习2: webrtc对h264的rtp打包 中分析了打包过程的代码,这样再来看解析过程的源码就容易多了:本代码主要基于m79,m98类似。这里注明了jitterbuffer 会再次 做 解析stap-a 变为NAL units解析ParseFuaNalu 第一…