Apache Omid TSO 组件源码实现原理

Apache Omid TSO 组件实现原理

作用

独立进程,处理全局事务之间的并发冲突。

流程

TSOChannelHandler#channelRead -> AbstractRequestProcessor -> PersistenceProcessorHandler

总体流程

thread1TSOChannelHandler#channelReadAbstractRequestProcessor#timestampRequest 接收 client 请求,创建 RequestEvent 并 publish
thread2AbstractRequestProcessor#onEvent 处理 RequestEvent 请求AbstractRequestProcessor#handleRequestPersistenceProcessorImpl#addTimestampToBatch 创建 PersistEvent,当 batch 满了发送事件
thread3PersistenceProcessorHandler#onEvent 持久化事件处理

TSOChannelHandler

继承自 Netty 的 ChannelInboundHandlerAdapter,用于处理 TSO 的入站请求

channelRead

委托 requestProcessor 创建 timestampRequest 和 commitRequest 请求事件。

AbstractRequestProcessor

处理 timestamp 和 commit 事件。

onEvent

处理 RequestEvent 事件,按照事件类型派发给 handleTimestamp 和 handleCommit 方法进行处理。

handleTimestamp

1.通过 timestampOracle 获取下一个时间戳;
2.PersistenceProcessorImpl#addBatch 事件添加到 batch,但是后续对 timestamp 请求不会额外处理。

handleCommit

主要通过 hasConflictsWithCommittedTransactions 判断 writeSet 和 CommitHashMap 里是否有事务写冲突,如果没有则可以提交事务,分配 commitTimestamp。

private void handleCommit(RequestEvent event) throws Exception {long startTimestamp = event.getStartTimestamp(); // startTimestampIterable<Long> writeSet = event.writeSet(); // 写入集,存储的是 cellIdsCollection<Long> tableIdSet = event.getTableIdSet();boolean isCommitRetry = event.isCommitRetry();boolean nonEmptyWriteSet = writeSet.iterator().hasNext(); // 检查写集合是否为空,即事务是否有写操作if (startTimestamp > lowWatermark &&!hasConflictsWithFences(startTimestamp, tableIdSet) &&!hasConflictsWithCommittedTransactions(startTimestamp, writeSet)) { // 检查事务是否满足提交条件,通过 hasConflictsWithCommittedTransactions 判断是否有事务写冲突// 可以进行事务提交long commitTimestamp = timestampOracle.next(); // 获取提交时间戳Optional<Long> forwardNewWaterMark = Optional.absent();if (nonEmptyWriteSet) { // 写集合非空long newLowWatermark = lowWatermark;for (long r : writeSet) { // 遍历写集合中的每个元素,更新其最新的写入时间戳,并计算新的低水位线long removed = hashmap.putLatestWriteForCell(r, commitTimestamp); // 更新 cellId 对应的 commitTimestamp, 返回之前的 oldest commitTimestampnewLowWatermark = Math.max(removed, newLowWatermark); // 更新低水位线}if (newLowWatermark != lowWatermark) { // 更新低水位线lowWatermark = newLowWatermark;forwardNewWaterMark = Optional.of(lowWatermark);}}forwardCommit(startTimestamp, commitTimestamp, c, event.getMonCtx(), forwardNewWaterMark);  // 持久化 commit 请求} else { // 事务不满足提交条件if (isCommitRetry) { // Re-check if it was already committed but the client retried due to a lag replyingforwardCommitRetry(startTimestamp, c, event.getMonCtx());  // 若是提交重试,再次检查是否已提交以避免因响应延迟导致的重复提交} else {forwardAbort(startTimestamp, c, event.getMonCtx()); // 否则,中止事务}}
}

CommitHashMap

通过 LongCache 缓存 cellId -> lastCommittedTimestamp 的映射。

getLatestWriteForCell 方法:
根据 cellId 获取 lastCommittedTimestamp。

putLatestWriteForCell 方法:
更新 cellId 对应的 lastCommittedTimestamp。

LongCache

缓存 cellId -> lastCommittedTimestamp 的映射。

get 和 set 操作都是先将原始 cellId 进行 hash 操作找到位置,所以可能存在冲突。

set

更新 cellId 对应的 lastCommittedTimestamp。

public long set(long key, long value) {final int index = index(key); // cellId 取模返回下标,可能会冲突int oldestIndex = 0;long oldestValue = Long.MAX_VALUE;for (int i = 0; i < associativity; ++i) {int currIndex = 2 * (index + i); // 计算 key 下标if (cache[currIndex] == key) { // 相同事务 cellId, 替换场景oldestValue = 0;oldestIndex = currIndex;break;}if (cache[currIndex + 1] <= oldestValue) { // 没找到相同的key.通过和 oldestValue 比较会将最小的 timestamp 剔除oldestValue = cache[currIndex + 1];oldestIndex = currIndex;}}// 替换最旧的键值对,将其更新为新的键值对cache[oldestIndex] = key;cache[oldestIndex + 1] = value;return oldestValue;
}
get

