Flink1.17 DataStream API

目录

一.执行环境(Execution Environment)

1.1 创建执行环境

1.2 执行模式

1.3 触发程序执行

二.源算子(Source)

2.1 从集合中读取数据

2.2 从文件读取数据

2.3 从 RabbitMQ 中读取数据

2.4 从数据生成器读取数据 

2.5 Flink支持的数据类型

2.5.1 Flink的类型系统

2.5.2 Flink支持的数据类型

2.5.3 类型提示(Type Hints)

三.转换算子(Transformation)

3.1 基本转换算子(map/ filter/ flatMap)

3.1.1 映射(map)

3.1.2 过滤(filter)

3.1.2 扁平映射(flatMap)

3.2 聚合算子(Aggregation)

3.2.1 按键分区(keyBy)

 3.2.2 简单聚合(sum/min/max/minBy/maxBy)

3.2.3 归约聚合(reduce)

3.3 用户自定义函数(UDF)

3.3.1 函数类(Function Classes)

3.3.2 富函数类(Rich Function Classes)

3.4 物理分区算子(Physical Partitioning)

3.4.1 随机分区(shuffle)

3.4.2 轮询分区(Round-Robin)

3.4.3 重缩放分区(rescale)

 3.4.4 广播(broadcast)

3.4.5 全局分区(global)

3.4.6 自定义分区(Custom) 

3.5 分流

3.5.1 Filter 实现分流

 3.5.2 使用侧输出流

3.6 基本合流操作

3.6.1 联合(Union)

3.6.2 连接(Connect)

3.6.2.1 连接流(ConnectedStreams)

3.6.2.2 CoProcessFunction

四.输出算子(Sink)

 4.1 连接到外部系统

4.2 输出到文件 

4.3 输出到RabbitMQ

4.4 输出到MySQL(JDBC) 

 4.5 自定义Sink输出


DataStream API是Flink的核心层API。一个Flink程序,其实就是对DataStream的各种转换。具体来说,代码基本上都由以下几部分构成:

一.执行环境(Execution Environment)

        Flink程序可以在各种上下文环境中运行:既可以可以在本地JVM中执行程序,也可以提交到远程集群上运行。

1.1 创建执行环境

获取的执行环境是StreamExecutionEnvironment类的对象(流处理,批处理已经标记为过时),创建执行环境一般有以下三种方式:

// 创建一个本地执行环境并返回,可传入并行度,默认是本地CPU核心数
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();// 返回远程集群执行环境,需传入远程IobManager的主机名与端口、及在集群中需运行的Jar包
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("JobManager ip", "JobManager port","提交给JobManager的JAR包");// (推荐)根据当前环境自动选择执行环境,无脑选这个即可
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 当使用 getExecutionEnvironment() 创建环境时,可以传入 org.apache.flink.configuration.Configuration 类来手动指定默认的参数,例如端口等。

Configuration conf = new Configuration();
conf.set(RestOptions.PORT,8088);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

1.2 执行模式

从Flink 1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理。不建议使用DataSet API(已过时)。

通过代码指定:

// 流 执行模式
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
// 批 执行模式
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// 自动模式,根据数据源是否有界自动选择执行模式
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

提交任务时命令行指定(推荐):

bin/flink run -Dexecution.runtime-mode=BATCH ...

同一套代码/API,既可以指定流处理也可以指定批处理,这就是“流批一体”的其中一个解释。

1.3 触发程序执行

// 程序执行
env.execute();

写完输出(sink)操作并不代表程序已经结束。因为当 main() 方法被调用时,其实只是定义了作业的每个执行操作,然后添加到数据流图中;这时并没有真正处理数据——因为数据可能还没来。Flink是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”

默认 main 方法的一个 env.execute() 会触发一个 Flink Job,并且一个 main 方法可以调用多个 env.execute() ,但无意义,因为第一个会阻塞住。可使用 env.executeAsync() 可以异步触发,而且不会产生阻塞。

在application模式下,代码中有多少个 env.executeAsync() ,就会有多少个Job,对应就会有多少个 JboManager。

二.源算子(Source)

Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端。

从Flink1.12开始,主要使用流批统一的新Source架构:

DataStreamSource<String> stream = env.fromSource(…)

Flink直接提供了很多预实现的接口,此外还有很多外部连接工具也帮我们实现了对应的Source,通常情况下足以应对我们的实际需求。

2.1 从集合中读取数据

        最简单的读取数据的方式,就是在代码中直接创建一个Java集合,然后调用执行环境的fromCollection方法进行读取。这相当于将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用,一般用于测试。

 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从集合中读取数据DataStreamSource<Integer> source = env.fromCollection(Arrays.asList(1, 10, 99, 53));source.print();env.execute();}

输出结果:

6> 1
8> 99
1> 53
7> 10

2.2 从文件读取数据

在实际场景中,可能要读取、处理日志文件这样的需求,这也是批处理最常见的读取方式。 

读取文件,需要添加文件连接器依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version>
</dependency>

 代码如下:

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("input/words.txt")).build();// !使用官方推荐的新的 Source 架构  => env.fromSource(Source实现类,Watermark,资源名称)env.fromSource(fileSource,WatermarkStrategy.noWatermarks(),"file").print();env.execute();}

输出结果:

3> hello flink
3> hello world
3> hello java

2.3 从 RabbitMQ 中读取数据

导入相关依赖:

<!--RabbitMQ 连接器-->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version>
</dependency>
<!--amqp 客户端-->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.14.1</version>
</dependency>

