DataX DorisWriter 插件DorisWriterManager类详细解读

DorisWriterManager 的类,用于将数据写入到 Doris 中。以下是代码的具体作用和功能解释:

  1. 导入必要的包和类: 代码开头导入了所需的包和类,包括日志记录、线程池、字符编码和其他相关工具类。
  2. 类成员变量定义: 下面是一些类的成员变量定义,这些变量在类的不同方法中使用:
    • LOG: 用于记录日志的 Logger 对象。
    • visitorDorisStreamLoadObserver 类的实例,用于处理数据写入 Doris 的观察者。
    • optionsKeys 类的实例,包含了一些配置选项。
    • buffer: 存储待写入 Doris 的数据。
    • batchCount: 当前批次中的记录数量。
    • batchSize: 当前批次中的数据大小。
    • closed: 标志位,表示是否已关闭写入。
    • flushException: 异步刷新数据时可能发生的异常。
    • flushQueue: 用于异步刷新数据的队列。
    • scheduler: 用于定期刷新数据的调度器。
    • scheduledFuture: 用于取消定时任务的句柄。
  3. 构造函数 DorisWriterManager 构造函数接受一个 Keys 对象作为参数,设置了初始化的配置信息,并初始化了 visitor 和 flushQueue。接着,它调用 startScheduler() 启动定期刷新任务,以及 startAsyncFlushing() 启动异步刷新线程。
  4. startScheduler() 方法: 此方法负责启动定时刷新任务。它首先调用 stopScheduler() 停止之前的定时任务。然后,创建一个单线程的调度器(scheduler),并设置一个定时任务,定期触发数据刷新操作。在定时任务内部,它会检查是否关闭了写入操作,然后根据配置信息进行数据刷新。如果当前批次为空,重新启动定时任务,确保数据持续刷新。
  5. stopScheduler() 方法: 此方法用于停止定时任务。它会取消之前的定时任务并关闭调度器。
  6. writeRecord(String record) 方法: 该方法用于将记录写入缓冲区。它首先调用 checkFlushException() 方法检查是否存在刷新异常。然后,将记录转换成字节数组并添加到缓冲区中,同时更新批次计数和数据大小。如果当前批次的记录数量或数据大小超过了阈值,就会触发数据刷新。
  7. flush(String label, boolean waitUntilDone) 方法: 此方法用于手动触发数据刷新操作。它首先检查是否存在刷新异常,然后根据当前批次的情况决定是否执行刷新。如果当前批次为空,且 waitUntilDone 为真,它会等待之前的异步刷新操作完成。否则,它将当前批次的数据放入刷新队列,并根据 waitUntilDone 参数决定是否等待刷新操作完成。
  8. close() 方法: 此方法用于关闭 DorisWriterManager。它首先检查是否已经关闭,然后触发一次最终的数据刷新操作。如果当前批次有数据,会记录相应日志。最后,它检查是否有刷新异常并抛出相应异常。
  9. createBatchLabel() 方法: 此方法用于创建批次标签,用于标识一批数据。它根据配置的前缀和随机 UUID 生成标签。
  10. startAsyncFlushing() 方法: 此方法启动一个异步刷新线程。线程会循环调用 asyncFlush() 方法,将数据异步刷新到 Doris 中。
  11. waitAsyncFlushingDone() 方法: 该方法用于等待之前的异步刷新操作完成。它向刷新队列添加空的 WriterTuple,以确保之前的刷新操作完成。然后,它检查是否存在刷新异常。
  12. asyncFlush() 方法: 此方法用于异步刷新数据到 Doris。它从刷新队列中取出 WriterTuple,然后根据批次的标签执行数据刷新操作。如果发生异常,它会尝试多次,直到达到最大重试次数。如果需要重新创建批次标签,则生成新的标签。重试之间会休眠一段时间。成功后,重新启动定时任务。
  13. checkFlushException() 方法: 此方法用于检查是否存在刷新异常,如果存在则抛出异常。

这个 DorisWriterManager 类的目的是管理数据写入到 Doris 数据库的操作。它通过定时任务和异步刷新线程来控制数据的批量写入,同时处理异常情况,确保数据的稳定写入。

