flink 1.18 sql demo

flink 1.18 sql demo

更换flink-table-planner 为 flink-table-planner-loader pom.xml

    <dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-uber</artifactId><version>1.18.0</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>1.18.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>1.18.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.18.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.18.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-csv --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.18.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>3.0.2-1.18</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-table-planner_2.12</artifactId>-->
<!--            <version>1.18.0</version>-->
<!--        </dependency>-->
<!--         https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-loader--><dependency><groupId>org.apache.flink</groupId><artifactId> </artifactId><version>1.18.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.21</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.21</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><!-- Replace this with the main class of your job --><mainClass>my.programs.main.clazz</mainClass></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/></transformers></configuration></execution></executions></plugin></plugins></build>

demo

package com.cn;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @Classname app* @Description TODO* @Date 2024/1/12 11:26* @Created by typezhou*/
public class App {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(1000L);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);String str = "CREATE TABLE KafkaTable (\n" +"  `user_id` STRING,\n" +"  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'aaaa',\n" +"  'properties.bootstrap.servers' = '172.xx.xx.xx:9092,172.xx.86.xx:9092,172.xx.xx.xx:9092',\n" +"  'properties.group.id' = 'testGrou1p',\n" +"  'scan.startup.mode' = 'latest-offset',\n" +"  'format' = 'csv'\n" +")";tableEnv.executeSql(str);Table tableResult = tableEnv.sqlQuery("SELECT user_id  FROM KafkaTable group by user_id");
//        DataStream<ResultBean> tuple2DataStream = tableEnv.toDataStream(result, ResultBean.class);
//        SingleOutputStreamOperator<ResultBean> map = tuple2DataStream.map(new MapFunction<ResultBean, ResultBean>() {
//            @Override
//            public ResultBean map(ResultBean s) throws Exception {
//                Thread.sleep(3000L);
//                return s;
//            }
//        });
//        tuple2DataStream.print();String sqlPri = "CREATE TABLE print_table (\n" +"  `user_id` STRING \n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'bbbb',\n" +"  'properties.bootstrap.servers' = '172.xx.xx.xx:9092,172.xx.86.xx:9092,172.xx.xx.xx:9092',\n" +"  'format' = 'csv'\n" +")";tableEnv.executeSql(sqlPri);tableEnv.executeSql("insert into  print_table SELECT user_id FROM KafkaTable");}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/625487.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2020年财政收支

偶感兴趣&#xff0c;花了点时间整理 有兴趣的可以参照下面的链接整理完整2022年的数据&#xff0c;2023年的数据还有12月份的数据未出&#xff0c;估计在这几天出。 附 2022年的财政收支情况 2022年基金支出预算表 2020年的社保收入是7.6万亿。 上图个税金额写错了&#xff0c…

信息安全应急演练方案-模板

文章目录 信息安全应急演练方案一、应急演练总体目标二、安全应急演练的具体目标为&#xff1a;三、应急演练计划安排&#xff1a;3.1 黑客攻击服务器应急演练时间安排&#xff1a;演练地点&#xff1a;演练步骤&#xff1a; 3.2 大规模病毒&#xff08;含恶意软件&#xff09;…

Mindspore 公开课 - CodeGeeX

CodeGeeX: 多语言代码生成模型 CodeGeeX 是一个具有130亿参数的多编程语言代码生成预训练模型。CodeGeeX采用华为MindSpore框架实现&#xff0c;在鹏城实验室“鹏城云脑II”中的192个节点&#xff08;共1536个国产昇腾910 AI处理器&#xff09;上训练而成。截至2022年6月22日&…

数据库结构文档生成方法二(EZDML)

EZDML 下载链接&#xff1a;EZDML - 下载 我们常用的是数据建模有PowerDesigner,EZDML也是一款数据建模工具&#xff0c;而且功能很多&#xff0c;除了生成sql&#xff0c;还可以生成前端后端代码等等。 我们直接下载最新版后点击安装&#xff0c;打开后会默认打开示例&#…

基于springboot数码论坛系统源码和论文

网络的广泛应用给生活带来了十分的便利。所以把数码论坛与现在网络相结合&#xff0c;利用java技术建设数码论坛系统&#xff0c;实现数码论坛的信息化。则对于进一步提高数码论坛发展&#xff0c;丰富数码论坛经验能起到不少的促进作用。 数码论坛系统能够通过互联网得到广泛…

window系统安装MySQL -- MySQL(1)

第一步&#xff1a; 下载mysql安装包 1&#xff09;打开MySQL官方链接&#xff1a;https://www.mysql.com 2&#xff09;选择 DOWNLOADS 3&#xff09;往下滑&#xff0c;点击社区版本下载 4&#xff09;点击 MySQL installer for Windows 5&#xff09;点击安装 第二步&#…

2024年腾讯云服务器购买价格,真便宜

腾讯云服务器租用价格表&#xff1a;轻量应用服务器2核2G3M价格62元一年、2核2G4M价格118元一年&#xff0c;540元三年、2核4G5M带宽218元一年&#xff0c;2核4G5M带宽756元三年、轻量4核8G12M服务器446元一年、646元15个月&#xff0c;云服务器CVM S5实例2核2G配置280.8元一年…

细说JavaScript对象(JavaScript对象详解)