相关代码:

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;/*** 从 RabbitMQ读取数据*/
public class RabbitMQSourceDemo {public static void main(String[] args) throws Exception {// 获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置 RabbitMQ 连接信息RMQConnectionConfig config = new RMQConnectionConfig.Builder().setHost("RabbitMQ服务器地址").setPort(RabbitMQ端口).setUserName(用户名).setPassword(密码).setVirtualHost(虚拟主机).build();// 添加 RabbitMQ 数据源(Flink 1.17 并不支持使用 env.fromSource 在 RabbitMQ 读取数据!)RMQSource<String> source = new RMQSource<>(config,                     //  连接配置"test_queue",               // 队列名称new SimpleStringSchema());  // 反序列化器// 添加数据源DataStreamSource<String> rabbitMQStream = env.addSource(source);// 打印rabbitMQStream.print();// 执行env.execute("RabbitMQ job");}
}

进入 RabbitMQ Web 页面,在对应的虚拟主机下创建相关的队列,进入队列中,使用 Web 中的 Publish message给队列发送消息:

输出结果:

2.4 从数据生成器读取数据 

Flink从1.11开始提供了一个内置的DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。1.17提供了新的Source写法,需要导入依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version>
</dependency>

 代码:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** Flink 数据生成器*/
public class DataGeneratorDemo {public static void main(String[] args) throws Exception {// 获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();/*** 数据生成器的四个参数:* 1、GeneratorFunction的map实现。重写返回值* 2、返回的个数 会从0开始依次返回(使用Long.MAX_VALUE可模拟出无界流)* 3、限速,每秒多少个数据* 4、返回值类型*/DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>((GeneratorFunction<Long, String>) num -> "Number:" + num,30,RateLimiterStrategy.perSecond(3),Types.STRING);env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(),"datagenerator-source").print();// 执行env.execute();}
}

输出:

1> Number:8
8> Number:12
4> Number:27
3> Number:0
2> Number:4
6> Number:24
5> Number:16
7> Number:20
3> Number:1
7> Number:21
5> Number:17
.
.
.

2.5 Flink支持的数据类型

2.5.1 Flink的类型系统

Flink使用“类型信息”(TypeInformation)来统一表示数据类型。TypeInformation类是Flink中所有类型描述符的基类。它涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化器、反序列化器和比较器。

2.5.2 Flink支持的数据类型

对于常见的Java和Scala数据类型,Flink都是支持的。Flink在内部,Flink对支持不同的类型进行了划分,这些类型可以在Types工具类中找到:

其中包含 Java 基本类型包装类数组类型复合数据类型、辅助类型(List、Map等)、泛型类型(GENERIC)。

符合类型又包括:

  • Java元组类型(TUPLE):这是Flink内置的元组类型,是Java API的一部分。最多25个字段,也就是从Tuple0~Tuple25,不支持空字段。
  • Scala 样例类及Scala元组:不支持空字段。
  • 行类型(ROW):可以认为是具有任意个字段的元组,并支持空字段。
  • POJO:Flink自定义的类似于Java bean模式的类。(POJO的类和属性是公有的、有一个无参构造、属性可序列化)

2.5.3 类型提示(Type Hints)

Flink还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。例如:

.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));

可写作:

.map(word -> Tuple2.of(word, 1L))
.returns(new TypeHint<Tuple2<String, Long>>(){})

三.转换算子(Transformation)

数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个DataStream转换为新的DataStream。

3.1 基本转换算子(map/ filter/ flatMap

准备工作

为了方便练习,这里使用WaterSensor作为数据模型。

字段名

数据类型

说明

id

String

水位传感器类型

ts

Long

传感器记录时间戳

vc

Integer

水位记录

代码如下:

public class WaterSensor {public String id;public Long ts;public Integer vc;// 省略getter、setter、构造器、toString
}

3.1.1 映射(map)

与 JDK1.8 中的Stream中的 Map 类似。Map 就是将一个元素映射成另一个元素。基于DataStream调用map()方法就可以进行转换处理。

例子:需要提取 WaterSensor 中的 id 属性:

public class MapDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 构造数据DataStreamSource<WaterSensor> source = env.fromElements(new WaterSensor("id_1", 1l, 1),new WaterSensor("id_2", 2l, 2),new WaterSensor("id_3", 3l, 3));// 方法1:实现匿名内部类source.map(new MapFunction<WaterSensor, String>() {@Overridepublic String map(WaterSensor waterSensor) throws Exception {return waterSensor.getId();}}).print();// 方法2:Lambda 表达式source.map(WaterSensor::getId).print();// 方法三:定义 MapFunction 实现类source.map(new MyMapFunction()).print();env.execute();}// 实现 MapFunction , 可以复用static class MyMapFunction implements MapFunction<WaterSensor , String>{@Overridepublic String map(WaterSensor waterSensor) throws Exception {return waterSensor.getId();}}
}

结果输出:

2> id_3
8> id_1
1> id_2

3.1.2 过滤(filter)

与 JDK1.8 中的Stream中的 Fliter类似。对数据流进行过滤,满足条件的元素则会被输出,不满足则被过滤。

例子:过滤掉 WaterSensor 中 id 不为 “id_1” 的元素。

public class FilterDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 构造数据DataStreamSource<WaterSensor> source = env.fromElements(new WaterSensor("id_1", 1l, 1),new WaterSensor("id_1", 12l, 2),new WaterSensor("id_2", 2l, 2),new WaterSensor("id_3", 3l, 3));// 过滤数据中 id 不为 id_1 的元素source.filter(new FilterFunction<WaterSensor>() {@Overridepublic boolean filter(WaterSensor waterSensor) throws Exception {return "id_1".equals(waterSensor.getId());}}).print();env.execute();}}

结果输出:

3> WaterSensor{id='id_1', ts=1, vc=1}
4> WaterSensor{id='id_1', ts=12, vc=2}

3.1.2 扁平映射(flatMap)

flatMap操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生0到多个元素。flatMap可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。

例子:如果 id 为 id_1 则输出 vc 属性,如果 id 为 id_2 则输出 ts、vc 属性。

