Flink-Source的使用

Data Sources 是什么呢?就字面意思其实就可以知道:数据来源

Flink 做为一款流式计算框架,它可用来做批处理,也可以用来做流处理,这个 Data Sources 就是数据的来源地。

flink在/流处理中常见的source主要有两大类。

预定义Source

基于本地集合的source(Collection-based-source)

基于文件的source(File-based-source)

基于网络套接字(socketTextStream)

自定义Source

预定义Source演示 

Collection [测试]--本地集合Source

在flink最常见的创建DataStream方式有四种:

l 使用env.fromElements(),这种方式也支持Tuple,自定义对象等复合形式。

注意:类型要一致,不一致可以用Object接收,但是使用会报错,比如:env.fromElements("haha", 1);

源码注释中有写:

|使用env.fromCollection(),这种方式支持多种Collection的具体类型,如List,Set,Queue

l 使用env.generateSequence()方法创建基于Sequence的DataStream --已经废弃了

l 使用env.fromSequence()方法创建基于开始和结束的DataStream

一般用于学习测试时编造数据时使用

1.env.fromElements(可变参数);

2.env.fromColletion(各种集合);

3.env.fromSequence(开始,结束);

package com.bigdata.source;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;public class _01YuDingYiSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 各种获取数据的SourceDataStreamSource<String> dataStreamSource = env.fromElements("hello world txt", "hello nihao kongniqiwa");dataStreamSource.print();// 演示一个错误的//DataStreamSource<Object> dataStreamSource2 = env.fromElements("hello", 1,3.0f);//dataStreamSource2.print();DataStreamSource<Tuple2<String, Integer>> elements = env.fromElements(Tuple2.of("张三", 18),Tuple2.of("lisi", 18),Tuple2.of("wangwu", 18));elements.print();// 有一个方法,可以直接将数组变为集合  复习一下数组和集合以及一些非常常见的APIString[] arr = {"hello","world"};System.out.println(arr.length);System.out.println(Arrays.toString(arr));List<String> list = Arrays.asList(arr);System.out.println(list);env.fromElements(Arrays.asList(arr),Arrays.asList(arr),Arrays.asList(arr)).print();// 第二种加载数据的方式// Collection 的子接口只有 Set 和 ListArrayList<String> list1 = new ArrayList<>();list1.add("python");list1.add("scala");list1.add("java");DataStreamSource<String> ds1 = env.fromCollection(list1);DataStreamSource<String> ds2 = env.fromCollection(Arrays.asList(arr));// 第三种DataStreamSource<Long> ds3 = env.fromSequence(1, 100);ds3.print();// execute 下面的代码不运行,所以,这句话要放在最后。env.execute("获取预定义的Source");}
}

本地文件的案例:

package com.bigdata.source;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;public class _02YuDingYiSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取并行度System.out.println(env.getParallelism());// 讲第二种Source File类型的// 给了一个相对路径,说路径不对,老闫非要写,我咋办?// 相对路径,转绝对路径File file = new File("datas/wc.txt");File file2 = new File("./");System.out.println(file.getAbsoluteFile());System.out.println(file2.getAbsoluteFile());DataStreamSource<String> ds1 = env.readTextFile("datas/wc.txt");ds1.print();// 还可以获取hdfs路径上的数据DataStreamSource<String> ds2 = env.readTextFile("hdfs://bigdata01:9820/home/a.txt");ds2.print();// execute 下面的代码不运行,所以,这句话要放在最后。env.execute("获取预定义的Source");}
}

Socket [测试] 

socketTextStream(String hostname, int port) 方法是一个非并行的Source,该方法需要传入两个参数,第一个是指定的IP地址或主机名,第二个是端口号,即从指定的Socket读取数据创建DataStream。该方法还有多个重载的方法,其中一个是socketTextStream(String hostname, int port, String delimiter, long maxRetry),这个重载的方法可以指定行分隔符和最大重新连接次数。这两个参数,默认行分隔符是”\n”,最大重新连接次数为0。

提示:

如果使用socketTextStream读取数据,在启动Flink程序之前,必须先启动一个Socket服务,为了方便,Mac或Linux用户可以在命令行终端输入nc -lk 8888启动一个Socket服务并在命令行中向该Socket服务发送数据。Windows用户可以在百度中搜索windows安装netcat命令。

使用nc 进行数据的发送

