基于docker安装flink

文章目录

  • 环境准备
    • Flink
      • docker-compose方式
      • 二进制部署
    • Kafka
    • Mysql
  • Flink 执行 SQL命令
    • 进入SQL客户端CLI
    • 执行SQL查询
      • 表格模式
      • 变更日志模式
      • Tableau模式
      • 窗口计算
    • 窗口计算
      • 滚动窗口demo
      • 滑动窗口
  • 踩坑

环境准备

Flink

docker-compose方式

version: "3"
services:jobmanager:image: flink:latestexpose:- "6123"ports:- "8081:8081"command: jobmanagerenvironment:- JOB_MANAGER_RPC_ADDRESS=jobmanagertaskmanager:image: flink:latestexpose:- "6121"- "6122"depends_on:- jobmanagercommand: taskmanagerlinks:- "jobmanager:jobmanager"environment:- JOB_MANAGER_RPC_ADDRESS=jobmanager

前端访问地址: http://192.168.56.112:8081/#/overview

二进制部署

wget https://archive.apache.org/dist/flink/flink-1.13.3/flink-1.13.3-bin-scala_2.11.tgzvim conf/flink-conf.yamljobmanager.rpc.address: 192.168.56.112 # 修改为本机ip./bin/start-cluster.sh

Kafka

version: '2'
services:zookeeper:image: wurstmeister/zookeeper   ## 镜像ports:- "2181:2181"                 ## 对外暴露的端口号kafka:image: wurstmeister/kafka       ## 镜像volumes:- /etc/localtime:/etc/localtime ## 挂载位置(kafka镜像和宿主机器之间时间保持一直)ports:- "9092:9092"environment:KAFKA_ADVERTISED_HOST_NAME: 192.168.56.112    ## 修改:宿主机IPKAFKA_ZOOKEEPER_CONNECT: 192.168.56.112:2181       ## 卡夫卡运行是基于zookeeper的kafka-manager:image: sheepkiller/kafka-manager                ## 镜像:开源的web管理kafka集群的界面environment:ZK_HOSTS:                    ## 修改:宿主机IPports:- "9000:9000"

Mysql

docker run -d -p3306:3306 --name=mysql57 -e MYSQL_ROOT_PASSWORD=111111 mysql:5.7

在这里插入图片描述

Flink 执行 SQL命令

进入SQL客户端CLI

docker exec  -it flink_jobmanager_1  /bin/bash./bin/sql-client.sh

在这里插入图片描述

执行SQL查询

SELECT 'Hello World';

在这里插入图片描述

表格模式

表格模式(table mode)在内存中物化结果,并将结果用规则的分页表格的形式可视化展示出来。执行如下命令启用:

SET sql-client.execution.result-mode = table;

可以使用如下查询语句查看不同模式的的运行结果:

SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;

在这里插入图片描述

变更日志模式

变更日志模式(changelog mode)不会物化结果。可视化展示由插入(+)和撤销(-)组成的持续查询结果流。

SET sql-client.execution.result-mode = changelog;

在这里插入图片描述

Tableau模式

Tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容取决于作业执行模式(execution.type):

SET sql-client.execution.result-mode = tableau;

在这里插入图片描述

注意:当你在流式查询上使用这种模式时,Flink 会将结果持续的打印在当前的控制台上。如果流式查询的输入是有限数据集,那么 Flink 在处理完所有的输入数据之后,作业会自动停止,同时控制台上的打印也会自动停止。如果你想提前结束这个查询,那么可以直接使用 CTRL-C 按键,这个会停止作业同时停止在控制台上的打印。

窗口计算

TUMBLE(time_attr, interval) 定义一个滚动窗口。滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。比如,5 分钟的滚动窗口以 5 分钟为间隔对行进行分组。滚动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。

窗口计算

TUMBLE(time_attr, interval) 定义一个滚动窗口。滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。比如,5 分钟的滚动窗口以 5 分钟为间隔对行进行分组。滚动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。

滚动窗口demo

根据订单信息使用kafka作为数据源表,JDBC作为数据结果表统计用户在5秒内的订单数量,并根据窗口的订单id和窗口开启时间作为主键,将结果实时统计到JDBC中:

  1. 在MySQL的flink数据库下创建表order_count,创建语句如下:
CREATE TABLE `flink`.`order_count` (`user_id` VARCHAR(32) NOT NULL,`window_start` TIMESTAMP NOT NULL,`window_end` TIMESTAMP NULL,`total_num` BIGINT UNSIGNED NULL,PRIMARY KEY (`user_id`, `window_start`)
)        ENGINE = InnoDBDEFAULT CHARACTER SET = utf8mb4COLLATE = utf8mb4_general_ci;
  1. 创建flink opensource sql作业,并提交运行作业