public class FlatmapDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 构造数据DataStreamSource<WaterSensor> source = env.fromElements(new WaterSensor("id_1", 1l, 1),new WaterSensor("id_1", 12l, 2),new WaterSensor("id_2", 2l, 2),new WaterSensor("id_3", 3l, 3));/*** 如果 id 为 id_1 则输出 vc 属性* 如果 id 为 id_2 则输出 ts、vc 属性*/source.flatMap(new FlatMapFunction<WaterSensor, String>() {@Overridepublic void flatMap(WaterSensor waterSensor, Collector<String> collector) throws Exception {if("id_1".equals(waterSensor.getId())){// 将 vc 放入采集器collector.collect(waterSensor.getVc().toString());} else if ("id_2".equals(waterSensor.getId())) {// 将 ts、vc 放入采集器collector.collect(waterSensor.getVc().toString());collector.collect(waterSensor.getTs().toString());}}}).print();env.execute();}
}

结果输出:

2> 1
3> 2
4> 2
4> 2

3.2 聚合算子(Aggregation)

计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并——这就是所谓的“聚合”(Aggregation),类似于MapReduce中的reduce操作。

3.2.1 按键分区(keyBy)

在Flink中,要做聚合,需要先进行分区;这个操作就是通过keyBy来完成的。keyBy通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务。 

  • 基于不同的key,流中的数据将被分配到不同的分区中去;这样一来,所有具有相同的key的数据,都将被发往同一个分区一个子任务就可以理解为一个分区
  • KeyBy 返回的是 KeyedStream 键控流。
  • KeyBy 不是转换算子,只是对数据做重分区,不能设置并行度。
  • 分区是通过对 Key 进行 Hash 再对分区数取模来实现的。

例子:以 id 作为 Key 进行分区:

public class KeyByDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 构造数据DataStreamSource<WaterSensor> source = env.fromElements(new WaterSensor("id_1", 1l, 1),new WaterSensor("id_1", 12l, 2),new WaterSensor("id_2", 2l, 2),new WaterSensor("id_3", 3l, 3));// 以 id 为 Key 进行分区source.keyBy(new KeySelector<WaterSensor, String>() {@Overridepublic String getKey(WaterSensor waterSensor) throws Exception {return waterSensor.getId();}}).print();env.execute();}}

 结果输出:

2> WaterSensor{id='id_2', ts=2, vc=2}
3> WaterSensor{id='id_1', ts=1, vc=1}
3> WaterSensor{id='id_1', ts=12, vc=2}
3> WaterSensor{id='id_3', ts=3, vc=3}

 3.2.2 简单聚合(sum/min/max/minBy/maxBy)

所有的聚合操作都要基于按键分区的数据流KeyedStream。 Flink为我们内置实现了一些最基本、最简单的聚合API,主要有以下几种:

  • sum():在输入流上,对指定的字段做叠加求和的操作。
  • min():在输入流上,对指定的字段求最小值。
  • max():在输入流上,对指定的字段求最大值。
  • sumBy()、minBy()、maxBy():功能类似,xxxBy() 会返回包含符合要求的整条数据。而不加 By 只会保留第一次的非比较字段。

例子:

public class AggrDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 构造数据DataStreamSource<WaterSensor> source = env.fromElements(new WaterSensor("id_1", 1l, 1),new WaterSensor("id_1", 12l, 22),new WaterSensor("id_2", 2l, 2),new WaterSensor("id_3", 3l, 3));// 以 id 为 Key 进行分区KeyedStream<WaterSensor, String> keyBySource = source.keyBy(new KeySelector<WaterSensor, String>() {@Overridepublic String getKey(WaterSensor waterSensor) throws Exception {return waterSensor.getId();}});keyBySource.sum("vc").print();keyBySource.min("vc").print();keyBySource.max("vc").print();/*** max结果:* 1> WaterSensor{id='id_1', ts=1, vc=1}* 1> WaterSensor{id='id_1', ts=1, vc=22}* 1> WaterSensor{id='id_2', ts=2, vc=2}* 1> WaterSensor{id='id_3', ts=3, vc=3}* ts 还是 第一次的值*/keyBySource.maxBy("vc").print();/*** max结果:* 1> WaterSensor{id='id_1', ts=1, vc=1}* 1> WaterSensor{id='id_1', ts=12, vc=22}* 1> WaterSensor{id='id_2', ts=2, vc=2}* 1> WaterSensor{id='id_3', ts=3, vc=3}* 取当前整列值 */env.execute();}}

3.2.3 归约聚合(reduce)

reduce可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算。

ReduceFunction接口里需要实现reduce()方法,这个方法接收两个输入事件,经过转换处理之后输出一个相同类型的事件。在流处理的底层实现过程中,实际上是将中间“合并的结果”作为任务的一个状态保存起来的;之后每来一个新的数据,就和之前的聚合状态进一步做归约。

例子:只保存每个分组中 VC 最大的那条数据

public class ReduceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 构造数据DataStreamSource<WaterSensor> source = env.fromElements(new WaterSensor("id_1", 1l, 1),new WaterSensor("id_1", 21l, 21),new WaterSensor("id_1", 31l, 31),new WaterSensor("id_2", 2l, 2),new WaterSensor("id_3", 3l, 3));// 以 id 为 Key 进行分区KeyedStream<WaterSensor, String> sensorKs = source.keyBy(new KeySelector<WaterSensor, String>() {@Overridepublic String getKey(WaterSensor waterSensor) throws Exception {return waterSensor.getId();}});/*** reduce:*  1.必须在KeyBy后调用*  2.输入类型 = 输出类型*  3.每个分区的第一条数据来的时候不会执行reduce,但是会存起来保存状态,直接输出,,“Flink有状态的体现”*  4.reduce( value1,  value2)*     a.value1 是上一次的计算结果*     b.value2 是当前进入的数据*/SingleOutputStreamOperator<WaterSensor> reduce = sensorKs.reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {if(value2.getVc() > value1.getVc()){return new WaterSensor(value2.getId(), value2.getTs(), value2.getVc());}else {return value1;}}});reduce.print();env.execute();}
}

结果输出:

WaterSensor{id='id_1', ts=1, vc=1} // 分组的第一条数据直接返回
WaterSensor{id='id_1', ts=21, vc=21}
WaterSensor{id='id_1', ts=31, vc=31}
WaterSensor{id='id_2', ts=2, vc=2} // 分组的第一条数据直接返回
WaterSensor{id='id_3', ts=3, vc=3} // 分组的第一条数据直接返回

reduce同简单聚合算子一样,也要针对每一个key保存状态。因为状态不会清空,所以我们需要将reduce算子作用在一个有限key的流上。

3.3 用户自定义函数(UDF

用户自定义函数(user-defined function,UDF),即用户可以根据自身需求,重新实现算子的逻辑。

用户自定义函数分为:函数类匿名函数富函数类

3.3.1 函数类(Function Classes)

Flink暴露了所有UDF函数的接口,具体实现方式为接口或者抽象类,例如MapFunction、FilterFunction、ReduceFunction等。所以用户可以自定义一个函数类,实现对应的接口。

匿名内部类实现

 source.filter(new FilterFunction<WaterSensor>() {@Overridepublic boolean filter(WaterSensor waterSensor) throws Exception {return "id_1".equals(waterSensor.getId());}}).print();

Lambda表达式实现:

source.filter((FilterFunction<WaterSensor>) waterSensor -> "id_1".equals(waterSensor.getId())).print();

实现 XxxFunction 接口

public class FilterFunctionImpl implements FilterFunction<WaterSensor> {public String id ;public FilterFunctionImpl(String id) {this.id = id;}@Overridepublic boolean filter(WaterSensor waterSensor) throws Exception {return this.id.equals(waterSensor.getId());}
}

3.3.2 富函数类(Rich Function Classes)

“富函数类”也是DataStream API提供的一个函数类的接口,所有的Flink函数类都有其Rich版本。

RichXxxFunction 与 XxxFunction 的区别是可以获取到任务运行时的一些上下文信息、环境信息以及对任务生命周期的管理。

典型的生命周期方法有:

  • 重写open()方法,每个子任务在启动时,会调用一次。
  • 重写close()方法,每个子任务在结束时会调用一次。
    • 程序异常退出不会调用 close() 方法。
    • 手动取消任务会调用 close() 方法。

在open、close中可以使用 getRuntimeContext() 来获取运行时上下文信息。

public class RichMapFunctionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4);SingleOutputStreamOperator<Integer> map = source.map(new RichMapFunction<Integer, Integer>() {@Overridepublic void open(Configuration parameters) throws Exception {System.out.println("open:子任务名称"+getRuntimeContext().getTaskNameWithSubtasks());System.out.println("open:子任务编号"+getRuntimeContext().getIndexOfThisSubtask());super.open(parameters);}@Overridepublic void close() throws Exception {System.out.println("close:子任务名称"+getRuntimeContext().getTaskNameWithSubtasks());System.out.println("close:子任务编号"+getRuntimeContext().getIndexOfThisSubtask());super.close();}@Overridepublic Integer map(Integer value) throws Exception {return value + 1;}});map.print();env.execute();}
}

