【自定义Source、Sink】Flink自定义Source、Sink对redis进行读写操作

使用ParameterTool读取配置文件

Flink读取参数的对象

  1. Commons-cli: Apache提供的,需要引入依赖
  2. ParameterTool:Flink内置

ParameterTool 比 Commons-cli 使用上简便;

ParameterTool能避免Jar包的依赖冲突

建议使用第二种

使用ParameterTool对象可以直接获取配置文件中的信息,需要如下依赖

        <!-- Flink基础依赖 【ParameterTool类 在该依赖中】 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId></dependency><!-- Flink流批处理依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId></dependency>

Java读取资源的方式

  1. Class.getResourceAsStream(Path):Path 必须以 “/”,表示从ClassPath的根路径读取资源
  2. Class.getClassLoader().getResourceAsStream(Path):Path 无须以 “/”,默认从ClassPath的根路径读取资源

推荐使用第2种,以类加载器的方式获取静态资源文件,不要通过ClassPath的相对路径查找

最基本的工具类

public class ParameterUtil {// 创建 ParameterTool 对象public static ParameterTool getParameters() {// 读取 resources 文件夹下 "flink.properties" 文件InputStream inputStream = ParameterUtil.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG);try {return ParameterTool.fromPropertiesFile(inputStream);} catch (Exception e) {throw new FlinkPropertiesException(FlinkPropertiesExceptionInfo.PROPERTIES_NULL);}}
}

image-20231209095849541

可以通过 ParameterUtil.getParameters().get("redis.port") 直接读取key对应的value值

Flink写入Redis方式

  1. 继承RichSinkFunction (Flink-Stream)
  2. 使用第3方的包 (Apache-Bahir-Flink)

Apache-Bahir-Flink 的 Redis-Connector的缺点:

  1. 使用Jedis, 没有使用Lettuce
  2. 没有对 Flink Table/SQL Api 的支持

不少基于bahir二开的例子解决了上述问题

gitee地址:https://gitee.com/jeff-zou/flink-connector-redis?_from=gitee_search

github地址:https://github.com/apache/bahir-flink

bahir 集成了许多连接器,其中就包含Redis

image-20231209103659812

Flink官网上也可以看到bahir的影子

image-20231209104014483

方便起见,接下来就基于bahir,Flink写入Redis集群

基于巴希尔(Bahir)-Flink写入Redis集群

引入connector连接器依赖

        <!-- Flink-Connector-Redis --><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_${scala.binary.version}</artifactId></dependency>

依赖版本定义在父模块中

image-20231209100449996

实现RedisMapper接口自定义Sink

首先实现RedisMapper接口并指定泛型——处理元素的类型

/*** 基于apache bachir flink的RedisSink,作用于Redis String数据类型*/
public class RedisSinkByBahirWithString implements RedisMapper<Tuple2<String, String>> {/*** 指定Redis的命令*/@Overridepublic RedisCommandDescription getCommandDescription() {/* ************************ 如果Redis的数据类型是 hash 或 z-Set* RedisCommandDescription 的构造方法必须传入 additionalKey* additionalKey就是Redis的键** *********************/return new RedisCommandDescription(RedisCommand.SET);}/*** 从数据流里获取Key值*/@Overridepublic String getKeyFromData(Tuple2<String, String> input) {return input.f0;}/*** 从数据流里获取Value值*/@Overridepublic String getValueFromData(Tuple2<String, String> input) {return input.f1;}
}

写入Redis工具类

public class RedisWriteUtil {/* ************************ FlinkJedisClusterConfig:集群模式* FlinkJedisPoolConfig:单机模式* FlinkJedisSentinelConfig:哨兵模式** *********************/// Jedis配置private static final FlinkJedisClusterConfig JEDIS_CONF;static {ParameterTool parameterTool = ParameterUtil.getParameters();String host = parameterTool.get("redis.host");String port = parameterTool.get("redis.port");/* ************************ InetSocketAddress 是Java的套接字** *********************/InetSocketAddress inetSocketAddress = new InetSocketAddress(host, Integer.parseInt(port));Set<InetSocketAddress> set = new HashSet<>();set.add(inetSocketAddress);JEDIS_CONF = new FlinkJedisClusterConfig.Builder().setNodes(set).build();}/*** 基于Bahir写入Redis,Redis的数据是String类型*/public static void writeByBahirWithString(DataStream<Tuple2<String, String>> input) {input.addSink(new RedisSink<>(JEDIS_CONF, new RedisSinkByBahirWithString()));}}

测试一下

class RedisWriteUtilTest {@DisplayName("测试基于Bahir写入Redis,Redis数据类型是String类型")@Testvoid writeByBahirWithString() throws Exception {LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();DataStreamSource<Tuple2<String, String>> dataStream = env.fromElements(Tuple2.of("k", "v"));RedisWriteUtil.writeByBahirWithString(dataStream);env.execute();}
}

非常完美!写入成功

image-20231209105850707

Flink读取Redis方式

