大纲
- 新建工程
- 编码
- Pom.xml
- 自定义无界流
- 分流
- 测试
- 工程代码
在之前的案例中,我们一直使用的是单个Sink来做数据的输出。实际上,Flink是支持多个输出流的。本文我们就来讲解如何在Flink数据输出时做分流处理。
我们将基于《Java版Flink使用指南——自定义无界流生成器》的输入流,按生成数字的奇偶性,将其分流输出到不同的RabbitMQ队列中。
新建工程
我们新建一个名字叫MultiSinkTo的工程。
Archetype:org.apache.flink:flink-quickstart-java
版本:1.19.1
编码
Pom.xml
因为我们要往RabbitMQ中输出,所以需要引入相关连接组件。
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version></dependency>
自定义无界流
新建src/main/java/org/example/generator/UnBoundedStreamGenerator.java
这块的代码可以见《Java版Flink使用指南——自定义无界流生成器》
它会每隔1秒钟生成一个递增的数字
package org.example.generator;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;public class UnBoundedStreamGenerator extends RichSourceFunction<Long> {private volatile boolean isRunning = true;@Overridepublic void run(SourceContext<Long> ctx) throws Exception {long count = 0L;while (isRunning) {Thread.sleep(1000); // Simulate delayctx.collect(count++); // Emit data}}@Overridepublic void cancel() {isRunning = false;System.out.println("UnBoundedStreamGenerator canceled");}
}
分流
我们通过下面的代码生成数据流
DataStreamSource<Long> longDataStreamSource = env.addSource(new UnBoundedStreamGenerator());
然后奇数发布到odd.data.to.rbtmq队列;偶数发布到even.data.to.rbtmq。
分流主要是通过filter来区分数据,然后针对不同的数据addSink来发布到不同的队列。
如果不需要区分数据,只是将相同的数据发布到不同的目的地,则可以直接多次addSink来达成。
String host = "172.25.103.252"; // IP of the rabbitmq serverint port = 5672;String username = "admin";String password = "fangliang";String virtualHost = "/";RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();int parallelism = 1;String oddSinkQueueName = "odd.data.to.rbtmq"; RMQSink<String> oddRMQSink = new RMQSink<>(rmqConnectionConfig, oddSinkQueueName, new SimpleStringSchema());longDataStreamSource.filter(value -> value % 2 != 0).map(Object::toString).addSink(oddRMQSink).setParallelism(parallelism).name("oddSink");String evenSinkQueueName = "even.data.to.rbtmq";RMQSink<String> evenRMQSink = new RMQSink<>(rmqConnectionConfig, evenSinkQueueName, new SimpleStringSchema());longDataStreamSource.filter(value -> value % 2 == 0).map(Object::toString).addSink(evenRMQSink).setParallelism(parallelism).name("evenSink");
测试
执行一段时间后,我们看到两个队列相序增加
奇数队列
偶数队列
工程代码
https://github.com/f304646673/FlinkDemo