输出结果:

open:子任务名称Source: Collection Source -> Map -> Sink: Print to Std. Out (1/1)#0
open:子任务编号0
2
3
4
5
close:子任务名称Source: Collection Source -> Map -> Sink: Print to Std. Out (1/1)#0
close:子任务编号0

3.4 物理分区算子(Physical Partitioning

Flink 为我们提供了7种分区策略和一个用户自定义分区器。常见的物理分区策略有:随机分配(Random)轮询分配(Round-Robin)重缩放(Rescale)广播(Broadcast)

分区算子就是将数据按照某种策略分配到下游算子的子任务分区中。 

3.4.1 随机分区(shuffle)

通过调用DataStream的.shuffle()方法,将数据随机地分配到下游算子的并行任务分区中去。

shuffle底层实现采用的是 生成随机数

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<String> source = env.socketTextStream("ip", 端口);// 随机分区 random.nextInt(下游算子并行度)source.shuffle().print();env.execute();}
输入:
[root@VM-55-24-centos ~]# nc -lk 1234
1
2
3
4
5输出:
1> 1
2> 2
2> 3
1> 4
1> 5

从控制台输出的左侧子任务编号可以看出子任务分区是随机分配的。

3.4.2 轮询分区(Round-Robin)

通过调用DataStream的.rebalance()方法,就可以实现轮询重分区。rebalance采用的是对并行度取模,可以将输入流数据平均分配到下游的并行任务中去。可以解决 数据源数据倾斜 的问题。

 // 轮询重分区source.rebalance().print();
输入:
[root@VM-55-24-centos ~]# nc -lk 1234
1
2
3
4
5输出:
1> 1
2> 2
1> 3
2> 4
1> 5

3.4.3 重缩放分区(rescale)

与 rebalance 类似,也是轮询的效果,不过比轮询更加高效。rescale的做法是将数据在固定的几个分区中进行轮询,而不是轮询所有分区。

// 缩放轮询
source.rescale().print();

 3.4.4 广播(broadcast)

通过调用DataStream的broadcast()方法,会将数据发送到下游算子的所有并行任务中去。慎用!

 // 广播source.broadcast().print();

3.4.5 全局分区(global)

全局分区做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了1。慎用,可能对程序造成很大的压力!

 // 全局分区source.global().print();

3.4.6 自定义分区(Custom) 

当Flink提供的所有分区策略都不能满足用户的需求时,我们可以通过使用partitionCustom()方法来自定义分区策略。

例子:实现将奇数与偶数分配到不同的分区

自定义分区器实现 Partitioner:

public class MyPartitioner implements Partitioner<String> {@Overridepublic int partition(String key, int numPartitions) {// key 为当前数据,numPartitions 为下游并行度return Integer.parseInt(key) % numPartitions;}
}

使用自定义分区

public class PartitionCustomDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<String> source = env.socketTextStream("IP", 端口);source.partitionCustom(new MyPartitioner(), v -> v).print();env.execute();}
}

结果输出:

输入:
[root@VM-55-24-centos ~]# nc -lk 1234
2
3
4
6
8
10输出:
2> 1
1> 2
2> 3
1> 4
1> 6
1> 8
1> 10

3.5 分流

所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里

与分区不同的是,分流是是将一条数据流拆分成多条流。而分区是将数据分配到下游算子的子任务中。

3.5.1 Filter 实现分流

例子:读取一个整数数字流,将数据流划分为奇数流和偶数流。