yum install -y nc   
nc -lk 8888   --向8888端口发送消息,这个命令先运行,如果先运行java程序,会报错!

如果是windows平台:nc -lp 8888 

代码演示:

//socketTextStream创建的DataStream,不论怎样,并行度永远是1
public class StreamSocketSource {public static void main(String[] args) throws Exception {//local模式默认的并行度是当前机器的逻辑核的数量StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();int parallelism0 = env.getParallelism();System.out.println("执行环境默认的并行度:" + parallelism0);DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);//获取DataStream的并行度int parallelism = lines.getParallelism();System.out.println("SocketSource的并行度:" + parallelism);SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> collector) throws Exception {String[] words = line.split(" ");for (String word : words) {collector.collect(word);}}});int parallelism2 = words.getParallelism();System.out.println("调用完FlatMap后DataStream的并行度:" + parallelism2);words.print();env.execute();}
}

以下用于演示:统计socket中的 单词数量,体会流式计算的魅力!

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class SourceDemo02_Socket {public static void main(String[] args) throws Exception {//TODO 1.env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//TODO 2.source-加载数据DataStream<String> socketDS = env.socketTextStream("bigdata01", 8889);//TODO 3.transformation-数据转换处理//3.1对每一行数据进行分割并压扁DataStream<String> wordsDS = socketDS.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] words = value.split(" ");for (String word : words) {out.collect(word);}}});//3.2每个单词记为<单词,1>DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}});//3.3分组KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordAndOneDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}});//3.4聚合SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedDS.sum(1);//TODO 4.sink-数据输出result.print();//TODO 5.execute-执行env.execute();}
}

自定义数据源

SourceFunction:非并行数据源(并行度只能=1) --接口

RichSourceFunction:多功能非并行数据源(并行度只能=1) --类

ParallelSourceFunction:并行数据源(并行度能够>=1) --接口

RichParallelSourceFunction:多功能并行数据源(并行度能够>=1) --类 【建议使用的】

package com.bigdata.day02;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Random;
import java.util.UUID;/*** 需求: 每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)* 要求:* - 随机生成订单ID(UUID)* - 随机生成用户ID(0-2)* - 随机生成订单金额(0-100)* - 时间戳为当前系统时间*/@Data  // set get toString
@AllArgsConstructor
@NoArgsConstructor
class OrderInfo{private String orderId;private int uid;private int money;private long timeStamp;
}
// class MySource extends RichSourceFunction<OrderInfo> {
//class MySource extends RichParallelSourceFunction<OrderInfo> {
class MySource implements SourceFunction<OrderInfo> {boolean flag = true;@Overridepublic void run(SourceContext ctx) throws Exception {// 源源不断的产生数据Random random = new Random();while(flag){OrderInfo orderInfo = new OrderInfo();orderInfo.setOrderId(UUID.randomUUID().toString());orderInfo.setUid(random.nextInt(3));orderInfo.setMoney(random.nextInt(101));orderInfo.setTimeStamp(System.currentTimeMillis());ctx.collect(orderInfo);Thread.sleep(1000);// 间隔1s}}// source 停止之前需要干点啥@Overridepublic void cancel() {flag = false;}
}
public class CustomSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 将自定义的数据源放入到env中DataStreamSource dataStreamSource = env.addSource(new MySource())/*.setParallelism(1)*/;System.out.println(dataStreamSource.getParallelism());dataStreamSource.print();env.execute();}}

 通过ParallelSourceFunction创建可并行Source

/*** 自定义多并行度Source*/
public class CustomerSourceWithParallelDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> mySource = env.addSource(new MySource()).setParallelism(6);mySource.print();env.execute();}public static class MySource implements ParallelSourceFunction<String> {@Overridepublic void run(SourceContext<String> ctx) throws Exception {ctx.collect(UUID.randomUUID().toString());/*如果不设置无限循环可以看出,设置了多少并行度就打印出多少条数据*/}@Overridepublic void cancel() {}}
}

如果代码换成ParallelSourceFunction,每次生成12个数据,假如是12核数的话。

总结:Rich富函数总结 ctrl + o

 Rich 类型的Source可以比非Rich的多出有:
    - open方法,实例化的时候会执行一次,多个并行度会执行多次的哦(因为是多个实例了)
    - close方法,销毁实例的时候会执行一次,多个并行度会执行多次的哦
    - getRuntimeContext 方法可以获得当前的Runtime对象(底层API)

Kafka Source --从kafka中读取数据

