Flink实时电商数仓之旁路缓存

撤回流的处理

撤回流是指流式处理过程中,两表join过程中的数据是一条一条跑过来的,即原本可以join到一起的数据在刚开始可能并没有join上。

  • 撤回流的格式:
    在这里插入图片描述
  • 解决方案
    • 定时器:使用定时器定时10s(数据最大的时间差值),定时器触发时将状态中的数据发送过来
    • 如果重复计算这些数据,如何保持结果正确即可;通过每次度量值修改为当次度量值 - 上次度量值即可

异步IO

在这里插入图片描述

  • 减少等待的时间,充分利用已有的资源
  • 使用异步IO时,必须保证从头到尾都是异步的操作;即使用异步的连接器
/*** 获取到 redis 的异步连接** @return 异步链接对象*/public static StatefulRedisConnection<String, String> getRedisAsyncConnection() {RedisClient redisClient = RedisClient.create("redis://hadoop102:6379/2");return redisClient.connect();}/*** 关闭 redis 的异步连接** @param redisAsyncConn*/public static void closeRedisAsyncConnection(StatefulRedisConnection<String, String> redisAsyncConn) {if (redisAsyncConn != null) {redisAsyncConn.close();}}/*** 获取到 Hbase 的异步连接** @return 得到异步连接对象*/public static AsyncConnection getHBaseAsyncConnection() {Configuration conf = new Configuration();conf.set("hbase.zookeeper.quorum", "hadoop102");conf.set("hbase.zookeeper.property.clientPort", "2181");try {return ConnectionFactory.createAsyncConnection(conf).get();} catch (Exception e) {throw new RuntimeException(e);}}/*** 关闭 hbase 异步连接** @param asyncConn 异步连接*/public static void closeAsyncHbaseConnection(AsyncConnection asyncConn) {if (asyncConn != null) {try {asyncConn.close();} catch (IOException e) {throw new RuntimeException(e);}}}

异步IO关联

  1. AsyncDataStream.unorderedWait(异步核心逻辑, 60, TimeUnit.SECONDS) 异步关联维度表
  2. CompletableFuture.supplyAsync(new Supplier<>(){ 异步访问读取Redis中的数据 }),返回的数据类型是Future类型
    • 先拼写访问的redisKey
    • 获取到dimSkuInfoFuture期货
    • 使用dimSkuInfoFuture.get()获取异步结果
  3. thenApplyAsync(new Function<>()), 旁路缓存判断,判断是否在redis中读取到相关数据,如果没有读取到,需要访问HBase.
  4. 需要重写HBase的getCells方法,改为getAsyncCells方法
    • 连接更换为异步连接
    • Future类型数据需要再get()方法获取具体的值
    • 无需关闭连接
  5. 将从HBase读取的数据保存到redis, redisAsyncConnection.async().setex(redisKey,24*60*60,dimJsonObj.toJSONString());

异步维度关联封装

  1. 继承RichAsyncFunction<TradeSkuOrderBean, TradeSkuOrderBean>接口
  2. 将表名和rowkey拼接的方法抽象化,让方法调用者自己传进来
  3. 封装join方法, join(TradeSkuOrderBean input, JSONOjbect dim); join方法里面填写度量值的聚合逻辑
  4. 将抽象方法和具体方法分离,把抽象方法放到接口中,在实现该接口
  5. TradeSkuOrderBean类改为泛型方法T
