1、部署
1.1、修改flink-conf.yaml
1.1.1、flink-17
jobmanager. rpc. address: boshi-122
jobmanager. rpc. port: 6123
jobmanager. memory. process . size: 2048m
taskmanager. memory. process . size: 4096mb
taskmanager. memory. task. heap. size: 3072m
taskmanager. memory. framework. heap. size: 128m
taskmanager. memory. managed. size: 128m
taskmanager. memory. framework. off-heap. size: 128m
taskmanager. memory. network. max: 128m
taskmanager. memory. jvm-metaspace. size: 256m
taskmanager. memory. jvm-overhead. max: 256m
taskmanager. numberOfTaskSlots: 8
parallelism. default: 1
jobmanager. execution. failover-strategy: region
classloader. check-leaked-classloader: false
akka. ask. timeout: 50s
web. timeout: 50000
heartbeat. timeout: 180000
taskmanager. network. request-backoff . max: 240000
state. savepoints. dir : file:/ / / data / flink/savepoint/
state. checkpoints. dir : file:/ / / data / flink/checkpoint/
env. java. opts: - XX:+ UseConcMarkSweepGC - XX:CMSInitiatingOccupancyFraction=75 - XX:+ UseCMSInitiatingOccupancyOnly - XX:+ AlwaysPreTouch - server - XX:+ HeapDumpOnOutOfMemoryError
1.1.2、flink-1-13
jobmanager. rpc. address: boshi-146
jobmanager. rpc. port: 6123
jobmanager. memory. process . size: 4096m
taskmanager. memory. process . size: 16384mb
taskmanager. memory. task. heap. size: 15360m
taskmanager. memory. framework. heap. size: 128m
taskmanager. memory. managed. size: 128m
taskmanager. memory. framework. off-heap. size: 128m
taskmanager. memory. network. max: 128m
taskmanager. memory. jvm-metaspace. size: 256m
taskmanager. memory. jvm-overhead. max: 256m
taskmanager. numberOfTaskSlots: 2
parallelism. default: 2
jobmanager. execution. failover-strategy: region
classloader. check-leaked-classloader: false
akka. ask. timeout: 100s
web. timeout: 100000
heartbeat. timeout: 180000
taskmanager. network. request-backoff . max: 240000
state. savepoints. dir : hdfs:/ / hdfs-ha/flink/savepoint/
state. checkpoints. dir : hdfs:/ / hdfs-ha/flink/checkpoint/
env. java. opts: - server - XX:+ UseG1GC - Xloggc:<LOG_DIR>/ gc . log - XX:+ PrintGCDetails - XX:- OmitStackTraceInFastThrow - XX:+ PrintGCTimeStamps - XX:+ PrintGCDateStamps - XX:+ UseGCLogFileRotation - XX:NumberOfGCLogFiles=20 - XX:GCLogFileSize=100M
1.2、masters
boshi-122:8081
1.3、workers
boshi-129
boshi-137
boshi-144
boshi-166
2、提交任务
2.1、mysql-to-kafka-starrocks
CREATE TABLE mysql_crawl_enterprise_website ( ` id` int , ` eid` varchar , ` enterprise_name` varchar , ` website` varchar , ` html` varchar , PRIMARY KEY ( ` id` ) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc' ,
'hostname' = 'ip' ,
'port' = '3306' ,
'username' = 'root' ,
'password' = '' ,
'database-name' = 'db_enterprise_outer_resource' ,
'table-name' = 'crawl_enterprise_website' ,
'scan.incremental.snapshot.enabled' = 'false'
) ;
CREATE TABLE kafka_crawl_enterprise_website ( ` id` int , ` eid` varchar , ` enterprise_name` varchar , ` website` varchar , ` html` varchar , PRIMARY KEY ( ` id` ) NOT ENFORCED
) WITH ( 'connector' = 'upsert-kafka' , 'topic' = 'ods_crawl_enterprise_website' , 'properties.bootstrap.servers' = 'ip:6667,ip:6667,ip:6667' , 'properties.group.id' = 'source_province' , 'properties.max.request.size' = '512000000' , 'properties.session.timeout.ms' = '60000' , 'properties.request.timeout.ms' = '40000' , 'key.format' = 'json' , 'value.format' = 'json'
) ;
CREATE TABLE starrock_ods_crawl_enterprise_website ( ` id` int , ` eid` varchar , ` enterprise_name` varchar , ` website` varchar , ` html` varchar , PRIMARY KEY ( ` id` ) NOT ENFORCED
) WITH ( 'connector' = 'starrocks' , 'jdbc-url' = 'jdbc:mysql://ip:9030' , 'load-url' = 'ip:8030' , 'database-name' = 'ods' , 'table-name' = 'ods_crawl_enterprise_website' , 'username' = 'starrocks' , 'password' = '' , 'sink.max-retries' = '5' , 'sink.buffer-flush.max-bytes' = '256000000' , 'sink.buffer-flush.interval-ms' = '3000' , 'sink.properties.format' = 'json' , 'sink.properties.strip_outer_array' = 'true' , 'sink.properties.ignore_json_size' = 'true' ) ;
insert into kafka_crawl_enterprise_website select * from mysql_crawl_enterprise_website;
insert into starrock_ods_crawl_enterprise_website select * from kafka_crawl_enterprise_website;
2.2、提交参数
jobmanager. memory. process . size=4096m
taskmanager. memory. process . size=8192m
taskmanager. memory. task. heap. size=7168m
taskmanager. memory. framework. heap. size=128m
taskmanager. memory. framework. off-heap. size=128m
taskmanager. memory. managed. size=128m
taskmanager. memory. network. max=128m
taskmanager. memory. jvm-metaspace. size=256m
taskmanager. memory. jvm-overhead. max=256m
parallelism. default=3
taskmanager. numberOfTaskSlots=1
yarn. containers. vcores=1