一:搭建kafka。
1. 三台机器执行以下命令。
cd /opt
wget wget https://dlcdn.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz
tar zxvf kafka_2.13-3.6.1.tgz
cd kafka_2.13-3.6.1/config
vim server.properties
修改以下俩内容
1.三台机器分别给予各自的broker_id。
2. 配置zk。
3. 启动测试。
3.1 后台启动。
第一步:启动zk。
第二步:执行启动命令
nohup /opt/kafka_2.13-3.6.1/bin/kafka-server-start.sh /opt/kafka_2.13-3.6.1/config/server.properties > /dev/null 2>&1 &
3.2 测试。
在一台机器上执行创建topic命令。
/opt/kafka_2.13-3.6.1/bin/kafka-topics.sh --create --topic my-topic-kraft --bootstrap-server localhost:9092
在另外一台机器上执行查看topic命令。
/opt/kafka_2.13-3.6.1/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
二:搭建flink。
1. 三台机器下载flink。
cd /opt
https://www.apache.org/dyn/closer.lua/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz
2.修改配置参数。
- 三台机器都修改 flink-conf.yaml
cd /opt/flink-1.13.6/conf
vim flink-conf.yaml
填写主节点地址
zk地址修改
high-availability.storageDir: hdfs://10.15.250.196/flink/ha/
state.checkpoints.dir: hdfs://10.15.250.196/flink-checkpoints
- 修改 masters
vim masters
3. 修改works
vim works
其他俩台机器地址填写到此处。
4. 添加jar包到lib目录下。
3.启动flink。
cd /opt/flink-1.13.6/bin
./start-cluster.sh
查看页面,ip位主节点,端口8081
三:dinky
1. mysql初始化。
mysql -uroot -p123456
create database dinky;
grant all privileges on dinky.* to 'dinky'@'%' identified by 'dinky' with grant option;
flush privileges;
2:上传dinky。
上传安装包至目录/opt
tar -zxvf dlink-release-0.6.6.tar.gz
mv dlink-release-0.6.6.tar.gz dinky
cd dinky
#首先登录 mysql
mysql -udinky -pdinky
mysql>use dinky;
mysql>source /opt/dinky/sql/dlink.sql
3. 配置mysql。
cd config/
vim application.yml
spring:datasource:url: jdbc:mysql://xxxx:3306/dinky?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRet
rieval=true username: dinkypassword: dinky
4. 添加jar包。
mkdir /opt/dlink/plugins
5. 启动服务。
cd /opt/dinky
sh auto.sh start 1.13
sh auto.sh stop
jps
地址:http://192.168.50.60:8888/#/datastudio
账号:admin
密码:admin
四:实时计算小案例。
1. flink申请yarn资源。
/opt/flink-1.13.6/bin/yarn-session.sh -n 4 -tm 1024m -s 2 &
2. kafka生成topic
/opt/kafka_2.13-3.6.1/bin/kafka-topics.sh --create --topic my-topic-kraft --bootstrap-server localhost:9092
3. dinky编写sql,造数据实时写入kafka
set execution.checkpointing.interval = 30s;
set state.checkpoints.dir=hdfs://192.168.50.60:8020/cluster/flink/checkpointes_;
set state.savepoints.dir=hdfs://192.168.50.60:8020/cluster/flink/savepointkes_;CREATE TABLE source_table (age INT, sex STRING, t_insert_time AS localtimestamp,WATERMARK FOR t_insert_time AS t_insert_time ) WITH ('connector' = 'datagen', 'rows-per-second'='5','fields.age.min'='1','fields.age.max'='1000','fields.sex.length'='10');CREATE TABLE KafkaTable (`age` int,`sex` STRING,t_insert_time TIMESTAMP
) WITH ('connector' = 'kafka','topic' = 'my-topic-kraft','properties.bootstrap.servers' = '192.168.50.60:9092',--'properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'json'
);
insert into KafkaTable
select age,sex,t_insert_time from source_table;
4. 抽取kafka中数据,进行累加计算,插入到mysql
set execution.checkpointing.interval = 30s;
SET execution.type = streaming;
set state.checkpoints.dir=hdfs://192.168.50.60:8020/cluster/flink/checkpointes_;
set state.savepoints.dir=hdfs://192.168.50.60:8020/cluster/flink/savepointkes_;CREATE TABLE MyUserTable (window_end_time TIMESTAMP,create_time TIMESTAMP,window_proctime_time TIMESTAMP,age int,count_sum bigint,PRIMARY KEY (age) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.50.60:3306/test','table-name' = 'test_kafka','username' = 'root','password' = '123456','sink.buffer-flush.max-rows' = '1');-- select * from MyUserTable;
CREATE TABLE KafkaTable (`age` int,`sex` STRING,t_insert_time TIMESTAMP,`ts1` as CAST(t_insert_time AS TIMESTAMP_LTZ(3)),WATERMARK FOR ts1 AS ts1 - INTERVAL '5' SECOND -- 在t_s上定义5 秒延迟的 watermark
) WITH ('connector' = 'kafka','topic' = 'my-topic-kraft','properties.bootstrap.servers' = '192.168.50.60:9092',--'properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'json'
);--insert into MyUserTable
selectwindow_end as window_end_time,window_start as create_time,PROCTIME() as window_proctime_time,age,count(*) as count_sum
FROM TABLE(CUMULATE(TABLE KafkaTable, DESCRIPTOR(ts1), INTERVAL '5' MINUTES, INTERVAL '1' DAY))
group bywindow_end,window_start,age
;