public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T, T>
implements DimJoinFunction<T>{StatefulRedisConnection<String, String> redisAsyncConnection;AsyncConnection hBaseAsyncConnection;String tableName;@Overridepublic void open(Configuration parameters) throws Exception {redisAsyncConnection = RedisUtil.getRedisAsyncConnection();hBaseAsyncConnection = HBaseUtil.getHBaseAsyncConnection();}@Overridepublic void close() throws Exception {RedisUtil.closeRedisAsyncConnection(redisAsyncConnection);HBaseUtil.closeAsyncHbaseConnection(hBaseAsyncConnection);}@Overridepublic void asyncInvoke(T input, ResultFuture<T> resultFuture) throws Exception {//java的异步编程方式String tableName = getTableName();String rowKey = getId(input);String redisKey = RedisUtil.getRedisKey(tableName, rowKey);CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {//第一步异步访问得到的数据RedisFuture<String> dimSkuInfoFuture = redisAsyncConnection.async().get(redisKey);String dimInfo = null;try {dimInfo = dimSkuInfoFuture.get();} catch (Exception e) {e.printStackTrace();}return dimInfo;}}).thenApplyAsync(new Function<String, JSONObject>() {@Overridepublic JSONObject apply(String dimInfo) {JSONObject dimJsonObj = null;//旁路缓存判断if (dimInfo == null || dimInfo.isEmpty()) {try {//需要访问HBasedimJsonObj = HBaseUtil.getAsyncCells(hBaseAsyncConnection, Constant.HBASE_NAMESPACE, tableName, rowKey);//将读取的数据保存到redisredisAsyncConnection.async().setex(redisKey, 24 * 60 * 60, dimJsonObj.toJSONString());} catch (Exception e) {e.printStackTrace();}} else {//redis中存在缓存数据dimJsonObj = JSONObject.parseObject(dimInfo);}return dimJsonObj;}}).thenAccept(new Consumer<JSONObject>() {public void accept(JSONObject dim) {//合并维度信息if (dim == null) {//无法关联到维度信息System.out.println("无法关联到当前的维度信息:" + tableName + ":" + rowKey);} else {join(input,dim);}//返回结果resultFuture.complete(Collections.singletonList(input));}});}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/596459.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

7.14解数独(LC37-H)

算法&#xff1a; 二维递归&#xff08;递归时需要两层for循环&#xff09; 一个for循环放行 另一个for循环放列 画树&#xff1a; 因为这个树形结构太大了&#xff0c;我抽取一部分&#xff0c;如图所示&#xff1a; 回溯三部曲&#xff1a; 1.确定函数参数和返回值 返…

在Gradle工程中使用checkstyle来规范你的项目

&#x1f339;作者主页&#xff1a;青花锁 &#x1f339;简介&#xff1a;Java领域优质创作者&#x1f3c6;、Java微服务架构公号作者&#x1f604; &#x1f339;简历模板、学习资料、面试题库、技术互助 &#x1f339;文末获取联系方式 &#x1f4dd; 系列专栏目录 [Java项…

CSS transition详解

文章目录 属性transition-propertytransition-durationtransition-timing-functiontransition-delaytransition 简写属性 方法Element&#xff1a;transitionrun 事件Element&#xff1a;transitionstart 事件Element&#xff1a;transitionend 事件Element&#xff1a;transit…

音频DAC,ADC,CODEC高性能立体声

想要让模拟信号和数字信号顺利“交往”&#xff0c;就需要一座像“鹊桥”一样的中介&#xff0c;将两种不同的语言转变成统一的语言&#xff0c;消除无语言障碍。这座鹊桥就是转换器芯片&#xff0c;也就是ADC芯片。ADC芯片的全称是Analog-to-Digital Converter, 即模拟数字转换…

【白盒测试】逻辑覆盖和路径测试的设计方法

&#x1f4e2;专注于分享软件测试干货内容&#xff0c;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01;&#x1f4e2;交流讨论&#xff1a;欢迎加入我们一起学习&#xff01;&#x1f4e2;资源分享&#xff1a;耗时200小时精选的「软件测试」资…

互联网演进历程:从“全球等待”到“全球智慧”的技术革新与商业变革

文章目录 一、导言二、World Wide Wait (全球等待)阶段1. 技术角度2. 用户体验3. 企业收益4. 教育影响 三、World Wide Web (万维网)阶段1. 技术角度2. 用户体验3. 企业收益4. 教育影响 四、World Wide Wisdom (全球智慧)阶段1. 技术角度2. 用户体验3. 企业收益4. 教育影响 五、…

Java编程中的IO模型详解:BIO,NIO,AIO的区别与实际应用场景分析

IO模型 IO模型就是说用什么样的通道进行数据的发送和接收&#xff0c;Java 共支持3种网络编程IO 模式&#xff1a;BIO,NIO,AIO BIO(Blocking lO) 同步阻塞模型&#xff0c; 一个客户端连接对应一个处理线程 代码示例&#xff1a; package com.tuling.bio; import java.io.…

DeepL翻译器,一直想使用怎么办?

作为一个独立开发者&#xff0c;将应用程序翻译到不同语言是个让我很头大的事情。请专业人员翻译太贵无法承受&#xff0c;谷歌翻译质量太差时常词不达意。 如何使用 DeepL 使用起来很直观&#xff0c;打开此网页粘贴要翻译的内容即可。它也支持 macOS 和 PC 端。 这里开我们开…

LinuxShell

一、 新建用户 在Linux上新建一个用户并赋予超级用户权限&#xff0c;建立家目录并设置默认shell为bash&#xff0c;并设置Linux在输入sudo密码时显示星号。请提交全部命令及输出截图&#xff08;表明完成需求即可&#xff09;。 1.sudo useradd -m ymhs(用户名) 增加用户 2.su…

SpingBoot的项目实战--模拟电商【5.沙箱支付】

&#x1f973;&#x1f973;Welcome Huihuis Code World ! !&#x1f973;&#x1f973; 接下来看看由辉辉所写的关于SpringBoot电商项目的相关操作吧 目录 &#x1f973;&#x1f973;Welcome Huihuis Code World ! !&#x1f973;&#x1f973; 一. 沙箱支付是什么 二.Sp…

2分钟了解什么是socket?

文章目录 概念比喻类型Socket 与 TCP、UDP的关系 概念 Socket 是提供网络通信功能的编程接口&#xff08;API&#xff09;&#xff0c;提供了网络通信的基本操作&#xff0c;允许程序或进程之间进行数据交换。是传输层协议的具体软件实现&#xff0c;它封装了协议底层的复杂实…

【干货】Windows中定时删除system32目录下的.dmp文件教程

旭帆科技的技术人员除了给用户答疑解惑以外&#xff0c;还会主动测试软件性能&#xff0c;进行平台优化&#xff0c;除此之外&#xff0c;技术人员还会总结一些技术干货&#xff0c;这不&#xff0c;近期又提供了一份如何在Windows中定时删除system32目录下的.dmp文件的教程。感…

CRM软件对企业发展起着哪些作用?CRM的功能解析

虽然不少科技成果昙花一现&#xff0c;但CRM管理系统作为销售和营销领域的核心技术&#xff0c;已经牢牢占据了不可撼动的地位。拥有一个部署得当的CRM系统能为企业带来诸多好处。它可以跟踪和管理销售人员与潜在/现有客户的所有互动和沟通&#xff0c;并帮助他们识别出需要重点…

msvcp140.dll文件缺失要怎么修复?msvcp140.dll重新安装的解决方法

使用Windows系统时&#xff0c;dll文件丢失的问题时有发生&#xff0c;特别是msvcp140.dll文件缺失问题&#xff0c;它会导致某些程序无法运行。针对这一常见问题&#xff0c;本文将详细阐述如何应对和修复msvcp140.dll文件缺失的状况&#xff0c;提供多个解决方案&#xff0c;…

Java多线程-14

目录 程序线程进程 并发并行​编辑 创建线程的基本方式 程序线程进程 并发并行 创建线程的基本方式&#xff08;1&#xff09; package com.edu.threaduse;public class Demo01 {public static void main(String[] args) throws InterruptedException {//创建Cat对象&…

DolphinScheduler实际应用

前言 最近公司新启动了一个项目&#xff0c;然后领导想用一下新技术&#xff0c;并且为公司提供多个大数据调度解决方案&#xff0c;我呢就根据领导要求调研了下当前的开源调度工具&#xff0c;最终决定采用DolphinScheduler&#xff0c; 因此研究了一下DolphinScheduler &…

今日聊聊寒假假期如何不颓废

&#x1d649;&#x1d65e;&#x1d658;&#x1d65a;!!&#x1f44f;&#x1f3fb;‧✧̣̥̇‧✦&#x1f44f;&#x1f3fb;‧✧̣̥̇‧✦ &#x1f44f;&#x1f3fb;‧✧̣̥̇:Solitary-walk ⸝⋆ ━━━┓ - 个性标签 - &#xff1a;来于“云”的“羽球人”。…

Linux-进程间通信_管道

项目场景&#xff1a; 须熟知文件管理和进程方面的基础知识 通过Xshell和VScode 相互进行远程开发&#xff0c;学习进程间通信的其中一种方式——管道。 问题描述 依照我们曾经所学的知识&#xff0c;我们仅仅只能在单个进程中进行数据的交互&#xff0c;但是在实际应用中&a…

算法第七天-粉刷房子Ⅲ

粉刷房子Ⅲ 题目要求 解题思路 来自[宫水三叶] 动态规划 定义 f[i][j][k] 为开了前i间房子&#xff0c;且第 i 间房子的颜色编号为 j&#xff0c; 前 i 间房子形成的分区数量为 k 的所有方案中的[最小上色成本]。 我们不失一般性的考虑 f[i][j][k] 该如何转移&#xff0c;由…

Sentinel使用

前言&#xff1a; 所有的准备工作都做好了&#xff0c;就可以进入到Sentinel的具体使用上了&#xff0c;这里还需要一个测试工具叫做jmeter&#xff0c;是一个很好的测试工具&#xff0c;专门针对并发的&#xff0c;准备好以后&#xff0c;就可以直接开干了。 一、Sentinel作用…