flink:通过table api把文件中读取的数据写入MySQL

当写入数据到外部数据库时,Flink 会使用 DDL 中定义的主键。如果定义了主键,则连接器将以 upsert 模式工作,否则连接器将以 append 模式工作

package cn.edu.tju.demo2;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.*;
import org.apache.flink.types.Row;public class Test41 {//demo 是MySQL中已经创建好的表//create table demo (userId varchar(50) not null,total bigint,avgVal double);private static String FILE_PATH = "info.txt";public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.connect(new FileSystem().path(FILE_PATH)).withFormat(new Csv()).withSchema(new Schema().field("userId", DataTypes.VARCHAR(50)).field("ts", DataTypes.INT()).field("val", DataTypes.DOUBLE())).createTemporaryTable("input");Table dataTable = tableEnv.from("input");Table aggregateTable = dataTable.groupBy("userId").select("userId, userId.count as total, val.avg as avgVal");String sql="create table jdbcOutputTable (" +" userId varchar(50) not null,total bigint,avgVal double " +") with (" +" 'connector.type' = 'jdbc', " +" 'connector.url' = 'jdbc:mysql://xx.xx.xx.xx:3306/test', " +" 'connector.table' = 'demo', " +" 'connector.driver' = 'com.mysql.jdbc.Driver', " +" 'connector.username' = 'root', " +" 'connector.password' = 123456' )";tableEnv.sqlUpdate(sql);aggregateTable.insertInto("jdbcOutputTable");tableEnv.execute("my job");}
}

文件info.txt

user1,1680000890,31.6
user2,1681111900,38.3
user1,1680000890,34.9

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/738936.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

portainer管理远程docker和docker-swarm集群

使用前请先安装docker和docker-compose,同时完成docker-swarm集群初始化 一、portainer-ce部署 部署portainer-ce实时管理本机docker,使用docker-compose一键拉起 docker-compose.yml version: 3 services:portainer:container_name: portainer#imag…

LeetCode 26. 删除有序数组中的重复项。(通过JavaScript实现)

给你一个 非严格递增排列 的数组 nums ,请你 原地 删除重复出现的元素,使每个元素 只出现一次 ,返回删除后数组的新长度。元素的 相对顺序 应该保持 一致 。然后返回 nums 中唯一元素的个数。 考虑 nums 的唯一元素的数量为 k ,你…

2-Docker-应用-多容器部署Django+Vue项目(nginx+uwsgi+mysql)

摘要: 本文详细介绍了如何使用Docker部署一个多容器DjangoVue项目,包括nginx、uwsgi和mysql。文章内容涵盖了基础知识回顾、需求分析、设计方案、实现步骤、技巧与实践、性能优化与测试、常见问题与解答以及结论与展望。 阅读时长:约60分钟…

要将这个本地HTML文件转换为图片

要将这个本地HTML文件转换为图片,您可以按照以下步骤操作: 1. 打开文件: - 使用您的网页浏览器(如Google Chrome、Mozilla Firefox、Microsoft Edge等)打开这个本地HTML文件。您可以通过在浏览器地址栏中输入文件的…

制冷系统管道焊接气焊安全操作

气焊操作安全教育: 1、检查气焊用具完好牢固无损,不得贴粘有(机油); 2、气瓶余压(2KG)停止使用;清除动火 10 米范围内易燃易爆物料; 3、库房内动火要 做好通风排气&…

python基础——函数进阶【多个返回值,多种传参方式,匿名函数】

📝前言: 这篇文章主要记录一下在python中,关于函数的进阶常用知识,建议对编程中的函数有一定了解的读者阅读,如果想了解一下函数的最基础内容,也可先看这篇文章C语言——函数 在本文,我会主要讲…

ETAS入门篇-1、ISOLAR-A软件介绍

目录 ISOLAR-A介绍 体系结构 特性 架构 Explorers和Views 编辑 Editors

ORM(对象关系映射)的概念,并说明在Python中如何使用

ORM(对象关系映射)的概念,并说明在Python中如何使用 ORM(对象关系映射)是一种编程技术,它实现了将关系型数据库中的数据映射到程序中的对象模型,使得开发者能够使用面向对象的方式来操作数据…

海格里斯HEGERLS智能托盘四向车系统为物流仓储自动化升级提供新答案

随着实体企业面临需求多样化、订单履行实时化、商业模式加速迭代等挑战,客户对物流仓储解决方案的需求也逐渐趋向于柔性化、智能化。作为近十年来发展起来的新型智能仓储设备,四向车系统正是弥补了先前托盘搬运领域柔性解决方案的空白。随着小车本体设计…

1051:分段函数

