Flink SQL 常用作业sql

目录

  • flink sql常用配置
  • kafka source to mysql sink
  • 窗口函数 开窗
  • datagen 自动生成数据表
    • tumble 滚动窗口
    • hop 滑动窗口
    • cumulate 累积窗口
  • grouping sets 多维分析
  • over 函数
  • TopN

flink sql常用配置

设置输出结果格式
SET sql-client.execution.result-mode=tableau;

kafka source to mysql sink

kafka 
topic: bop_log_realtime
数据结构:
{"timestamp":"2023-10-31 14:26:02.528","serverip":"10.13.177.209","level":"INFO","servicename":"bop-fms-query-info","traceid":"","spanid":"","parent":"","message":"Resolving eureka endpoints via configuration"}mysql表:
库名:flink_test
CREATE TABLE `bop_log_realtime_warning` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`serverip` varchar(255) NOT NULL DEFAULT '',`timestamp` varchar(255) NOT NULL DEFAULT '',`level` varchar(255) NOT NULL DEFAULT '',`servicename` varchar(255) NOT NULL DEFAULT '',`traceid` varchar(255) NOT NULL DEFAULT '',`spanid` varchar(255) NOT NULL DEFAULT '',`parent` varchar(255) NOT NULL DEFAULT '',`message` varchar(255) NOT NULL DEFAULT '',`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;CREATE TABLE kafka_log_realtime_json (`serverip` STRING,`timestamp` STRING,`level` STRING,`servicename` STRING,`traceid` STRING,`spanid` STRING,`parent` STRING,`message` STRING
) WITH ('connector' = 'kafka','topic' = 'bop_log_realtime','properties.bootstrap.servers' = '10.2.25.221:9092,10.2.25.221:9093','properties.group.id' = 'testGroup2','format' = 'json','scan.startup.mode' = 'latest-offset'
);CREATE TABLE bop_log_realtime_warning (`serverip` STRING,`timestamp` STRING,`level` STRING,`servicename` STRING,`traceid` STRING,`spanid` STRING,`parent` STRING,`message` STRING
) WITH (
'connector' = 'jdbc'
,'url' = 'jdbc:mysql://m3309i.hebe.grid.xx.com.cn:3309/flink_test?zeroDateTimeBehavior=convertToNull&characterEncoding=utf-8&useSSL=false&autoReconnect=true&serverTimezone=Asia/Shanghai'
,'username' = 'super_mis'
,'password' = 'mis_password'
,'table-name' = 'bop_log_realtime_warning'
);insert into bop_log_realtime_warning 
SELECT`serverip` ,`timestamp` ,`level` ,`servicename` ,`traceid` ,`spanid` ,`parent` ,`message` FROM kafka_log_realtime_json;

窗口函数 开窗

datagen 自动生成数据表

CREATE TABLE ws (id INT,vc INT,pt AS PROCTIME(), --处理时间et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间WATERMARK FOR et AS et - INTERVAL '5' SECOND --watermark
) WITH ('connector' = 'datagen','rows-per-second' = '10','fields.id.min' = '1','fields.id.max' = '3','fields.vc.min' = '1','fields.vc.max' = '100'
);CREATE TABLE sink (id INT,ts BIGINT,vc INT
) WITH ('connector' = 'print'
);

tumble 滚动窗口

滚动窗口 窗口大小5selectid,sum(vc) vcSum,window_start,window_endfrom table(TUMBLE(table ws, descriptor(et), INTERVAL '5' SECOND))group by id, window_start, window_end;

hop 滑动窗口

滑动窗口 滑动步长5秒 窗口大小10秒
注意:窗口大小=滑动步长的整数倍(底层会优化成多个小滚动窗口)
selectid,sum(vc) vcSum,window_start,window_endfrom table(hop(table ws, descriptor(et), INTERVAL '5' SECOND, INTERVAL '10' SECOND))group by id, window_start, window_end;

cumulate 累积窗口

注意:窗口大小=累积步长的整数倍
selectid,sum(vc) vcSum,window_start,window_endfrom table(CUMULATE(table ws, descriptor(et), INTERVAL '5' SECOND))group by id, window_start, window_end;

grouping sets 多维分析

selectid,sum(vc) vcSum,window_start,window_endfrom table(TUMBLE(table ws, descriptor(et), INTERVAL '5' SECOND))group by window_start, window_end,grouping sets ( (id) );

over 函数

TopN

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/132003.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【linux编程】linux文件IO的系统函数(close/read/fcntl/dup/dup2)

close函数 函数原型&#xff1a; #include <unistd.h> int close(int fd);参数&#xff1a;fd&#xff1a;要关闭的文件描述符 返回值&#xff1a;成功返回0&#xff0c;失败返回-1并设置errno 功能&#xff1a;关闭一个已经打开的文件&#xff0c;释放相关的资源。在…

最新 vie-vite框架下 jtopo安装使用

官方地址 官方源码 安装下载 1.官方好像都没有给git地址&#xff0c;尝试npm安装报错 2.找到1.0.5之前的版本npm i jtopo2&#xff0c;安装成功后使用报错&#xff0c;应该是版本冲突了 1.本地引入&#xff0c; 点击官方源码下载&#xff0c;需要jtopo_npm文件 2.引入到本…

Jetpack:030-Jetpack中的状态

文章目录 1. 概念介绍2. 使用方法2.1 可监听对象2.2 获取状态值2.3 修改状态值2.4 重组函数 3. 示例代码4. 内容总结 我们在上一章回中介绍了Jetpack中网格布局相关的内容&#xff0c;本章回中主要 介绍状态。闲话休提&#xff0c;让我们一起Talk Android Jetpack吧&#xff0…

【SpringCloud Alibaba -- Nacos】Linux 搭建 Nacos 集群

搭建 Nacos 集群 架构 centos安装docker https://docs.docker.com/engine/install/centos/ 详细配置过程 MySql8 mysql数据库配置 数据库脚本 nacos/conf/nacos-mysql.sql Nacos2 application.properties 修改为mysql spring.datasource.platformmysqldb.num1 db.url…

【工具】Github统计代码行数工具推荐(VScode插件、兼容任何平台、不用下载安装包)

需求&#xff1a; 1&#xff09;被要求统计代码行数&#xff1b; 2&#xff09;不想打开Linux&#xff0c;懒得下载Windows版本GitStats&#xff1b; 3&#xff09;打开了Linux但也不记得find命令行怎么用&#xff1b; 4&#xff09;打开了Linux&#xff0c;装好了Gitstats但自…

wagtail的使用

文章目录 安装虚拟环境新建项目时指定虚拟环境打开已有项目添加虚拟环境 安装wagtail查看安装后的包 创建wagtail项目安装依赖迁移创建超级用户运行项目 管理工作台内容扩展首页的数据模型更新数据库修改模板页创建一个页面的过程 models中的基本字段templates字符型文本字段富…

汽车标定技术(四)--问题分析:多周期测量时上位机显示异常

目录 1.问题现象 2.数据流分析 ​​​​3.代码分析 3.1 AllocDAQ 3.2 AllocOdt 3.3 AllocOdtEntry 4.根因分析及解决方法 4.1 根因分析 4.2 解决方案 1.问题现象 在手撸XCP代码时&#xff0c; DAQ的实现是一大头痛的事情。最初单周期实现还好一点&#xff0c;特别是…

MATLAB - Gazebo 联合仿真 —— 使用 UR10 机械臂检测和采摘水果

系列文章目录 文章目录 系列文章目录前言一、设置 Gazebo 仿真环境二、在 Gazebo 中模拟和控制机器人2.1 概述2.2 任务调度器2.3 感知和目标生成系统2.4 运动规划2.5 机械臂和关节控制系统 三、分配用于控制机器人的参数3.1 定义机器人模型和运动规划参数&#xff0c;3.2 定义机…

OpenCV(应用) —— 目标轮廓的相关应用

文章目录 一、目标轮廓的获取与绘制二、轮廓的信息&#xff08;面积和周长&#xff09;三、轮廓外接形状的三种表达方式 一、目标轮廓的获取与绘制 通常&#xff0c;使用findContours() 函数是为了获取一张图像内目标对象的所有轮廓&#xff0c;并且在 OpenCV4.x 版本中&#…

Python 深度学习导入的一些包的说明

Python 深度学习导入的一些包的说明 这段代码导入了一些Python库和模块&#xff0c;并定义了一些数据转换操作。 from future import print_function, division&#xff1a;这是一个Python 2和Python 3兼容性的导入语句。它确保在Python 2中使用Python 3的print函数和除法运算符…

学习c++的第十一天

目录 继承和派生 基类 & 派生类 访问控制和继承 派生类的构造函数 派生类的析构函数 继承类型 多继承 重载运算符和重载函数 函数重载 运算符重载 可重载运算符/不可重载运算符 运算符重载实例 继承和派生 先来说继承&#xff0c;这与现实生活中的继承意思差不…

[直播自学]-[汇川easy320]搞起来(1)给PLC供电

从没正儿八经的用一用PLC&#xff0c;所以双11在淘宝入手一个EASY320&#xff0c;大概1000出头。 到货后&#xff0c;汇川官网搜了一下资料&#xff0c;搜到这几个&#xff1a; 首先是给PLC供电吧&#xff0c;看了下PLC前面是24V&#xff0c;不知道供电范围多宽&#xff0c;于…

YoloV8目标检测与实例分割——目标检测onnx模型推理

一、模型转换 1.onnxruntime ONNX Runtime&#xff08;ONNX Runtime或ORT&#xff09;是一个开源的高性能推理引擎&#xff0c;用于部署和运行机器学习模型。它的设计目标是优化执行使用Open Neural Network Exchange&#xff08;ONNX&#xff09;格式定义的模型&#xff0c;…

helm一键部署grafana

一键部署命令 helm repo add prometheus-community https://prometheus-community.github.io/helm-charts helm repo update helm install prometheus prometheus-community/kube-prometheus-stack暴露服务 kubectl port-forward --address 0.0.0.0 deployment/prometheus-gr…

https原理

首先说一下几个概念&#xff1a;对称加密、非对称加密 对称加密&#xff1a; 客户端和服务端使用同一个秘钥&#xff0c;分两种情况&#xff1a; 1、所有的客户端和服务端使用同一个秘钥&#xff0c;这个秘钥被泄漏后数据不再安全 2、每个客户端生成一个秘钥&…

[云原生案例2.1 ] Kubernetes的部署安装 【单master集群架构 ---- (二进制安装部署)】节点部分

文章目录 1. 常见的K8S安装部署方式1.1 Minikube1.2 Kubeadm1.3 二进制安装部署 2. Kubernetes单master集群架构 ---- &#xff08;二进制安装部署&#xff09;2.1 前置准备2.2 操作系统初始化2.3 部署 docker引擎 ---- &#xff08;所有 node 节点&#xff09;2.4 部署 etcd 集…

重启某个节点、重启电脑服务器后,kubernetes无法运行,k8s无法运行

问题描述 环境&#xff1a;ubuntu18.04 LTS 现象&#xff1a;按步骤安装kubernetes后&#xff0c;正常启动&#xff0c;各个命令均可正常使用。服务器重启后&#xff0c;执行命令错误信息如下&#xff1a; sudo kubectl get nodesThe connection to the server 127.0.0.1:644…

HTML_案例1_注册页面

用纯html页面&#xff0c;不用css画一个注册页面。 最终效果如下&#xff1a; html页面代码如下&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>注册页面</title> </head>…

ffmpeg命令帮助文档

一&#xff1a;帮助文档的命令格式 ffmpeg -h帮助的基本信息ffmpeg -h long帮助的高级信息ffmpeg -h full帮助的全部信息 ffmpeg的命令使用方式&#xff1a;ffmpeg [options] [[infile options] -i infile] [[outfile options] outfile] 二&#xff1a;将帮助文档输出到文件 …

部署ELK

一、elasticsearch #拉取镜像 docker pull elasticsearch:7.12.1 #创建ELK docker网络 docker network create elk #启动ELK docker run -d --name es --net elk -P -e "discovery.typesingle-node" elasticsearch:7.12.1 #拷贝配置文件 docker cp es:/usr/share/el…