https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/kafka/

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version>
</dependency>

创建一个topic1 这个主题:

cd /opt/installs/kafka3/bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 1 --replication-factor 3 --topic topic1通过控制台向topic1发送消息:
bin/kafka-console-producer.sh  --bootstrap-server bigdata01:9092 --topic topic1
package com.bigdata.day02;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class KafkaSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "bigdata01:9092");properties.setProperty("group.id", "g1");FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("topic1",new SimpleStringSchema(),properties);DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);// 以下代码跟flink消费kakfa数据没关系,仅仅是将需求搞的复杂一点而已// 返回true 的数据就保留下来,返回false 直接丢弃dataStreamSource.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String word) throws Exception {// 查看单词中是否包含success 字样return word.contains("success");}}).print();env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/60595.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python开发之Linux

文章目录 1. 基础2. 进阶链接压缩/解压缩 文件权限用户远程操作编辑文件软件安装 1. 基础 # 查看当前目录下文件 ls# 查看当前目录 pwd# 清除界面内容 clear# 切换目录 cd# 创建目录 mkdir# 创建文件 touch 文件 vi 文件# 强制删除 rm -rf # 复制文件 cp 复制文件 复制文件路径…

OceanBase 驱动类获取数据库精确类型 “Oracle|MySQL”

当我们需要获取 DataSource 对象所连接的数据库的数据库名称时&#xff0c;正常是通过如下代码来获取&#xff1a; String databaseName dataSource.getConnection().getMetaData().getDatabaseProductName();我们知道 OceanBase 数据库企业版的租户是区分 Oracle 和 MySQL 两…

element-plus入门教程:Button

一、Button组件概述 Element Plus的Button组件是一个常用的操作按钮&#xff0c;提供了多种类型、尺寸、状态等配置选项&#xff0c;以满足不同的交互需求。 二、安装Element Plus 在Vue 3项目中&#xff0c;可以通过npm或yarn来安装Element Plus。 npm install element-pl…

深入解析 Cron 表达式高级用法:Spring 与 Linux Crontab 的全面对比与实践20241120

深入解析 Cron 表达式高级用法&#xff1a;Spring 与 Linux Crontab 的全面对比与实践 任务调度是后台服务中的重要组成部分&#xff0c;无论是定期数据备份、日志归档还是周期性报表生成&#xff0c;Cron 表达式始终是描述这些任务规则的核心工具。本文将聚焦 Spring Cron 表…

python中的map、split、join函数的作用 => ACM输入输出流

map(func,iter) lst_str ["1", "2", "3"] # 得到lst_num为[1, 2, 3] lst_num list(map(int, lst_str))如果想把一个列表里的所有元素批量地调用某一个函数&#xff0c;并映射得到一个新的列表&#xff08;原列表中元素相对位置不变&#xff0…

【数据结构】七种常用排序总结

一、七种排序及其讲解 以下为七种排序的讲解&#xff1a; 【数据结构】插入排序——直接插入排序 和 希尔排序 【数据结构】选择排序——选择排序 和 堆排序 【数据结构】交换排序——冒泡排序 和 快速排序 【数据结构】归并排序 —— 递归及非递归解决归并排序 二、排序的…

【AI日记】24.11.23 学习谷歌数据分析初级课程-第4课

【AI论文解读】【AI知识点】【AI小项目】【AI战略思考】【AI日记】 核心工作 内容&#xff1a;学习谷歌数据分析初级课程地址&#xff1a;第四课《从脏数据到干净数据的处理》时间&#xff1a;4 小时评估&#xff1a;不错&#xff0c;完成 读书 书名&#xff1a;權力與進步时…

计算机网络(14)ip地址超详解

先看图&#xff1a; 注意看第三列蓝色标注的点不会改变&#xff0c;A类地址第一个比特只会是0&#xff0c;B类是10&#xff0c;C类是110&#xff0c;D类是1110&#xff0c;E类是1111. IPv4地址根据其用途和网络规模的不同&#xff0c;分为五个主要类别&#xff08;A、B、C、D、…

HashMap底层原理

jdk1.8之后hashmap底层结构 jdk1.8之后&#xff0c;是哈希表数据结构&#xff0c;也可以说是数组链表或红黑树&#xff0c;下图是没有添加数据的一个hashmap。 现在开始添加数据&#xff0c;看下面具体步骤 put操作 如下&#xff0c;我们来简单看看hashmap的put源码&#xff…

