Doris:StreamLoad导入数据

    

目录

1.基本原理

2.支持数据格式

3.StreamLoad语法

3.1.请求参数

3.2.返回参数

4.StreamLoad实践

4.1.使用 curl命令

4.2.使用Java代码


    Stream load 是一个同步的导入方式,用户通过发送 HTTP 协议发送请求将本地文件或数据流导入到 Doris 中。Stream load 主要适用于导入本地文件,或通过程序导入数据流中的数据。

1.基本原理

        Stream load 中,Doris 会选定一个节点作为 Coordinator 节点。该节点负责接数据并分发数据到其他数据节点。
        用户通过 HTTP 协议提交导入命令。如果提交到 FE,则 FE 会通过 HTTP redirect 指令将请求转发给某一个 BE。用户也可以直接提交导入命令给某一指定 BE。
        导入的最终结果由 Coordinator BE 返回给用户。

2.支持数据格式

        目前 Stream Load 支持数据格式:CSV(文本)、JSON,1.2+ 支持PARQUET 和 ORC。 

3.StreamLoad语法

        Stream Load 通过 HTTP 协议提交和传输数据。这里通过 curl 命令展示如何提交导入。用户也可以通过其他 HTTP client 进行操作。

3.1.请求参数

        Stream Load 由于使用的是 HTTP 协议,所以所有导入任务有关的参数均设置在 Header 中。

参数参数说明
user/passwdStream load 由于创建导入的协议使用的是 HTTP 协议,通过 Basic access authentication 进行签名。Doris 系统会根据签名验证用户身份和导入权限。
label导入任务的标识。每个导入任务,都有一个在单 database 内部唯一的 label。
column_separator用于指定导入文件中的列分隔符,默认为\t。如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。
line_delimiter用于指定导入文件中的换行符,默认为\n。
max_filter_ratio导入任务的最大容忍率,默认为0容忍,取值范围是0~1。当导入的错误率超过该值,则导入失败。如果用户希望忽略错误的行,可以通过设置这个参数大于 0,来保证导入可以成功。
where导入任务指定的过滤条件。Stream load 支持对原始数据指定 where 语句进行过滤。被过滤的数据将不会被导入,也不会参与 filter ratio 的计算,但会被计入num_rows_unselected。
Partitions待导入表的 Partition 信息,如果待导入数据不属于指定的 Partition 则不会被导入。这些数据将计入 dpp.abnorm.ALL
columns待导入数据的函数变换配置,目前 Stream load 支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致。
format

指定导入数据格式,支持csv、json,默认是csv

支持csv_with_names(支持csv文件行首过滤)、csv_with_names_and_types(支持csv文件前两行过滤)

exec_mem_limit导入内存限制。默认为 2GB,单位为字节。
merge_type 数据的合并类型,一共支持三种类型APPEND、DELETE、MERGE。
APPEND是默认值,表示这批数据全部需要追加到现有数据中,
DELETE 表示删除与这批数据key相同的所有行,
MERGE 语义 需要与delete 条件联合使用,表示满足delete 条件的数据按照DELETE 语义处理其余的按照APPEND 语义处理
two_phase_commitStream load 导入可以开启两阶段事务提交模式:在Stream load过程中,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为PRECOMMITTED,用户手动触发commit操作之后,数据才可见。
enable_profile当 enable_profile 为 true 时,Stream Load profile将会打印到日志中。否则不会打印。

3.2.返回参数

        Stream load 是一种同步的导入方式,所以导入的结果会通过创建导入的返回值直接返回给用户。

参数参数说明
TxnId导入的事务ID。
Label导入 Label。由用户指定或系统自动生成。
Status导入完成状态:
"Success":表示导入成功。
"Publish Timeout":该状态也表示导入已经完成,只是数据可能会延迟可见,无需重试。
"Label Already Exists":Label 重复,需更换 Label。
"Fail":导入失败。
ExistingJobStatus已存在的 Label 对应的导入作业的状态。
Message导入错误信息。
NumberTotalRows导入总处理的行数。
NumberLoadedRows成功导入的行数。
NumberFilteredRows数据质量不合格的行数。
NumberUnselectedRows被 where 条件过滤的行数。
LoadBytes导入的字节数。
LoadTimeMs导入完成时间。单位毫秒。
BeginTxnTimeMs向Fe请求开始一个事务所花费的时间,单位毫秒。
StreamLoadPutTimeMs向Fe请求获取导入数据执行计划所花费的时间,单位毫秒。
ReadDataTimeMs读取数据所花费的时间,单位毫秒。
WriteDataTimeMs执行写入数据操作所花费的时间,单位毫秒。
CommitAndPublishTimeMs向Fe请求提交并且发布事务所花费的时间,单位毫秒。
ErrorURL如果有数据质量问题,通过访问这个 URL 查看具体错误行。

