基于docker安装flink

文章目录

  • 环境准备
    • Flink
      • docker-compose方式
      • 二进制部署
    • Kafka
    • Mysql
  • Flink 执行 SQL命令
    • 进入SQL客户端CLI
    • 执行SQL查询
      • 表格模式
      • 变更日志模式
      • Tableau模式
      • 窗口计算
    • 窗口计算
      • 滚动窗口demo
      • 滑动窗口
  • 踩坑

环境准备

Flink

docker-compose方式

version: "3"
services:jobmanager:image: flink:latestexpose:- "6123"ports:- "8081:8081"command: jobmanagerenvironment:- JOB_MANAGER_RPC_ADDRESS=jobmanagertaskmanager:image: flink:latestexpose:- "6121"- "6122"depends_on:- jobmanagercommand: taskmanagerlinks:- "jobmanager:jobmanager"environment:- JOB_MANAGER_RPC_ADDRESS=jobmanager

前端访问地址: http://192.168.56.112:8081/#/overview

二进制部署

wget https://archive.apache.org/dist/flink/flink-1.13.3/flink-1.13.3-bin-scala_2.11.tgzvim conf/flink-conf.yamljobmanager.rpc.address: 192.168.56.112 # 修改为本机ip./bin/start-cluster.sh

Kafka

version: '2'
services:zookeeper:image: wurstmeister/zookeeper   ## 镜像ports:- "2181:2181"                 ## 对外暴露的端口号kafka:image: wurstmeister/kafka       ## 镜像volumes:- /etc/localtime:/etc/localtime ## 挂载位置(kafka镜像和宿主机器之间时间保持一直)ports:- "9092:9092"environment:KAFKA_ADVERTISED_HOST_NAME: 192.168.56.112    ## 修改:宿主机IPKAFKA_ZOOKEEPER_CONNECT: 192.168.56.112:2181       ## 卡夫卡运行是基于zookeeper的kafka-manager:image: sheepkiller/kafka-manager                ## 镜像:开源的web管理kafka集群的界面environment:ZK_HOSTS:                    ## 修改:宿主机IPports:- "9000:9000"

Mysql

docker run -d -p3306:3306 --name=mysql57 -e MYSQL_ROOT_PASSWORD=111111 mysql:5.7

在这里插入图片描述

Flink 执行 SQL命令

进入SQL客户端CLI

docker exec  -it flink_jobmanager_1  /bin/bash./bin/sql-client.sh

在这里插入图片描述

执行SQL查询

SELECT 'Hello World';

在这里插入图片描述

表格模式

表格模式(table mode)在内存中物化结果,并将结果用规则的分页表格的形式可视化展示出来。执行如下命令启用:

SET sql-client.execution.result-mode = table;

可以使用如下查询语句查看不同模式的的运行结果:

SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;

在这里插入图片描述

变更日志模式

变更日志模式(changelog mode)不会物化结果。可视化展示由插入(+)和撤销(-)组成的持续查询结果流。

SET sql-client.execution.result-mode = changelog;

在这里插入图片描述

Tableau模式

Tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容取决于作业执行模式(execution.type):

SET sql-client.execution.result-mode = tableau;

在这里插入图片描述

注意:当你在流式查询上使用这种模式时,Flink 会将结果持续的打印在当前的控制台上。如果流式查询的输入是有限数据集,那么 Flink 在处理完所有的输入数据之后,作业会自动停止,同时控制台上的打印也会自动停止。如果你想提前结束这个查询,那么可以直接使用 CTRL-C 按键,这个会停止作业同时停止在控制台上的打印。

窗口计算

TUMBLE(time_attr, interval) 定义一个滚动窗口。滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。比如,5 分钟的滚动窗口以 5 分钟为间隔对行进行分组。滚动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。

窗口计算

TUMBLE(time_attr, interval) 定义一个滚动窗口。滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。比如,5 分钟的滚动窗口以 5 分钟为间隔对行进行分组。滚动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。

滚动窗口demo

