DataX DorisWriter 插件DorisStreamLoadObserver类详细解读

DorisStreamLoadObserver 类是一个用于将数据加载到 Doris(以前称为 Palo)数据库中并监视加载过程的 Java 类。该类提供了一组方法,用于构建 HTTP 请求、处理 HTTP 响应以及监控数据加载的状态。以下是每个方法的具体作用:

  1. DorisStreamLoadObserver(Keys options): 这是类的构造函数,用于初始化加载数据所需的配置选项。
  2. void streamLoad(WriterTuple data) throws Exception: 该方法是数据加载的主要方法。它将给定的数据(WriterTuple 对象)加载到 Doris 数据库中。它构建了用于将数据发送到 Doris 的 HTTP 请求,并根据响应状态来确定加载是否成功。如果加载失败,它会抛出异常。
  3. private void checkStreamLoadState(String host, String label) throws IOException: 这个方法用于检查数据加载的状态。它会不断地轮询 Doris 服务器,以获取特定加载任务的最终状态。根据加载状态的不同,它可能会抛出异常或者在加载完成时返回。
  4. private byte[] addRows(List<byte[]> rows, int totalBytes): 此方法根据给定的数据行和总字节数,构建用于加载的字节数组。它根据配置中的数据格式(CSV 或 JSON)将数据行连接起来,并添加适当的分隔符。
  5. private Map<String, Object> put(String loadUrl, String label, byte[] data) throws IOException: 该方法执行 HTTP PUT 请求,将数据加载到 Doris 数据库中。它构建了包含数据的请求实体,发送到指定的加载 URL,并解析响应以获取加载结果。
  6. private String getBasicAuthHeader(String username, String password): 此方法用于生成基本身份验证头部,以便在 HTTP 请求中进行身份验证。
  7. private HttpEntity getHttpEntity(CloseableHttpResponse response): 这是一个实用方法,用于从 HTTP 响应中提取实体内容。
  8. private String getLoadHost(): 该方法从配置选项中获取用于加载数据的主机地址列表,并尝试连接到这些主机以检查其可用性。它会返回第一个可用的主机地址。

DorisStreamLoadObserver 类主要用于处理数据加载任务,它负责构建适当的 HTTP 请求,将数据发送到 Doris 数据库,并监控加载任务的状态。通过这些方法,可以实现将数据从外部系统加载到 Doris 数据库中,并在加载过程中进行必要的状态检查和错误处理。