4.StreamLoad实践

4.1.使用 curl命令

curl命令格式如下:

curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load

# Header 中支持属性见下面的 ‘导入任务参数’ 说明
# 格式为: -H "key1:value1"

csv文件数据如下:

id,username,age,sex,phone,register_time
3,user_3,24,0,13212345678,2023-11-03 10:23:34
4,user_4,31,0,13312345678,2023-11-03 12:34:56
5,user_5,53,1,13412345678,2023-11-03 09:12:34

执行导入:

 curl --location-trusted -u root -T /home/weisx/opt/doris/user.csv -H "label:label_user" -H "column_separator:," -H "format:csv_with_names" http://localhost:8030/api/demo/user/_stream_load

134c8f4632454e3c9a0991b84cac3ee2.png

8cb5af8095b34e4983df3c15c74144bc.png

4.2.使用Java代码

package com.yichenkeji.dataplus.core.drois.util;import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.stream.Collectors;@Slf4j
public class StreamLoadTest {public static void main(String[] args) throws IOException {List<Map<String,Object>> datas = loadData();String label = "label_user_java";String username = "root";String password = "";String db = "demo";String table ="user";String loadUrl = String.format("http://192.168.179.131:8030/api/%s/%s/_stream_load",db,table);List<String> columns = Arrays.asList("id,username,age,sex,phone,register_time".split(","));String columnSeparator = ",";String format = "csv";String  loadData = datas.stream().map(data -> columns.stream().map(column -> data.get(column).toString()).collect(Collectors.joining(columnSeparator))).collect(Collectors.joining("\n"));sendData(label,username,password,loadUrl,columns,loadData,columnSeparator,null,format,null);}/*** 加载数据* @return*/private static List<Map<String, Object>> loadData() {List<Map<String,Object>> datas = new ArrayList<>();Map<String,Object> map = new HashMap<>();map.put("id",6);map.put("username","user_6");map.put("age",52);map.put("sex",1);map.put("phone","13612345678");map.put("register_time","2023-11-02-12:34:36");datas.add(map);return  datas;}/*** Basic access authentication 签名* @param username doris用户名* @param password doris用户密码* @return*/public static String basicAuthHeader(String username, String password) {final String tobeEncode = username + ":" + password;byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));return "Basic " + new String(encoded);}/*** Stream load 导入数据* @param label 导入任务的标识。每个导入任务,都有一个在单 database 内部唯一的 label。label 是用户在导入命令中自定义的名称。通过这个 label,用户可以查看对应导入任务的执行情况。* @param username* @param password* @param loadUrl* @param columns 待导入数据的函数变换配置,目前 Stream load 支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致。* @param loadData* @param columnSeparator 用于指定导入文件中的列分隔符,默认为\t。如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。* @param lineDelimiter 用于指定导入文件中的换行符,默认为\n。* @param format 指定导入数据格式,支持csv、json,默认是csv* @param mergeType 数据的合并类型:一共支持三种类型APPEND、DELETE、MERGE 其中,APPEND是默认值* @throws IOException*/public static void sendData(String label, String username, String password, String loadUrl, List<String> columns, String loadData, String columnSeparator, String lineDelimiter, String format, String mergeType) throws IOException {HttpClientBuilderhttpClientBuilder = HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {@Overrideprotected boolean isRedirectable(String method) {return true;}});log.info("loadUrl:{},columns:{}",loadUrl,columns);try (CloseableHttpClient client = httpClientBuilder.build()) {HttpPut put = new HttpPut(loadUrl);StringEntity entity = new StringEntity(loadData, "UTF-8");put.setHeader(HttpHeaders.EXPECT, "100-continue");put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(username, password));// the label header is optional, not necessary// use label header can ensure at most once semanticsput.setHeader("label", label);if(StringUtils.isNotBlank(columnSeparator)){put.setHeader("column_separator", columnSeparator);}if(StringUtils.isNotBlank(lineDelimiter)){put.setHeader("line_delimiter", lineDelimiter);}put.setHeader("format", format);put.setHeader("merge_type", mergeType);//字段if (null != columns && !columns.isEmpty()) {put.setHeader("columns", String.join(",",columns.stream().map(f -> String.format("`%s`", f)).collect(Collectors.toList())));}//数据put.setEntity(entity);try (CloseableHttpResponse response = client.execute(put)) {String loadResultStr  =  null;if (response.getEntity() != null) {loadResultStr  =  EntityUtils.toString(response.getEntity());}final int statusCode = response.getStatusLine().getStatusCode();log.info("statusCode:{},loadResultStr:{}",statusCode,loadResultStr);}}}
}