CREATE TABLE orders (order_id string,order_channel string,order_time timestamp(3),pay_amount double,real_pay double,pay_time string,user_id string,user_name string,area_id string,watermark for order_time as order_time - INTERVAL '3' SECOND
) WITH ('connector' = 'kafka','topic' = 'order_topic','properties.bootstrap.servers' = '192.168.56.112:9092','properties.group.id' = 'order_group','scan.startup.mode' = 'latest-offset','format' = 'json'
);CREATE TABLE jdbcSink (user_id string,window_start timestamp(3),window_end timestamp(3),total_num BIGINT,primary key (user_id, window_start) not enforced
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.56.112:3306/flink','table-name' = 'order_count','username' = 'root','password' = '111111','sink.buffer-flush.max-rows' = '1'
);SELECT 'WINDOW',-- window_start,window_end,group_key,record_num,create_time,SUM(record_num) OVER w AS sum_amount
FROM temp
WINDOW w AS (PARTITION BY group_keyORDER BY rowtimeRANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW)select user_id,TUMBLE_START(order_time, INTERVAL '5' SECOND),TUMBLE_END(order_time, INTERVAL '5' SECOND),COUNT(*) from ordersGROUP BY user_id, TUMBLE(order_time, INTERVAL '5' SECOND) having count(*) > 3;SELECT 'WINDOW',user_id,order_id,real_pay,order_timeCOUNT(*) OVER w AS sum_amount
FROM orders
WINDOW w AS (PARTITION BY user_idORDER BY order_timeRANGE BETWEEN INTERVAL '60' SECOND PRECEDING AND CURRENT ROW) insert into jdbcSink select user_id,TUMBLE_START(order_time, INTERVAL '5' SECOND),TUMBLE_END(order_time, INTERVAL '5' SECOND),COUNT(*) from ordersGROUP BY user_id, TUMBLE(order_time, INTERVAL '5' SECOND) having count(*) > 3;
  1. Kafka 相关操作
bin/kafka-topics.sh --zookeeper 192.168.56.112:2181 --listbin/kafka-topics.sh --zookeeper 192.168.56.112:2181 --create --replication-factor 1 --partitions 1 --topic order_topicbin/kafka-console-producer.sh --broker-list 192.168.56.112:9092 --topic order_topicbin/kafka-console-consumer.sh --bootstrap-server 192.168.56.112:9092 --topic order_topic --from-beginningbin/kafka-topics.sh --zookeeper 192.168.56.112:2181 --describe --topic order_topic bin/kafka-topics.sh --zookeeper 192.168.56.112:2181 --delete --topic order_topic 

发送数据样例