import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;public class DorisStreamLoadObserver {private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoadObserver.class);private Keys options;private long pos;private static final String RESULT_FAILED = "Fail";private static final String RESULT_LABEL_EXISTED = "Label Already Exists";private static final String LAEBL_STATE_VISIBLE = "VISIBLE";private static final String LAEBL_STATE_COMMITTED = "COMMITTED";private static final String RESULT_LABEL_PREPARE = "PREPARE";private static final String RESULT_LABEL_ABORTED = "ABORTED";private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN";public DorisStreamLoadObserver(Keys options) {this.options = options;}// 数据写入 Doris 的主要方法public void streamLoad(WriterTuple data) throws Exception {String host = getLoadHost();if (host == null) {throw new IOException("load_url cannot be empty, or the host cannot connect. Please check your configuration.");}String loadUrl = new StringBuilder(host).append("/api/").append(options.getDatabase()).append("/").append(options.getTable()).append("/_stream_load").toString();LOG.info("Start to join batch data: rows[{}] bytes[{}] label[{}].", data.getRows().size(), data.getBytes(), data.getLabel());Map<String, Object> loadResult = put(loadUrl, data.getLabel(), addRows(data.getRows(), data.getBytes().intValue()));LOG.info("StreamLoad response :{}", JSONValue.toJSONString(loadResult));final String keyStatus = "Status";if (null == loadResult || !loadResult.containsKey(keyStatus)) {throw new IOException("Unable to flush data to Doris: unknown result status.");}LOG.debug("StreamLoad response:{}", JSONValue.toJSONString(loadResult));if (RESULT_FAILED.equals(loadResult.get(keyStatus))) {throw new IOException(new StringBuilder("Failed to flush data to Doris.\n").append(JSONValue.toJSONString(loadResult)).toString());} else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) {LOG.debug("StreamLoad response:{}", JSONValue.toJSONString(loadResult));checkStreamLoadState(host, data.getLabel());}}// 检查数据加载状态的方法private void checkStreamLoadState(String host, String label) throws IOException {int idx = 0;while (true) {try {TimeUnit.SECONDS.sleep(Math.min(++idx, 5));} catch (InterruptedException ex) {break;}try (CloseableHttpClient httpclient = HttpClients.createDefault()) {HttpGet httpGet = new HttpGet(new StringBuilder(host).append("/api/").append(options.getDatabase()).append("/get_load_state?label=").append(label).toString());httpGet.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));httpGet.setHeader("Connection", "close");try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {HttpEntity respEntity = getHttpEntity(resp);if (respEntity == null) {throw new IOException(String.format("Failed to flush data to Doris, Error " +"could not get the final state of label[%s].\n", label), null);}Map<String, Object> result = (Map<String, Object>) JSONValue.parse(EntityUtils.toString(respEntity));String labelState = (String) result.get("state");if (null == labelState) {throw new IOException(String.format("Failed to flush data to Doris, Error " +"could not get the final state of label[%s]. response[%s]\n", label, EntityUtils.toString(respEntity)), null);}LOG.info(String.format("Checking label[%s] state[%s]\n", label, labelState));switch (labelState) {case LAEBL_STATE_VISIBLE:case LAEBL_STATE_COMMITTED:return;case RESULT_LABEL_PREPARE:continue;case RESULT_LABEL_ABORTED:throw new DorisWriterExcetion(String.format("Failed to flush data to Doris, Error " +"label[%s] state[%s]\n", label, labelState), null, true);case RESULT_LABEL_UNKNOWN:default:throw new IOException(String.format("Failed to flush data to Doris, Error " +"label[%s] state[%s]\n", label, labelState), null);}}}}}// 根据格式将数据行拼接成字节数组private byte[] addRows(List<byte[]> rows, int totalBytes) {if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {Map<String, Object> props = (options.getLoadProps() == null ? new HashMap<>() : options.getLoadProps());byte[] lineDelimiter = DelimiterParser.parse((String) props.get("line_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);for (byte[] row : rows) {bos.put(row);bos.put(lineDelimiter);}return bos.array();}if (Keys.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) {ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));bos.put("[".getBytes(StandardCharsets.UTF_8));byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);boolean isFirstElement = true;for (byte[] row : rows) {if (!isFirstElement) {bos.put(jsonDelimiter);}bos.put(row);isFirstElement = false;}bos.put("]".getBytes(StandardCharsets.UTF_8));return bos.array();}throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");}private Map<String, Object> put(String loadUrl, String label, byte[] data) throws IOException {RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(120 * 1000).setConnectTimeout(120 * 1000).setConnectionRequestTimeout(120 * 1000).build();try (CloseableHttpClient httpclient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).setRedirectStrategy(new DefaultRedirectStrategy()).build()) {HttpPut httpPut = new HttpPut(loadUrl);httpPut.setHeader(HttpHeaders.CONTENT_TYPE, "application/octet-stream");httpPut.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));httpPut.setEntity(new ByteArrayEntity(data));try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {HttpEntity respEntity = getHttpEntity(resp);if (respEntity == null) {throw new IOException("Failed to flush data to Doris, Error could not get the response entity.");}return (Map<String, Object>) JSONValue.parse(EntityUtils.toString(respEntity));}}}// 构造 HTTP 请求中的基本认证头部private String getBasicAuthHeader(String username, String password) {String credentials = username + ":" + password;byte[] credentialsBytes = credentials.getBytes(StandardCharsets.UTF_8);String base64Credentials = Base64.encodeBase64String(credentialsBytes);return "Basic " + base64Credentials;}// 从 HTTP 响应中获取实体内容private HttpEntity getHttpEntity(CloseableHttpResponse response) {if (response != null) {return response.getEntity();}return null;}// 获取用于加载数据的主机地址private String getLoadHost() {List<String> hosts = options.getDorisStreamLoadUrls();for (String host : hosts) {try {HttpURLConnection connection = (HttpURLConnection) new URL(host).openConnection();connection.setRequestMethod("HEAD");int responseCode = connection.getResponseCode();if (responseCode == HttpURLConnection.HTTP_OK) {return host;}} catch (IOException e) {LOG.warn("Failed to connect to host: {}", host);}}return null;}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/63348.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何使用CSS实现一个自适应等高布局?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 使用 Flexbox 布局⭐ 使用 Grid 布局⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入门之旅&#xff01;这个专栏是为那些对Web开发…