添加详细注释代码如下:

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class DorisWriterManager {private static final Logger LOG = LoggerFactory.getLogger(DorisWriterManager.class);private final DorisStreamLoadObserver visitor;private final Keys options;private final List<byte[]> buffer = new ArrayList<>(); // 缓冲区,用于存储待写入 Doris 的数据private int batchCount = 0; // 当前批次中的记录数量private long batchSize = 0; // 当前批次中的数据大小private volatile boolean closed = false; // 标志位,表示是否已关闭private volatile Exception flushException; // 异步刷新数据时可能发生的异常private final LinkedBlockingDeque<WriterTuple> flushQueue; // 用于异步刷新数据的队列private ScheduledExecutorService scheduler; // 用于定期刷新数据的调度器private ScheduledFuture<?> scheduledFuture;public DorisWriterManager(Keys options) {this.options = options;this.visitor = new DorisStreamLoadObserver(options);flushQueue = new LinkedBlockingDeque<>(options.getFlushQueueLength());this.startScheduler(); // 启动定期刷新调度器this.startAsyncFlushing(); // 启动异步刷新线程}// 启动定期刷新调度器public void startScheduler() {stopScheduler(); // 停止之前的调度器this.scheduler = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("Doris-interval-flush").daemon(true).build());this.scheduledFuture = this.scheduler.schedule(() -> {synchronized (DorisWriterManager.this) {if (!closed) {try {String label = createBatchLabel();LOG.info(String.format("Doris interval Sinking triggered: label[%s].", label));if (batchCount == 0) {startScheduler(); // 如果当前批次为空,重新启动定时任务}flush(label, false);} catch (Exception e) {flushException = e;}}}}, options.getFlushInterval(), TimeUnit.MILLISECONDS);}// 停止定期刷新调度器public void stopScheduler() {if (this.scheduledFuture != null) {scheduledFuture.cancel(false);this.scheduler.shutdown();}}// 写入一条记录到缓冲区public final synchronized void writeRecord(String record) throws IOException {checkFlushException(); // 检查是否有刷新异常try {byte[] bts = record.getBytes(StandardCharsets.UTF_8);buffer.add(bts);batchCount++;batchSize += bts.length;if (batchCount >= options.getBatchRows() || batchSize >= options.getBatchSize()) {String label = createBatchLabel();LOG.debug(String.format("Doris buffer Sinking triggered: rows[%d] label[%s].", batchCount, label));flush(label, false); // 当记录数量或数据大小超过阈值时触发刷新}} catch (Exception e) {throw new IOException("Writing records to Doris failed.", e);}}// 手动触发刷新缓冲区的数据public synchronized void flush(String label, boolean waitUntilDone) throws Exception {checkFlushException(); // 检查是否有刷新异常if (batchCount == 0) {if (waitUntilDone) {waitAsyncFlushingDone(); // 如果当前批次为空,等待之前的刷新操作完成}return;}flushQueue.put(new WriterTuple(label, batchSize, new ArrayList<>(buffer))); // 将数据放入刷新队列if (waitUntilDone) {waitAsyncFlushingDone(); // 等待刷新操作完成}buffer.clear();batchCount = 0;batchSize = 0;}// 关闭 DorisWriterManager,触发最后一次刷新操作public synchronized void close() {if (!closed) {closed = true;try {String label = createBatchLabel();if (batchCount > 0) LOG.debug(String.format("Doris Sink is about to close: label[%s].", label));flush(label, true); // 关闭时触发刷新操作} catch (Exception e) {throw new RuntimeException("Writing records to Doris failed.", e);}}checkFlushException();}// 创建批次标签,通常用于标识一批数据public String createBatchLabel() {StringBuilder sb = new StringBuilder();if (!Strings.isNullOrEmpty(options.getLabelPrefix())) {sb.append(options.getLabelPrefix());}return sb.append(UUID.randomUUID().toString()).toString();}// 启动异步刷新线程private void startAsyncFlushing() {Thread flushThread = new Thread(new Runnable() {public void run() {while (true) {try {asyncFlush(); // 异步刷新数据} catch (Exception e) {flushException = e;}}}});flushThread.setDaemon(true);flushThread.start();}// 等待之前的刷新操作完成private void waitAsyncFlushingDone() throws InterruptedException {for (int i = 0; i <= options.getFlushQueueLength(); i++) {flushQueue.put(new WriterTuple("", 0L, null));}checkFlushException();}// 异步刷新数据到 Dorisprivate void asyncFlush() throws Exception {WriterTuple flushData = flushQueue.take();if (Strings.isNullOrEmpty(flushData.getLabel())) {return;}stopScheduler(); // 停止定时任务LOG.debug(String.format("Async stream load: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel()));for (int i = 0; i <= options.getMaxRetries(); i++) {try {// 利用 DorisStreamLoadObserver 进行数据刷新visitor.streamLoad(flushData);LOG.info(String.format("Async stream load finished: label[%s].", flushData.getLabel()));startScheduler(); // break;} catch (Exception e) {LOG.warn("Failed to flush batch data to Doris, retry times = {}", i, e);if (i >= options.getMaxRetries()) {throw new IOException(e);}if (e instanceof DorisWriterExcetion && (( DorisWriterExcetion )e).needReCreateLabel()) {String newLabel = createBatchLabel();LOG.warn(String.format("Batch label changed from [%s] to [%s]", flushData.getLabel(), newLabel));flushData.setLabel(newLabel);}try {Thread.sleep(1000l * Math.min(i + 1, 10));} catch (InterruptedException ex) {Thread.currentThread().interrupt();throw new IOException("Unable to flush, interrupted while doing another attempt", e);}}}}private void checkFlushException() {if (flushException != null) {throw new RuntimeException("Writing records to Doris failed.", flushException);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/64039.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python入门教程 - 判断语句(二)

目录 一、布尔类型 二、比较运算符 三、if判断语句 一、布尔类型 True False result1 10 > 5 result2 10 < 5 print(result1) print(result2) print(type(result1)) True False <class bool> 二、比较运算符 ! > < > < 比较运算的结果是布尔…

wireshark 流量抓包例题

一、题目一(1.pcap) 题目要求&#xff1a; 1.黑客攻击的第一个受害主机的网卡IP地址 2.黑客对URL的哪一个参数实施了SQL注入 3.第一个受害主机网站数据库的表前缀&#xff08;加上下划线例如abc&#xff09; 4.第一个受害主机网站数据库的名字 看到题目SQL注入&#xff0c…

Cenos7安装小火车程序动画

运维Shell脚本小试牛刀(一) 运维Shell脚本小试牛刀(二) 运维Shell脚本小试牛刀(三)::$(cd $(dirname $0)&#xff1b; pwd)命令详解 运维Shell脚本小试牛刀(四): 多层嵌套if...elif...elif....else fi_蜗牛杨哥的博客-CSDN博客 Cenos7安装小火车程序动画 一&#xff1a;替换…

numpy学习:reshape和resize

.reshape 与 .resize reshape&#xff1a;有返回值&#xff0c;所谓有返回值&#xff0c;即不对原始多维数组进行修改&#xff1b; resize&#xff1a;无返回值&#xff0c;所谓无返回值&#xff0c;即会对原始多维数组进行修改&#xff1b;

电磁式电压互感器直流电阻测试

试验目的 测量电磁式电压互感器直流电阻的目 的是检查其一次、 二次绕组的质量及回路的完整性&#xff0c; 以发现各种原因所造成的导线断裂、 接头开焊、 接触不良、 匝间短路等缺陷。 试验设备 变压器直流电阻测试仪 厂家&#xff1a; 湖北众拓高试 试验方法 一次绕组直流…

C语言的类型转换

C语言的类型转换很重要&#xff0c;经常出现&#xff0c;但是往往不被人注意&#xff0c;而在汇编代码当中就暴露无遗了。 如下列代码&#xff1a; char ch; while ((ch getchar()) ! #) putchar(ch); 反汇编后&#xff1a; .text:00401006 mov…

【文心一言】学习笔记

学习资料 《听说文心一言App霸榜了&#xff0c;那必须来一波全方位实测了》 情感陪伴&#xff1a;文心一言 App 可以充当用户的情感树洞&#xff0c;提供知心姐姐、【暖男】等角色扮演&#xff0c;为用户提供情绪疏导、情感分析、约会建议等服务。 1. 模型属性 【提示词工具…

cobbler自动化安装CentOS、windows和ubuntu

环境介绍 同时玩cobbler3.3和cobbler2.8.5 cobbler3.3 系统CentOS8.3 VMware虚拟机 桥接到物理网络 IP: 192.168.1.33 cobbler2.8.5 系统CentOS7.9 VMWare虚拟机 桥接到物理网络 IP&#xff1a;192.168.1.33 安装cobbler3.3 yum源修改 cat /etc/yum.repo.d/Cento…

指针(一)------指针概念+指针类型+野指针+指针运算+二级指针

&#x1f493;博主csdn个人主页&#xff1a;小小unicorn ⏩专栏分类&#xff1a;C语言 &#x1f69a;代码仓库&#xff1a;小小unicorn的代码仓库&#x1f69a; &#x1f339;&#x1f339;&#x1f339;关注我带你学习编程知识 指针&#xff08;一&#xff09; 指针是什么指针…

Debezium快问快答

什么是 debezium? debezium 是一系列分布式服务的集合,这些服务可以捕获数据库中行级别的更改,应用程序可以根据这些变化来做相应的处理。 debezium 在事务日志中记录提交给每个数据库表的所有行级别的更改,每个应用程序可以只读取自己感兴趣的事务日志,并按照更改时间发…

ModaHub魔搭社区:自动化机器学习Auto-Sklearn全面详细教程

Auto-Sklearn的简介 Auto-Sklearn(基于scikit-learn库的自动化的机器学习工具)的概述 简介 Auto-Sklearn,在2015年由德国图宾根大学的研究人员提出的,最初的版本于2016年发布。auto-sklearn基于scikit-learn库进行开发,支持多种机器学习任务,包括分类、回归、时间序列…

php 权限节点的位运算

一&#xff0c;概述 在 PHP 中&#xff0c;位运算可以用来进行权限节点的判断。通常&#xff0c;每个权限节点都会用一个不同的位表示&#xff08;2的n次方&#xff0c;从0开始&#xff09;&#xff0c;可以将这些位组合成一个权限值。然后&#xff0c;可以使用位运算符来检查…

【Unity】URP屏幕后处理UI模糊效果实现

这里Canvas(1)设置为Overlay能渲染出指定UI高清&#xff0c;其他UI模糊&#xff0c;然而这做法非常不好&#xff0c;如果此时再打开UI 以及 关闭模糊效果 要将这些置顶UI 恢复到原本Canvas里&#xff0c;也就是要管理2套Canvas using System; using System.Collections; using…

【算法与数据结构】404、LeetCode左叶子之和

文章目录 一、题目二、解法三、完整代码 所有的LeetCode题解索引&#xff0c;可以看这篇文章——【算法和数据结构】LeetCode题解。 一、题目 二、解法 思路分析&#xff1a;思路比较简单&#xff0c;遍历所有节点然后判断该节点是否为左叶子节点&#xff0c;如果是&#xff0c…

Python3 命令行参数

Python 提供了 getopt 模块来获取命令行参数。 $ python test.py arg1 arg2 arg3 Python 中也可以所用 sys 的 sys.argv 来获取命令行参数&#xff1a; sys.argv 是命令行参数列表。 len(sys.argv) 计算命令行参数个数。 注&#xff1a;sys.argv[0] 表示脚本名。 实例 t…

静态成员(个人学习笔记黑马学习)

1、静态成员变量 所有对象共享同一份数据在编译阶段分配内存类内声明&#xff0c;类外初始化 #include <iostream> using namespace std; #include <string>class Person { public://1 所有对象都共享一份数据//2 编译阶段就分配内存//3 类内声明&#xff0c;类外初…

windows11 利用vmware17 安装ky10-server-x86操作系统

下载相关软件和镜像 vmware17 下载 下载页面 Download VMware Workstation Pro ky10server-x86镜像下载 官网 国产操作系统、银河麒麟、中标麒麟、开放麒麟、星光麒麟——麒麟软件官方网站 (kylinos.cn) 选择对应版本去下载 安装 选择镜像&#xff0c;点击下一步 磁盘设置要…

jupyter notebook中查看python版本的解决方案

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…

ssm+vue宠物领养系统源码和论文

ssmvue宠物领养系统源码和论文103 开发工具&#xff1a;idea 数据库mysql5.7 数据库链接工具&#xff1a;navcat,小海豚等 技术&#xff1a;ssm 摘 要 本课题是根据用户的需要以及网络的优势建立的一个宠物领养系统&#xff0c;来满足用宠物领养的需求。 本宠物领养系统…

Android JNI系列详解之生成指定CPU的库文件

一、前提 这次主要了解Android的cpu架构类型&#xff0c;以及在使用CMake工具的时候&#xff0c;如何指定生成哪种类型的库文件。 如上图所示&#xff0c;是我们之前使用CMake工具默认生成的四种cpu架构的动态库文件&#xff1a;arm64-v8a、armeabi-v7a、x86、x86_64&#xff0…