org.apache.flink.table.api.TableException: Sink does not exists

FlinkSQL_1.12_用DDL实现Kafka到MySQL的数据传输_实现按照条件进行过滤写入MySQL_flink从kafka拉取数据并过滤数据写入mysql_旧城里的阳光的博客-CSDN博客

参考这篇文章,写了kafka到mysql的代码例子,因为自己改了表结构,运行下面代码:

package org.test.flink;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;//TODO 用DDL实现Kafka到MySQL的数据传输
public class FlinkSQL15_SQL_DDL_Kafka_MySQL {public static void main(String[] args) throws Exception {//1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//2.使用DDL的方式加载数据--注册SourceTabletableEnv.executeSql("create table source_sensor(account_id  BIGINT)" +"with (" +"'connector.type' = 'kafka'," +"'connector.version' = 'universal'," +"'connector.topic' = 'testtopic'," +"'connector.properties.bootstrap.servers' = '11.0.24.216:9092'," +"'connector.properties.group.id' = 'bigdata1109'," +"'format.type' = 'json'"+ ")");Table table = tableEnv.sqlQuery("select * from source_sensor");//3.注册SinkTable:MysqltableEnv.executeSql("CREATE TABLE spend_report (\n" +"    account_id BIGINT,\n" +"    PRIMARY KEY (account_id) NOT ENFORCED)" +"with (" +"'connector' = 'jdbc'," +"'url' = 'jdbc:mysql://11.0.24.216:4306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false',"+"'table-name' = 'spend_report',"+"'username' = 'root',"+"'password' = '123456'"+ ")");//4.执行查询kafka数据
//        Table source_sensor = tableEnv.from("source_sensor");
//        //5.将数据写入Mysql
//        source_sensor.executeInsert("sink_sensor");
//table.executeInsert("sink_sensor");//6.执行任务env.execute();}
}

发现报错如下:

Exception in thread "main" org.apache.flink.table.api.TableException: Sink `default_catalog`.`default_database`.`sink_sensor` does not existsat org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:247)at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:159)at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)at scala.collection.Iterator.foreach(Iterator.scala:943)at scala.collection.Iterator.foreach$(Iterator.scala:943)at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)at scala.collection.IterableLike.foreach(IterableLike.scala:74)at scala.collection.IterableLike.foreach$(IterableLike.scala:73)at scala.collection.AbstractIterable.foreach(Iterable.scala:56)at scala.collection.TraversableLike.map(TraversableLike.scala:286)at scala.collection.TraversableLike.map$(TraversableLike.scala:279)at scala.collection.AbstractTraversable.map(Traversable.scala:108)at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:159)at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:554)at org.test.flink.FlinkSQL15_SQL_DDL_Kafka_MySQL.main(FlinkSQL15_SQL_DDL_Kafka_MySQL.java:50)

点击table.executeInsert看了下源码:

    /*** Writes the {@link Table} to a {@link TableSink} that was registered under the specified path,* and then execute the insert operation.** <p>See the documentation of {@link TableEnvironment#useDatabase(String)} or {@link* TableEnvironment#useCatalog(String)} for the rules on the path resolution.** <p>A batch {@link Table} can only be written to a {@code* org.apache.flink.table.sinks.BatchTableSink}, a streaming {@link Table} requires a {@code* org.apache.flink.table.sinks.AppendStreamTableSink}, a {@code* org.apache.flink.table.sinks.RetractStreamTableSink}, or an {@code* org.apache.flink.table.sinks.UpsertStreamTableSink}.** <p>Example:** <pre>{@code* Table table = tableEnv.fromQuery("select * from MyTable");* TableResult tableResult = table.executeInsert("MySink");* tableResult...* }</pre>** @param tablePath The path of the registered TableSink to which the Table is written.* @return The insert operation execution result.*/TableResult executeInsert(String tablePath);

 发现executeInsert方法的参数tablePath需要传入表名,这里的表名应该和