c88d986aeb55462ca5f1e256b52faa02.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/129022.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

免费记课时小程序-全优学堂

1. 教师使用小程序记上课 使用步骤 创建了员工账号&#xff0c;员工需设置为教师为班级进行排课使用系统账号绑定小程序&#xff0c;记上课 #1.1 创建员工账号 通过系统菜单’机构设置->员工管理‘&#xff0c;添加本机构教师及其他员工。 添加过程中&#xff0c;可设置…

ffmpeg mp3截取命令,视频与mp3合成带音频视频命令

从00:00:03.500开始截取往后长度到结尾的mp3音频&#xff08;这个更有用&#xff0c;测试好用&#xff09; ffmpeg -i d:/c.mp3 -ss 00:00:03.500 d:/output.mp3 将两个音频合并成一个音频&#xff08;测试好用&#xff09; ffmpeg -i "concat:d:/c.mp3|d:/output.mp3&…

CSS3设计动画样式

CSS3动画包括过渡动画和关键帧动画&#xff0c;它们主要通过改变CSS属性值来模拟实现。我将详细介绍Transform、Transitions和Animations 3大功能模块&#xff0c;其中Transform实现对网页对象的变形操作&#xff0c;Transitions实现CSS属性过渡变化&#xff0c;Animations实现…

Git复制代码

目录 一、常用下载代码 1.登录Git克隆SSH​编辑 2.新建文件然后右键点击Git Bash Here 3.git clone Paste 二. 本地下载 1.从本地进入页面 2.生成代码——>导入——>生成代码后下载 3.解压道相应位置 一、常用下载代码 1.登录Git克隆SSH 2.新建文件然后右键点击…

C# list<T>去重

文章目录 C# list<T>去重值类型去重List<object>object is intobject is decimalobject is charobject is boolobject is string List<int>List<string> 引用类型去重 C# list去重 值类型去重 List object is int //object is intList<object&g…

图像二值化阈值调整——cv2.threshold方法

二值化阈值调整&#xff1a;调整是指在进行图像二值化处理时&#xff0c;调整阈值的过程。阈值决定了将图像中的像素分为黑色和白色的界限&#xff0c;大于阈值的像素被设置为白色&#xff0c;小于等于阈值的像素被设置为黑色。 首先画出灰度直方图&#xff1a;横坐标是灰度值…

制造行业数字化运维破局之道

项目背景 某大型汽车制造集团&#xff0c;致力于通过数字化、智能化运营手段为用户提升提供高品质的汽车产品和服务。IT部门不仅为内外部持续提供服务&#xff0c;同时为业务运营与核心系统运行提供重要支撑。数字化运维作为数字化转型的核心基础&#xff0c;不但要保障数据安…

3D网页游戏外包开发引擎

3D网页开发引擎是用于创建具有三维图形、虚拟现实和交互性的网页应用程序的工具。以下是一些常用的3D网页开发引擎以及它们的主要特点&#xff0c;希望对大家有所帮助。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&#xff0c;欢迎交流合作。 1.Three.js&…

使用 Curl 和 DomCrawler 下载抖音视频链接并存储到指定文件夹

项目需求 假设我们需要从抖音平台上下载一些特定的视频&#xff0c;以便进行分析、编辑或其他用途。为了实现这个目标&#xff0c;我们需要编写一个爬虫程序来获取抖音视频的链接&#xff0c;并将其保存到本地文件夹中。 目标分析 在开始编写爬虫之前&#xff0c;我们需要了…

Redis Twemproxy 集群,水平扩展 ,扩容方案