public class SplitByFilterDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<String> source = env.socketTextStream("IP", 端口);source.filter(num -> Integer.parseInt(num) % 2 == 0).print("偶数流:");source.filter(num -> Integer.parseInt(num) % 2 == 1).print("奇数流:");env.execute();}
}

输入输出结果:

输入:
[root@VM-55-24-centos ~]# nc -lk 1234
1
2
45
324321
234325235
12312
11412输出:
奇数流::1> 1
偶数流::2> 2
奇数流::1> 45
偶数流::2> 324321
奇数流::1> 234325235
偶数流::2> 12312
偶数流::2> 11412

用 Filter 实现虽然简单但不够高效,因为每次数据流都会经过两次 Filter 过滤 。

 3.5.2 使用侧输出流

一条未被分类操作的流被称为“主流”,经过分流操作后,侧输出流可以理解为“主流”的“支流”。

需求:id 为 s1 、s2 的数据被到另外两条侧流 ,非 s1 、s2不受影响,放在主流:

public class SideOutputDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<String> source = env.socketTextStream("IP", 端口);SingleOutputStreamOperator<WaterSensor> sensorDs = source.map(new MapFunction<String, WaterSensor>() {@Overridepublic WaterSensor map(String value) throws Exception {String[] data = value.split(",");return new WaterSensor(data[0], Long.valueOf(data[1]), Integer.valueOf(data[2]));}});// 侧输出流的标记OutputTag<WaterSensor> s1Tag = new OutputTag<>("s1", Types.POJO(WaterSensor.class));OutputTag<WaterSensor> s2Tag = new OutputTag<>("s2", Types.POJO(WaterSensor.class));SingleOutputStreamOperator<WaterSensor> process = sensorDs.process(new ProcessFunction<WaterSensor, WaterSensor>() {@Overridepublic void processElement(WaterSensor value, ProcessFunction<WaterSensor, WaterSensor>.Context ctx, Collector<WaterSensor> out) throws Exception {if ("s1".equals(value.getId())) {ctx.output(s1Tag, value);} else if ("s2".equals(value.getId())) {ctx.output(s2Tag, value);} else {out.collect(value);}}});// process 默认只会返回主流数据process.print("主流");// 根据输出标签(流的标签)找到 s1 这条支流斌输出process.getSideOutput(s1Tag).printToErr("测输出流S1");// 根据输出标签(流的标签)找到 s2 这条支流斌输出process.getSideOutput(s2Tag).printToErr("测输出流S2");env.execute();}
}

输入与输出结果:

输入:[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1
s3,3,3
s2,2,2
s9,9,9
s1,33,1134输出:测输出流S1:2> WaterSensor{id='s1', ts=1, vc=1}主流:1> WaterSensor{id='s3', ts=3, vc=3}测输出流S2:2> WaterSensor{id='s2', ts=2, vc=2}主流:1> WaterSensor{id='s9', ts=9, vc=9}测输出流S1:2> WaterSensor{id='s1', ts=33, vc=1134}
  • Process 算子非常灵活,基础算子底层都是调用 Process 来实现的。
  • OutputTag 可以理解为侧输出流的名称以流的数据类型。
  • 将数据放入侧输出流中需要使用 ctx.output()传入输出流标签和数据 ;
  • process 返回的流是主流,想获取侧输出流必须通过 process.getSideOutput()传入输出流标签来获取。

3.6 基本合流操作

在实际应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理。所以Flink中合流的操作会更加普遍,对应的API也更加丰富。

3.6.1 联合(Union)

通过调用数据源的 Union() 就可以将一条或者多条流进行合并。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。

public class UnionDemo {/***  Union : 合并一条或多条相同数据类型的流*/public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);DataStreamSource<Integer> source2 = env.fromElements(44, 55, 66);DataStreamSource<String> source3 = env.fromElements("777", "888", "999");// 写法1 : 一次合并一个流DataStream<Integer> union = source1.union(source2).union(source3.map(Integer::parseInt));// 写法2 : 一次合并多个流source1.union(source2,source3.map(Integer::parseInt));union.print();env.execute();}
}

结果输出:

1
2
3
44
55
66
777
888
999

3.6.2 连接(Connect)

Union 虽然使用简单,但是受限于只能合并相同类型的流,不太灵活。Flink 提供了另一个更方便的河流操作:连接(Connect)。

3.6.2.1 连接流(ConnectedStreams

通过 Connect 可以将两条不同类型的流进行连接,但是不再返回 DataStream ,而是返回 ConnectedStreams(连接流)。

且两条流连接后只是形式上的“合并”,对这条流进行处理转换则需要对原本的两条流单独处理。

public class ConnectDemo {/***  Connect : 连接(合并)两条流*      返回的是 ConnectedStreams(连接流) 而不是 DataStream*      只是名义上的统一,处理逻辑需要每条流单独处理*/public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);DataStreamSource<String> source2 = env.fromElements("777", "888", "999");ConnectedStreams<Integer, String> connect = source1.connect(source2);// 需要对两条单独处理 CoMapFunction(第一条流的类型,第二条流的类型,输出的类型)SingleOutputStreamOperator<String> map = connect.map(new CoMapFunction<Integer, String, String>() {@Overridepublic String map1(Integer value) throws Exception {return value.toString();}@Overridepublic String map2(String value) throws Exception {return value;}});map.print();env.execute();}
}

结果输出:

1
777
2
888
3
999
3.6.2.2 CoProcessFunction

与CoMapFunction类似,如果是调用.map()就需要传入一个CoMapFunction,需要实现map1()、map2()两个方法;而调用.process()时,传入的则是一个CoProcessFunction。它也是“处理函数”家族中的一员,用法非常相似。它需要实现的就是processElement1()、processElement2()两个方法,在每个数据到来时,会根据来源的流调用其中的一个方法进行处理。

例子:有两条数据类型不同的流,需要根据各自数据的第一个字段进行匹配。类似于 MySQL中的 Inner Join。

