一. 背景
FLINK 任务从一个数据源读取数据, 写入多个sink端.
二. 官方实例
写入多个Sink语句时,需要以BEGIN STATEMENT SET;开头,以END;结尾。
--源表
CREATE TEMPORARY TABLE datagen_source (name VARCHAR,score BIGINT
) WITH ('connector' = 'datagen'
);--结果表A
CREATE TEMPORARY TABLE blackhole_sinkA(name VARCHAR,score BIGINT
) WITH ('connector' = 'blackhole'
);--结果表B
CREATE TEMPORARY TABLE blackhole_sinkB(name VARCHAR,score BIGINT
) WITH ('connector' = 'blackhole'
);--DML
BEGIN STATEMENT SET; --写入多个Sink时,必填。
INSERT INTO blackhole_sinkA SELECT UPPER(name), sum(score) FROM datagen_source GROUP BY UPPER(name);
INSERT INTO blackhole_sinkB SELECT LOWER(name), max(score) FROM datagen_source GROUP BY LOWER(name);
END; --写入多个Sink时,必填。
三. 实操
3.1. 启动Standlone集群
进入到flink引擎包目录, 启动Standlone模式.
./bin/start-cluster.sh
3.2. 启动flink sql-client.
./bin/sql-client.sh embedded
3.3. 执行sql
Flink SQL> CREATE TEMPORARY TABLE datagen_source (
> name VARCHAR,
> score BIGINT
> ) WITH (
> 'connector' = 'datagen'
> );
[INFO] Execute statement succeed.Flink SQL> CREATE TEMPORARY TABLE blackhole_sinkA(
> name VARCHAR,
> score BIGINT
> ) WITH (
> 'connector' = 'blackhole'
> );
>
[INFO] Execute statement succeed.Flink SQL> CREATE TEMPORARY TABLE blackhole_sinkB(
> name VARCHAR,
> score BIGINT
> ) WITH (
> 'connector' = 'blackhole'
> );
>
[INFO] Execute statement succeed.Flink SQL> BEGIN STATEMENT SET;
[INFO] Begin a statement set.Flink SQL> INSERT INTO blackhole_sinkA
> SELECT UPPER(name), sum(score)
> FROM datagen_source
> GROUP BY UPPER(name);
[INFO] Add SQL update statement to the statement set.Flink SQL> INSERT INTO blackhole_sinkB
> SELECT LOWER(name), max(score)
> FROM datagen_source
> GROUP BY LOWER(name);
[INFO] Add SQL update statement to the statement set.Flink SQL> END;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 37a1390129c356374601a267cb8080b6
3.4. 查看flink ui
查看flink ui页面,验证结论.
http://master01:8081/#/job/37a1390129c356374601a267cb8080b6/overview