tableEnv.executeSql("create table source_sensor(account_id  BIGINT)"

的表名source_sensor一致。

将:

table.executeInsert("sink_sensor");

改成:

table.executeInsert("source_sensor");

后执行成功。

flink1.2的demo完整代码:flink-java-1.12.7: flink1.12.7的java demo,包括flink wordcount示例,如何连接kafka

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/79462.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ceph入门到精通-Macvlan网络模式

Docker中的Macvlan网络模式提供了一种将容器直接连接到宿主机网络的方式&#xff0c;使得容器可以拥有自己的MAC地址和与宿主机网络的直接连接。以下是使用Macvlan网络模式的一般步骤&#xff1a; 创建Macvlan网络&#xff1a; docker network create -d macvlan --subnet<s…

Java中的队列Queue

Queue(队列)是一种在计算机科学中常见的数据结构,它基于先进先出(FIFO)的原则,即最先进入队列的元素最先出队。在Java中,Queue是一个接口,定义了一组操作队列的方法,而具体的实现类可以选择性地实现这些方法。 以下是Queue的一些常见用途和操作: 添加元素: 使用off…

【openscreen】FrameList的插入

新版本也有此代码 E:\chromium\src\media\cast\test\receiver\frame_buffer.cc Framer 类的代码都没加锁。 FrameList :frame_id 和 FrameBuffer 的对应关系 typedef std::map<uint32, linked_ptr<FrameBuffer> > FrameList;Framer 管理 FrameList bool Framer::

淘宝双11数据分析与预测课程案例中(林子雨)错误点总结

问题一&#xff1a;可视化代码中男女买家各个年龄段对比散点图中数值不显示以及坐标不正确问题如下图 解决方法&#xff1a; 1修改坐标 2修改数值 修改后散点图 问题二&#xff1a;各省份的总成交量对比中地图显示不出来&#xff0c;因为该部分代码源码中没有需要自己编写…

Python3.11教程6:标准库简介1——os、shutil、sys、random、time、datetime、 threading

文章目录 一、 文件和目录处理模块1.1 os模块1.2 shutil 模块1.3 文件通配符glob1.4 stat 二、 sys模块2.1 命令行参数列表2.2 -c和-m选项2.3 argparse2.3.1 argparse使用逻辑2.3.2 add_argument()语法 三、 数学3.1 math3.2 random3.3 numpy生成随机数 四、 日期和时间4.1 tim…

排序算法-----插入排序

目录 前言&#xff1a; 插入排序 原理图 代码实现 分析总结 二分法插入排序 代码实现 前言&#xff1a; 嗨嗨^_^&#xff0c;米娜桑&#xff0c;今天我们继续学习排序算法中的插入排序&#xff0c;激不激动&#xff0c;兴不兴奋呢&#xff01;好了废话不多说&#xff0c;…

RFID服装管理系统改善零售供应链

随着时尚零售业的竞争日益激烈&#xff0c;RFID技术正快速地改变着服装管理的方式。我们将探讨RFID服装管理系统的核心优点&#xff0c;以及如何在零售供应链中充分利用它。 首先&#xff0c;让我们了解一下RFID技术是什么。RFID是一种无线通信技术&#xff0c;通过使用RFID标…

国家网络安全周 | 金融日,一起 get金融行业数据安全

2023国家网络安全宣传周 热度一直在持续&#xff01; 9月15日是国家网络安全宣传金融日。 目前随着国际形势愈发严峻&#xff0c;金融机构基础设施的全面数字化升级&#xff0c;带来了全新的安全问题。数据安全不单是技术问题&#xff0c;更是已经成为一个关系社会稳定发展的…

【算法专题突破】滑动窗口 - 水果成篮(13)

目录 1. 题目解析 2. 算法原理 3. 代码编写 写在最后&#xff1a; 1. 题目解析 题目链接&#xff1a;904. 水果成篮 - 力扣&#xff08;Leetcode&#xff09; 题目有很长一段话&#xff0c;但是我们读一遍题目可以提炼转化出题目的要求 &#xff1a; 其实就是找出一个最长…

怎么样把下载到的补丁集成到Win10 ISO镜像中?

怎么样把下载到的补丁集成到Win10 ISO镜像中&#xff1f; 我们需要使用Dism 命令来进行&#xff0c;Win10中已经内置dism 命令如下&#xff1a; 1、首先挂载镜像命令&#xff0c;其中盘符自己更改 dism /mount-wim /wimfile:"i:\install.wim" /index:1 /mountdir:&q…

数字IC验证23915--寄存器方法

文章目录 镜像值与期望值predication的分类自动预测显示预测 uvm_reg的访问方法寄存器健康检查![在这里插入图片描述](https://img-blog.csdnimg.cn/8b1832ab43854068970bb5a66d851d06.png) 镜像值与期望值 寄存器模型中的每一个寄存器&#xff0c;都应该有两个值&#xff0c;…

我的C#基础

using System; namespace HelloWorldApplication }TOC 欢迎使用Markdown编辑器 你好&#xff01; 这是你第一次使用 Markdown编辑器 所展示的欢迎页。 为帮助您在CSDN创作的文章获得更多曝光和关注&#xff0c;我们为您提供了专属福利&#xff1a; 已注册且未在CSDN平台发布过…

ESIM实战文本匹配

引言 今天我们来实现ESIM文本匹配&#xff0c;这是一个典型的交互型文本匹配方式&#xff0c;也是近期第一个测试集准确率超过80%的模型。 我们来看下是如何实现的。 模型架构 我们主要实现左边的ESIM网络。 从下往上看&#xff0c;分别是 输入编码层(Input Ecoding) 对前…

网工内推 | 网络安全工程师,上市公司,13薪,食宿有补贴

01 苏州奖多多科技有限公司 招聘岗位&#xff1a;网络安全工程师&#xff08;安服渗透&#xff09; 职责描述&#xff1a; 1、负责客户网络安全攻击入侵事件溯源分析、处置等工作&#xff1b; 2、根据攻击告警/入侵事件&#xff0c;进行取证调查&#xff0c;攻击溯源反制&…

.netcore 连接 apache doris

apache doris 兼容mysql协议&#xff1b;所以我们在.netcore项目中&#xff0c;可以使用Mysql的驱动 dotnet add package MySqlConnector 测试代码&#xff1a; [HttpGet]public async Task<string> Get2(){//打开连接await using var connection new MySqlConnectio…

入职美团近三个月,闲聊几句

校招入职美团近3个月&#xff0c;随便聊聊 今天和组内的小伙伴们团建来着&#xff0c;聊了很多&#xff0c;感触颇深&#xff0c;碎碎念一下。 作为组内的唯一的校招生&#xff0c;刚入职时面对复杂的业务&#xff0c;各种不熟悉的工具&#xff0c;真的是一脸懵。至少对我自己…

Jmeter系列-环境部署、详细介绍、安装目录介绍(1)

环境部署 官网下载Jmeter http://jmeter.apache.org/下载最新版本的 JMeter&#xff0c;解压文件到任意目录 安装JDK&#xff0c;配置Java环境 1、下载&#xff08;注意选择操作系统对应的位数32/64&#xff09; 官网 &#xff1a;http://www.oracle.com 2、安装&#xff0…

commons-io版本变动在windows环境下引发的NTFS ADS separator问题

起因 因业务需求&#xff0c;项目中引入了一个对方的业务jar包&#xff0c;但是发现代码却启动不起来了&#xff0c;报错&#xff1a; org.springframework.beans.PropertyBatchUpdateException; nested PropertyAccessExceptions (1) are: PropertyAccessException 1: org.s…

程序与保持健康的六个秘诀

虽然编程并不被视为是一个高危职业,但我们发现一大批数量惊人的开发人员正遭受健康问题的折磨。坐在办公桌很舒适,但有研究表明,它并不像你想象的那样健康。幸运的是,有很多非常容易做到的方法可以改善你的健康。 1.锻炼 尽管这可能是最明显的方法,但很多开发人员常常会…

变压器寿命预测(python代码,Logistic Regression模型预测效果一般,可以做对比实验)

1.数据来源官网&#xff1a;Data for: Root cause analysis improved with machine learning for failure analysis in power transformers - Mendeley Data 点Download All 10kb即可下载数据 2.下载下来后是这样 每一列的介绍&#xff1a; Hydrogen 氢气&#xff1b; Oxyge…