文章目录 一、概述二、Twemproxy 分布模式三、测试规划四、Redis 服务实例准备4.1 配置Redis实例4.2 创建关资源4.3 启动Redis服务实例 五、Twemproxy 安装准备六、Twemproxy 安装及集群配置6.1 安装 Twemproxy6.2 配置 Twemproxy6.3 启动 twemproxy6.4 测试 twemproxy 集群 如…

如何使用 NFTScan NFT API 在 Polygon 网络上开发 Web3 应用

Polygon 以前被称为 Matic Network&#xff0c;是一种扩展的解决方案&#xff0c;它提供多种工具来加快并降低区块链网络上交易的成本和复杂性。然而&#xff0c;其区块链上的大量活动使以太坊因增长的传输成本和拥挤的流量几乎瘫痪。Polygon 诞生的主要目的是帮助以太坊解决链…

Docker学习——①

文章目录 1、什么是虚拟化、容器化&#xff1f;2、为什么要虚拟化、容器化&#xff1f;3、虚拟化实现方式3.1 应用程序执行环境分层3.2 虚拟化常见类别3.3 常见虚拟化实现3.3.1 主机虚拟化(虚拟机)实现3.3.2 容器虚拟化实现3.3.3 空间隔离实战--基础知识3.3.4 PID 隔离3.3.5 Mo…

springboot+vue基于Hadoop短视频流量数据分析与可视化系统的设计与实现【内含源码+文档+部署教程】

博主介绍&#xff1a;✌全网粉丝10W,前互联网大厂软件研发、集结硕博英豪成立工作室。专注于计算机相关专业毕业设计项目实战6年之久&#xff0c;选择我们就是选择放心、选择安心毕业✌ &#x1f345;由于篇幅限制&#xff0c;想要获取完整文章或者源码&#xff0c;或者代做&am…

HarmonyOS(二)—— 初识ArkTS开发语言(中)之ArkTS的由来和演进

前言 在上一篇文章HarmonyOS&#xff08;二&#xff09;—— 初识ArkTS开发语言&#xff08;上&#xff09;之TypeScript入门&#xff0c;我初识了TypeScript相关知识点&#xff0c;也知道ArkTS是华为基于TypeScript发展演化而来。 从最初的基础的逻辑交互能力&#xff0c;到…

C++类和对象万字详解(典藏版)

文章目录 前言认识类和对象使用 struct 定义类class 定义类类的声明和定义分离类大小的计算this指针this指针的常见的面试题 构造函数与构析函数构造函数初始化列表 构析函数默认生成的构造函数和构析函数 拷贝构造函数默认类型转化与 explicit 关键字 static 成员变量运算符重…

【云原生基础】了解云原生,什么是云原生?

&#x1f4d1;前言 本文主要讲了云原生的基本概念和原则的文章&#xff0c;如果有什么需要改进的地方还请大佬指出⛺️ &#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是青衿&#x1f947; ☁️博客首页&#xff1a;CSDN主页放风讲故事 &#x1f304;每日一句&#x…

分享一个抖音视频解析神器~

怎么样下载抖音视频&#xff1f;相信很多人都有过这样的困惑。作为一个资深短视频剪辑工作者&#xff0c;常常需要用到各种视频素材&#xff0c;其中不乏需要从抖音上下载的&#xff0c;因此我也尝试过许多下载工具&#xff0c;但是效果都不大满意&#xff0c;直到有一次朋友给…

SpringBoot可以同时处理多少请求?

前言 前两天面试的时候&#xff0c;面试官问我&#xff1a;一个ip发请求过来&#xff0c;是一个ip对应一个线程吗&#xff1f;我突然愣住了&#xff0c;对于SpringBoot如何处理请求好像从来没仔细思考过&#xff0c;所以面试结束后就仔细研究了一番&#xff0c;现在就来探讨一…

C++——list

目录 list介绍 list的函数接口 构造函数 push_front和pop_front push_back和pop_back insert erase 迭代器 front和back size resize empty clear list::sort unique reverse 迭代器的实现 list介绍 list是一种可以在常数范围内在任意位置进行插入和删除的序列…

“AI换脸诈骗”来势汹汹,三个层面科学应对……

当前&#xff0c;AI技术的广泛应用为社会公众提供了个性化智能化的信息服务&#xff0c;也给网络诈骗带来可乘之机&#xff0c;如不法分子通过面部替换语音合成等方式制作虚假图像、音频、视频仿冒他人身份实施诈骗、侵害消费者合法权益。你认为AI诈骗到底应该如何防范&#xf…