在JavaScript中对象作为数据类型之一&#xff0c;它的数据结构区别于其余5中数据类型&#xff0c;从数据结构角度看对象就是数据值的几个&#xff0c;其书就结构就是若干组名值对&#xff0c;类似于其他语言中的哈希、散列 关联数组等&#xff0c;但对象在JavaScript中不仅仅扮…

如何通过pytest+requests+allure自动化测试接入Jenkins?测开必备

最近在这整理知识&#xff0c;发现在pytest的知识文档缺少系统性&#xff0c;这里整理一下&#xff0c;方便后续回忆。 在python中&#xff0c;大家比较熟悉的两个框架是unittest和pytest&#xff1a; Unittest是Python标准库中自带的单元测试框架&#xff0c;Unittest有时候也…

2024腾讯云服务器购买指南一步步全流程攻略(超详细)

腾讯云服务器购买流程很简单&#xff0c;有两种购买方式&#xff0c;直接在官方活动上购买比较划算&#xff0c;在云服务器CVM或轻量应用服务器页面自定义购买价格比较贵&#xff0c;但是自定义购买云服务器CPU内存带宽配置选择范围广&#xff0c;活动上购买只能选择固定的活动…

flutter base64图片保存到相册

首先base64转成uint8List&#xff0c;然后再用插件保存到相册&#xff08;没有内置的方法处理&#xff09; 保存图片的插件 photo_manager: ^2.6.0完整代码如下 //保存图片savePhotoJs(String base64String) async {Uint8List bytes UriData.parse(base64String).contentAsB…

音视频中的DTS和PTS区别

一、什么是PTS、DTS 1.PTS:即显示时间戳&#xff0c;用来告诉播放器在什么时候显示这一帧数据 2.DTS:即解码时间戳&#xff0c;用来告诉播放器在什么时间来解码这一帧的数据 3.GOP&#xff1a;MPEG使用的一种视频压缩技术 总的来说PTS和DTS用于指导播放器端的行为 二&…

导图解房(04)验房流程总结

本章主要是对模型 流程的使用&#xff0c;雷老板也说过&#xff1a;99%的问题实际上都有明确的解决方案和流程&#xff0c;只要问一问就好了&#xff0c;验房这件事而也是如此。 1 验三书一表一证 验房第一步并不是直接看房&#xff0c;而是看三书一表一证&#xff0c;先确保…

基于SPI的插件式开发实现方案之@AutoService+ServiceLoader介绍及Dolphinscheduler中的实际应用

1.插件化开发概述 插件化开发模式正在很多编程语言或技术框架中得以广泛的应用实践&#xff0c;比如大家熟悉的jenkins&#xff0c;docker可视化管理平台rancher&#xff0c;以及日常编码使用的编辑器idea&#xff0c;vscode等。 实现服务模块之间解耦的方式有很多&#xff0…

PHP中excel带图片数据导入

前提&#xff1a;有个需求需要实现带图片的excel数据导入数据库中&#xff0c;发现PHPExcel - Excel 操作库已经停止维护&#xff0c;在PHP8的版本中&#xff0c;有些语法不支持&#xff0c;报错一堆&#xff0c;改了一堆&#xff0c;又还有一堆。所以决定找个替代的扩展&#…

【wow-ts】前端学习笔记Typescript基础语法(一)

项目地址是https://github.com/datawhalechina/wow-ts。 我选择的是ts前端课程 Typescript笔记 TypeScript 入门介绍基础数据类型TypeScript 基础数据结构TypeScript 变量声明变量作用域 TypeScript 入门介绍 第一次接触ts&#xff0c;先去了解了下ts的内容&#xff0c;复制内…

22.实战演练--记住密码和登录状态

在登录注册案例的基础上&#xff0c;实现一个相对完整的登录注册模块 (1).记住密码 (2).记住登录状态&#xff08;自动登录&#xff09; (3).注册成功&#xff0c;登录成功&#xff0c;退出登录时的页面跳转

Java基础面试题(一)

Java 语言有哪些特点? 简单易学&#xff1b;面向对象&#xff08;封装&#xff0c;继承&#xff0c;多态&#xff09;&#xff1b;平台无关性&#xff08; Java 虚拟机实现平台无关性&#xff09;&#xff1b;支持多线程&#xff08; C 语言没有内置的多线程机制&#xff0c;…

雍禾医疗好医生:雍禾植发张华医生立志服务好毛发患者

作为中国领先的专门从事毛发医疗服务的医疗集团&#xff0c;雍禾医疗提供诊疗、植发、养固等一站式毛发医疗服务&#xff0c;旗下拥有由专业植发品牌“雍禾植发”、医疗养固品牌“史云逊”、女性美学植发品牌“雍禾发之初”及医学假发品牌“哈发达”等组成的全产业链品牌矩阵。…

MySQL 协议(非常详细适合小白学习)

MySQL 查询过程 MySQL 查询过程大致如下&#xff1a; 1&#xff09;客户端与服务器端建立连接&#xff1b; 2&#xff09;客户端登陆 MySQL&#xff1b; 3&#xff09;客户端向服务器端发起一条请求&#xff1b; 4&#xff09;服务器端先检查查询缓存&#xff0c;如果命中缓…