{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-09-26 15:20:11", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:28:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:29:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:29:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:29:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:30:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:30:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}

滑动窗口

SELECT * FROM TABLE(HOP(TABLE orders, DESCRIPTOR(order_time), INTERVAL '2' SECOND, INTERVAL '10' SECOND));SELECT * FROM TABLE(HOP(DATA => TABLE orders,TIMECOL => DESCRIPTOR(order_time),SLIDE => INTERVAL '5' MINUTES,SIZE => INTERVAL '10' MINUTES));SELECT window_start, window_end, SUM(pay_amount)FROM TABLE(HOP(TABLE orders, DESCRIPTOR(order_time), INTERVAL '2' SECOND, INTERVAL '10' SECOND))GROUP BY window_start, window_end;

踩坑

  1. Could not find any factory for identifier ‘kafka’ that implements ‘org.apache.flink.table.factories.DynamicTableFactory’ in the classpath.

查看flink version

flink-sql-connector-kafka-1.17.1.jar

https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka/1.17.1

下载对应版本jar,放到lib目录下,重启

  1. Could not find any factory for identifier ‘jdbc’ that implements 'org.apache.flink.table.factories.DynamicTableFactory
    flink-connector-jdbc-3.1.0-1.17.jar
    https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar

  2. Caused by: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.0.31

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/9519.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

媲美Suno、Udio!AI铁了心,要砸音乐人的饭碗

5月10日凌晨,著名语音生成式AI平台ElevenLabs在社交平台宣布,推出文本生成歌曲产品ElevenLabs Music。 从其展示的效果来看,音乐的节奏感、和声、乐器的搭配、情感表达、创意性、风格的多样性、高/低音,可媲美该领域的两款头部产…

618精选好物推荐,五款品质与性价比并存的选择!

在繁忙的生活中,我们总是渴望找到那些能够提升生活品质的好物,让每一天都过得更加精彩。而618购物节,无疑是寻找这些好物的绝佳时机。在这个盛大的购物狂欢中,我们为大家精选了五款品质与性价比并存的选择,让大家在享受…

『大模型笔记』Google CEO Sundar Pichai(桑达尔·皮查伊)谈人工智能的未来!

Google CEO Sundar Pichai(桑达尔皮查伊)谈人工智能的未来! 文章目录 一. Google CEO谈人工智能的未来总结摘要观点时间线二. 参考文献中文字幕视频链接,欢迎关注我的xhs账号:Google CEO 皮查伊谈人工智能的未来! 一. Google CEO谈人工智能的未来

struct和union大小计算规则

Union 一:联合类型的定义 联合也是一种特殊的自定义类型,这种类型定义的变量也包含一系列的成员,特征是这些成员公用同一块空间(所以联合也叫共用体) 比如:共用了 i 这个较大的空间 二: 联合的…

AI怎么把图形分割下来

1 画一个图形 2 画一条直线分割 用直线段工具,画一条直线 3 分割操作 用 直接选择工具,先选中直线,按shift键,再选中矩形,把他两都选上 路径查找器,点分割(路径查找器面板如果没有,在窗口 菜单…

接口自动化框架篇:接口框架中的日志记录封装!

接口自动化框架中的日志记录是一个重要的环节,它能帮助我们追踪接口的执行情况、调试问题、分析测试结果等。通过规范的日志记录,我们可以更好地管理和维护接口自动化测试代码。 以下是一个从0到1的详细规范,来进行接口框架中的日志记录封装…

Java方法和数组

方法 Java中的方法就是c语言中的函数。 方法的定义 定义格式如下 修饰符 返回值 方法名([参数列表]){代码块[return 返回值;] } //方括号[]括起来代表可以没有,不是必须有的方法名采用小驼峰命名(就是有多个单词,第一个单词首字母小写其…

酷柚易汛ERP源码部署/售后更新/搭建/上线维护

一款基于FastAdminThinkPHPLayui开发的ERP管理系统,帮助中小企业实现ERP管理规范化,此系统能为你解决五大方面的经营问题:1.采购管理 2.销售管理 3.仓库管理 4.资金管理 5.生产管理,适用于:服装鞋帽、化妆品、机械机电…

Vue3自定义指令封装-按钮权限控制v-permission、hasPermissions

背景:平常所接触到的系统权限控制,大部分都是菜单、路由级别的控制,但后台管理系统中,很多操作都是与职责和角色挂钩的,同样一个列表,不同人的操作列并不都一样,有些页面存在一些含有重要数据的…

excel中怎么跳转到指定的单元格?

也许你会有这样的需求,如A1单元格中显示B100这种单元格地址,怎么做以点一下就跳转到B100? 一、设置公式 B1HYPERLINK("#"&MID(CELL("FILENAME",A1),FIND("]",CELL("FILENAME",A1))1,99)&&…

java-函数式编程-jdk

背景 函数式接口很简单,但是不是每一个函数式接口都需要我们自己来写jdk 根据 有无参数,有无返回值,参数的个数和类型,返回值的类型 提前定义了一些通用的函数式接口 IntPredicate 参数:有一个,类型是int类…

VSCode(安装)

前言 VSCode(全称:Visual Studio Code)是一款由微软开发且跨平台的免费源代码编辑器。该软件支持语法高亮、代码自动补全(又称 IntelliSense)、代码重构、查看定义功能,并且内置了命令行工具和 Git …

5月游戏市场迎来新的体验,网易两款游戏重磅出炉

易采游戏网5月9日消息,随着科技的飞速发展,手机游戏已经成为人们休闲娱乐的重要方式。在这个领域,网易作为国内领先的游戏开发商,一直致力于为玩家带来高品质的游戏体验。近日,网易携手国际大厂Square Enix&#xff0c…

2024年数维杯高校数学建模竞赛(B题) 建模解析| 生物质和煤共热解问题的研究 |小鹿学长带队指引全代码文章与思路

我是鹿鹿学长,就读于上海交通大学,截至目前已经帮200人完成了建模与思路的构建的处理了~ 本篇文章是鹿鹿学长经过深度思考,独辟蹊径,实现综合建模。独创复杂系统视角,帮助你解决数维杯的难关呀。 完整内容可…

创建Spring Boot项目及配置

目录 一、创建项目所需要的插件 1、安装插件 二、创建项目 三、创建项目所面临的常见问题。 1、IDEA不能识别 2、无效的发行版本 3、确认jar包是否下载成功 一、创建项目所需要的插件 1、安装插件 首先需要在IDEA插件里面搜索Spring,选择Spring Boot Helper…

政安晨:【Keras机器学习示例演绎】(四十一)—— 使用预先训练的词嵌入

目录 设置 简介 下载新闻组 20 数据 让我们来看看这些数据 清洗数据并将数据分成训练集和验证集 创建词汇索引 加载预训练的词嵌入 建立模型 训练模型 导出端到端模型 政安晨的个人主页:政安晨 欢迎 👍点赞✍评论⭐收藏 收录专栏: TensorFlow与…

Python批量修改图片文件名中的指定名称

批量处理图像时,图片名有时需要统一,本教程仅针对图片中名如:0001x4.png,批量将图片名中的x4去除,只留下0001.png的情况。 如果想要按照原图片顺序批量修改图片名,参考其它博文:按照原顺序批量…

Vue中常用指令

Vue中的常用指令 Vue中的常用指令内容渲染指令条件渲染指令事件绑定指令内联语句事件处理函数给事件处理函数传参 属性绑定指令列表渲染指令v-for中的key 双向绑定指令 Vue中的常用指令 概念:指令 是 Vue 提供的带有 v- 前缀 的 特殊 标签属性。Vue 会根据不同的【…

硬性清空缓存的方法

前端发布代码后,我们是需要刷新页面再验证的。有时候仅仅f5 或者ctrlshiftdelete快捷键仍然有历史缓存,这时可以通过下面的方法硬性清空缓存。 以谷歌浏览器为例,打开f12,右键点击刷新按钮,选择【清空缓存并硬性加载】…