Flink Sql Redis Connector

经常做开发的小伙伴肯定知道用flink连接redis的时候比较麻烦,更麻烦的是解析redis数据,如果rdis可以普通数据库那样用flink sql连接并且数据可以像表格那样展示出来就会非常方便。

历时多天,我终于把flink sql redis connector写出来了,并且已经测试过可以用sql解析数据,下面直接展示写好的代码和执行结果,完整的代码可以在我的github上面看:https://github.com/niuhu3/flink_sql_redis_connector.git

目前该connector已提交给flink,详见:[FLINK-35588] flink sql redis connector - ASF JIRA (apache.org)

希望大家可以帮忙点个fork和stars,后面会持续更新这个连接器,欢迎大家试用,试用的时候遇到什么问题也可以给我反馈,或者在社区反馈,有什么好的想法也可以联系我哦。

1.使用案例和讲解

1.读取数据案例

CREATE TABLE orders (`order_id` STRING,`price` STRING,`order_time` STRING,PRIMARY KEY(order_id) NOT ENFORCED
) WITH ('connector' = 'redis','mode' = 'single','single.host' = '192.168.10.101','single.port' = '6379','password' = 'xxxxxx','command' = 'hgetall','key' = 'orders'
);select * from orders

注:redis表必须定义主键,可以是单个主键,也可以是联合主键

以下为sql读取结果,直接将redis数据解析成我们需要的表格形式

2.写入数据案例

1. generate source data
CREATE TABLE order_source (`order_number` BIGINT,`price` DECIMAL(32,2),`order_time` TIMESTAMP(3),PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'number-of-rows' = '5',
'fields.order_number.min' = '1',
'fields.order_number.max' = '20',
'fields.price.min' = '1001',
'fields.price.max' = '1100'
);2. define redis sink table CREATE TABLE orders (`order_number` STRING,`price` STRING,`order_time` STRING,PRIMARY KEY(order_id) NOT ENFORCED
) WITH ('connector' = 'redis','mode' = 'single','single.host' = '192.168.10.101','single.port' = '6379','password' = 'xxxxxx','command' = 'hmset','key' = 'orders'
);3. insert data to redis sink table (cast data type to string)insert into redis_sinkselectcast(order_number as STRING) order_number,cast(price as STRING) price,cast(order_time as STRING) order_timefrom orders

redis表不会保存数据类型,所以在写入redis之前需要转成字符串类型,以下为写入redis数据的结果,redis的主键用 key + primary key + value 拼接而成,保证每条数据的唯一性,所以这也就要为什么redis table要定义主键

3.目前支持的功能 

1. 该connector目前支持多个写入和读取命令:

        读取:   get    hget     hgetall     hscan   lrange    smembers    zrange

        写入:   set   hset      hmset      lpush    rpush     sadd

2.针对最常用的hash类型数据支持模糊匹配,只输入表名可以查询整张表数据   

4. 连接参数说明

OptionRequiredDefaultTypeDescription
connectorrequirednoStringconnector name
moderequirednoStringredis cluster mode (single or cluster)
single.hostoptionalnoStringredis single mode machine host
single.portoptionalnointredis single mode running port
passwordoptionalnoStringredis database password
commandrequirednoStringredis write data or read data command
keyrequirednoStringredis key
expireoptionalnoIntset key ttl
fieldoptionalnoStringget a value with field when using hget command
cursoroptionalnoIntusing hscan command(e.g:1,2)
startoptional0Intread data when using lrange command
endoptional10Intread data when using lrange command
connection.max.wait-millsoptionalnoIntredis connection parameter
connection.timeout-msoptionalnoIntredis connection parameter
connection.max-totaloptionalnoIntredis connection parameter
connection.max-idleoptionalnoIntredis connection parameter
connection.test-on-borrowoptionalnoBooleanredis connection parameter
connection.test-on-returnoptionalnoBooleanredis connection parameter
connection.test-while-idleoptionalnoBooleanredis connection parameter
so.timeout-msoptionalnoIntredis connection parameter
max.attemptsoptionalnoIntredis connection parameter

2.动态读取和写入的工厂类