【GD32】(三) ISP基本使用

0 前言 有一块GD32的板子不知道为啥用着用着就下载不了程序了&#xff0c;没办法&#xff0c;只能另寻他法。作为STM32的平替&#xff0c;GD32的功能和STM32基本是一致的&#xff0c;所以也可以使用ISP来下载程序。于是就开始复活这块板子。 1 BOOT模式 对于熟悉STM32开发的人…

《下载别人的python项目,如何安装 requirements.txt 中的包》

在开发过程中&#xff0c;我们经常会下载别人的开源项目进行学习或二次开发。然而&#xff0c;下载项目后&#xff0c;如何正确安装项目所需的依赖包是一个常见的问题。本文将详细介绍如何使用 requirements.txt 文件来安装项目依赖包&#xff0c;并确保项目在本地环境中正常运…

摄像机视频分析软件下载LiteAIServer视频智能分析平台玩手机打电话检测算法技术的实现

随着科技的不断进步&#xff0c;摄像机视频分析软件的发展已经为我们的生活带来了许多便捷。其中&#xff0c;LiteAIServer视频智能分析平台的玩手机打电话检测算法技术尤为突出&#xff0c;它利用先进的图像处理和人工智能技术&#xff0c;能够自动识别并监控视频中的玩手机或…

鸿蒙多线程开发——sendable共享模块

1、前 言 本篇文章是基于鸿蒙多线程开发——线程间数据通信对象03(sendable)的接续讨论。 2、共享模块 共享模块是进程内只会加载一次的模块&#xff0c;使用"use shared"这一指令来标记一个模块是否为共享模块。 非共享模块在同一线程内只加载一次&#xff0c;在…

推荐几个 VSCode 流程图工具

Visual Studio Code&#xff08;简称VSCode&#xff09;是一个由微软开发的免费、开源的代码编辑器。 VSCode 发布于 2015 年&#xff0c;而且很快就成为开发者社区中广受欢迎的开发工具。 VSCode 可用于 Windows、macOS 和 Linux 等操作系统。 VSCode 拥有一个庞大的扩展市…

学习Prompt Turning

传统的微调因为代价很高&#xff0c;而且一旦权重很大&#xff0c;这种fine 微微的意思是调不动模型的&#xff0c;所以需要这种提示词调 mindnlp直接有 peft config peft_config PromptTuningConfig(task_type“SEQ_CLS”, num_virtual_tokens10) 方便我们进行prompt tunin…

C++ 中的模板特化和偏特化 如何进行模板特化和偏特化

模板特化和偏特化的概念 模板特化&#xff08;Template Specialization&#xff09; 概念&#xff1a;模板特化是指为特定的模板参数&#xff08;或参数组合&#xff09;提供一个特殊的实现。当编译器在实例化模板时&#xff0c;如果遇到与特化版本匹配的参数类型&#xff0c;就…

React中 setState 是同步的还是异步的?调和阶段 setState 干了什么?

React中 setState 是同步的还是异步的 1. React 的 setState 是异步的2. 为什么 setState 在合成事件和生命周期函数中是异步的3. 为什么 setState 在原生事件和定时器中是同步的4. 为什么要这样设计&#xff1f;调和阶段是什么setState在调和阶段干了什么&#xff1f;总结&…

2024 APMCM亚太数学建模C题 - 宠物行业及相关产业的发展分析和策略(详细解题思路)

在当下&#xff0c; 日益发展的时代&#xff0c;宠物的数量应该均为稳步上升&#xff0c;在美国出现了下降的趋势&#xff0c; 中国 2019-2020 年也下降&#xff0c;这部分变化可能与疫情相关。需要对该部分进行必要的解释说明。 问题 1: 基于附件 1 中的数据及您的团队收集的额…

若依-一个请求中返回多个表的信息

背景 主表是点位表关联子表 需要知道对应 合作商的信息关联子表 需要直到对应 区域的信息关联子表 需要直到对应 设备数量 实现的方案 关联实体相关的标签

速盾:海外服务器使用CDN加速有什么优势?

CDN&#xff08;Content Delivery Network&#xff09;是指一种分布式网络架构&#xff0c;将内容分发到全球多个节点服务器上&#xff0c;使用户能够就近获取所需内容。海外服务器使用CDN加速&#xff0c;具有以下几个优势&#xff1a; 提高访问速度&#xff1a;CDN将内容复制…