根据订单信息使用kafka作为数据源表,JDBC作为数据结果表统计用户在5秒内的订单数量,并根据窗口的订单id和窗口开启时间作为主键,将结果实时统计到JDBC中:

  1. 在MySQL的flink数据库下创建表order_count,创建语句如下:
CREATE TABLE `flink`.`order_count` (`user_id` VARCHAR(32) NOT NULL,`window_start` TIMESTAMP NOT NULL,`window_end` TIMESTAMP NULL,`total_num` BIGINT UNSIGNED NULL,PRIMARY KEY (`user_id`, `window_start`)
)        ENGINE = InnoDBDEFAULT CHARACTER SET = utf8mb4COLLATE = utf8mb4_general_ci;
  1. 创建flink opensource sql作业,并提交运行作业
CREATE TABLE orders (order_id string,order_channel string,order_time timestamp(3),pay_amount double,real_pay double,pay_time string,user_id string,user_name string,area_id string,watermark for order_time as order_time - INTERVAL '3' SECOND
) WITH ('connector' = 'kafka','topic' = 'order_topic','properties.bootstrap.servers' = '192.168.56.112:9092','properties.group.id' = 'order_group','scan.startup.mode' = 'latest-offset','format' = 'json'
);CREATE TABLE jdbcSink (user_id string,window_start timestamp(3),window_end timestamp(3),total_num BIGINT,primary key (user_id, window_start) not enforced
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.56.112:3306/flink','table-name' = 'order_count','username' = 'root','password' = '111111','sink.buffer-flush.max-rows' = '1'
);SELECT 'WINDOW',-- window_start,window_end,group_key,record_num,create_time,SUM(record_num) OVER w AS sum_amount
FROM temp
WINDOW w AS (PARTITION BY group_keyORDER BY rowtimeRANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW)select user_id,TUMBLE_START(order_time, INTERVAL '5' SECOND),TUMBLE_END(order_time, INTERVAL '5' SECOND),COUNT(*) from ordersGROUP BY user_id, TUMBLE(order_time, INTERVAL '5' SECOND) having count(*) > 3;SELECT 'WINDOW',user_id,order_id,real_pay,order_timeCOUNT(*) OVER w AS sum_amount
FROM orders
WINDOW w AS (PARTITION BY user_idORDER BY order_timeRANGE BETWEEN INTERVAL '60' SECOND PRECEDING AND CURRENT ROW) insert into jdbcSink select user_id,TUMBLE_START(order_time, INTERVAL '5' SECOND),TUMBLE_END(order_time, INTERVAL '5' SECOND),COUNT(*) from ordersGROUP BY user_id, TUMBLE(order_time, INTERVAL '5' SECOND) having count(*) > 3;
  1. Kafka 相关操作
bin/kafka-topics.sh --zookeeper 192.168.56.112:2181 --listbin/kafka-topics.sh --zookeeper 192.168.56.112:2181 --create --replication-factor 1 --partitions 1 --topic order_topicbin/kafka-console-producer.sh --broker-list 192.168.56.112:9092 --topic order_topicbin/kafka-console-consumer.sh --bootstrap-server 192.168.56.112:9092 --topic order_topic --from-beginningbin/kafka-topics.sh --zookeeper 192.168.56.112:2181 --describe --topic order_topic bin/kafka-topics.sh --zookeeper 192.168.56.112:2181 --delete --topic order_topic 

发送数据样例