基于OpenCV+Keras+tensorflow 实现的变电站作业管控平台源代码。含人脸识别考勤,移动目标跟踪,越线检测,安全措施检测,姿态识别等功能

#综述 使用该作业现场安全生产智能管控平台来实现变电站的安全生产的智能化管理&#xff0c;通过人脸识别功能进行人员的考勤&#xff1b; 通过人员、车辆的检测和识别来实现变电站的智能化管理&#xff1b;通过安全行为识别和安全区域报警功能来实现对变电站内人员和设备安全的…

python使用字典暴力解析wifi密码

前言 最近无wifi可用,搜到了很多高质量但是没有密码的WiFi,我在想应该可以用python调用常见的wifi字典包来暴力破解一下这些WiFi,也许可以成功 原理 使用pip install pywifi命令安装pywifi 使用它调用本机网卡,设置wifi加密方式,对字典包扫描密码逐个尝试 扫描失败的密码会被…

【若依框架RuoYi-Vue-Plus 图片回显不显示问题,OSS文件上传或者本地上传】

一、问题 1.设计表 product&#xff08;商品表&#xff09; 有 id &#xff08;id&#xff09; name&#xff08;商品名&#xff09;icon&#xff08;图标&#xff09; 2.使用若依代码生成功能&#xff0c;导入product表&#xff0c;代码生成。 3.将生成的代码导入到项目中得到…

专访张少光---国内著名牛散、实战专家

导读&#xff1a;新财富最佳分析师评选作为中国本土第一份市场化的分析师评选&#xff0c;自2003年开启至今已20年&#xff0c;通过公正、公平、公开的评选&#xff0c;与市场各方共同挖掘了大量优秀分析师。值此新财富最佳分析师评选20周年之际&#xff0c;我们期望通过《对话…

51单片机智能电风扇控制系统proteus仿真设计( 仿真+程序+原理图+报告+讲解视频)