import org.apache.flink.common.RedisOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.sink.RedisDynamicTableSink;
import org.apache.flink.source.RedisDynamicTableSource;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;public class RedisSourceSinkFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory {private ReadableConfig options;public RedisSourceSinkFactory(){}public RedisSourceSinkFactory(ReadableConfig options){this.options = options;}//DynamicTableSourceFactory的实现方法,要用flink sql 读取数据需要实现这个接口@Overridepublic DynamicTableSource createDynamicTableSource(Context context) {FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);helper.validate();options = helper.getOptions();ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();List<Column> columns = schema.getColumns();ArrayList<String> columnNames = new ArrayList<>();columns.forEach(column -> columnNames.add(column.getName()));List<String> primaryKey = schema.getPrimaryKey().get().getColumns();return new RedisDynamicTableSource(options,columnNames,primaryKey);}/DynamicTableSinkFactory的实现方法,要用flink sql往redis中写数据这个也必须要实现@Overridepublic DynamicTableSink createDynamicTableSink(Context context) {FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);helper.validate();ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();List<Column> columns = schema.getColumns();ArrayList<String> columnNames = new ArrayList<>();columns.forEach(column -> columnNames.add(column.getName()));List<String> primaryKey = schema.getPrimaryKey().get().getColumns();ReadableConfig options = helper.getOptions();return new RedisDynamicTableSink(options,columnNames,primaryKey);}@Overridepublic String factoryIdentifier() {return "redis";}//sql connector 必填项@Overridepublic Set<ConfigOption<?>> requiredOptions() {HashSet<ConfigOption<?>> options = new HashSet<>();options.add(RedisOptions.PASSWORD);options.add(RedisOptions.KEY);options.add(RedisOptions.MODE);return options;}//sql connector 选填项@Overridepublic Set<ConfigOption<?>> optionalOptions() {HashSet<ConfigOption<?>> options = new HashSet<>();options.add(RedisOptions.SINGLE_HOST);options.add(RedisOptions.SINGLE_PORT);options.add(RedisOptions.CLUSTER_NODES);options.add(RedisOptions.FIELD);options.add(RedisOptions.CURSOR);options.add(RedisOptions.EXPIRE);options.add(RedisOptions.COMMAND);options.add(RedisOptions.START);options.add(RedisOptions.END);options.add(RedisOptions.CONNECTION_MAX_TOTAL);options.add(RedisOptions.CONNECTION_MAX_IDLE);options.add(RedisOptions.CONNECTION_TEST_WHILE_IDLE);options.add(RedisOptions.CONNECTION_TEST_ON_BORROW);options.add(RedisOptions.CONNECTION_TEST_ON_RETURN);options.add(RedisOptions.CONNECTION_TIMEOUT_MS);options.add(RedisOptions.TTL_SEC);options.add(RedisOptions.LOOKUP_ADDITIONAL_KEY);options.add(RedisOptions.LOOKUP_CACHE_MAX_ROWS);options.add(RedisOptions.LOOKUP_CACHE_TTL_SEC);return options;}

3. Redis Source 读取类

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.util.Preconditions;import java.util.List;public class RedisDynamicTableSource implements ScanTableSource {private ReadableConfig options;private List<String> primaryKey;private List<String> columns;public RedisDynamicTableSource(ReadableConfig options, List<String> columns, List<String> primaryKey) {this.options = Preconditions.checkNotNull(options);this.columns = Preconditions.checkNotNull(columns);this.primaryKey = Preconditions.checkNotNull(primaryKey);}@Overridepublic DynamicTableSource copy() {return new RedisDynamicTableSource(this.options, this.columns, this.primaryKey);}@Overridepublic String asSummaryString() {return "redis table source";}@Overridepublic ChangelogMode getChangelogMode() {return ChangelogMode.all();}@Overridepublic ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {RedisSourceFunction redisSourceFunction = new RedisSourceFunction(this.options, this.columns, this.primaryKey);return SourceFunctionProvider.of(redisSourceFunction,false);}
}

支持redis string, set ,zset ,hash数据的读取并解析成rowdata传入 flink

import org.apache.flink.common.RedisClusterMode;
import org.apache.flink.common.RedisCommandOptions;
import org.apache.flink.common.RedisOptions;
import org.apache.flink.common.RedisSplitSymbol;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.ScanResult;import java.util.*;public class RedisSourceFunction extends RichSourceFunction<RowData>{private static final Logger LOG = LoggerFactory.getLogger(RedisSourceFunction.class);private ReadableConfig options;private List<String> primaryKey;private List<String> columns;private Jedis jedis;private JedisCluster jedisCluster;private String value;private String field;private String[] fields;private String cursor;private Integer start;private Integer end;private String[] keySplit;private static int position = 1;private GenericRowData rowData;public RedisSourceFunction(ReadableConfig options, List<String> columns, List<String> primaryKey){this.options = Preconditions.checkNotNull(options);this.columns = Preconditions.checkNotNull(columns);this.primaryKey = Preconditions.checkNotNull(primaryKey);}@Overridepublic void run(SourceContext<RowData> ctx) throws Exception {String password = options.get(RedisOptions.PASSWORD);Preconditions.checkNotNull(password,"password is null,please set value for password");Integer expire = options.get(RedisOptions.EXPIRE);String key = options.get(RedisOptions.KEY);Preconditions.checkNotNull(key,"key is null,please set value for key");String[] keyArr = key.split(RedisSplitSymbol.CLUSTER_NODES_SPLIT);String command = options.get(RedisOptions.COMMAND);// judge if command is redis set data command and stop methodList<String> sourceCommand = Arrays.asList(RedisCommandOptions.SET, RedisCommandOptions.HSET, RedisCommandOptions.HMSET, RedisCommandOptions.LPUSH,RedisCommandOptions.RPUSH, RedisCommandOptions.SADD);if(sourceCommand.contains(command.toUpperCase())){ return;}Preconditions.checkNotNull(command,"command is null,please set value for command");String mode = options.get(RedisOptions.MODE);Preconditions.checkNotNull(command,"mode is null,please set value for mode");Integer maxIdle = options.get(RedisOptions.CONNECTION_MAX_IDLE);Integer maxTotal = options.get(RedisOptions.CONNECTION_MAX_TOTAL);Integer maxWaitMills = options.get(RedisOptions.CONNECTION_MAX_WAIT_MILLS);Boolean testOnBorrow = options.get(RedisOptions.CONNECTION_TEST_ON_BORROW);Boolean testOnReturn = options.get(RedisOptions.CONNECTION_TEST_ON_RETURN);Boolean testWhileIdle = options.get(RedisOptions.CONNECTION_TEST_WHILE_IDLE);if(mode.toUpperCase().equals(RedisClusterMode.SINGLE.name())){String host = options.get(RedisOptions.SINGLE_HOST);Integer port = options.get(RedisOptions.SINGLE_PORT);JedisPool jedisPool = RedisUtil.getSingleJedisPool(mode, host, port, maxTotal,maxIdle, maxWaitMills, testOnBorrow, testOnReturn, testWhileIdle);jedis = jedisPool.getResource();jedis.auth(password);switch (command.toUpperCase()){case RedisCommandOptions.GET:value = jedis.get(key);rowData = new GenericRowData(2);rowData.setField(0,BinaryStringData.fromString(key));rowData.setField(1,BinaryStringData.fromString(value));break;case RedisCommandOptions.HGET:field = options.get(RedisOptions.FIELD);value = jedis.hget(key, field);rowData = new GenericRowData(3);keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keyArr[2 * primaryKey.size()]));}rowData.setField(primaryKey.size(),BinaryStringData.fromString(value));break;case RedisCommandOptions.HGETALL:if (keyArr.length > 1){for (String str : keyArr) {rowData = new GenericRowData(columns.size());keySplit = str.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedis.hget(str, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}}else if(key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT).length == (primaryKey.size() * 2 + 1)){rowData = new GenericRowData(columns.size());keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedis.hget(key, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}else{//Fuzzy matching ,gets the data of the entire tableString fuzzyKey = new StringBuffer(key).append("*").toString();Set<String> keys = jedis.keys(fuzzyKey);for (String keyStr : keys) {keySplit = keyStr.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);rowData = new GenericRowData(columns.size());for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedis.hget(keyStr, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}}break;case RedisCommandOptions.HSCAN:cursor = options.get(RedisOptions.CURSOR);ScanResult<Map.Entry<String, String>> entries = jedis.hscan(key, cursor);List<Map.Entry<String, String>> result = entries.getResult();keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);rowData = new GenericRowData(columns.size());for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}position = primaryKey.size();for (int i = 0; i < result.size(); i++) {value = result.get(i).getValue();rowData.setField(position,BinaryStringData.fromString(value));position++;}break;case RedisCommandOptions.LRANGE:start = options.get(RedisOptions.START);end = options.get(RedisOptions.END);List<String> list = jedis.lrange(key, start, end);rowData = new GenericRowData(list.size() +1);rowData.setField(0,BinaryStringData.fromString(key));list.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;case RedisCommandOptions.SMEMBERS:Set<String> smembers = jedis.smembers(key);rowData = new GenericRowData(smembers.size() +1);rowData.setField(0,BinaryStringData.fromString(key));smembers.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;case RedisCommandOptions.ZRANGE:start = options.get(RedisOptions.START);end = options.get(RedisOptions.END);Set<String> sets = jedis.zrange(key, start, end);rowData = new GenericRowData(sets.size() +1);rowData.setField(0,BinaryStringData.fromString(key));sets.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;default:LOG.error("Cannot process such data type: {}", command);break;}if(!command.toUpperCase().equals(RedisCommandOptions.HGETALL)){ctx.collect(rowData);}}else if(mode.toUpperCase().equals(RedisClusterMode.CLUSTER.name())){String nodes = options.get(RedisOptions.CLUSTER_NODES);String[] hostAndPorts = nodes.split(RedisSplitSymbol.CLUSTER_NODES_SPLIT);String[] host = new String[hostAndPorts.length];int[] port = new int[hostAndPorts.length];for (int i = 0; i < hostAndPorts.length; i++) {String[] splits = hostAndPorts[i].split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);host[i] = splits[0];port[i] = Integer.parseInt(splits[1]);}Integer connTimeOut = options.get(RedisOptions.CONNECTION_TIMEOUT_MS);Integer soTimeOut = options.get(RedisOptions.SO_TIMEOUT_MS);Integer maxAttempts = options.get(RedisOptions.MAX_ATTEMPTS);jedisCluster = RedisUtil.getJedisCluster(mode, host, password, port, maxTotal,maxIdle, maxWaitMills, connTimeOut, soTimeOut, maxAttempts, testOnBorrow, testOnReturn, testWhileIdle);switch (command.toUpperCase()){case RedisCommandOptions.GET:value = jedisCluster.get(key);rowData = new GenericRowData(2);rowData.setField(0,BinaryStringData.fromString(key));rowData.setField(1,BinaryStringData.fromString(value));break;case RedisCommandOptions.HGET:field = options.get(RedisOptions.FIELD);value = jedisCluster.hget(key, field);rowData = new GenericRowData(3);keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keyArr[2 * primaryKey.size()]));}rowData.setField(primaryKey.size(),BinaryStringData.fromString(value));break;case RedisCommandOptions.HGETALL:if (keyArr.length > 1){for (String str : keyArr) {rowData = new GenericRowData(columns.size());keySplit = str.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedisCluster.hget(str, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}}else if(key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT).length == (primaryKey.size() * 2 + 1)){rowData = new GenericRowData(columns.size());keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedisCluster.hget(key, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}else{//Fuzzy matching ,gets the data of the entire tableString fuzzyKey = new StringBuffer(key).append("*").toString();Set<String> keys = jedisCluster.keys(fuzzyKey);for (String keyStr : keys) {keySplit = keyStr.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);rowData = new GenericRowData(columns.size());for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedisCluster.hget(keyStr, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}}break;case RedisCommandOptions.HSCAN:cursor = options.get(RedisOptions.CURSOR);ScanResult<Map.Entry<String, String>> entries = jedisCluster.hscan(key, cursor);List<Map.Entry<String, String>> result = entries.getResult();keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);rowData = new GenericRowData(columns.size());for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}position = primaryKey.size();for (int i = 0; i < result.size(); i++) {value = result.get(i).getValue();rowData.setField(position,BinaryStringData.fromString(value));position++;}break;case RedisCommandOptions.LRANGE:start = options.get(RedisOptions.START);end = options.get(RedisOptions.END);List<String> list = jedisCluster.lrange(key, start, end);rowData = new GenericRowData(list.size() +1);rowData.setField(0,BinaryStringData.fromString(key));list.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;case RedisCommandOptions.SMEMBERS:Set<String> smembers = jedisCluster.smembers(key);rowData = new GenericRowData(smembers.size() +1);rowData.setField(0,BinaryStringData.fromString(key));smembers.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;case RedisCommandOptions.ZRANGE:start = options.get(RedisOptions.START);end = options.get(RedisOptions.END);Set<String> sets = jedisCluster.zrange(key, start, end);rowData = new GenericRowData(sets.size() +1);rowData.setField(0,BinaryStringData.fromString(key));sets.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;default:LOG.error("Cannot process such data type: {}", command);break;}if(!command.toUpperCase().equals(RedisCommandOptions.HGETALL)){ctx.collect(rowData);}}else{LOG.error("Unsupport such {} mode",mode);}}@Overridepublic void cancel() {if(jedis != null){jedis.close();}if(jedisCluster != null){jedisCluster.close();}}
}

4. Redis sink 写入类

public class RedisDynamicTableSink implements DynamicTableSink {private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(RedisDynamicTableSink.class);private ReadableConfig options;private List<String> primaryKey;private List<String> columns;public RedisDynamicTableSink(ReadableConfig options, List<String> columns, List<String> primaryKey) {this.options = Preconditions.checkNotNull(options);this.columns = Preconditions.checkNotNull(columns);this.primaryKey = Preconditions.checkNotNull(primaryKey);}@Overridepublic ChangelogMode getChangelogMode(ChangelogMode changelogMode) {return ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).addContainedKind(RowKind.DELETE).addContainedKind(RowKind.UPDATE_BEFORE).addContainedKind(RowKind.UPDATE_AFTER).build();}@Overridepublic SinkRuntimeProvider getSinkRuntimeProvider(Context context) {RedisSinkFunction myRedisSinkFunction = new RedisSinkFunction(this.options,this.columns,this.primaryKey);return SinkFunctionProvider.of(myRedisSinkFunction);}@Overridepublic DynamicTableSink copy() {return new RedisDynamicTableSink(this.options,this.columns,this.primaryKey);}@Overridepublic String asSummaryString() {return "redis table sink";}
}
package org.apache.flink.sink;import org.apache.flink.common.RedisClusterMode;
import org.apache.flink.common.RedisCommandOptions;
import org.apache.flink.common.RedisOptions;
import org.apache.flink.common.RedisSplitSymbol;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;import java.util.List;public class RedisSinkFunction extends RichSinkFunction<RowData>{private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(RedisSinkFunction.class);private ReadableConfig options;private List<String> primaryKey;private List<String> columns;private String fields;private Jedis jedis;private JedisCluster jedisCluster;private String[] fieldsArr;private StringBuffer redisTableKey;private String value;public RedisSinkFunction(ReadableConfig options, List<String> columns, List<String> primaryKey){this.options = Preconditions.checkNotNull(options);this.columns = Preconditions.checkNotNull(columns);this.primaryKey = Preconditions.checkNotNull(primaryKey);}@Overridepublic void invoke(RowData rowData, Context context) throws Exception {String password = options.get(RedisOptions.PASSWORD);Preconditions.checkNotNull(password,"password is null,please set value for password");Integer expire = options.get(RedisOptions.EXPIRE);String key = options.get(RedisOptions.KEY);Preconditions.checkNotNull(key,"key is null,please set value for key");String command = options.get(RedisOptions.COMMAND);Preconditions.checkNotNull(command,"command is null,please set value for command");String mode = options.get(RedisOptions.MODE);Preconditions.checkNotNull(command,"mode is null,please set value for mode");Integer maxIdle = options.get(RedisOptions.CONNECTION_MAX_IDLE);Integer maxTotal = options.get(RedisOptions.CONNECTION_MAX_TOTAL);Integer maxWaitMills = options.get(RedisOptions.CONNECTION_MAX_WAIT_MILLS);Boolean testOnBorrow = options.get(RedisOptions.CONNECTION_TEST_ON_BORROW);Boolean testOnReturn = options.get(RedisOptions.CONNECTION_TEST_ON_RETURN);Boolean testWhileIdle = options.get(RedisOptions.CONNECTION_TEST_WHILE_IDLE);if (mode.toUpperCase().equals(RedisClusterMode.SINGLE.name())) {String host = options.get(RedisOptions.SINGLE_HOST);Integer port = options.get(RedisOptions.SINGLE_PORT);JedisPool jedisPool = RedisUtil.getSingleJedisPool(mode, host, port, maxTotal,maxIdle, maxWaitMills, testOnBorrow, testOnReturn, testWhileIdle);jedis = jedisPool.getResource();jedis.auth(password);switch (command.toUpperCase()){case RedisCommandOptions.SET:value = rowData.getString(0).toString();jedis.set(String.valueOf(key),String.valueOf(value));break;case RedisCommandOptions.HSET:String field = columns.get(1);//construct redis key:table_name:primary key col name: primary key valueredisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {if(primaryKey.size() <= 1){redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());break;}else{redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());}redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);}value = rowData.getString(1).toString();jedis.hset(String.valueOf(redisTableKey),String.valueOf(field),String.valueOf(value));case RedisCommandOptions.HMSET://construct redis key:table_name:primary key col name: primary key valueredisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {if(primaryKey.size() <= 1){redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());break;}else{redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());}if (i != primaryKey.size() -1){redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);}}for (int i = 1; i < columns.size(); i++) {if (!primaryKey.contains(columns.get(i))){value = rowData.getString(i).toString();jedis.hset(String.valueOf(redisTableKey),String.valueOf(columns.get(i)),String.valueOf(value));}}break;case RedisCommandOptions.LPUSH:value = rowData.getString(0).toString();jedis.lpush(key,value);break;case RedisCommandOptions.RPUSH:value = rowData.getString(0).toString();jedis.rpush(key,value);break;case RedisCommandOptions.SADD:value = rowData.getString(0).toString();jedis.sadd(key,value);break;default:LOG.error("Cannot process such data type: {}", command);break;}}else if(mode.toUpperCase().equals(RedisClusterMode.CLUSTER.name())){String nodes = options.get(RedisOptions.CLUSTER_NODES);String[] hostAndPorts = nodes.split(RedisSplitSymbol.CLUSTER_NODES_SPLIT);String[] host = new String[hostAndPorts.length];int[] port = new int[hostAndPorts.length];for (int i = 0; i < hostAndPorts.length; i++) {String[] splits = hostAndPorts[i].split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);host[i] = splits[0];port[i] = Integer.parseInt(splits[1]);}Integer connTimeOut = options.get(RedisOptions.CONNECTION_TIMEOUT_MS);Integer soTimeOut = options.get(RedisOptions.SO_TIMEOUT_MS);Integer maxAttempts = options.get(RedisOptions.MAX_ATTEMPTS);jedisCluster = RedisUtil.getJedisCluster(mode, host, password, port, maxTotal,maxIdle, maxWaitMills, connTimeOut, soTimeOut, maxAttempts, testOnBorrow, testOnReturn, testWhileIdle);switch (command.toUpperCase()){case RedisCommandOptions.SET:value = rowData.getString(0).toString();jedisCluster.set(String.valueOf(key),String.valueOf(value));break;case RedisCommandOptions.HSET:String field = columns.get(1);//construct redis key:table_name:primary key col name: primary key valueredisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {if(primaryKey.size() <= 1){redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());break;}else{redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());}redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);}value = rowData.getString(1).toString();jedisCluster.hset(String.valueOf(redisTableKey),String.valueOf(field),String.valueOf(value));case RedisCommandOptions.HMSET://construct redis key:table_name:primary key col name: primary key valueredisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {if(primaryKey.size() <= 1){redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());break;}else{redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());}redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);}for (int i = 1; i < columns.size(); i++) {value = rowData.getString(i).toString();jedisCluster.hset(String.valueOf(redisTableKey),String.valueOf(columns.get(i)),String.valueOf(value));}break;case RedisCommandOptions.LPUSH:value = rowData.getString(0).toString();jedisCluster.lpush(key,value);break;case RedisCommandOptions.RPUSH:value = rowData.getString(0).toString();jedisCluster.rpush(key,value);break;case RedisCommandOptions.SADD:value = rowData.getString(0).toString();jedisCluster.sadd(key,value);break;default:LOG.error("Cannot process such data type: {}", command);break;}}else{LOG.error("Unsupport such {} mode",mode);}}@Overridepublic void close() throws Exception {if(jedis != null){jedis.close();}if(jedisCluster != null){jedisCluster.close();}}
}

对以上代码不理解为啥这样写的,可以参考我的上一篇帖子:

Flink Sql-用户自定义 Sources & Sinks_source表和sink表-CSDN博客

 最后再次希望大家可以去github或者社区支持一下,让这个连接器可以正式开源

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/30828.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C语言】手写学生管理系统丨附源码+教程

最近感觉大家好多在忙C语言课设~ 我来贡献一下&#xff0c;如果对你有帮助的话谢谢大家的点赞收藏喔&#xff01; 1. 项目分析 小白的神级项目&#xff0c;99%的程序员&#xff0c;都做过这个项目&#xff01; 掌握这个项目&#xff0c;就基本掌握 C 语言了&#xff01; 跳…

idea http client GET 请求 报503错误

idea 提供的 http client 插件&#xff0c;在 GET 请求时总是 报503 的错误&#xff0c;但请求URL可以在浏览器中正常访问。 GET localhost:8080/student Response file saved. > 2024-06-20T160906.503.html 有一种原因跟本地配置的代理有关&#xff0c;如下图。如果在…

大模型应用场景在哪?探索人工智能的无限可能

随着人工智能技术的飞速发展&#xff0c;大模型在自然语言处理、计算机视觉、推荐系统等领域取得了显著成果。这些大模型&#xff0c;如OpenAI的GPT-3、谷歌的BERT、百度的ERNIE等&#xff0c;不仅在学术界引起了巨大反响&#xff0c;也在产业界得到了广泛应用。本文将以大模型…

sqlcoder实践

背景 Defog llama-3 意义 翻译自然语言到sql&#xff0c;类似脑机接口&#xff0c;大模型重要应用领域 sql是数据库查询标准;关系数据库&#xff0c;工具(datax,sqoop&#xff0c;logstash,hive)&#xff0c;非关系数据库&#xff08;MongoDB&#xff0c;图数据库&#xff…

上新:NFTScan 正式上线 Bitcoin-brc20 浏览器!

近日&#xff0c;NFTScan 团队正式对外发布了 Bitcoin-brc20 浏览器&#xff0c;将为 Bitcoin 生态的 NFT 开发者和用户提供简洁高效的 NFT 数据搜索查询服务。作为比特币生态中最火热的标准之一&#xff0c;brc20 也吸引着广泛的关注。洞悉其巨大潜力&#xff0c;NFTScan 对 b…

协同编辑:只是在线协作这么简单吗?揭秘协同编辑的深层价值

经常很多朋友咨询&#xff0c;无忧企业文档是否支持协同编辑&#xff0c;首先肯定是支持的。但是&#xff0c;我发现很多人对于“协同编辑”的理解可能比较表面&#xff0c;仅仅停留在多人同时编辑一份文档的层面。实际上&#xff0c;协同编辑的功能远不止于此&#xff0c;它更…

两个方法,批量替换PPT中的字体

经常制作ppt的朋友可能会遇到需要批量替换字体的情况&#xff0c;如果我们想要更换ppt中的字体&#xff0c;今天分享PPT批量替换字体的两个方法。 方法一&#xff1a; 找到功能栏中的编辑选项卡&#xff0c;点击替换 – 替换字体&#xff0c;在里面选择我们想要替换的字体就可…

通过MindSpore API实现深度学习模型

快速入门 将相应的包逐一导入到项目中&#xff0c;这是制作项目的第一步。 import mindspore from mindspore import nn from mindspore.dataset import vision, transforms from mindspore.dataset import MnistDataset 处理数据集 先从网上下载对应的数据集文件,MindSpor…

《C++ Primer》导学系列:第 6 章 - 函数

6.1 函数基础 6.1.1 基本概念 函数是C程序的基本组成单元&#xff0c;用于将代码组织成可以复用的模块。函数通过函数名进行调用&#xff0c;并且可以接受参数和返回值。函数的定义包括函数头和函数体&#xff0c;其中函数头描述了函数的接口&#xff0c;函数体包含了具体的实…

最新OPPO 真我手机 一加手机 使用adb命令永久关闭系统更新教程

使用adb命令永久关闭系统更新 一、先了解手机系统二、Android 11 以下使用adb 命令永久关闭系统更新1、adb 官方下载2、小白开启 USB 调试模式教程&#xff08;熟手跳过&#xff09;三、Android 12 以上使用adb 命令永久关闭系统更新什么您还是不会弄&#xff01;赞赏我&#x…

MYSQL 四、mysql进阶 3(存储引擎)

mysql中表使用了不同的存储引擎也就决定了我们底层文件系统中文件的相关物理结构。 为了管理方便&#xff0c;人们把连接管理、语法解析、查询优化这些并不涉及真实数据存储的功能划分为 Mysql Server的功能&#xff0c;把真实存取数据的功能划分为存储引擎的功能&…

systemd的实现原理

systemd是现代Linux系统中的初始化系统和服务器管理器&#xff0c;而systemctl是用于与systemd交互的命令行工具。 systemd是一个守护进程&#xff0c;systemctl是命令行管理工具&#xff1a;systemd是用于管理Linux系统的初始化过程和后台服务的初始化系统&#xff0c;而syst…

Windows10 + fydeOS双系统!简单几步完成

前言 最近发现小伙伴对于fydeOS热情是真的不减&#xff0c;啧啧啧……今天闲来无事&#xff0c;就来讲讲双系统Windows10 fydeOS的安装方法吧&#xff01; Windows10 FydeOS双系统安装过程其实很简单&#xff0c;不过要建议先安装好Windows10系统。 虽然先安装好fydeOS之后…

SpringBootWeb 篇-入门了解 Vue 前端工程的创建与基本使用

&#x1f525;博客主页&#xff1a; 【小扳_-CSDN博客】 ❤感谢大家点赞&#x1f44d;收藏⭐评论✍ 文章目录 1.0 基于脚手架创建前端工程 1.1 基于 Vue 开发前端项目的环境要求 1.2 前端工程创建的方式 1.2.1 基于命令的方式来创建前端工程 1.2.2 使用图形化来创建前端工程 1.…

如何建立私域流量?私域流量怎么运营,一文读懂

当全网都在讨论私域流量&#xff0c;你是不是也有很多问号呢&#xff1f; 互联网高速发达&#xff0c;消费形式日新月异&#xff0c;跟不上时代就会被时代淘汰&#xff0c;接下来&#xff0c;我们就从3个层面深度讨论下私域流量究竟是什么&#xff1f;为什么要玩转私域流量&am…

【保姆级教程】Linux 基于 Docker 部署 MySQL 和 Nacos 并配置两者连接

一、Linux 部署 Docker 1.1 卸载旧版本&#xff08;如有&#xff09; sudo yum remove docker \docker-client \docker-client-latest \docker-common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-engine1.2 安装 yum-utils 包 sudo yum install -y…

微信多开器

由于微信的限制&#xff0c;我们平时只能登录一个微信&#xff0c;要登录多个微信一般需要多台手机&#xff0c;很显然这种方法很费手机&#xff01;&#xff01;一个微信多开神器可以给你省下好几台手机钱&#xff0c;抓紧拉下来放手机里落灰http://www.xbydon.online/?p132 …

NetSuite 审批工作流与事务处理类型的限制关系

在最近的实践中&#xff0c;用户提出可否对Credit Memo与Vendor Prepayment Application两种事务处理类型进行审批参与&#xff0c;当提出来的时候我们并没有直接在系统中进行测试&#xff0c;而是以常规事务处理的角度认为可以满足客户的需求&#xff1b; 但在沙盒环境中讨论…

RocketMQ快速入门:如何保证消息不丢失|保证消息可靠性(九)

0. 引言 在金融、电商等对数据完整性要求极高的行业&#xff0c;消息的丢失可能会导致数据不一致&#xff0c;严重影响业务逻辑和数据统计&#xff0c;也影响客户体验&#xff0c;所以在很多业务场景下&#xff0c;我们都要求数据不能丢失。而rocketmq中&#xff0c;如何对消息…

当游戏遭遇安全问题,我们应该怎么做?

在游戏安全领域&#xff0c;专业性最差、但最常见的案例类型是DDoS攻击&#xff08;分布式拒绝服务攻击&#xff09;。出于它的特性&#xff0c;中小厂商、独立开发者较容易遭受这类攻击。 例如&#xff0c;今年2月29日上线的手游《雷索纳斯》就遭受了名为ACCN组织发起的DDoS攻…