获取 cellId 对应的 lastCommittedTimestamp,找不到则返回 0.

public long get(long key) {final int index = index(key);for (int i = 0; i < associativity; ++i) { // associativity 里存储的元素key应该是相同的int currIndex = 2 * (index + i); // 计算 key 的下标if (cache[currIndex] == key) { // 找到 cache keyreturn cache[currIndex + 1]; // 返回对应的 value}}return 0;
}

PersistenceProcessorImpl

将 startTimestamp 和 commitTimestamp 放入 batch.

addCommitToBatch
创建 event,添加到 current batch
如果 current batch is fulltriggerCurrentBatchFlush
triggerCurrentBatchFlush

创建 PersistBatchEvent 并发送事件

PersistenceProcessorHandler

处理上面 PersistenceProcessorImpl 发送过来的事件,进行持久化处理。

onEvent

实际上只处理 commit 事件,会创建 put 对象将事务信息持久化到 hbase 的 commitTable (OMID_COMMIT_TABLE).

HBaseCommitTable

构造方法: 根据 HBaseCommitTableConfig 配置初始化

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/47383.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

智能边缘计算网关:实现工业自动化与数据处理的融合-天拓四方

随着物联网&#xff08;IoT&#xff09;技术的迅速发展和普及&#xff0c;越来越多的设备被连接到互联网上&#xff0c;产生了海量的数据。如何有效地处理和分析这些数据&#xff0c;同时确保数据的安全性和实时性&#xff0c;成为了摆在企业面前的一大挑战。智能边缘计算网关作…

广联达Linkworks ArchiveWebService XML实体注入漏洞复现

0x01 产品简介 广联达 LinkWorks(也称为 GlinkLink 或 GTP-LinkWorks)是广联达公司(Glodon)开发的一种BIM(建筑信息模型)协同平台。广联达是中国领先的数字建造技术提供商之一,专注于为建筑、工程和建筑设计行业提供数字化解决方案。 0x02 漏洞概述 广联达 LinkWorks…

在VScode中编译C程序

一&#xff0c;安装 VS Code 下载并安装VS code&#xff0c;安装简体中文和C/C插件。略。 二&#xff0c;配置gcc环境 下载并安装MinGW。添加环境变量。略。 在cmd中输入 gcc -v 能打印版本即可。 三&#xff0c;打开文件夹&#xff0c;创建工作区 1&#xff0c;打开文件夹…

数据库系统概论:数据库系统模式

数据库系统在我们的数字世界中扮演着至关重要的角色&#xff0c;无论是个人设备还是企业级应用&#xff0c;数据的有效管理和访问都是必不可少的。而数据库系统的模式结构是确保数据一致性和可访问性的关键组成部分。 数据库系统模式 基本概念 型和值 数据模型中有 型(type…

游戏中的敏感词算法初探

在游戏中起名和聊天需要服务器判断是否含有敏感词&#xff0c;从而拒绝或屏蔽敏感词显示&#xff0c;这里枚举一些常用的算法和实际效果。 1.字符串匹配算法 常用的有KMP&#xff0c;核心就是预处理出next数组&#xff0c;也就是失配信息&#xff0c;时间复杂度在O(mn) 。还有个…

微软研究人员为电子表格应用开发了专用人工智能LLM

微软的 Copilot 生成式人工智能助手现已成为该公司许多软件应用程序的一部分。其中包括 Excel 电子表格应用程序&#xff0c;用户可以在其中输入文本提示来帮助处理某些选项。微软的一组研究人员一直在研究一种新的人工智能大型语言模型&#xff0c;这种模型是专门为 Excel、Go…

Transformer系列专题(四)——Swintransformer

文章目录 九、SwinTransformer9.1 整体网络架构9.2 Transformer Blocks9.3 Patch Embedding&#xff08;将图像切割成小块&#xff08;Patch&#xff09;&#xff09;9.4 window_partition9.5 W-MSA&#xff08;Window Multi-head Self Attention&#xff09;9.6 window_revers…

Redis-应用

目录 应用 缓存雪崩、击穿、穿透和解决办法? 布隆过滤器是怎么工作的? 缓存的数据一致性怎么保证 Redis和Mysql消息一致性 业务一致性要求高怎么办? 数据库与缓存的一致性问题 数据库和缓存的一致性如何保证 如何保证本地缓存和分布式缓存的一致&#xff1f; 如果在…

【Pytorch】一文向您详细介绍 `tensor.max(1, keepdims=True)`

【&#x1f525;Pytorch】一文向您详细介绍 tensor.max(1, keepdimsTrue) 下滑即可查看博客内容 &#x1f308; 欢迎莅临我的个人主页 &#x1f448;这里是我静心耕耘深度学习领域、真诚分享知识与智慧的小天地&#xff01;&#x1f387; &#x1f393; 博主简介&#xff…

(一)原生js案例之图片轮播

原生js实现的两种播放效果 效果一 循环播放&#xff0c;单一的效果 代码实现 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-sc…

昇思学习打卡-20-生成式/GAN图像生成

文章目录 网络介绍生成器和判别器的博弈过程数据集可视化模型细节训练过程网络优缺点优点缺点 网络介绍 GAN通过设计生成模型和判别模型这两个模块&#xff0c;使其互相博弈学习产生了相当好的输出。 GAN模型的核心在于提出了通过对抗过程来估计生成模型这一全新框架。在这个…

今日安装了一下Eclipse,配置了SVN

Eclipse安装配置参考文章1&#xff1a; https://blog.csdn.net/maiya_yayaya/article/details/132208892 Eclipse配置SVN参考文章2&#xff1a; https://blog.csdn.net/zzh45828/article/details/106224375 Eclipse如何导入项目参考文章3&#xff1a; https://blog.csdn.n…

Linux上的系统服务——DNS、WEB、NFS 和 AutoFS 服务的详细配置步骤

现有主机 node01 和 node02&#xff0c;完成如下需求&#xff1a; 1、在 node01 主机上提供 DNS 和 WEB 服务 2、dns 服务提供本实验所有主机名解析 3、web服务提供 www.rhce.com 虚拟主机 4、该虚拟主机的documentroot目录在 /nfs/rhce 目录 5、该目录由 node02 主机提供的NFS…

RK3568笔记三十九:多个LED驱动开发测试(设备树)

若该文为原创文章&#xff0c;转载请注明原文出处。 通过设备树配置一个节点下两个子节点控制两个IO口&#xff0c;一个板载LED&#xff0c;一个外接LED。 一、介绍 通过学习设备树控制GPIO&#xff0c;发现有多种方式 一、直接通过寄存器控制 二、通过设备树&#xff0c;但…

C#调用非托管dll,并从dll中再调用C#中的方法

从Delphi DLL调用C#方法&#xff1a;一种高效的跨语言集成方案 在软件开发中&#xff0c;我们经常遇到需要集成不同语言编写的组件的情况。 例如&#xff0c;使用C#开发的现代应用程序可能需要调用一些用Delphi编写的老DLL。 如果直接在Delphi中实现某些功能存在困难&#xff…

基于STC89C52RC单片机的大棚温控系统(含文档、源码与proteus仿真,以及系统详细介绍)

本篇文章论述的是基于STC89C52RC单片机的大棚温控系统的详情介绍&#xff0c;如果对您有帮助的话&#xff0c;还请关注一下哦&#xff0c;如果有资源方面的需要可以联系我。 目录 摘要 原理图 仿真图 系统总体设计图 代码 系统论文 参考文献 资源下载 摘要 本文介绍的…

CSA笔记3-文件管理命令(补充)+vim+打包解包压缩解压缩命令

grep(-i -n -v -w) [rootxxx ~]# grep root anaconda-ks.cfg #匹配关键字所在的行 [rootxxx ~]# grep -i root anaconda-ks.cfg #-i 忽略大小写 [rootxxx ~]# grep -n root anaconda-ks.cfg #显示匹配到的行号 [rootxxx ~]# grep -v root anaconda-ks.cfg #-v 不匹配有…

甄选范文“论软件维护方法及其应用”软考高级论文,系统架构设计师论文

论文真题 软件维护是指在软件交付使用后,直至软件被淘汰的整个时间范围内,为了改正错误或满足 新的需求而修改软件的活动。在软件系统运行过程中,软件需要维护的原因是多种多样的, 根据维护的原因不同,可以将软件维护分为改正性维护、适应性维护、完善性维护和预防性 维护…

NumPy中np.clip()的用法

np.clip() 是 NumPy 库中的一个函数&#xff0c;用于限制数组中的数值在一个指定的最小值和最大值之间。它将数组中的所有元素逐个检查&#xff0c;并将它们限制在给定的下限&#xff08;min&#xff09;和上限&#xff08;max&#xff09;范围内。如果元素小于下限&#xff0c…

Linux 上 TTY 的起源

注&#xff1a;机翻&#xff0c;未校对。 What is a TTY on Linux? (and How to Use the tty Command) What does the tty command do? It prints the name of the terminal you’re using. TTY stands for “teletypewriter.” What’s the story behind the name of the co…