DorisWriterManager
的类,用于将数据写入到 Doris 中。以下是代码的具体作用和功能解释:
- 导入必要的包和类: 代码开头导入了所需的包和类,包括日志记录、线程池、字符编码和其他相关工具类。
- 类成员变量定义: 下面是一些类的成员变量定义,这些变量在类的不同方法中使用:
LOG
: 用于记录日志的 Logger 对象。visitor
:DorisStreamLoadObserver
类的实例,用于处理数据写入 Doris 的观察者。options
:Keys
类的实例,包含了一些配置选项。buffer
: 存储待写入 Doris 的数据。batchCount
: 当前批次中的记录数量。batchSize
: 当前批次中的数据大小。closed
: 标志位,表示是否已关闭写入。flushException
: 异步刷新数据时可能发生的异常。flushQueue
: 用于异步刷新数据的队列。scheduler
: 用于定期刷新数据的调度器。scheduledFuture
: 用于取消定时任务的句柄。
- 构造函数
DorisWriterManager
: 构造函数接受一个Keys
对象作为参数,设置了初始化的配置信息,并初始化了visitor
和flushQueue
。接着,它调用startScheduler()
启动定期刷新任务,以及startAsyncFlushing()
启动异步刷新线程。 startScheduler()
方法: 此方法负责启动定时刷新任务。它首先调用stopScheduler()
停止之前的定时任务。然后,创建一个单线程的调度器(scheduler),并设置一个定时任务,定期触发数据刷新操作。在定时任务内部,它会检查是否关闭了写入操作,然后根据配置信息进行数据刷新。如果当前批次为空,重新启动定时任务,确保数据持续刷新。stopScheduler()
方法: 此方法用于停止定时任务。它会取消之前的定时任务并关闭调度器。writeRecord(String record)
方法: 该方法用于将记录写入缓冲区。它首先调用checkFlushException()
方法检查是否存在刷新异常。然后,将记录转换成字节数组并添加到缓冲区中,同时更新批次计数和数据大小。如果当前批次的记录数量或数据大小超过了阈值,就会触发数据刷新。flush(String label, boolean waitUntilDone)
方法: 此方法用于手动触发数据刷新操作。它首先检查是否存在刷新异常,然后根据当前批次的情况决定是否执行刷新。如果当前批次为空,且waitUntilDone
为真,它会等待之前的异步刷新操作完成。否则,它将当前批次的数据放入刷新队列,并根据waitUntilDone
参数决定是否等待刷新操作完成。close()
方法: 此方法用于关闭DorisWriterManager
。它首先检查是否已经关闭,然后触发一次最终的数据刷新操作。如果当前批次有数据,会记录相应日志。最后,它检查是否有刷新异常并抛出相应异常。createBatchLabel()
方法: 此方法用于创建批次标签,用于标识一批数据。它根据配置的前缀和随机 UUID 生成标签。startAsyncFlushing()
方法: 此方法启动一个异步刷新线程。线程会循环调用asyncFlush()
方法,将数据异步刷新到 Doris 中。waitAsyncFlushingDone()
方法: 该方法用于等待之前的异步刷新操作完成。它向刷新队列添加空的WriterTuple
,以确保之前的刷新操作完成。然后,它检查是否存在刷新异常。asyncFlush()
方法: 此方法用于异步刷新数据到 Doris。它从刷新队列中取出WriterTuple
,然后根据批次的标签执行数据刷新操作。如果发生异常,它会尝试多次,直到达到最大重试次数。如果需要重新创建批次标签,则生成新的标签。重试之间会休眠一段时间。成功后,重新启动定时任务。checkFlushException()
方法: 此方法用于检查是否存在刷新异常,如果存在则抛出异常。
这个 DorisWriterManager
类的目的是管理数据写入到 Doris 数据库的操作。它通过定时任务和异步刷新线程来控制数据的批量写入,同时处理异常情况,确保数据的稳定写入。
添加详细注释代码如下:
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class DorisWriterManager {private static final Logger LOG = LoggerFactory.getLogger(DorisWriterManager.class);private final DorisStreamLoadObserver visitor;private final Keys options;private final List<byte[]> buffer = new ArrayList<>(); // 缓冲区,用于存储待写入 Doris 的数据private int batchCount = 0; // 当前批次中的记录数量private long batchSize = 0; // 当前批次中的数据大小private volatile boolean closed = false; // 标志位,表示是否已关闭private volatile Exception flushException; // 异步刷新数据时可能发生的异常private final LinkedBlockingDeque<WriterTuple> flushQueue; // 用于异步刷新数据的队列private ScheduledExecutorService scheduler; // 用于定期刷新数据的调度器private ScheduledFuture<?> scheduledFuture;public DorisWriterManager(Keys options) {this.options = options;this.visitor = new DorisStreamLoadObserver(options);flushQueue = new LinkedBlockingDeque<>(options.getFlushQueueLength());this.startScheduler(); // 启动定期刷新调度器this.startAsyncFlushing(); // 启动异步刷新线程}// 启动定期刷新调度器public void startScheduler() {stopScheduler(); // 停止之前的调度器this.scheduler = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("Doris-interval-flush").daemon(true).build());this.scheduledFuture = this.scheduler.schedule(() -> {synchronized (DorisWriterManager.this) {if (!closed) {try {String label = createBatchLabel();LOG.info(String.format("Doris interval Sinking triggered: label[%s].", label));if (batchCount == 0) {startScheduler(); // 如果当前批次为空,重新启动定时任务}flush(label, false);} catch (Exception e) {flushException = e;}}}}, options.getFlushInterval(), TimeUnit.MILLISECONDS);}// 停止定期刷新调度器public void stopScheduler() {if (this.scheduledFuture != null) {scheduledFuture.cancel(false);this.scheduler.shutdown();}}// 写入一条记录到缓冲区public final synchronized void writeRecord(String record) throws IOException {checkFlushException(); // 检查是否有刷新异常try {byte[] bts = record.getBytes(StandardCharsets.UTF_8);buffer.add(bts);batchCount++;batchSize += bts.length;if (batchCount >= options.getBatchRows() || batchSize >= options.getBatchSize()) {String label = createBatchLabel();LOG.debug(String.format("Doris buffer Sinking triggered: rows[%d] label[%s].", batchCount, label));flush(label, false); // 当记录数量或数据大小超过阈值时触发刷新}} catch (Exception e) {throw new IOException("Writing records to Doris failed.", e);}}// 手动触发刷新缓冲区的数据public synchronized void flush(String label, boolean waitUntilDone) throws Exception {checkFlushException(); // 检查是否有刷新异常if (batchCount == 0) {if (waitUntilDone) {waitAsyncFlushingDone(); // 如果当前批次为空,等待之前的刷新操作完成}return;}flushQueue.put(new WriterTuple(label, batchSize, new ArrayList<>(buffer))); // 将数据放入刷新队列if (waitUntilDone) {waitAsyncFlushingDone(); // 等待刷新操作完成}buffer.clear();batchCount = 0;batchSize = 0;}// 关闭 DorisWriterManager,触发最后一次刷新操作public synchronized void close() {if (!closed) {closed = true;try {String label = createBatchLabel();if (batchCount > 0) LOG.debug(String.format("Doris Sink is about to close: label[%s].", label));flush(label, true); // 关闭时触发刷新操作} catch (Exception e) {throw new RuntimeException("Writing records to Doris failed.", e);}}checkFlushException();}// 创建批次标签,通常用于标识一批数据public String createBatchLabel() {StringBuilder sb = new StringBuilder();if (!Strings.isNullOrEmpty(options.getLabelPrefix())) {sb.append(options.getLabelPrefix());}return sb.append(UUID.randomUUID().toString()).toString();}// 启动异步刷新线程private void startAsyncFlushing() {Thread flushThread = new Thread(new Runnable() {public void run() {while (true) {try {asyncFlush(); // 异步刷新数据} catch (Exception e) {flushException = e;}}}});flushThread.setDaemon(true);flushThread.start();}// 等待之前的刷新操作完成private void waitAsyncFlushingDone() throws InterruptedException {for (int i = 0; i <= options.getFlushQueueLength(); i++) {flushQueue.put(new WriterTuple("", 0L, null));}checkFlushException();}// 异步刷新数据到 Dorisprivate void asyncFlush() throws Exception {WriterTuple flushData = flushQueue.take();if (Strings.isNullOrEmpty(flushData.getLabel())) {return;}stopScheduler(); // 停止定时任务LOG.debug(String.format("Async stream load: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel()));for (int i = 0; i <= options.getMaxRetries(); i++) {try {// 利用 DorisStreamLoadObserver 进行数据刷新visitor.streamLoad(flushData);LOG.info(String.format("Async stream load finished: label[%s].", flushData.getLabel()));startScheduler(); // break;} catch (Exception e) {LOG.warn("Failed to flush batch data to Doris, retry times = {}", i, e);if (i >= options.getMaxRetries()) {throw new IOException(e);}if (e instanceof DorisWriterExcetion && (( DorisWriterExcetion )e).needReCreateLabel()) {String newLabel = createBatchLabel();LOG.warn(String.format("Batch label changed from [%s] to [%s]", flushData.getLabel(), newLabel));flushData.setLabel(newLabel);}try {Thread.sleep(1000l * Math.min(i + 1, 10));} catch (InterruptedException ex) {Thread.currentThread().interrupt();throw new IOException("Unable to flush, interrupted while doing another attempt", e);}}}}private void checkFlushException() {if (flushException != null) {throw new RuntimeException("Writing records to Doris failed.", flushException);}}
}