1. 关闭防火墙和selinux
systemctl stop firewalld
systemctl disable firewalld
systemctl status firewalld
2.安装java8
yum list java-1.8* yum install java-1.8.0-openjdk* -yjava -version
3.下载和部署mysql
yum -y install wget
wget https://dev.mysql.com/get/Downloads/MySQL-8.0/mysql-8.0.28-linux-glibc2.12-x86_64.tar.xz
mv mysql-8.0.28-linux-glibc2.12-x86_64.tar.xz /usr/local/
cd /usr/local/
tar xvJf mysql-8.0.28-linux-glibc2.12-x86_64.tar.xz
mv mysql-8.0.28-linux-glibc2.12-x86_64 mysql-8.0
mkdir data
groupadd mysql
chown -R mysql.mysql /usr/local/mysql-8.0
cd mysql-8.0/bin/
mkdir data
./mysqld --user=mysql --basedir=/usr/local/mysql-8.0 --datadir=/usr/local/mysql-8.0/data/ --initialize
圈起来的部分为后面数据库登陆的初始密码
vi /etc/my.cnf
basedir=/usr/local/mysql-8.0/
datadir=/usr/local/mysql-8.0/data/
socket=/tmp/mysql.sock
character-set-server=UTF8MB4cp -a .././support-files/mysql.server /etc/init.d/mysql
chmod +x /etc/init.d/mysql
chkconfig --add mysql
service mysql status
service mysql start
ln -s /usr/local/mysql-8.0/bin/mysql /usr/bin
登陆
mysql -u root -p
输入之前初始化给的初始密码
修改mysql密码
登陆
4.下载部署flink
flink下载地址:
https://www.apache.org/dyn/closer.lua/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz
下载flink sql所需驱动
1. flink-connector-jdbc-3.1.1-1.17.jar 下载地址:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/
2. flink-sql-connector-mysql-cdc-3.0.1.jar 下载地址:https://github.com/ververica/flink-cdc-connectors/releases
3. mysql-connector-java-8.0.28.jar 下载地址:https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.28/
上传flink压缩包并解压
tar -zxvf flink-1.18.1-bin-scala_2.12.tgz
进入flink的lib目录上传三个依赖
cd flink-1.18.1/lib/
需要修改一下flink配置文件
不然会出现以下情况
vi flink-1.18.1/conf/flink-conf.yaml
原本是localhost修改为ip
./flink-1.18.1/bin/start-cluster.sh
访问 192.168.207.193:8081 (默认是8081端口 可在配置文件里修改)
5.实时同步mysql数据表
数据库先创建一个库,在库里创建表再添加数据
create database ljq;use ljqCREATE TABLE players (
player_id INT NOT NULL AUTO_INCREMENT,
team_id INT,
player_name VARCHAR(255),
height FLOAT(53),
PRIMARY KEY (player_id)
);;
insert into players (player_id,team_id,player_name,height) values
(1001,1001,'韦德','1.93'),
(1002,1002,'雷吉','1.91'),
(1003,1003,'安德烈','2.11'),
(1004,1004,'索恩','2.16'),
(1005,1005,'兰斯顿','1.88'),
(1006,1006,'格伦','1.98'),
(1007,1007,'伊斯梅尔','1.83'),
(1008,1008,'扎扎','2.11'),
(1009,1009,'乔恩','2.08');
select * from players;
再创一个表不插入数据
CREATE TABLE players2 (
player_id INT NOT NULL AUTO_INCREMENT,
team_id INT,
player_name VARCHAR(255),
height FLOAT(53),
PRIMARY KEY (player_id)
);
启动flink-sql
./flink-1.18.1/bin/sql-client.sh embedded
根据需要同步的数据创建源表
CREATE TABLE nbaplayers (
player_id INT,
team_id INT,
player_name VARCHAR,
height FLOAT,
PRIMARY KEY (player_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.207.193',
'port' = '3306',
'username' = 'root',
'password' = 'linux123',
'database-name' = 'ljq',
'table-name' = 'players',
'server-time-zone' = 'Asia/Shanghai'
);
查看表时出现报错!
解决方法:修改为允许所有ip访问
use mysql;
select user,host from user;
update user set host = '%' where user = 'root';
flush privileges;
select user,host from user;
重新查看表 select * from nbaplayers;·
创建结果表
CREATE TABLE nba (
player_id INT,
team_id INT,
player_name VARCHAR,
height FLOAT,
PRIMARY KEY (player_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.207.193:3306/ljq?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&useSSL=true&dontTrackOpenResources=true&defaultFetchSize=10000&useCursorFetch=true',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = 'linux123',
'table-name' = 'players2'
);
执行从源表插入结果表操作,生成同步作业
INSERT INTO nba
SELECT
player_id,
team_id,
player_name,
height
FROM nbaplayers;
Web端查看
查看是否同步数据到players2
如发现height同步数据有很多位小数点
解决方法:
ALTER TABLE players2 MODIFY height DOUBLE(3,2);
测试一下增删改是否同步
增 insert into players (player_id,team_id,player_name,height) values (10001,10002,'韦德2','1.95');
删delete from players where player_id = 10001;
改update players set player_name='姚明' where player_id=1001;