docker-compose部署Flink及Dinky
服务器环境:centos7
1. 配置hosts
vim /etc/hostsx.x.x.x jobmanager
x.x.x.x taskmanager
x.x.x.x dinky-mysql
2. 文件目录结构
.
├── conf
│ ├── JobManager
│ │ ├── flink-conf.yaml
│ │ ├── log4j-cli.properties
│ │ ├── log4j-console.properties
│ │ ├── log4j.properties
│ │ ├── log4j-session.properties
│ │ ├── logback-console.xml
│ │ ├── logback-session.xml
│ │ ├── logback.xml
│ │ ├── masters
│ │ ├── workers
│ │ └── zoo.cfg
│ └── TaskManager
│ ├── flink-conf.yaml
│ ├── log4j-cli.properties
│ ├── log4j-console.properties
│ ├── log4j.properties
│ ├── log4j-session.properties
│ ├── logback-console.xml
│ ├── logback-session.xml
│ ├── logback.xml
│ ├── masters
│ ├── workers
│ └── zoo.cfg
├── dinky
│ ├── config
│ │ └── application.yml
│ └── lib
├── dinky-mysql
│ ├── conf
│ │ └── my.cnf
│ └── data
├── docker-compose-flink-dinky.yml
├── jar
├── plugins
└── sql
说明:
(建议第一次启动容器时不映射配置文件及lib包路径,将默认的配置文件和lib包复制到宿主机后,第二次启动时再映射配置文件及lib包路径,方便后期功能扩展)
- flink/conf/JobManager/flink-conf.yaml - flink JobManager的配置文件
- flink/conf/TaskManager/flink-conf.yaml - flink TaskManager的配置文件
- flink/jar/ - JobManager和TaskManager的依赖包,映射容器内的/opt/flink/lib/ 目录。
- flink/dinky/config/application.yml - dinky的配置文件,可以配置连接的mysql库
- flink/dinky/lib/ - dinky的依赖包,映射容器内的/opt/dinky/lib/ 目录。
- flink/dinky-mysql/conf/my.cnf - dinky-mysql的配置文件,需配置 max_allowed_packet=1073741824,否则导入数据会失败。
- flink/dinky-mysql/data/ - dinky-mysql的数据映射目录
3. 创建容器
-
docker-compose-flink-dinky.yml
version: '3' services:jobmanager:image: flink:1.14.0-scala_2.12-java8container_name: jobmanagerhostname: jobmanagerexpose: - "6123"ports:- "48081:8081" # Flink Web UI- "6123:6123" # JobManager RPC- "6124:6124" # JobManager REST- "8082:8082" # Metrics- "6125:6125" # Blob Servercommand: jobmanagernetworks: - flink-netrestart: alwaysvolumes:- /opt/flink/conf/JobManager/:/opt/flink/conf/ #第一次启动不映射,将conf复制到宿主机后,第二次启动再映射- /opt/flink/jar/:/opt/flink/lib/ #第一次启动不映射,将lib复制到宿主机后,第二次启动再映射- /opt/flink/plugins/:/opt/flink/plugins/- /opt/flink/sql/:/opt/flink/sql/taskmanager:image: flink:1.14.0-scala_2.12-java8container_name: taskmanagerhostname: taskmanagerdepends_on:- jobmanagerscale: 1 # 可以根据需要增加 TaskManagers 的数量command: taskmanagernetworks:- flink-netrestart: alwaysenvironment:- JOB_MANAGER_RPC_ADDRESS=jobmanagervolumes:- /opt/flink/conf/TaskManager/:/opt/flink/conf/ #第一次启动不映射,将conf复制到宿主机后,第二次启动再映射- /opt/flink/jar/:/opt/flink/lib/ #第一次启动不映射,将lib复制到宿主机后,第二次启动再映射- /opt/flink/plugins/:/opt/flink/plugins/- /opt/flink/sql/:/opt/flink/sql/dinky:image: dinkydocker/dinky-standalone-server:0.7.0-flink14container_name: dinkyhostname: dinkyports:- "38081:8081"- "38888:8888"networks:- flink-netrestart: alwaysenvironment:- SPRING_PROFILES_ACTIVE=prod- MYSQL_HOST=dinky-mysql- MYSQL_PORT=33306- MYSQL_DATABASE=dlink- MYSQL_USERNAME=dlink- MYSQL_PASSWORD=dlinkdepends_on:- dinky-mysqlvolumes:- /opt/flink/dinky/config/application.yml:/opt/dinky/config/application.yml #第一次启动不映射,将application.yml复制到宿主机后,第二次启动再映射- /opt/flink/dinky/lib/:/opt/dinky/lib/ #第一次启动不映射,将lib复制到宿主机后,第二次启动再映射dinky-mysql: image: dinkydocker/dinky-mysql-server:0.7.0container_name: dinky-mysqlhostname: dinky-mysqlports:- "33306:3306"networks:- flink-netrestart: alwaysenvironment:- MYSQL_ROOT_PASSWORD=dlink- MYSQL_DATABASE=dlinkvolumes:- /opt/flink/dinky-mysql/data/:/var/lib/mysql/- /opt/flink/dinky-mysql/conf/:/etc/mysql/conf.d/networks:flink-net:driver: bridge
-
创建容器
docker-compose -f docker-compose-flink-dinky.yml up -d
如果创建容器时,报错 iptables: No chain/target/match by that name. 需开启防火墙(systemctl start firewalld)
-
查看容器状态
如果容器都是up状态,但是web页面访问不到,注意一下两点:
- 防火墙是否关闭,或开放指定端口
- 进入docker 容器的logs目录下查看日志
4. web访问验证
-
flink:http://ip:48081/
-
dinky:http://ip:38888/
用户名/密码:admin/admin
5. Dinky 配置及测试
5.1 配置flink实例
在 dinky --> 注册中心 --> 集群管理 --> Flink实例管理 中配置 flink的JobManager地址
5.2 执行sql
-
在测试数据库创建数据表user_source 和 target_source
-
在dinky创建sql并执行
-- 创建源表映射语句 DROP TABLE if exists user_source_1; CREATE TABLE IF NOT EXISTS user_source_1 (`id` STRING,`name` STRING,`age` INT ) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://ip:3306/test_database?characterEncoding=UTF-8&allowMultiQueries=true&useUnicode=true&useSSL=false&serverTimezone=Asia/Shanghai&autoReconnect=true','table-name' = 'user_source','username' = 'root','password' = '123456' );-- 创建目标表映射语句 DROP TABLE if exists user_target_1; CREATE TABLE IF NOT EXISTS user_target_1 (`id` STRING,`name` STRING,`age` INT ) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://ip:3306/test_database?characterEncoding=UTF-8&allowMultiQueries=true&useUnicode=true&useSSL=false&serverTimezone=Asia/Shanghai&autoReconnect=true','table-name' = 'user_target','username' = 'root','password' = '123456' );-- 同步数据语句 insert into user_target_1 select * from user_source_1;
-
配置执行模式,并执行sql。执行完成后在flink管理页面也可以看到执行的sql任务
-
如果在执行sql过程中有下面报错,是因为缺少对应的jar包,需要将包传入dinky的lib目录
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='jdbc' at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:587) at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:561) at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:146) ... 112 more Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath. Available factory identifiers are: blackhole datagen filesystem print at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:399) at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:583) ... 114 more