Flink 维表关联

1、实时查询维表

实时查询维表是指用户在 Flink 算子中直接访问外部数据库,比如用 MySQL 来进行关联,这种方式是同步方式,数据保证是最新的。但是,当我们的流计算数据过大,会对外 部系统带来巨大的访问压力,一旦出现比如连接失败、线程池满等情况,由于我们是同步调用,所以一般会导致线程阻塞、Task 等待数据返回,影响整体任务的吞吐量。而且这种方案对外部系统的 QPS 要求较高,在大数据实时计算场景下,QPS 远远高于普通的后台系统,峰值高达十万到几十万,整体作业瓶颈转移到外部系统


public class DimSync  extends RichMapFunction<fplOverview, String> {private static final Logger LOGGER = LoggerFactory.getLogger(DimSync.class);private Connection conn = null;public void open(Configuration parameters) throws Exception {super.open(parameters);conn = DriverManager.getConnection("jdbc:test:3306/mysqldb?characterEncoding=UTF-8", "root", "qyllt1314#");}@Overridepublic String map(fplOverview fplOverview) throws Exception {JSONObject jsonObject = JSONObject.parseObject(fplOverview.toJson());String dp_id = jsonObject.getString("dp_id");//根据 dp_id 查询  上周的 fpl_amount,ywPreparedStatement pst = conn.prepareStatement("select  max(fpl_amount) as fpl_amount,max(yearweek(datatime)) as yw \n" +"from fpl_overview \n" +"where datatime >= date_sub(curdate(), interval weekday(curdate()) + 7 Day)  # 上周第一天\n" +"and datatime < date_sub(curdate(), interval weekday(curdate()) + 0 Day)  # 上周最后一天+1 \n" +"and dp_id = ? \n" +"group by dp_id");pst.setString(1,dp_id);ResultSet resultSet = pst.executeQuery();String fpl_amount = null;String yw = null ;while (resultSet.next()){fpl_amount = resultSet.getString(1);yw = resultSet.getString(2);}pst.close();jsonObject.put("lastweek_fpl_amount",fpl_amount);jsonObject.put("lastweek_yw",yw)return jsonObject.toString();}public void close() throws Exception {super.close();conn.close();}

2、LRU 缓存 (flink 异步Id)

利用 Flink 的 RichAsyncFunction 读取 mysql 的数据到缓存中,我们在关联维度表时先去查询缓存,如果缓存中不存在这条数据,就利用客户端去查询 mysql,然后插入到缓存中。


public class JDBCAsyncFunction  extends RichAsyncFunction<fplOverview, JsonObject> {private SQLClient client;@Overridepublic void open(Configuration parameters) throws Exception {Vertx vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(10).setEventLoopPoolSize(10));JsonObject config = new JsonObject().put("url", "jdbc:mysql://rm-bp161be65d56kbt4nzo.mysql.rds.aliyuncs.com:3306/mysqldb?characterEncoding=UTF-8;useSSL=false").put("driver_class", "com.mysql.cj.jdbc.Driver").put("max_pool_size", 10).put("user", "root").put("password", "");client = JDBCClient.createShared(vertx, config);}@Overridepublic void close() throws Exception {client.close();}@Overridepublic void asyncInvoke(fplOverview fplOverview, ResultFuture<JsonObject> resultFuture) throws Exception {client.getConnection(conn -> {if (conn.failed()) {return;}final SQLConnection connection = conn.result();// 执行sqlconnection.query("select  max(fpl_amount) as fpl_amount,max(yearweek(datatime)) as yw \n" +"from fpl_overview \n" +"where datatime >= date_sub(curdate(), interval weekday(curdate()) + 7 Day)  # 上周第一天\n" +"and datatime < date_sub(curdate(), interval weekday(curdate()) + 0 Day)  # 上周最后一天+1 \n" +"and dp_id = '" + fplOverview.getDp_id() + " ' " +"group by dp_id ", res2 -> {ResultSet rs = new ResultSet();if (res2.succeeded()) {rs = res2.result();}else{System.out.println("查询数据库出错");}List<JsonObject> stores = new ArrayList<>();for (JsonObject json : rs.getRows()) {stores.add(json);}connection.close();resultFuture.complete(stores);});});}}

3、预加载全量mysql数据

预加载全量mysql数据 使用 ScheduledExecutorService 每隔 5 分钟拉取一次维表数据,这种方式适用于那些实时场景不是很高,维表数据较小的场景

public class WholeLoad extends RichMapFunction<fplOverview,String> {private static final Logger LOGGER = LoggerFactory.getLogger(WholeLoad.class);// 定义map的结果,key为关联字段private static Map<String,String> cache ;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);cache = new HashMap<>();ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);executor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {load();} catch (Exception e) {e.printStackTrace();}}},0,5, TimeUnit.MINUTES); //从现在开始每隔5分钟查询数据}@Overridepublic String map(fplOverview fplOverview) throws Exception {JSONObject jsonObject = JSONObject.parseObject(fplOverview.toJson());String dp_id = jsonObject.getString("dp_id");// 获取对应id的结果String rs = cache.get(dp_id);JSONObject rsObject = JSONObject.parseObject(rs);jsonObject.putAll(rsObject);return jsonObject.toString();}public   void  load() throws Exception {Class.forName("com.mysql.jdbc.Driver");Connection con = DriverManager.getConnection("jdbc:mysql://test:3306/mysqldb?characterEncoding=UTF-8", "root", "qyllt1314#");// 执行查询的SQLPreparedStatement statement = con.prepareStatement("select  dp_id,max(fpl_amount) as fpl_amount,max(yearweek(datatime)) as yw \n" +"from fpl_overview \n" +"where datatime >= date_sub(curdate(), interval weekday(curdate()) + 7 Day)  # 上周第一天\n" +"and datatime < date_sub(curdate(), interval weekday(curdate()) + 0 Day)  # 上周最后一天+1 \n" +"group by dp_id");ResultSet rs = statement.executeQuery();while (rs.next()) {// 查询结果放入缓存String dp_id = rs.getString("dp_id");String fpl_amount = rs.getString("fpl_amount");String yw = rs.getString("yw");JSONObject jsonObject = JSONObject.parseObject("{}");jsonObject.put("lastweek_fpl_amount",fpl_amount);jsonObject.put("lastweek_yw",yw);cache.put(dp_id,jsonObject.toString());}System.out.println("数据输出测试:"+cache.toString());con.close();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/119027.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ui设计要学插画吗?优漫动游

现如今很多UI设计培训班都开设了商业插画的课程&#xff0c;有不少同学表示真的要学吗&#xff1f;商业插画都有什么用处呢&#xff1f;今天我们就来给大家介绍一下商业插画在UI设计中的运用。 ui设计要学插画吗&#xff1f;   商业插画属于实用型插画&#xff0c;是一种…

详解预处理(1)

目录 预定义符号 预处理指令#define #define定义符号 #define定义宏 #define替换规则 #和##&#xff08;C语言预处理操作符&#xff09; # ## 带副作用的宏参数 宏和函数的对比 命名约定 在之前我们学习了一个文本文件.c生成一个可执行程序。今天我们详细讲解其中的…

腾讯云国际站服务器端口开放失败怎么办?

腾讯云服务器是腾讯公司推出的一种云服务&#xff0c;用户能够经过这种方式在互联网上进行数据存储和计算。然而&#xff0c;用户在运用腾讯云服务器时或许会遇到各种问题&#xff0c;其间端口敞开失利是一个常见问题。本文将具体介绍如何解决腾讯云服务器端口敞开失利的问题。…

01 # 手写 new 的原理

new 做了什么? 在构造器内部创建一个新的对象这个对象内部的隐式原型指向该构造函数的显式原型让构造器中的 this 指向这个对象执行构造器中的代码如果构造器中没有返回对象&#xff0c;则返回上面的创建出来的对象 手写 new 的过程 new 是一个运算符&#xff0c;只能通过函…

CSS隐藏元素的N种方法,你知道哪一种最适合你?

&#x1f3ac; 江城开朗的豌豆&#xff1a;个人主页 &#x1f525; 个人专栏 :《 VUE 》 《 javaScript 》 &#x1f4dd; 个人网站 :《 江城开朗的豌豆&#x1fadb; 》 ⛺️ 生活的理想&#xff0c;就是为了理想的生活 ! 目录 ⭐ 专栏简介 &#x1f4d8; 文章引言 一、前…

Camtasia2024永久激活码

真的要被录屏软件给搞疯了&#xff0c;本来公司说要给新人做个培训视频&#xff0c;想着把视频录屏一下&#xff0c;然后简单的剪辑一下就可以了。可谁知道录屏软件坑这么多&#xff0c;弄来弄去头都秃了&#xff0c;不过在头秃了几天之后&#xff0c;终于让我发现了一个值得“…

el-date-picker日期选择器奇怪的问题解决

问题描述&#xff1a;点击日期选择器&#xff0c;出现日历下拉框&#xff0c;选择了日期后绑定的value值不变&#xff1b;绑定 change 事件&#xff0c;监听不到 解决方案&#xff1a;添加input事件&#xff0c;$forceUpdate() <el-date-pickerv-model"value1"ty…

用Python做数据分析之数据处理及数据提取

1、数据预处理 第四部分是数据的预处理&#xff0c;对清洗完的数据进行整理以便后期的统计和分析工作。主要包括数据表的合并&#xff0c;排序&#xff0c;数值分列&#xff0c;数据分组及标记等工作。 1&#xff09;数据表合并 首先是对不同的数据表进行合并&#xff0c;我们…

【Linux】gdb调试

目录 进入调试查看代码运行代码断点打断点查断点删断点从一个断点转跳至下一个断点保留断点但不会运行该断点 退出调试逐过程逐语句监视跳转至指定行运行结束当前函数 进入调试 指令&#xff1a;gdb 【可执行文件】&#xff1a; 查看代码 &#xff1a;l 【第几行】如果输入指…

安全设备

一.防火墙 5层应用层 防火墙 4层 udp tcp 协议 华为 厂商 华为 h3 1.区域划分 Dmz 停火区 Untrust 不安全区域 Trust 安全区域 防火墙 默认禁止所有 二.Waf Web 应用防火墙 放到web前面 产品 雷池 绿盟 软件 安…

递归神经网络 (RNN)

弗朗西斯科佛朗哥 一、说明 循环神经网络非常有趣&#xff0c;因为与前馈网络不同&#xff0c;在前馈网络中&#xff0c;数据只能在一个方向上传播&#xff0c;每个神经元可以与连续层的一个或多个神经元连接&#xff0c;在这种类型的网络中&#xff0c;神经元还可以环回自身或…

MAXScript - tyFlow for 3dsMax

MAXScript - tyFlow for 3dsMax tyFlow 信息 tyFlow_version(): 返回当前安装的tyFlow DLO文件的版本号。tyFlow_found_in_scene(): 返回有关场景中活动tyFlow类&#xff08;对象、修改器等&#xff09;数量的信息。 tyFlow 许可 tyFlow_activate_license(): 尝试激活tyFlo…

安卓端GB28181设备接入模块如何实现实时位置订阅(MobilePosition)

技术背景 实时位置&#xff08;MobilePosition&#xff09;订阅和上报&#xff0c;对GB28281设备接入终端尤其重要&#xff0c;如移动单兵设备、执法记录仪、智能安全帽、车载终端等&#xff0c;Android国标接入设备通过获取到实时经纬度信息&#xff0c;按照一定的间隔上报到…

竞赛选题 深度学习卫星遥感图像检测与识别 -opencv python 目标检测

文章目录 0 前言1 课题背景2 实现效果3 Yolov5算法4 数据处理和训练5 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; **深度学习卫星遥感图像检测与识别 ** 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;学长非常推荐…

gstreamer插件开发-The event function

event 函数 event函数通知您数据流中发生的特殊事件(如大写、流结束、新段、标记等)。事件可以在上游和下游传播&#xff0c;因此您可以在汇聚节点和源节点上接收它们。 下面是一个非常简单的事件函数&#xff0c;我们将其安装在元素的接收器上。 static gboolean gst_my_fil…

如何使用 VuePress 搭建博客网站并 Vercel 部署

先来看一下网站截图&#xff1a; 快速上手 1.创建并进入一个新目录 mkdir vuepress-starter && cd vuepress-starter2.使用你喜欢的包管理器进行初始化 yarn init # npm init3.将 VuePress 安装为本地依赖 yarn add -D vuepress # npm install -D vuepress4.创建你…

【LeetCode:2520. 统计能整除数字的位数 | 模拟 | HashMap】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

Sentinel授权规则和规则持久化

大家好我是苏麟 , 今天说说Sentinel规则持久化. 授权规则 授权规则可以对请求方来源做判断和控制。 授权规则 基本规则 授权规则可以对调用方的来源做控制&#xff0c;有白名单和黑名单两种方式。 白名单&#xff1a;来源&#xff08;origin&#xff09;在白名单内的调用…

问题与分类

设计问题 是否已经有类似的解决方案&#xff0c;是否需要当前的设计设计思路的文档话&#xff0c;背景-》 设计思路-》 好处与不足 -》 其他设计思路的对比&#xff08;淘汰其他设计思路的原因&#xff09; 设计思路的评审&#xff0c;如何评审&#xff0c;如何量化&#xff…