Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
-
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。 -
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 -
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。 -
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。 -
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
文章目录
- Flink 系列文章
- 一、maven依赖
- 二、redis环境及相关内容说明
- 三、redis作为Flink的source异步数据交互示例
- 1、maven依赖
- 2、redis异步交互数据实现
- 1)、读取redis数据时以string进行输出
- 2)、读取redis数据时以pojo进行输出
- 3、使用示例
- 4、验证
- 1)、准备redis环境数据
- 2)、启动应用程序,并观察控制台输出
本文主要介绍Flink 的redis作为数据源的异步读取使用示例。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
本文依赖redis环境好用。
本专题分为以下几篇文章:
【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(1) - File、Socket、Collection
【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(2)- 自定义、mysql
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(3)- kafka
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(4)- redis -异步读取
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(5)- clickhouse
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例 - 完整版
一、maven依赖
本文依赖见【flink番外篇】3、flink的source介绍及示例(1)- File、Socket、Collection,不再赘述。
如果有新增的maven依赖,则会在示例时加以说明,避免篇幅的过大。
二、redis环境及相关内容说明
1、本示例是需要redis环境的,至于redis是集群或单击和本示例关系不大。
关于redis的更多信息请参考其官网。
2、本示例是以redis作为外部数据进行异步交互的例子,也是实际中应用中常见的例子。关于异步数据交互参考文章:55、Flink之用于外部数据访问的异步 I/O
三、redis作为Flink的source异步数据交互示例
本示例是模拟根据外部数据用户姓名查询redis中用户的个人信息。
本示例外部数据就以flink的集合作为示例,redis数据中存储的为hash表,下面验证中会有具体展示。
1、maven依赖
<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.12</artifactId><version>1.1.0</version><exclusions><exclusion><artifactId>flink-streaming-java_2.12</artifactId><groupId>org.apache.flink</groupId></exclusion><exclusion><artifactId>flink-runtime_2.12</artifactId><groupId>org.apache.flink</groupId></exclusion><exclusion><artifactId>flink-core</artifactId><groupId>org.apache.flink</groupId></exclusion><exclusion><artifactId>flink-java</artifactId><groupId>org.apache.flink</groupId></exclusion></exclusions>
</dependency>
2、redis异步交互数据实现
1)、读取redis数据时以string进行输出
package org.datastreamapi.source.custom.redis;import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;import com.sun.jdi.IntegerValue;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;/*** @author alanchan**/
public class CustomRedisSource extends RichAsyncFunction<String, String> {private JedisPoolConfig config = null;private static String ADDR = "192.168.10.41";private static int PORT = 6379;// 等待可用连接的最大时间,单位是毫秒,默认是-1,表示永不超时private static int TIMEOUT = 10000;private JedisPool jedisPool = null;private Jedis jedis = null;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);config = new JedisPoolConfig();jedisPool = new JedisPool(config, ADDR, PORT, TIMEOUT);jedis = jedisPool.getResource();}@Overridepublic void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {// 文件中读取的内容System.out.println("输入参数input----:" + input);// 发起一个异步请求,返回结果CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {String[] arrayData = input.split(",");String name = arrayData[1];String value = jedis.hget("AsyncReadUser_Redis", name);System.out.println("查询结果output----:" + value);return value;}}).thenAccept((String dbResult) -> {// 设置请求完成时的回调,将结果返回resultFuture.complete(Collections.singleton(dbResult));});}// 连接超时的时候调用的方法@Overridepublic void timeout(String input, ResultFuture<String> resultFuture) throws Exception {System.out.println("redis connect timeout!");}@Overridepublic void close() throws Exception {super.close();if (jedis.isConnected()) {jedis.close();}}@Data@AllArgsConstructor@NoArgsConstructorstatic class User {private int id;private String name;private int age;private double balance;User(String value) {String[] str = value.split(",");this.setId(Integer.valueOf(str[0]));this.setName(str[1]);this.setAge(Integer.valueOf(str[2]));this.setBalance(Double.valueOf(str[0]));}}}
2)、读取redis数据时以pojo进行输出
package org.datastreamapi.source.custom.redis;import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.datastreamapi.source.custom.redis.CustomRedisSource.User;import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;/*** @author alanchan**/
public class CustomRedisSource2 extends RichAsyncFunction<String, User> {private JedisPoolConfig config = null;private static String ADDR = "192.168.10.41";private static int PORT = 6379;// 等待可用连接的最大时间,单位是毫秒,默认是-1,表示永不超时private static int TIMEOUT = 10000;private JedisPool jedisPool = null;private Jedis jedis = null;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);config = new JedisPoolConfig();jedisPool = new JedisPool(config, ADDR, PORT, TIMEOUT);jedis = jedisPool.getResource();}@Overridepublic void asyncInvoke(String input, ResultFuture<User> resultFuture) throws Exception {System.out.println("输入查询条件:" + input);CompletableFuture.supplyAsync(new Supplier<User>() {@Overridepublic User get() {String[] arrayData = input.split(",");String name = arrayData[1];String value = jedis.hget("AsyncReadUser_Redis", name);System.out.println("查询redis结果:" + value);return new User(value);}}).thenAccept((User dbResult) -> {// 设置请求完成时的回调,将结果返回resultFuture.complete(Collections.singleton(dbResult));});}// 连接超时的时候调用的方法@Overridepublic void timeout(String input, ResultFuture<User> resultFuture) throws Exception {System.out.println("redis connect timeout!");}@Overridepublic void close() throws Exception {super.close();if (jedis.isConnected()) {jedis.close();}}}
3、使用示例
package org.datastreamapi.source.custom.redis;import java.util.concurrent.TimeUnit;import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.source.custom.redis.CustomRedisSource.User;/*** @author alanchan**/
public class TestCustomRedisSourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// id,nameDataStreamSource<String> lines = env.fromElements("1,alan", "2,alanchan", "3,alanchanchn", "4,alan_chan", "5,alan_chan_chn");SingleOutputStreamOperator<String> result = AsyncDataStream.orderedWait(lines, new CustomRedisSource(), 10, TimeUnit.SECONDS, 1);SingleOutputStreamOperator<User> result2 = AsyncDataStream.orderedWait(lines, new CustomRedisSource2(), 10, TimeUnit.SECONDS, 1);result.print("result-->").setParallelism(1);result2.print("result2-->").setParallelism(1);env.execute();}
}
4、验证
1)、准备redis环境数据
hset AsyncReadUser_Redis alan '1,alan,18,20,alan.chan.chn@163.com'
hset AsyncReadUser_Redis alanchan '2,alanchan,19,25,alan.chan.chn@163.com'
hset AsyncReadUser_Redis alanchanchn '3,alanchanchn,20,30,alan.chan.chn@163.com'
hset AsyncReadUser_Redis alan_chan '4,alan_chan,27,20,alan.chan.chn@163.com'
hset AsyncReadUser_Redis alan_chan_chn '5,alan_chan_chn,36,10,alan.chan.chn@163.com'127.0.0.1:6379> hset AsyncReadUser_Redis alan '1,alan,18,20,alan.chan.chn@163.com'
(integer) 1
127.0.0.1:6379> hset AsyncReadUser_Redis alanchan '2,alanchan,19,25,alan.chan.chn@163.com'
(integer) 1
127.0.0.1:6379> hset AsyncReadUser_Redis alanchanchn '3,alanchanchn,20,30,alan.chan.chn@163.com'
(integer) 1
127.0.0.1:6379> hset AsyncReadUser_Redis alan_chan '4,alan_chan,27,20,alan.chan.chn@163.com'
(integer) 1
127.0.0.1:6379> hset AsyncReadUser_Redis alan_chan_chn '5,alan_chan_chn,36,10,alan.chan.chn@163.com'
(integer) 1
127.0.0.1:6379> hgetall AsyncReadUser_Redis1) "alan"2) "1,alan,18,20,alan.chan.chn@163.com"3) "alanchan"4) "2,alanchan,19,25,alan.chan.chn@163.com"5) "alanchanchn"6) "3,alanchanchn,20,30,alan.chan.chn@163.com"7) "alan_chan"8) "4,alan_chan,27,20,alan.chan.chn@163.com"9) "alan_chan_chn"
10) "5,alan_chan_chn,36,10,alan.chan.chn@163.com"
2)、启动应用程序,并观察控制台输出
输入查询条件:5,alan_chan_chn
输入参数input----:2,alanchan
输入参数input----:5,alan_chan_chn
输入查询条件:3,alanchanchn
输入查询条件:1,alan
输入参数input----:1,alan
输入查询条件:2,alanchan
输入查询条件:4,alan_chan
输入参数input----:4,alan_chan
输入参数input----:3,alanchanchn
查询结果output----:3,alanchanchn,20,30,alan.chan.chn@163.com
查询redis结果:1,alan,18,20,alan.chan.chn@163.com
查询结果output----:1,alan,18,20,alan.chan.chn@163.com
查询redis结果:4,alan_chan,27,20,alan.chan.chn@163.com
查询redis结果:2,alanchan,19,25,alan.chan.chn@163.com
查询结果output----:2,alanchan,19,25,alan.chan.chn@163.com
查询redis结果:3,alanchanchn,20,30,alan.chan.chn@163.com
查询结果output----:4,alan_chan,27,20,alan.chan.chn@163.com
查询结果output----:5,alan_chan_chn,36,10,alan.chan.chn@163.com
查询redis结果:5,alan_chan_chn,36,10,alan.chan.chn@163.com
result-->> 4,alan_chan,27,20,alan.chan.chn@163.com
result-->> 5,alan_chan_chn,36,10,alan.chan.chn@163.com
result-->> 3,alanchanchn,20,30,alan.chan.chn@163.com
result-->> 2,alanchan,19,25,alan.chan.chn@163.com
result-->> 1,alan,18,20,alan.chan.chn@163.com
result2-->> CustomRedisSource.User(id=4, name=alan_chan, age=27, balance=4.0)
result2-->> CustomRedisSource.User(id=1, name=alan, age=18, balance=1.0)
result2-->> CustomRedisSource.User(id=3, name=alanchanchn, age=20, balance=3.0)
result2-->> CustomRedisSource.User(id=5, name=alan_chan_chn, age=36, balance=5.0)
result2-->> CustomRedisSource.User(id=2, name=alanchan, age=19, balance=2.0)
以上,本文主要介绍Flink 的redis作为数据源的异步读取使用示例。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本专题分为以下几篇文章:
【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(1) - File、Socket、Collection
【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(2)- 自定义、mysql
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(3)- kafka
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(4)- redis -异步读取
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(5)- clickhouse
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例 - 完整版