flink table view datastream互转

case class outer(f1:String,f2:Inner)
case class outerV1(f1:String,f2:Inner,f3:Int)
case class Inner(f3:String,f4:Int)

测试代码

package com.yy.table.convertimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.types.DataTypeobject streamPOJO2table {case class outer(f1:String,f2:Inner)case class outerV1(f1:String,f2:Inner,f3:Int)case class Inner(f3:String,f4:Int)def main(args: Array[String]): Unit = {// flink1.13 流处理环境初始化val env = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = StreamTableEnvironment.create(env)import org.apache.flink.streaming.api.scala._val ds1: DataStream[outer] = env.fromElements(outer("a",Inner("b",2)),outer("d",Inner("e",4)))val table1: Table = tEnv.fromDataStream(ds1)
//    table1
//      .execute()
//      .print()/*+----+--------------------------------+--------------------------------+
| op |                             f1 |                             f2 |
+----+--------------------------------+--------------------------------+
| +I |                              a |                   (f3=b, f4=2) |
| +I |                              d |                   (f3=e, f4=4) |
+----+--------------------------------+--------------------------------+*///    table1
//      .print()/*5> +I[d, Inner(e,4)]
4> +I[a, Inner(b,2)]*/tEnv.createTemporaryView("view1", ds1)val tableResult1: TableResult = tEnv.executeSql("select f1,f2,(f2.f4 + 100) as f3 from view1")tableResult1.print()/*+----+--------------------------------+--------------------------------+-------------+
| op |                             f1 |                             f2 |          f3 |
+----+--------------------------------+--------------------------------+-------------+
| +I |                              a |                   (f3=b, f4=2) |         102 |
| +I |                              d |                   (f3=e, f4=4) |         104 |
+----+--------------------------------+--------------------------------+-------------+*///val t1: Table = tEnv.sqlQuery("select f1,f2,(f2.f4 + 100) as f3 from view1")
//    t1.print()//    println(t1.getResolvedSchema)/*
+----+--------------------------------+--------------------------------+-------------+
| op |                             f1 |                             f2 |          f3 |
+----+--------------------------------+--------------------------------+-------------+
| +I |                              a |                   (f3=b, f4=2) |         102 |
| +I |                              d |                   (f3=e, f4=4) |         104 |
+----+--------------------------------+--------------------------------+-------------+
2 rows in set
(`f1` STRING,`f2` *com.yy.table.convert.streamPOJO2table$Inner<`f3` STRING, `f4` INT NOT NULL>* NOT NULL,`f3` INT NOT NULL
)*/println("---- 1 -------")// tableResult转datastreamval o1: DataStream[outerV1] = tEnv.toDataStream[outerV1](t1,classOf[outerV1])
//    o1.print()println("---- 2 -------")tEnv.executeSql("""|select|f1|,f2.f3|,f2.f4|from view1|""".stripMargin)
//      .print()/*+----+--------------------------------+--------------------------------+--------------------------------+
| op |                             f1 |                             f3 |                             f4 |
+----+--------------------------------+--------------------------------+--------------------------------+
| +I |                              a |                              b |                              c |
| +I |                              d |                              e |                              f |
+----+--------------------------------+--------------------------------+--------------------------------+*/tEnv.executeSql("""|select|f1|,(f2.f3,f2.f4)|from view1|""".stripMargin)
//      .print()env.execute("jobName1")}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/604593.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

w18认证崩溃之暴力破解DVWA

一、实验环境 攻击工具&#xff1a;burpsuite2021.12 靶场&#xff1a;DVWA二、实验目的 演示暴破DVWA的medium和high两个级别&#xff0c;low级别请查看w18认证崩溃之暴力破解4种攻击模式 三、实验步骤 1.设置靶场medium级别 2.开启谷歌代理插件&#xff0c;开启bp拦截&…

AI教我学编程之AI自刀

AI教我学编程系列学习第二课 — C#变量类型 上节回顾知识梳理C#基本变量类型 对话AI分歧产生本段总结 它说得对吗&#xff1f;我随即发问经典AI自刀他来了 总结 上节回顾 在上一节中我们发现&#xff0c;AI工具似乎还不能达到教学的水平&#xff0c;所以在本节中&#xff0c;…

数据在内存中的存储方式

前言&#xff1a; 期末临近&#xff0c;继续复习&#xff01; 今天要复习的内容是数据在内存中的存储&#xff0c;主要是整型与浮点两种&#xff0c;还有大小端的介绍。 提出问题 打印结果是255 -1 为什么&#xff1f; 首先我们要知道数据都是以二进制的形式存…

Spring Framework和SpringBoot的区别

目录 一、前言 二、什么是Spring 三、什么是Spring Framework 四、什么是SpringBoot 五、使用Spring Framework构建工程 六、使用SpringBoot构建工程 七、总结 一、前言 作为Java程序员&#xff0c;我们都听说过Spring&#xff0c;也都使用过Spring的相关产品&#xff0…

uni-app 前后端调用实例 基于Springboot 详情页实现

锋哥原创的uni-app视频教程&#xff1a; 2023版uniapp从入门到上天视频教程(Java后端无废话版)&#xff0c;火爆更新中..._哔哩哔哩_bilibili2023版uniapp从入门到上天视频教程(Java后端无废话版)&#xff0c;火爆更新中...共计23条视频&#xff0c;包括&#xff1a;第1讲 uni…

mysql进阶-重构表

目录 1. 原因 2. 如何重构表呢&#xff1f; 2.1 命令1&#xff1a; 2.2 命令2&#xff1a; 2.3 命令3&#xff1a; 1. 原因 正常的业务开发&#xff0c;为什么需要重构表呢&#xff1f; 原因1&#xff1a;某张表存在大量的新增和删除操作&#xff0c;导致表经历过大量的…

JavaScript异常处理实战

前言 之前在对公司的前端代码脚本错误进行排查&#xff0c;试图降低 JS Error 的错误量&#xff0c;结合自己之前的经验对这方面内容进行了实践并总结&#xff0c;下面就此谈谈我对前端代码异常监控的一些见解。 本文大致围绕下面几点展开讨论&#xff1a; JS 处理异常的方式…

Keil 5 ARMCC编译错误和警告解释大全(2) 序列号1000-2000

1001&#xff1a;由 using 声明指定的类成员必须在直接基类中可见 1003&#xff1a;Sun 模式与 cfront 模式不兼容 1004&#xff1a;严格模式与 Sun 模式不兼容 1005&#xff1a;只有在编译 C 时才允许使用太阳模式 1006&#xff1a; 模板模板参数不能与其模板参数之一同名…

解决 Postman 报错问题:一份综合指南

Postman 是一个流行的 API 测试工具&#xff0c;它可以帮助开发者和测试人员快速地创建和发送各种 HTTP 请求&#xff0c;并查看响应结果。但是&#xff0c;在使用 Postman 的过程中&#xff0c;有时候会遇到一些报错或异常情况&#xff0c;影响了正常的测试流程。本文将介绍一…

图像分割-Grabcut法

版权声明&#xff1a;本文为博主原创文章&#xff0c;转载请在显著位置标明本文出处以及作者网名&#xff0c;未经作者允许不得用于商业目的。 本文的C#版本请访问&#xff1a;图像分割-Grabcut法(C#)-CSDN博客 GrabCut是一种基于图像分割的技术&#xff0c;它可以用于将图像…

上海东海职业技术学院低代码实训平台建设项目竞争性磋商公告

上海东海职业技术学院低代码实训平台建设项目竞争性磋商公告 招标&#xff5c;招标公告 上海市|闵行区 项目编号&#xff1a;0773-2340GNSHFWCS2823 招标单位&#xff1a;上海东海职业技术学院 代理单位&#xff1a;中金招标有限责任公司 预算金额&#xff1a;59万元 联系方式&…

Python:sqlalchemy报错DetachedInstanceError

sqlalchemy 对象commit之后再使用就会报错 DetachedInstanceError既然这样&#xff0c;提交后还需要使用&#xff0c;就拷贝一个副本出来 一个思路是 # -*- coding: utf-8 -*- """ File : sqlalchemy_util.py Date : 2024-01-07 Author : Peng Shiyu …

循环队列的队空队满情况

有题目&#xff1a; 循环队列放在一维数组A[0....M-1]中&#xff0c;end1指向队头元素&#xff0c;end2指向队尾元素的后一个位置。假设队列两端均可进行入队和出队操作&#xff0c;队列中最多能容纳M-1个元素。初始时为空。下列判断队空和队满的条件中&#xff0c;正确的是 …

vim/vi 模式切换和常用快捷键

vim/vi 切换模式&#xff1a; vim/vi 常用快捷键&#xff1a; 一般模式&#xff1a; gg&#xff1a;文件开头、G&#xff1a;文件结尾 shift^ &#xff1a;光标当前行首、shift^&#xff1a;光标当前行尾 yy&#xff1a;复制、p&#xff1a;粘贴、dd&#xff1a;删除当前行、…

CRM的request管理笔记

1 request类型 request有两种&#xff0c;device request和link request。 link request link req是对link进行精确控制。 link req是对每个link的请求&#xff0c;比如某一帧是否需要bubble recovery、某一帧是否需要长曝光等feature。device request 对一个设备进行每帧控制…

JDK 11:崭新特性解析

JDK 11&#xff1a;崭新特性解析 JDK 11&#xff1a;崭新特性解析1. HTTP Client&#xff08;标准化&#xff09;示例代码 2. 局部变量类型推断的扩展示例代码 3. 新的字符串方法示例代码 4. 动态类文件常量示例代码 5. Epsilon 垃圾收集器使用方式 结语 JDK 11&#xff1a;崭新…

MySQL数据库进阶-事务

事务 事务由单独单元的一个或多个SQL语句组成&#xff0c;在这 个单元中&#xff0c;每个MySQL语句是相互依赖的。而整个单独单 元作为一个不可分割的整体&#xff0c;如果单元中某条SQL语句一 旦执行失败或产生错误&#xff0c;整个单元将会回滚。所有受到影 响的数据将返回到…

导致服务器重启的原因都有哪些,要如何处理

计算机重启主要作用是保存对系统的设置和修改以及立即启动相关服务等。重新启动的最通常的理由是因为新软件或硬件的安装需要&#xff0c;或因为应用软件因为一些理由没有回应。有时候&#xff0c;一些站长或GM在租用使用服务器过程&#xff0c;有时会遇到自动重启的情况。由于…

现有网络模型的使用及修改(VGG16为例)

VGG16 修改默认路径 import os os.environ[TORCH_HOME] rD:\Pytorch\pythonProject\vgg16 # 下载位置太大了&#xff08;140多G&#xff09;不提供直接下载 train_set torchvision.datasets.ImageNet(root./data_image_net, splittrain, downloadTrue, transformtorchvis…

数据库原理与应用期末复习试卷2

数据库原理技术与应用 一.单项选择题 设有属性A&#xff0c;B&#xff0c;C&#xff0c;D&#xff0c;以下表示中不是关系的是( C) ​ A、R(A) B、R(A, B, C, D) C、R&#xff08;AxBxCxD&#xff09; D、R(A&#xff0c;B) 在SQL语言中的视图VIEW是数据库的(A&#xff09;…