51单片机智能电风扇控制系统仿真设计( proteus仿真程序原理图报告讲解视频&#xff09; 讲解视频1.主要功能&#xff1a;2.仿真3. 原理图4. 程序代码5.设计报告6. 设计资料内容清单 51单片机智能电风扇控制系统仿真设计( proteus仿真程序原理图报告讲解视频&#xff09; 仿真图…

API管理风险:如何确保您的API安全与可靠?

API管理风险&#xff1a;如何确保您的API安全与可靠&#xff1f; 随着数字化时代的到来&#xff0c;应用程序接口&#xff08;API&#xff09;在现代软件开发中发挥着关键的作用。然而&#xff0c;API管理过程中存在着各种潜在的风险。本文将探讨如何有效地管理和缓解这些风险…

C ++ 学习之分文件 实现类

前言 当您在 C 中编写较大的程序时&#xff0c;将所有代码都放在一个文件中可能会变得混乱和不可维护。为了更好地组织代码并提高可维护性&#xff0c;您可以使用分文件实现&#xff08;Separate Compilation&#xff09;的概念。 正文 我的 circle.h 文件 #pragma once #i…

8、监测数据采集物联网应用开发步骤(6)

监测数据采集物联网应用开发步骤(5.3) 定时器插件化开发 在com.zxy.common.Com_Para.py中添加如下内容 #定时器正在运行标签 bTimeFlag False #定时器插件拦截器 TimeREFLECT_IN_CLASS "com.plugins.usereflect.testCustTimeReflectClass1" 创建自定义定时器执…

UE5 里的一些常用的了解

# ACharacter、APawn的继承关系 ACharacter -继承自-> APawn -继承自-> AActor和 INavAgentInterface AActor -继承自-> UObject -继承自->UObjectBaseUtility -继承自-> UObjectBase&#xff08;一个独立的类&#xff09;INavAgentInterface是一个独立的类 #…

vue v-on 艾特@

vue v-on 内联代码&#xff1a; <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</titl…

多目标应用:基于多目标人工蜂鸟算法(MOAHA)的微电网多目标优化调度MATLAB

一、微网系统运行优化模型 参考文献&#xff1a; [1]李兴莘,张靖,何宇,等.基于改进粒子群算法的微电网多目标优化调度[J].电力科学与工程, 2021, 37(3):7 二、多目标人工蜂鸟算法MOAHA 多目标人工蜂鸟算法&#xff08;multi-objective artificial hummingbird algorithm&…

构建稳定的爬虫系统:如何选择合适的HTTP代理服务商

在构建一个稳定、高效的爬虫系统中&#xff0c;选择合适的HTTP代理服务商是至关重要的一步。本文将介绍如何选取可靠且性能优秀的HTTP代理服务供应商&#xff0c;来完成搭建一个强大而稳定的爬虫系统。 1.了解不同类型和特点 -免费公开代理服务器:提供免费但可能存在限制或不…

【Linux】目录结构、路径

目录 1. 目录结构 1.1 基本概念 1.2 具体的目录结构 2. 路径 2.1 绝对路径和相对路径 2.2 特殊路径符 1. 目录结构 1.1 基本概念 Linux的目录结构是一个树形结构。 Windows系统可以拥有多个盘符&#xff0c;如 C盘、D盘、E盘。Linux没有盘符这个概念&#xff0c;只有一…

一阴一阳之谓道,乃自然规律也!

阴阳&#xff0c;在我们国家&#xff0c;是一切传统文化的基础。作为一个有着五千年文化的国家&#xff0c;作为世界上仅存的四大文明古国&#xff0c;峰民觉得&#xff0c;我们的传统文化&#xff0c;不能被当成迷信&#xff0c;慢慢的没落。 有时&#xff0c;不得不承认&…

【100天精通python】Day50:python web编程_Django框架从安装到使用

目录 1 安装Django Web框架 2 创建一个Django 项目 3 数据模型 3.1 在应用程序的 models.py 文件中定义数据模 3.2 创建模型的迁移文件并应用 3.2.1 查询模型对象&#xff1a; 3.2.2 创建新模型对象&#xff1a; 3.2.3 更新模型对象&#xff1a; 3.2.4 删除模型对象&a…

BDCC - 闲聊数据仓库的架构

文章目录 典型数据仓库架构图数据仓库ETL vs ELTETLELT区别联系 数据仓库分层&#xff08;1&#xff09;数据仓库ODS层&#xff08;2&#xff09;数据仓库CDM层DWD数据明细层DWS数据汇总层 &#xff08;3&#xff09;数据仓库ADS层 典型数据仓库架构图 按自下而上的顺序&#x…

plsql ebs 工作中的简单笔记

工作流中给系统界面发送消息&#xff1a; PROCEDURE wf_notify(p_sender IN VARCHAR2 DEFAULT SYSADMIN,p_receiver IN VARCHAR2,p_subject IN VARCHAR2,p_content_text IN VARCHAR2);PROCEDURE wf_notify(p_sender IN VARCHAR2 DEFAULT SYSADMIN,---发送…

外贸企业如何借助CRM提升企业发展?

外贸企业竞争激烈&#xff0c;提高自身竞争力&#xff0c;扩大海外业务市场&#xff0c;是每个外贸企业的目标。为了实现这一目标&#xff0c;不少外贸企业借助CRM系统&#xff0c;优化业务流程&#xff0c;管理维护客户&#xff0c;从而实现可持续发展。那么&#xff0c;外贸企…

2023年高教社杯数学建模思路 - 案例:异常检测

文章目录 赛题思路一、简介 -- 关于异常检测异常检测监督学习 二、异常检测算法2. 箱线图分析3. 基于距离/密度4. 基于划分思想 建模资料 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 一、简介 – 关于异常…