  1. 继承RichSourceFunction (实现自定义Source)
  2. 继承RichParallelSourceFunction (实现自定义Source)【可以指定并行度】
  3. 实现SourceFunction接口 (实现自定义Source)

RichParallelSourceFunction 和 RichSourceFunction区别

RichParallelSourceFunction 可以设置并行度

RichParallelSourceFunction 和 RichSourceFunction 代码是可以互相套用

RichParallelSourceFunction 默认的并行度是cpu 的 核心数(core数)

RichSourceFunction 的并行度只能是1

继承RichSourceFunction类-Flink读取Redis集群

前置准备

定义枚举类

Redis数据类型枚举类

@Getter
public enum RedisDataType {STRING,HASH,LIST,SET,SORTED_SET,;RedisDataType() {}
}

定义Redis命令的枚举类,便于Source判断操作

@Getter
public enum RedisCommand {// get stringGET(RedisDataType.STRING);private final RedisDataType redisDataType;RedisCommand(RedisDataType redisDataType) {this.redisDataType = redisDataType;}
}

Jedis配置类

bahir依赖中自带jedis依赖一般不用,自行引入jedis,jedis依赖版本要与巴希尔中jedis版本保持一致

image-20231209111800457

public class JedisConf {public static JedisCluster getJedisCluster() throws IOException {ParameterTool parameterTool =ParameterUtil.getParameters();String host = parameterTool.get("redis.host");String port = parameterTool.get("redis.port");/* *********************** Jedis对象** JedisPool : 用于redis单机版* JedisCluster: 用于redis集群** JedisCluster对象能够自动发现正常的redis节点** *********************/HostAndPort hostAndPort = new HostAndPort(host,Integer.parseInt(port));Set<HostAndPort> nodes = new HashSet<>();nodes.add(hostAndPort);return new JedisCluster(nodes);}
}

封装Jedis对象的redis方法

封装Jedis对象的redis方法,方便统一调用和维护

public class JedisBuilder {private JedisCluster jedis = null;public JedisBuilder(JedisCluster jedisCluster) {this.jedis = jedisCluster;}public void close() {if (this.jedis != null) {this.jedis.close();}}/*** Redis的Get方法*/public String get(String key) {return jedis.get(key);}
}

自定义Source

Redis数据的映射对象

@Data
@AllArgsConstructor
@NoArgsConstructor
public class RedisPO implements Serializable {private String data;}

Flink 自定义Redis Source读取Redis

/* *********************** 【富函数类】 比函数类提供了更多函数生命周期,提供了获取上下文的方法* 富函数类通常是抽象类* *********************/
public class RedisSource extends RichSourceFunction<RedisPO> {/*** Jedis对象*/private JedisBuilder jedisBuilder;/*** Redis命令枚举对象*/private final RedisCommand redisCommand;/*** redis key*/private final String key;public RedisSource(RedisCommand redisCommand, String key) {this.redisCommand = redisCommand;this.key = key;}/*** volatile 修饰的变量,它的更新都会通知其他线程.*/private volatile boolean isRunning = true;/*** Redis的连接初始化*/@Overridepublic void open(Configuration parameters) throws Exception {JedisCluster jedisCluster = JedisConf.getJedisCluster();jedisBuilder = new JedisBuilder(jedisCluster);}/*** Redis数据的读取*/@Overridepublic void run(SourceContext<RedisPO> output) throws Exception {/* ************************ 一直监听Redis数据的读取** *********************/String data = null;// while (isRunning) {switch (redisCommand.getRedisDataType()) {case STRING:data = jedisBuilder.get(key);}output.collect(new RedisPO(data));// }}@Overridepublic void cancel() {this.isRunning = false;}}

读取Redis工具类

public class RedisReadUtil {public static DataStream<RedisPO> read(StreamExecutionEnvironment env,RedisCommand redisCommand,String key) {return env.addSource(new RedisSource(redisCommand, key));}
}

测试一下

class RedisReadUtilTest {@DisplayName("测试自定义Source读取Redis,Redis数据类型是String类型")@Testvoid testReadByCustomSourceWithString() throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStream<RedisPO> dataStream = RedisReadUtil.read(env,RedisCommand.GET,"k");dataStream.print();env.execute();}
}

测试成功!

image-20231209113539037

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/210643.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

西工大网络空间安全学院计算机网络实验五——ACL配置

实验五、ACL配置 一. 实验目的 1. 掌握ACL的基本配置方法 二. 实验内容 1. 基于如下图所示的拓扑图&#xff0c;对路由器进行正确的RIP协议配置&#xff1b; ​ 首先引入3台2811 IOS15型号的路由器、3台2950-T24型号的交换机、4台PC-PT型号的PC机、两台Server-PT型号的服务…

kafka学习笔记--生产者消息发送及原理

本文内容来自尚硅谷B站公开教学视频&#xff0c;仅做个人总结、学习、复习使用&#xff0c;任何对此文章的引用&#xff0c;应当说明源出处为尚硅谷&#xff0c;不得用于商业用途。 如有侵权、联系速删 视频教程链接&#xff1a;【尚硅谷】Kafka3.x教程&#xff08;从入门到调优…

JavaScript 的节流与防抖

// 函数防抖&#xff1a; 在事件被触发 n 秒后再执行回调&#xff0c;如果在这 n 秒内事件又被触发&#xff0c;则重新计时。// 函数节流&#xff1a; 规定一个单位时间&#xff0c;在这个单位时间内&#xff0c;只能有一次触发事件的回调函数执行&#xff0c;如果在同一个单位…

Redis各种数据结构应用场景

Redis各种数据结构应用场景 一、基本类型 Redis的基本数据类型时&#xff0c;以下是它们的实际场景示例&#xff1a; 字符串&#xff08;String&#xff09;&#xff1a; 实际场景 缓存数据&#xff1a;将频繁访问的数据缓存在Redis中&#xff0c;以提高读取速度。会话管理&…

Ubuntu与Windows通讯传输文件(FTP服务器版)(没用的方法,无法施行)

本文介绍再Windows主机上建立FTP服务器&#xff0c;并且在Ubuntu虚拟机上面访问Windows上FTP服务器的方法 只要按照上图配置就可以了 第二部&#xff1a;打开IIS管理控制台 右击网站&#xff0c;新建FTP站点。需要注意的一点是在填写IP地址的时候&#xff0c;只需要填写Window…

遥感卫星综述(下载和预处理)(持续更新)

遥感卫星综述&#xff08;下载和预处理&#xff09; 目录 遥感卫星综述&#xff08;下载和预处理&#xff09;一、国产卫星GF-1 WFV 二、国外卫星Sentinel-1Sentinel-2 一、国产卫星 GF-1 WFV 下载 分辨率波段16m4(蓝、绿、红、近红) 预处理&#xff1a; ENVI预处理GF-1号W…

用友T3如何反结账、反记账、反审核及删除凭证

在T3总账中已经进行了总账记账和月末结账&#xff0c;但是需要去修改凭证或删除凭证&#xff0c;这个时候就需要去进行反结账、反记账等操作&#xff0c;以下是具体的操作流程 第一步、反结账 1、进入用友T3件&#xff0c;打开总账系统模块&#xff0c;点月末结账&#xff0c…

uc_15_TCP协议

1 TCP协议 TCP提供客户机与服务器的链接。一个完整TCP通信过程需要经历三个阶段 1&#xff09;首先&#xff0c;客户机必须建立与服务器的连接&#xff0c;所谓虚电路 2&#xff09;然后&#xff0c;凭借已建立好的连接&#xff0c;通信双方相互交换数据 3&#xff09;最后&am…

短信验证码无法获取,通过获取cookies直接登录

web端&#xff0c;selenium短信验证码无法获取&#xff0c;通过获取cookies直接登录 1&#xff0c;先获取cookies driver webdriver.Chrome() driver.get("") driver.implicitly_wait(2) # 获取弹窗&#xff0c;并取消 driver.find_element(By.XPATH,"/html/…

智能优化算法应用:基于粒子群算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于粒子群算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于粒子群算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.粒子群算法4.实验参数设定5.算法结果6.参考文…

Python---异常的综合案例

☆ 异常的传递 需求&#xff1a; ① 尝试只读方式打开python.txt文件&#xff0c;如果文件存在则读取文件内容&#xff0c;文件不存在则提示用户即可。 ② 读取内容要求&#xff1a;尝试循环读取内容&#xff0c;读取过程中如果检测到用户意外终止程序&#xff0c;则except捕…

个人博客网站如何实现https重定向(301)到http

对于个人网站站注册比较少的&#xff0c;服务器配置不是很好的&#xff0c;没必要https,https跳转到http是要时间的&#xff0c;会影响网站打开的速度。免费的https每年都要更换。个人博客网站https有一段时间了&#xff0c;而且很多页面都有收录排名&#xff0c;现在已去掉htt…

基于JavaWeb+SSM+Vue实习记录微信小程序系统的设计和实现

基于JavaWebSSMVue实习记录微信小程序系统的设计和实现 源码获取入口Lun文目录前言主要技术系统设计功能截图订阅经典源码专栏Java项目精品实战案例《500套》 源码获取 源码获取入口 Lun文目录 目 录 摘 要 III Abstract 1 1 系统概述 1 1.1 概述 2 1.2课题意义 3 1.3 主要内…

【Linux系统编程】进度条的编写

目录 一&#xff0c;进度条的必备知识 1&#xff0c;缓冲区的粗略介绍 2&#xff0c;回车与换行 二&#xff0c;进度条的初步制作 1&#xff0c;进度条的初步矿建 2&#xff0c;进度条的版本一 3&#xff0c;进度条的版本二 一&#xff0c;进度条的必备知识 1&#xff…

详细了解STM32----GPIO

提示&#xff1a;永远支持免费开源知识文档&#xff0c;喜欢的点个关注吧&#xff01;谢谢&#xff01; 文章目录 一、什么是GPIO&#xff1f;二、GPIO基本结构三、GPIO的输入输出模式1、推挽输出2、开漏输出3、复用推挽4、复用开漏1、浮空输入2、上拉输入&#xff13;、下拉输…

FastAPI之嵌套模型

请求体 - 嵌套模型 使用 FastAPI&#xff0c;你可以很随意的实现模型的嵌套、定义、校验、记录文档&#xff0c;并使用任意深度嵌套的模型&#xff0c;这其实都是FastAPI的核心模块P一单提成进行做的。。 List 字段 from fastapi import FastAPI from pydantic import BaseM…

基于JavaWeb+SSM+Vue童装商城小程序系统的设计和实现

基于JavaWebSSMVue童装商城小程序系统的设计和实现 源码获取入口Lun文目录前言主要技术系统设计功能截图订阅经典源码专栏Java项目精品实战案例《500套》 源码获取 源码获取入口 Lun文目录 目 录 摘 要 III Abstract 1 1 系统概述 2 1.1 概述 3 1.2课题意义 4 1.3 主要内容 5…

BearPi Std 板从入门到放弃 - 先天篇(1)(阶段 : 智慧城市 - 智慧路灯)

简介 对前面几篇整合, 做个小小汇总试验, 使用BearPi E53_SC1扩展板主芯片: STM32L431RCT6串口: Usart1扩展板与主板连接: I2C : I2C1 (光照强度传感器&#xff1a;BH1750)LED: PB9步骤 创建项目 参考 BearPi Std 板从入门到放弃 - 引气入体篇&#xff08;1&#xff09;(由零创…

【测试人生】数据同步和迁移的变更注意事项

数据同步或者迁移操作也算是线上数据变更的一种类型。由于涉及的数据量非常大&#xff0c;一旦发生故障&#xff0c;会直接影响线上业务&#xff0c;并且较难止损。从变更风险管控的角度考虑&#xff0c;数据同步或迁移操作也需要走合理的发布窗口&#xff0c;并且在操作前也需…

浅谈Google Play ASO 优化

什么是ASO ASO即APP Store Optimization&#xff0c;是用于提高APP在应用市场排名的工具&#xff0c;其实也就是移动产品的SEO工作。 ASO是为了提高该产品的搜索结果成绩&#xff0c;提升APP的下载量&#xff0c;针对Google Play来说&#xff0c;ASO就是优化APP页面。 为什么…