【题目描述】 编写程序&#xff0c;计算下列分段函数yf(x)的值。结果保留到小数点后三位。 y−x2.5;0≤x<5 y2−1.5(x−3)(x−3);5≤x<10 yx/2−1.5;10≤x<20 【输入】 一个浮点数N(0≤N<20)。 【输出】 输出 N 对应的分段函数值&#xff1a;f(N)。结果保留到…

仅需 5% 训练样本达到最优性能,清华大学研究团队发布条件去噪扩散模型 SPDiff,实现长程人流移动模拟

人流移动模拟 (Crowd Simulation) 是在特定情境中模拟大量人员移动的过程。这项技术主要应用于计算机游戏、城市规划、建筑设计以及交通组织等领域。例如&#xff0c;模拟人群在不同条件&#xff08;如人群密度、流量等&#xff09;下在建筑物内的移动&#xff0c;帮助决策者评…

sql server 如何创建触发器

要在 SQL Server 中新增触发器&#xff0c;您需要使用 CREATE TRIGGER 语句。以下是具体的步骤和示例&#xff1a; 确定触发器逻辑&#xff1a; 首先&#xff0c;确定触发器应该在表的何时触发&#xff0c;以及触发时应该执行哪些逻辑操作。这包括确定触发器是在数据插入、更新…

Enshrouded/雾锁王国服务器配置选择要求,CPU内存带宽

雾锁王国/Enshrouded服务器CPU内存配置如何选择&#xff1f;阿里云服务器网aliyunfuwuqi.com建议选择8核32G配置&#xff0c;支持4人玩家畅玩&#xff0c;自带10M公网带宽&#xff0c;1个月90元&#xff0c;3个月271元&#xff0c;幻兽帕鲁服务器申请页面 https://t.aliyun.com…

Linux命令-cupsdisable命令(停止指定的打印机)

说明 cupsdisable命令 用于停止指定的打印机。 语法 cupsdisable(选项)(参数)选项 -E&#xff1a;当连接到服务器时强制使用加密&#xff1b; -U&#xff1a;指定连接服务器时使用的用户名&#xff1b; -u&#xff1a;指定打印任务所属的用户&#xff1b; -c&#xff1a;取…

勾八头歌之数据科学导论—数据预处理

第1关&#xff1a;引言-根深之树不怯风折&#xff0c;泉深之水不会涸竭 第2关&#xff1a;数据清理-查漏补缺 import numpy as np import pandas as pd import matplotlib.pyplot as pltdef student():# Load the CSV file and replace #NAME? with NaNtrain pd.read_csv(Tas…

精通 Python 装饰器:代码复用与功能增强技巧

精通 Python 装饰器&#xff1a;代码复用与功能增强技巧 引言装饰器基础装饰器的定义基本装饰器的实现方法理解 符号的用法简单装饰器示例代码 使用装饰器增强函数功能日志记录性能测试事务处理小结 装饰器进阶应用管理用户认证缓存机制的实现参数化装饰器的创建和应用多个装饰…

智慧公厕的意义:高效智能的公共厕所运营、服务、协作管理

现代城市的发展离不开智慧技术的引领&#xff0c;而智慧公厕作为城市基础设施的重要组成部分&#xff0c;正在逐渐展现其巨大的意义和价值。通过采用智能管理系统&#xff0c;智慧公厕实现了更高效的管理、更贴心的服务和更协同的业务流程。本文以智慧公厕源头实力厂家广州中期…

【猫头虎科技角】深入Drools:规则引擎的艺术与实践

博主猫头虎的技术世界 &#x1f31f; 欢迎来到猫头虎的博客 — 探索技术的无限可能&#xff01; 专栏链接&#xff1a; &#x1f517; 精选专栏&#xff1a; 《面试题大全》 — 面试准备的宝典&#xff01;《IDEA开发秘籍》 — 提升你的IDEA技能&#xff01;《100天精通鸿蒙》 …

UI学习 一 可访问性基础

教程&#xff1a;Accessibility – Material Design 3 需要科学上网&#xff0c;否则图片显示不出来。设计教程没有图片说明&#xff0c;不容易理解。 优化UI方向 清晰可见的元素足够的对比度和尺寸重要性的明确等级一眼就能辨别的关键信息 传达某一事物的相对重要性 将重…

数据库乐观锁

目录 数据库中实现乐观锁的示例&#xff1a;实现乐观锁时&#xff0c;常见的错误主要包括以下几个方面&#xff1a; 乐观锁&#xff08;Optimistic Locking&#xff09;是一种在数据库系统中用于解决并发问题的技术。它假设多个事务在并发执行时不会彼此冲突&#xff0c;只有在…