原先的配置
[INFO] StarRocksSourceBeReader [open Scan params.mem_limit 8589934592 B]
[INFO] StarRocksSourceBeReader [open Scan params.query-timeout-s 600 s]
[INFO] StarRocksSourceBeReader [open Scan params.keep-alive-min 100 min]
[INFO] StarRocksSourceBeReader [open Scan params.batch_size 1000]
错误
程序读取starrocks跑10分钟左右报错
Caused by: java.lang.RuntimeException: Failed to get next from be -> ip:[172.24.5.172] CANCELLED msg:[Set cancelled by MemoryScratchSinkOperator]
解决方案
原因:是因为参数params.query-timeout-s 设置600秒,导致未读取完数据,直接取消了。
修改后的配置
[open Scan params.mem_limit 8589934592 B]
[open Scan params.query-timeout-s 6000 s]
[open Scan params.keep-alive-min 100 min]
[open Scan params.batch_size 1000]
各个参数说明
/*** StarRocks Source* @return*/public static StarRocksSourceOptions createStarRocksSourceOptions(String db,String tableName){StarRocksSourceOptions.Builder builder = StarRocksSourceOptions.builder().withProperty("connector", SR_SOURCE_CONNECTOR).withProperty("scan-url", SR_SOURCE_SCAN_URL).withProperty("jdbc-url", SR_SOURCE_JDBC_URL).withProperty("username", SR_SOURCE_USERNAME).withProperty("password", SR_SOURCE_PASSWORD)//BE 节点中单个查询的内存上限。单位:字节。默认值:1073741824(即 1 GB)。104857600:100M.withProperty("scan.params.mem-limit-byte","10737418240")//数据读取任务的超时时间,在任务执行过程中进行检查。单位:秒。默认值:600。如果超过该时间,仍未返回读取结果,则停止数据读取任务。.withProperty("scan.params.query-timeout-s","2592000")//Flink 连接器连接 StarRocks 集群的时间上限。单位:毫秒。默认值:1000。超过该时间上限,则数据读取任务会报错。.withProperty("scan.connect.timeout-ms","2592000")//数据读取任务的保活时间,通过轮询机制定期检查。单位:分钟。默认值:10。建议取值大于等于 5。.withProperty("scan.params.keep-alive-min","8")//数据读取失败时的最大重试次数。默认值:1。超过该数量上限,则数据读取任务报错。.withProperty("scan.max-retries","100").withProperty("table-name",tableName).withProperty("database-name",db);return builder.build();