{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-09-26 15:20:11", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:28:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:29:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:29:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:29:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:30:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:30:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}

滑动窗口

SELECT * FROM TABLE(HOP(TABLE orders, DESCRIPTOR(order_time), INTERVAL '2' SECOND, INTERVAL '10' SECOND));SELECT * FROM TABLE(HOP(DATA => TABLE orders,TIMECOL => DESCRIPTOR(order_time),SLIDE => INTERVAL '5' MINUTES,SIZE => INTERVAL '10' MINUTES));SELECT window_start, window_end, SUM(pay_amount)FROM TABLE(HOP(TABLE orders, DESCRIPTOR(order_time), INTERVAL '2' SECOND, INTERVAL '10' SECOND))GROUP BY window_start, window_end;

踩坑

  1. Could not find any factory for identifier ‘kafka’ that implements ‘org.apache.flink.table.factories.DynamicTableFactory’ in the classpath.

查看flink version

flink-sql-connector-kafka-1.17.1.jar

https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka/1.17.1

下载对应版本jar,放到lib目录下,重启

  1. Could not find any factory for identifier ‘jdbc’ that implements 'org.apache.flink.table.factories.DynamicTableFactory
    flink-connector-jdbc-3.1.0-1.17.jar
    https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar

  2. Caused by: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.0.31

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/9519.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

媲美Suno、Udio!AI铁了心,要砸音乐人的饭碗

5月10日凌晨,著名语音生成式AI平台ElevenLabs在社交平台宣布,推出文本生成歌曲产品ElevenLabs Music。 从其展示的效果来看,音乐的节奏感、和声、乐器的搭配、情感表达、创意性、风格的多样性、高/低音,可媲美该领域的两款头部产…

c++ 查看线程状态

在C标准库中,std::thread类并没有直接提供查询线程状态的方法。std::thread类提供了创建线程、等待线程结束(join())或分离线程(detach())的接口,但并没有提供一个函数来检查线程是否仍在运行、是否阻塞、是…

618精选好物推荐,五款品质与性价比并存的选择!

在繁忙的生活中,我们总是渴望找到那些能够提升生活品质的好物,让每一天都过得更加精彩。而618购物节,无疑是寻找这些好物的绝佳时机。在这个盛大的购物狂欢中,我们为大家精选了五款品质与性价比并存的选择,让大家在享受…

『大模型笔记』Google CEO Sundar Pichai(桑达尔·皮查伊)谈人工智能的未来!

Google CEO Sundar Pichai(桑达尔皮查伊)谈人工智能的未来! 文章目录 一. Google CEO谈人工智能的未来总结摘要观点时间线二. 参考文献中文字幕视频链接,欢迎关注我的xhs账号:Google CEO 皮查伊谈人工智能的未来! 一. Google CEO谈人工智能的未来

【C++】n个一位数能够组成的最大数

文章目录 题目题目描述输入输出样例输入样例输出 思路AC代码 题目 题目描述 请问n个一位数能够组成的最大的整数是多少。 比如, n 3 n3 n3,3个整数为 1 、 3 、 9 1、3、9 1、3、9,那么组成的最大整数是 931 931 931。 比如, n…

补充RequestAttribute通用工具类

补充https://blog.csdn.net/qq_37148232/article/details/138303873?spm1001.2014.3001.5501 通用工具类&#xff1a; SuppressWarnings("ALL") public class Share {public static <T> void set(String key, T value) {HttpServletRequest request ZYReque…

【Mysql】——收银查询与退货

&#x1f4bb;博主现有专栏&#xff1a; C51单片机&#xff08;STC89C516&#xff09;&#xff0c;c语言&#xff0c;c&#xff0c;离散数学&#xff0c;算法设计与分析&#xff0c;数据结构&#xff0c;Python&#xff0c;Java基础&#xff0c;MySQL&#xff0c;linux&#xf…

Java并发编程:Kilim协程框架

文章目录 一、介绍1、Kilim协程框架解析2、Kilim协程框架应用 一、介绍 Kilim是一个专为Java设计的轻量级协程框架&#xff0c;它通过字节码操纵技术实现了轻量级的协程&#xff0c;为Java开发者提供了更为灵活的并发编程选项。以下是关于Kilim协程框架的解析与应用&#xff1…

struct和union大小计算规则

Union 一&#xff1a;联合类型的定义 联合也是一种特殊的自定义类型&#xff0c;这种类型定义的变量也包含一系列的成员&#xff0c;特征是这些成员公用同一块空间&#xff08;所以联合也叫共用体&#xff09; 比如&#xff1a;共用了 i 这个较大的空间 二&#xff1a; 联合的…

【程序员侠】李飞往事之wifi恶魔

程序员侠李飞是一名技术高超的年轻程序员&#xff0c;他在城市中打击各种网络犯罪活动&#xff0c;保护市民的网络安全。一天&#xff0c;他接到了一个任务&#xff0c;说是城市中有一个邪恶的wifi恶魔正在肆虐&#xff0c;许多人的个人信息被盗取&#xff0c;银行账户被盗刷&a…

AI怎么把图形分割下来

1 画一个图形 2 画一条直线分割 用直线段工具&#xff0c;画一条直线 3 分割操作 用 直接选择工具&#xff0c;先选中直线&#xff0c;按shift键&#xff0c;再选中矩形&#xff0c;把他两都选上 路径查找器&#xff0c;点分割(路径查找器面板如果没有&#xff0c;在窗口 菜单…

接口自动化框架篇:接口框架中的日志记录封装!

接口自动化框架中的日志记录是一个重要的环节&#xff0c;它能帮助我们追踪接口的执行情况、调试问题、分析测试结果等。通过规范的日志记录&#xff0c;我们可以更好地管理和维护接口自动化测试代码。 以下是一个从0到1的详细规范&#xff0c;来进行接口框架中的日志记录封装…

Java方法和数组

方法 Java中的方法就是c语言中的函数。 方法的定义 定义格式如下 修饰符 返回值 方法名([参数列表]){代码块[return 返回值;] } //方括号[]括起来代表可以没有&#xff0c;不是必须有的方法名采用小驼峰命名&#xff08;就是有多个单词&#xff0c;第一个单词首字母小写其…

酷柚易汛ERP源码部署/售后更新/搭建/上线维护

一款基于FastAdminThinkPHPLayui开发的ERP管理系统&#xff0c;帮助中小企业实现ERP管理规范化&#xff0c;此系统能为你解决五大方面的经营问题&#xff1a;1.采购管理 2.销售管理 3.仓库管理 4.资金管理 5.生产管理&#xff0c;适用于&#xff1a;服装鞋帽、化妆品、机械机电…

Vue3自定义指令封装-按钮权限控制v-permission、hasPermissions

背景&#xff1a;平常所接触到的系统权限控制&#xff0c;大部分都是菜单、路由级别的控制&#xff0c;但后台管理系统中&#xff0c;很多操作都是与职责和角色挂钩的&#xff0c;同样一个列表&#xff0c;不同人的操作列并不都一样&#xff0c;有些页面存在一些含有重要数据的…

B+树(B+ Tree)

B树&#xff08;B Tree&#xff09;是一种对B树&#xff08;B-Tree&#xff09;的改进版本&#xff0c;它在数据库系统和文件系统中作为索引结构得到了广泛的应用&#xff0c;特别是在磁盘存储的场景下。B树保留了B树的基本特征&#xff0c;如自平衡、多路分支等&#xff0c;但…

excel中怎么跳转到指定的单元格?

也许你会有这样的需求&#xff0c;如A1单元格中显示B100这种单元格地址&#xff0c;怎么做以点一下就跳转到B100&#xff1f; 一、设置公式 B1HYPERLINK("#"&MID(CELL("FILENAME",A1),FIND("]",CELL("FILENAME",A1))1,99)&&…

java-函数式编程-jdk

背景 函数式接口很简单&#xff0c;但是不是每一个函数式接口都需要我们自己来写jdk 根据 有无参数&#xff0c;有无返回值&#xff0c;参数的个数和类型&#xff0c;返回值的类型 提前定义了一些通用的函数式接口 IntPredicate 参数&#xff1a;有一个&#xff0c;类型是int类…

VSCode(安装)

前言 VSCode&#xff08;全称&#xff1a;Visual Studio Code&#xff09;是一款由微软开发且跨平台的免费源代码编辑器。该软件支持语法高亮、代码自动补全&#xff08;又称 IntelliSense&#xff09;、代码重构、查看定义功能&#xff0c;并且内置了命令行工具和 Git …