1、版本介绍:
- doris版本: 1.2.8
- Spark Connector for Apache Doris 版本: spark-doris-connector-3.3_2.12-1.3.0.jar:1.3.0-SNAPSHOT
- spark版本:spark-3.3.1
2、Spark Doris Connector
Spark Doris Connector - Apache Doris
目前最新发布版本: Release Apache Doris Spark Connector 1.3.0 Release · apache/doris-spark-connector · GitHub
2.1、Spark Doris Connector概述
Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris。
代码库地址:GitHub - apache/doris-spark-connector: Spark Connector for Apache Doris
- 支持从
Doris
中读取数据 - 支持
Spark DataFrame
批量/流式 写入Doris
- 可以将
Doris
表映射为DataFrame
或者RDD
,推荐使用DataFrame
。 - 支持在
Doris
端完成数据过滤,减少数据传输量。
2.2、Doris 和 Spark 列类型映射关系
Doris Type | Spark Type |
---|---|
NULL_TYPE | DataTypes.NullType |
BOOLEAN | DataTypes.BooleanType |
TINYINT | DataTypes.ByteType |
SMALLINT | DataTypes.ShortType |
INT | DataTypes.IntegerType |
BIGINT | DataTypes.LongType |
FLOAT | DataTypes.FloatType |
DOUBLE | DataTypes.DoubleType |
DATE | DataTypes.DateType |
DATETIME | DataTypes.StringType1 |
DECIMAL | DecimalType |
CHAR | DataTypes.StringType |
LARGEINT | DecimalType |
VARCHAR | DataTypes.StringType |
TIME | DataTypes.DoubleType |
HLL | Unsupported datatype |
Bitmap | Unsupported datatype |
- 注:Connector 中,将
DATETIME
映射为String
。由于Doris
底层存储引擎处理逻辑,直接使用时间类型时,覆盖的时间范围无法满足需求。所以使用String
类型直接返回对应的时间可读文本。
3、doris所有所有类型测试与报错解决方案
3.1、doris建表并插入数据语句如下:
CREATE TABLE spark_connector_test_decimal_v1 (
c1 int NOT NULL,
c2 VARCHAR(25) NOT NULL,
c3 VARCHAR(152),
c4 boolean,
c5 tinyint,
c6 smallint,
c7 bigint,
c8 float,
c9 double,
c10 datev2,
c11 datetime,
c12 char,
c13 largeint,
c14 varchar,
c15 decimalv3(15, 5)
)
DUPLICATE KEY(c1)
COMMENT "OLAP"
DISTRIBUTED BY HASH(c1) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
);insert into spark_connector_test_decimal_v1 values(10000,'aaa','abc',true, 100, 3000, 100000, 1234.567, 12345.678, '2022-12-01','2022-12-01 12:00:00', 'a', 200000, 'g', 1000.12345);
insert into spark_connector_test_decimal_v1 values(10000,'aaa','abc',true, 100, 3000, 100000, 1234.567, 12345.678, '2022-12-01','2022-12-01 12:00:00', 'a', 200000, 'g', 1000.12345);
insert into spark_connector_test_decimal_v1 values(10001,'aaa','abc',false, 100, 3000, 100000, 1234.567, 12345.678, '2022-12-01','2022-12-01 12:00:00', 'a', 200000, 'g', 1000.12345);
insert into spark_connector_test_decimal_v1 values(10002,'aaa','abc',True, 100, 3000, 100000, 1234.567, 12345.678, '2022-12-01','2022-12-01 12:00:00', 'a', 200000, 'g', 1000.12345);
insert into spark_connector_test_decimal_v1 values(10003,'aaa','abc',False, 100, 3000, 100000, 1234.567, 12345.678, '2022-12-01','2022-12-01 12:00:00', 'a', 200000, 'g', 1000.12345);
3.2、spark-sql中创建临时视图,读取数据
spark-sql 中建表:CREATE
TEMPORARY VIEW spark_connector_test_decimal_v1
USING doris
OPTIONS("table.identifier"="ods.spark_connector_test_decimal_v1","fenodes"="172.xxx.99.199:8030","user"="syncxxx","password"="xxxxx"
);select * from spark_connector_test_decimal_v1;
3.2.1、报错如下:
spark-sql (default)> select * from spark_connector_test_decimal_v1;
17:42:13.979 [task-result-getter-3] ERROR org.apache.spark.scheduler.TaskSetManager - Task 0 in stage 1.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 7) (hadoop4 executor 1): org.apache.spark.util.TaskCompletionListenerException: nullPrevious exception in task: null
3.2.2、通过yarn页面追查错误log,具体报错如下:
Log Type: stdoutLog Upload Time: Wed Jan 10 14:54:47 +0800 2024Log Length: 200041Showing 4096 bytes of 200041 total. Click here for the full log..spark.exception.ConnectedFailedException: Connect to Doris BE{host='172.xxx.yyyy.10', port=9060}failed.at org.apache.doris.spark.backend.BackendClient.openScanner(BackendClient.java:153) ~[spark-doris-connector-3.3_2.12-1.3.0.jar:1.3.0-SNAPSHOT]at org.apache.doris.spark.rdd.ScalaValueReader.$anonfun$openResult$1(ScalaValueReader.scala:138) ~[spark-doris-connector-3.3_2.12-1.3.0.jar:1.3.0-SNAPSHOT]at org.apache.doris.spark.rdd.ScalaValueReader.org$apache$doris$spark$rdd$ScalaValueReader$$lockClient(ScalaValueReader.scala:239) ~[spark-doris-connector-3.3_2.12-1.3.0.jar:1.3.0-SNAPSHOT]at org.apache.doris.spark.rdd.ScalaValueReader.<init>(ScalaValueReader.scala:138) ~[spark-doris-connector-3.3_2.12-1.3.0.jar:1.3.0-SNAPSHOT]at org.apache.doris.spark.sql.ScalaDorisRowValueReader.<init>(ScalaDorisRowValueReader.scala:32) ~[spark-doris-connector-3.3_2.12-1.3.0.jar:1.3.0-SNAPSHOT]... 20 more
13:58:50.002 [Executor task launch worker for task 0.3 in stage 7.0 (TID 29)] ERROR org.apache.spark.executor.Executor - Exception in task 0.3 in stage 7.0 (TID 29)
org.apache.spark.util.TaskCompletionListenerException: nullPrevious exception in task: null
3.3、报错查找,定位为spark读取doris类型问题
直接查,报错,是spark-doris-connector连接器的类型转换问题:
https://github.com/apache/doris-spark-connector/issues/101
https://github.com/apache/doris-spark-connector/issues/101#issuecomment-1563765357
3.4、查看数据的该doris表底层字段列存储情况:
select * FROM information_schema.`columns` where TABLE_NAME='spark_connector_test_decimal_v1'
TABLE_CATALOG |TABLE_SCHEMA |TABLE_NAME |COLUMN_NAME |ORDINAL_POSITION |COLUMN_DEFAULT |IS_NULLABLE |DATA_TYPE |CHARACTER_MAXIMUM_LENGTH |CHARACTER_OCTET_LENGTH |NUMERIC_PRECISION |NUMERIC_SCALE |DATETIME_PRECISION |CHARACTER_SET_NAME |COLLATION_NAME |COLUMN_TYPE |COLUMN_KEY |EXTRA |PRIVILEGES |COLUMN_COMMENT |COLUMN_SIZE |DECIMAL_DIGITS |GENERATION_EXPRESSION |SRS_ID |
--------------|-------------|-----------------------------|------------|-----------------|---------------|------------|----------------|-------------------------|-----------------------|------------------|--------------|-------------------|-------------------|---------------|-----------------|-----------|------|-----------|---------------|------------|---------------|----------------------|-------|
internal |ods |spark_connector_test_decimal_v1 |c1 |1 | |NO |int | | |10 |0 | | | |int(11) |DUP | | | |10 |0 | | |
internal |ods |spark_connector_test_decimal_v1 |c2 |2 | |NO |varchar |25 |100 | | | | | |varchar(25) | | | | |25 | | | |
internal |ods |spark_connector_test_decimal_v1 |c3 |3 | |YES |varchar |152 |608 | | | | | |varchar(152) | | | | |152 | | | |
internal |ods |spark_connector_test_decimal_v1 |c4 |4 | |YES |tinyint | | | |0 | | | |tinyint(1) | | | | | |0 | | |
internal |ods |spark_connector_test_decimal_v1 |c5 |5 | |YES |tinyint | | |3 |0 | | | |tinyint(4) | | | | |3 |0 | | |
internal |ods |spark_connector_test_decimal_v1 |c6 |6 | |YES |smallint | | |5 |0 | | | |smallint(6) | | | | |5 |0 | | |
internal |ods |spark_connector_test_decimal_v1 |c7 |7 | |YES |bigint | | |19 |0 | | | |bigint(20) | | | | |19 |0 | | |
internal |ods |spark_connector_test_decimal_v1 |c8 |8 | |YES |float | | |7 |7 | | | |float | | | | |7 |7 | | |
internal |ods |spark_connector_test_decimal_v1 |c9 |9 | |YES |double | | |15 |15 | | | |double | | | | |15 |15 | | |
internal |ods |spark_connector_test_decimal_v1 |c10 |10 | |YES |date | | | | | | | |datev2 | | | | | | | | |
internal |ods |spark_connector_test_decimal_v1 |c11 |11 | |YES |datetime | | | | | | | |datetime | | | | | | | | |
internal |ods |spark_connector_test_decimal_v1 |c12 |12 | |YES |char |1 |4 | | | | | |char(1) | | | | |1 | | | |
internal |ods |spark_connector_test_decimal_v1 |c13 |13 | |YES |bigint unsigned | | |39 | | | | |largeint | | | | |39 | | | |
internal |ods |spark_connector_test_decimal_v1 |c14 |14 | |YES |varchar |1 |4 | | | | | |varchar(1) | | | | |1 | | | |
internal |ods |spark_connector_test_decimal_v1 |c15 |15 | |YES |decimal | | |15 |5 | | | |decimalv3(15, 5) | | | | |15 |5 | | |
3.5、Dbeaver 修改doris表的字段类型:
修改doris表的字段类型: 在 1.2.0 版本之后, 开启 "light_schema_change"="true" 选项时,可以支持修改列名。alter table ods.spark_connector_test_decimal_v1 MODIFY COLUMN c15 DECIMAL(15,5);alter table ods.spark_connector_test_decimal_v1 MODIFY COLUMN c10 datetime;
在 1.2.0 版本之后, 开启 "light_schema_change"="true" 选项时,可以支持修改列名。
建表成功后,即可对列名进行修改,语法:
alter table ods.spark_connector_test_decimal_v1 RENAME COLUMN c10 c100; insert into spark_connector_test_decimal_v1 values(10003,'aaa','abc',False, 100, 3000, 100000, 1234.567, 12345.678, '2022-12-01','2022-12-01 12:00:00', 'a', 200000, 'g', 1000.12345);
3.6、修改字段类型的后doris表字的column_type取值
select * FROM information_schema.`columns` where TABLE_NAME='spark_connector_test_decimal_v1'TABLE_CATALOG |TABLE_SCHEMA |TABLE_NAME |COLUMN_NAME |ORDINAL_POSITION |COLUMN_DEFAULT |IS_NULLABLE |DATA_TYPE |CHARACTER_MAXIMUM_LENGTH |CHARACTER_OCTET_LENGTH |NUMERIC_PRECISION |NUMERIC_SCALE |DATETIME_PRECISION |CHARACTER_SET_NAME |COLLATION_NAME |COLUMN_TYPE |COLUMN_KEY |EXTRA |PRIVILEGES |COLUMN_COMMENT |COLUMN_SIZE |DECIMAL_DIGITS |GENERATION_EXPRESSION |SRS_ID |
--------------|-------------|--------------------------------|------------|-----------------|---------------|------------|----------------|-------------------------|-----------------------|------------------|--------------|-------------------|-------------------|---------------|--------------|-----------|------|-----------|---------------|------------|---------------|----------------------|-------|
internal |ods |spark_connector_test_decimal_v1 |c1 |1 | |NO |int | | |10 |0 | | | |int(11) |DUP | | | |10 |0 | | |
internal |ods |spark_connector_test_decimal_v1 |c2 |2 | |NO |varchar |25 |100 | | | | | |varchar(25) | | | | |25 | | | |
internal |ods |spark_connector_test_decimal_v1 |c3 |3 | |YES |varchar |152 |608 | | | | | |varchar(152) | | | | |152 | | | |
internal |ods |spark_connector_test_decimal_v1 |c4 |4 | |YES |tinyint | | | |0 | | | |tinyint(1) | | | | | |0 | | |
internal |ods |spark_connector_test_decimal_v1 |c5 |5 | |YES |tinyint | | |3 |0 | | | |tinyint(4) | | | | |3 |0 | | |
internal |ods |spark_connector_test_decimal_v1 |c6 |6 | |YES |smallint | | |5 |0 | | | |smallint(6) | | | | |5 |0 | | |
internal |ods |spark_connector_test_decimal_v1 |c7 |7 | |YES |bigint | | |19 |0 | | | |bigint(20) | | | | |19 |0 | | |
internal |ods |spark_connector_test_decimal_v1 |c8 |8 | |YES |float | | |7 |7 | | | |float | | | | |7 |7 | | |
internal |ods |spark_connector_test_decimal_v1 |c9 |9 | |YES |double | | |15 |15 | | | |double | | | | |15 |15 | | |
internal |ods |spark_connector_test_decimal_v1 |c10 |10 | |YES |datetime | | | | | | | |datetime | | | | | | | | |
internal |ods |spark_connector_test_decimal_v1 |c11 |11 | |YES |datetime | | | | | | | |datetime | | | | | | | | |
internal |ods |spark_connector_test_decimal_v1 |c12 |12 | |YES |char |1 |4 | | | | | |char(1) | | | | |1 | | | |
internal |ods |spark_connector_test_decimal_v1 |c13 |13 | |YES |bigint unsigned | | |39 | | | | |largeint | | | | |39 | | | |
internal |ods |spark_connector_test_decimal_v1 |c14 |14 | |YES |varchar |1 |4 | | | | | |varchar(1) | | | | |1 | | | |
internal |ods |spark_connector_test_decimal_v1 |c15 |15 | |YES |decimal | | |15 |5 | | | |decimal(15,5) | | | | |15 |5 | | |
3.7、删除视图成功后,再次创建视图,即可查询成功
修改表类型之后,需要删除视图,重新建视图,否则直接查,会报如下错误:
Caused by: java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of date
spark-sql (default)> drop view spark_connector_test_decimal_v1;
Response code
Time taken: 0.039 secondsspark-sql (default)> select * from spark_connector_test_decimal_v1;
c1 c2 c3 c4 c5 c6 c7 c8 c9 c10 c11 c12 c13 c14 c15
10000 aaa abc true 100 3000 100000 1234.567 12345.678 2022-12-01 00:00:00 2022-12-01 12:00:00 a 200000 g 1000.12345
10001 aaa abc false 100 3000 100000 1234.567 12345.678 2022-12-01 00:00:00 2022-12-01 12:00:00 a 200000 g 1000.12345
10002 aaa abc true 100 3000 100000 1234.567 12345.678 2022-12-01 00:00:00 2022-12-01 12:00:00 a 200000 g 1000.12345
10003 aaa abc false 100 3000 100000 1234.567 12345.678 2022-12-01 00:00:00 2022-12-01 12:00:00 a 200000 g 1000.12345
Time taken: 0.233 seconds, Fetched 4 row(s)
spark-sql (default)>
参考:
Spark Doris Connector - Apache Doris
Release Apache Doris Spark Connector 1.3.0 Release · apache/doris-spark-connector · GitHub
[Bug] ConnectedFailedException: Connect to Doris BE{host='xxx', port=9060}failed · Issue #101 · apache/doris-spark-connector · GitHub