public class ConnectKeyByDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Tuple2<Integer, String>> source1 = env.fromElements(Tuple2.of(1, "a1"),Tuple2.of(1, "a2"),Tuple2.of(2, "b"),Tuple2.of(3, "c"));DataStreamSource<Tuple3<Integer, String, Integer>> source2 = env.fromElements(Tuple3.of(1, "aa1", 1),Tuple3.of(1, "aa2", 2),Tuple3.of(2, "bb", 1),Tuple3.of(3, "cc", 1));ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connect = source1.connect(source2);SingleOutputStreamOperator<String> process = connect.process(new CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>() {// 为各自两条流定义中间变量用于存储匹配时的数据Map<Integer, List<Tuple2<Integer, String>>> s1Cache = new HashMap<>();Map<Integer, List<Tuple3<Integer, String, Integer>>> s2Cache = new HashMap<>();@Overridepublic void processElement1(Tuple2<Integer, String> value, CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>.Context ctx, Collector<String> out) throws Exception {Integer id = value.f0;// 第一次出现该 Key 则直接将数据put进s1的数据集合中if (!s1Cache.containsKey(id)) {ArrayList<Tuple2<Integer, String>> s1Values = new ArrayList<>();s1Values.add(value);s1Cache.put(id, s1Values);} else {// 不是第一次出现该 Key ,直接添加进该 Key 的数组中s1Cache.get(id).add(value);}// 去另外一条流的数据中寻找有没有 id 相匹配的,有则放入采集器if (s2Cache.containsKey(id)) {for (Tuple3<Integer, String, Integer> s2Element : s2Cache.get(id)) {out.collect("S1:" + value + "<---->" + "s2:" + s2Element);}}}@Overridepublic void processElement2(Tuple3<Integer, String, Integer> value, CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>.Context ctx, Collector<String> out) throws Exception {Integer id = value.f0;// 第一次出现该 Key 则直接将数据put进s2的数据集合中if (!s2Cache.containsKey(id)) {ArrayList<Tuple3<Integer, String, Integer>> s2Values = new ArrayList<>();s2Values.add(value);s2Cache.put(id, s2Values);} else {// 不是第一次出现该 Key ,直接添加进该 Key 的数组中s2Cache.get(id).add(value);}// 去另外一条流的数据中寻找有没有 id 相匹配的,有则放入采集器if (s1Cache.containsKey(id)) {for (Tuple2<Integer, String> s1Element : s1Cache.get(id)) {out.collect("S2:" + value + "<---->" + "s1:" + s1Element);}}}});process.print();env.execute();}
}

结果:

S2:(1,aa1,1)<---->s1:(1,a1)
S1:(1,a2)<---->s2:(1,aa1,1)
S2:(1,aa2,2)<---->s1:(1,a1)
S2:(1,aa2,2)<---->s1:(1,a2)
S2:(2,bb,1)<---->s1:(2,b)
S2:(3,cc,1)<---->s1:(3,c)

注意:在多并行度下,以上匹配会出错,因为多并行度下,数据会被发往 Process 不同的子任务中(Slot),而不同的子任务间数据无法共享,导致读取不到另一个子任务的数组,从而匹配错误。所以需要在连接流后对要匹配的字段进行 KeyBy 操作,确保同一个 Key 被分配到同一个子任务中。

四.输出算子(Sink

Flink作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。

 4.1 连接到外部系统

Flink 1.17 中的DataStream API专门提供了向外部写入数据的方法:sinkTo,对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。

stream.sinkTo(…)

在大部分情况下,Sink 并不需要我们手动实现,Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示,列出了Flink官方目前支持的第三方系统连接器:

及第三方提供的连接器:

 地址:Overview | Apache Flink

4.2 输出到文件 

Flink专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。

FileSink支持行编码(Row-encoded)批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用FileSink的静态方法:

例子:使用数据生成器源源不断生成数据,并输出到文件夹的文本文件中,每隔一个小时生成一个新的文件夹,且每隔20秒或者文件大小达到 3KB 则新建一个文本文件。

导入依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version>
</dependency>
public class SinkFileDemo {public static void main(String[] args) throws Exception {// 获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 全局并行度设置为 2env.setParallelism(2);// 开启checkpointenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);/*** 数据生成器:无限生成数字,一秒生成 1000 条*/DataGeneratorSource<String> dataGenSource = new DataGeneratorSource<>((GeneratorFunction<Long, String>) num -> "Number:" + num,Long.MIN_VALUE,RateLimiterStrategy.perSecond(1000),Types.STRING);DataStreamSource<String> streamSource = env.fromSource(dataGenSource, WatermarkStrategy.noWatermarks(),"data-generator");/*** 输出到文件系统*      Sink 算子同样会受到 并行度 的影响:例如会同时有 并行度个 个文件被写入*/FileSink<String> fileSink = FileSink// 指定要输出的 文件目录 及 文件编码.<String>forRowFormat(new Path("D:/tmp"), new SimpleStringEncoder<>("UTF-8"))// 指定要生成文件的 前后缀.withOutputFileConfig(OutputFileConfig.builder() // 建造者模式// 文件的前缀.withPartPrefix("flink-file-test")// 文件的后缀.withPartSuffix(".txt").build())// 指定目录分桶:按照小时进行分桶(一小时生成一个新的目录),并设置时区为 Asia/Shanghai.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd--HH", ZoneId.of("Asia/Shanghai")))// 文件滚动策略:每隔多少秒 或 文件超过多大 就生成新的文件.withRollingPolicy(DefaultRollingPolicy.builder() // 建造者模式// 每隔 20S 生成一个新的文件.withRolloverInterval(Duration.ofSeconds(20))// 文件大小超过大于 3KB 则生成一个新的文件.withMaxPartSize(new MemorySize(1024 * 3)).build()).build();// 输出streamSource.sinkTo(fileSink);// 执行env.execute();}}

 结果:

FileSink

        .forRowFormat:指定要输出的文件目录及文件编码 

        .withOutputFileConfig:指定要生成文件的前后缀

        .withBucketAssigner:指定目录分桶

        .withRollingPolicy:文件滚动策略

4.3 输出到RabbitMQ

想要输出到 RabbitMQ,也需要调用对应的 Sink 算子--RMQSink 。 

例子:从 Socket 读数据,写入到 RabbitMQ 中,作为一条消息。

添加Kafka 连接器依赖:

<!--RabbitMQ 连接器-->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version>
</dependency>
public class SinkRabbitMqDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);DataStreamSource<String> streamSource = env.socketTextStream("ip", 1234);// 配置 RabbitMQ 连接信息RMQConnectionConfig mqConfig = new RMQConnectionConfig.Builder().setHost("xxx.xxx.xxx.xxx") // RabbitMQ 服务地址.setPort(5379) // RabbitMQ 服务端口.setUserName("用户名") // 用户名.setPassword("密码") // 密码.setVirtualHost("/") // 虚拟主机名.build();// 创建一个RMQSink,用于将数据发送到RabbitMQ队列// mq配置信息,队列名称,序列化器RMQSink rmqSink = new RMQSink(mqConfig, "test_queue", new SimpleStringSchema());// 将数据流写入RabbitMQ队列 Flink 1.17 并不支持使用 sinkTo 对第三方系统进行输出streamSource.addSink(rmqSink);env.execute("flink connectors rabbitmq");}
}

输入:

[root@VM-55-24-centos ~]# nc -lk 1234
hello flink
hello rabbitmq

结果:

4.4 输出到MySQL(JDBC) 

同样的,要输出到 MySQL ,需要调用 JdbcSink.sink() 算子,且也只能使用 addSink 来添加输出。

例子: 在 Socket 中写入数据,写入MySQL中。

在 MySQL 中新建表:

CREATE TABLE `ws` (`id` varchar(100) NOT NULL,`ts` bigint(20) DEFAULT NULL,`vc` int(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

导入 MySQL 驱动:

<!-- MySQL 驱动-->
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version>
</dependency>

导入 Flink - MySQL 连接器:

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.1-1.17</version>
</dependency>

代码:

public class SinkMySQLDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<String> streamSource = env.socketTextStream("xxx.xxx.xxx.xxx", 1234);// 将从 Socket 中读到的字符串转成实体类SingleOutputStreamOperator<WaterSensor> map = streamSource.map((MapFunction<String, WaterSensor>) s -> {String[] data = s.split(",");return new WaterSensor(data[0], Long.valueOf(data[1]), Integer.valueOf(data[2]));});/*** jdbcSink 四大参数:* 1、要执行的 SQL 语句* 2、为占位符填充值* 3、执行选项:重试次数,攒批* 4、MySQL 连接信息*/SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink("insert into ws values( ? , ? , ?)",new JdbcStatementBuilder<WaterSensor>() {@Overridepublic void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {preparedStatement.setString(1, waterSensor.getId());preparedStatement.setLong(2, waterSensor.getTs());preparedStatement.setInt(3, waterSensor.getVc());}},JdbcExecutionOptions.builder().withBatchIntervalMs(3000) // 批次的时间.withBatchSize(100) // 批次的大小:条数.withMaxRetries(3) // 重试次数.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8").withUsername("Username").withPassword("Password").withConnectionCheckTimeoutSeconds(60) // 重试的超时时间.build());map.addSink(jdbcSink);env.execute("flink connectors MySQL");}
}

输入:

[root@VM-55-24-centos ~]# nc -lk 1234
hello,1,1
flink,2,2
mysql,3,3

输出:

 4.5 自定义Sink输出

Flink 为我们提供很多常用的连接器,一般不推荐自定义Sink,因为需要自行处理连接逻辑及错误逻辑。

如果要自定义Sink,Flink 为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。

streamSource.addSink(new MySink());

推荐继承RichSinkDunction,实现其中的三个方法open()、close()、invoke(String value, Context context)。

public class  MySink extends RichSinkFunction<String>{@Overridepublic void open(Configuration parameters) throws Exception {// 启动时会被调用一次// 可以在这里创建连接}@Overridepublic void close() throws Exception {// 销毁时会被调用一次// 可以在这里销毁连接}// Sink 的核心逻辑@Overridepublic void invoke(String value, Context context) throws Exception {// 每条数据来都会调用一次// 具体的写入逻辑...}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/147461.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机毕业设计选题推荐-二手交易跳蚤市场微信小程序/安卓APP-项目实战

✨作者主页&#xff1a;IT毕设梦工厂✨ 个人简介&#xff1a;曾从事计算机专业培训教学&#xff0c;擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐⬇⬇⬇ Java项目 Py…

ubuntu下载conda

系统&#xff1a;Ubuntu18.04 &#xff08;1&#xff09;下载安装包 wget https://mirrors.tuna.tsinghua.edu.cn/anaconda/archive/Anaconda3-2021.11-Linux-x86_64.sh 报错错误 403&#xff1a;Forbidden 解决方法 wget -U NoSuchBrowser/1.0 https://mirrors.tuna.tsingh…

Smart Tomcat的使用

文章目录 Smart Tomcat的作用Smart Tomcat的安装Smart Tomcat的配置Smart Tomcat的启动 Smart Tomcat的作用 我们知道使用Servlet来完成一个项目一共需要七个步骤&#xff0c;即创建maven项目、添加依赖、创建目录结构、编写代码、打包程序、部署程序、验证程序。这样的确是完…

让资产权利归于建设者:Kiosk使过程变得更简单

区块链凭借着其将人的权利地位置于平台之上的能力&#xff0c;可以重塑互联网&#xff0c;而自托管为个人提供了控制和管理其资产和数据的能力。链上交易支持建设者和客户之间的点对点交易。这些特质联合起来&#xff0c;可以将数字世界从基于价值提取的模式转变为基于价值创造…

应用场景丨迭代市政综合管廊监测系统建设

市政综合管廊是指在城市地下建造的隧道空间&#xff0c;将市政、电力、通讯、燃气、给排水等各种管线集于一体&#xff0c;实施统一规划、设计、建设和管理。综合管廊有利于解决反复开挖路面、架空线网密集、管线事故频发等问题&#xff0c;是保障城市运行的重要基础设施和“生…

V10 桌面版、服务器版系统加固

V10 桌面版、服务器版系统加固 一、 文档说明 本文档中涉及的加固方法主要包括&#xff1a;密码策略配置、防火墙规 则配置、禁用高风险服务等。 二、 V10 桌面版系统加固 2.1 密码策略配置 密码策略包括密码老化控制策略和密码复杂度策略。密码老化 控制策略需要配置/etc…

【C++入门到精通】右值引用 | 完美转发 C++11 [ C++入门 ]

阅读导航 引言一、左值引用和右值引用1. 什么是左值&#xff1f;什么是左值引用&#xff1f;2. 什么是右值&#xff1f;什么是右值引用&#xff1f;3. move( )函数 二、左值引用与右值引用比较三、右值引用使用场景和意义四、完美转发std::forward 函数完美转发实际中的使用场景…

【LearnOpenGL基础入门——3】绘制纯色三角形

目录 一.写在前面 二.顶点输入 三.顶点着色器 四.编译着色器 五.片段着色器 六.着色器程序 七.链接顶点属性 彩蛋 一.写在前面 我们先认识一下OpenGL常用的几个名词&#xff1a; 顶点数组对象&#xff1a;Vertex Array Object&#xff0c;VAO顶点缓冲对象&#xff1a;…

2023全新付费进群系统源码 带定位完整版 附教程

这源码是我付费花钱买的分享给大家&#xff0c;功能完整。 搭建教程 Nginx1.2 PHP5.6-7.2均可 最好是7.2 第一步上传文件程序到网站根目录解压 第二步导入数据库&#xff08;58soho.cn.sql&#xff09; 第三步修改/config/database.php里面的数据库地址 第四步修改/conf…

linux配置固定ip(两种方法)

首先刚下载的vm&#xff0c;刚创建的虚拟机&#xff0c;肯定是需要配置ip的 其次以前我的每次都是设置自动ip&#xff0c;这样每次登录都会自动获取ip地址&#xff0c;并且每次的ip都不相同。 ~方法&#xff1a; 开机登陆后 1)Cd /etc/sysconfig/network-scripts 2)Vi ifcf…

Elasticsearch备份与还原:使用elasticdump

在数据管理的世界里&#xff0c;备份和还原数据是重中之重的日常工作&#xff0c;特别是对于Elasticsearch这样的强大而复杂的搜索引擎。备份不仅可以用于灾难恢复&#xff0c;还可以在数据迁移、测试或者升级等场景中发挥重要作用。 在本博客中&#xff0c;我们将会重点介绍如…

轻量级 Java 日志组件

日志记录功能在开发中很常用&#xff0c;不仅可以记录程序运行的细节&#xff0c;方便调试&#xff0c;也可以记录用户的行为&#xff0c;是框架中不可或缺的组件。为最大程度复用现有的组件&#xff0c;我们就地取材使用了 JDK 自带的 JUL&#xff08;java.util.logging&#…

聚观早报 |联想集团Q2财季业绩;小鹏汽车Q3营收

【聚观365】11月17日消息 联想集团Q2财季业绩 小鹏汽车Q3营收 微软发布两款自研AI芯片 FAA批准SpaceX再次发射星际飞船 2023 OPPO开发者大会 联想集团Q2财季业绩 全球数字经济领导企业联想集团公布截至2023年9月30日的2023/24财年第二财季业绩&#xff1a;整体营收达到10…

微信小程序H5 uniapp

最近微信小程序对有视频播放的审核严&#xff0c;需要提供“文娱类资质”。而申请这个资质比较繁琐。所以我们在小程序上用web-view做跳转到H5&#xff0c;H5使用uniapp编写。这是小程序关于web-view文档说明。https://developers.weixin.qq.com/miniprogram/dev/component/web…

硬件工程师基础能力课

第一课时--基本定理、电阻、电容等 首先了解下面几个概念&#xff0c;基尔霍夫定理&#xff1a;KCL & KVL&#xff0c;叠加定理&#xff0c;戴维南定理&#xff08;电压源等效&#xff09;和诺顿定理&#xff08;电流源等效&#xff09;、奈奎斯特采样定理。 上面说的这些东…

asp.net core EF Sqlserver

一、EF CORE的使用 1、使用NuGet来安装EF CORE 使用程序包管理器控制台&#xff0c;进行命令安装 //安装 Microsoft.EntityFrameworkCoreInstall-Package Microsoft.EntityFrameworkCore //安装 Microsoft.EntityFrameworkCore.SqlServer Install-Package Microsoft.EntityF…

产品运营的场景和运营策略

一、启动屏 1&#xff0e;概念 启动屏&#xff0c;特指 APP 产品启动时即显示的界面&#xff0c;这个界面一般会停留几秒钟时间&#xff0c;在这个时间内 APP 会在后台加载服务框架、启动各种服务 SDK 、获取用户地理位置、判断有无新版本、判断用户账户状态以及其他系统级别的…

微机原理_12

一、单项选择题(本大题共15小题,每小题3分&#xff0c;共45分。在每小题给出的四个备选项中&#xff0c;选出一个正确的答案。〕 十进制正数56的 8位二进制补码是()。 A. 00011001 B. 10100110 C. 10011001 D. 00100110 若栈顶的物理地址为20100H&#xff0c;当执行完指令PUSH…

qt 重载信号,使用““方式进行connect()调用解决方案

问题 在Qt中&#xff0c;重载的信号默认是无法使用&这种方式调用的。 因为&只能绑定到一个具体的信号&#xff0c;而重载的信号名称相同&#xff0c;编译器无法确定要绑定哪一个信号。 解决方案 如果非要使用&绑定重载的信号&#xff0c;可以使用函数指针进行转…

2023年道路运输企业主要负责人证考试题库及道路运输企业主要负责人试题解析

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2023年道路运输企业主要负责人证考试题库及道路运输企业主要负责人试题解析是安全生产模拟考试一点通结合&#xff08;安监局&#xff09;特种作业人员操作证考试大纲和&#xff08;质检局&#xff09;特种设备作业人…