0基础学习PyFlink——使用PyFlink的Sink将结果输出到外部系统

在《0基础学习PyFlink——使用PyFlink的SQL进行字数统计》一文中,我们直接执行了Select查询操作,在终端中直接看到了查询结果。

select word, count(1) as `count` from source group by word;
+--------------------------------+----------------------+
|                           word |                count |
+--------------------------------+----------------------+
|                              A |                    3 |
|                              B |                    1 |
|                              C |                    2 |
|                              D |                    2 |
|                              E |                    1 |
+--------------------------------+----------------------+

在生产环境,我们往往要将计算结果保存到外部系统中,比如Mysql等。这个时候我们就要使用Sink。

Sink

Sink用于将Reduce结果输出到外部系统。它也是通过一个表(Table)来表示结构。这个和MapReduce思路中的Map很类似。

Print

为了简单起见,我们让Sink的表连接的外部系统是print。这样我们就可以在控制台上看到数据。

    # define the sinkmy_sink_ddl = """CREATE TABLE WordsCountTableSink (`word` STRING,`count` BIGINT,PRIMARY KEY (word) NOT ENFORCED) WITH ('connector' = 'print');"""t_env.execute_sql(my_sink_ddl).print()

这一步只能创建表和连接器,具体执行还要执行下一步

Execute

因为source和WordsCountTableSink是两张表,分别表示数据的输入和输出结构。如果要打通输入和输出,则需要将source表中的数据通过某些计算,插入到WordsCountTableSink表中。于是我们主要使用的是insert into指令。

    # execute insertmy_select_ddl = """insert into WordsCountTableSinkselect word, count(1) as `count`from sourcegroup by word"""t_env.execute_sql(my_select_ddl).wait()

完整代码如下

import argparse
import logging
import sysfrom pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment)def word_count(input_path):config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)# define the sourcemy_source_ddl = """create table source (word STRING) with ('connector' = 'filesystem','format' = 'csv','path' = '{}')""".format(input_path)t_env.execute_sql(my_source_ddl).print()tab = t_env.from_path('source')# define the sinkmy_sink_ddl = """CREATE TABLE WordsCountTableSink (`word` STRING,`count` BIGINT,PRIMARY KEY (word) NOT ENFORCED) WITH ('connector' = 'print');"""t_env.execute_sql(my_sink_ddl).print()# execute insertmy_select_ddl = """insert into WordsCountTableSinkselect word, count(1) as `count`from sourcegroup by word"""t_env.execute_sql(my_select_ddl).wait()if __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--input',dest='input',required=False,help='Input file to process.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)word_count(known_args.input)

执行命令如下

python sql_print.py --input input1.csv

输出结果如下

Using Any for unsupported type: typing.Sequence[~T]
No module named google.cloud.bigquery_storage_v1. As a result, the ReadFromBigQuery transform CANNOT be used with method=DIRECT_READ.
OK
OK
+I[A, 3]
+I[B, 1]
+I[C, 2]
+I[D, 2]
+I[E, 1]

因为使用的是批处理模式(in_batch_mode),我们看到Flink将所有数据计算完整成,成批的执行了新增操作(+代表新增)。这块对比我们将在后续将流处理时介绍区别。
附上input1.csv内容

"A",
"B",
"C",
"D",
"A",
"E",
"C",
"D",
"A",

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/115803.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python爬虫:ad广告引擎的模拟登录

⭐️⭐️⭐️⭐️⭐️欢迎来到我的博客⭐️⭐️⭐️⭐️⭐️ 🐴作者:秋无之地 🐴简介:CSDN爬虫、后端、大数据领域创作者。目前从事python爬虫、后端和大数据等相关工作,主要擅长领域有:爬虫、后端、大数据…

Mybatis 相关模块以及设计模式分析

一、缓存模块 MyBatis作为一个强大的持久层框架,缓存是其必不可少的功能之一,Mybatis中的缓存分为一级缓存和二级缓存。但本质上是一样的,都是使用Cache接口实现的。缓存位于 org.apache.ibatis.cache包下。 通过结构我们能够发现Cache其实使…

螺杆支撑座是如何维持精度和稳定性的?

螺杆支撑座是机械设备中重要的支撑元件,主要用于支撑和固定螺杆,以确保其精度和稳定性,以下是螺杆支撑座在实际使用中的优势: 1、良好的耐腐蚀性:螺杆支撑座通常采用防腐蚀材料制造,能够抵抗各种腐蚀性介质…

linux安装visual studio code

下载 https://code.visualstudio.com/ 下载.deb文件 安装 假如文件被下载到了 /opt目录下 进入Opt目录,右键从当前目录打开终端。 输入下面的安装命令。 sudo apt-get install ./code_1.83.1-1696982868_amd64.deb 安装成功。 配置 打开 visual studio cod…

【蓝桥每日一题]-动态规划 (保姆级教程 篇11)#方格取数2.0 #传纸条

目录 题目:方格取数 思路: 题目:传纸条 思路: 题目:方格取数 (跑两次) 思路: 如果记录一种方案后再去跑另一个方案,影响因素太多了,所以两个方案要同时开…

手搭手Ajax经典基础案例省市联动

环境介绍 技术栈 springbootmybatis-plusmysql 软件 版本 mysql 8 IDEA IntelliJ IDEA 2022.2.1 JDK 1.8 Spring Boot 2.7.13 mybatis-plus 3.5.3.2 pom.xml <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http:/…

分类预测 | MATLAB实现SSA-CNN-BiLSTM-Attention数据分类预测(SE注意力机制)

分类预测 | MATLAB实现SSA-CNN-BiLSTM-Attention数据分类预测&#xff08;SE注意力机制&#xff09; 目录 分类预测 | MATLAB实现SSA-CNN-BiLSTM-Attention数据分类预测&#xff08;SE注意力机制&#xff09;分类效果基本描述模型描述程序设计参考资料 分类效果 基本描述 1.MAT…

Java中的常量管理:接口还是枚举,您如何选择?

在企业项目开发中&#xff0c;我们经常会需要设计一些常量的定义&#xff0c;两种常见的方式用于常量管理&#xff1a;接口和枚举。他们分别适用于什么场景了&#xff1f; 使用接口管理常量 接口中的常量通常具有以下特点&#xff1a; 通过 public static final 修饰&#x…

认识哈希表和哈希表的实现

哈希函数的定义&#xff1a; out f(in) 1&#xff09;in -> ∞&#xff0c; out -> S 输入域是无穷的&#xff0c;输出域是有限的&#xff0c;也就是S域&#xff1b; 2&#xff09;相同的输入一定会得到相同的输出&#xff1b; 3&#xff09;不同的输入可能会有相同…

Swift 判断 A B 两个时间是不是同一天,A 是不是 B 的昨天

1. 今天要做这个效果&#xff08;在时间旁边显示今天&#xff0c;昨天&#xff09; 2. Preview 3. Code: // 添加 今天 昨天 func show_today_yesterday(d: Date Date()) -> String {let calendar Calendar.currentlet today: Date Date()if calendar.isDate(today, inS…

C++DAY50

源文件代码 #include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this);if(!db.contains()){db QSqlDatabase::addDatabase("QSQLITE");db.setDatabaseName(&q…

用来生成二维矩阵的dcgan

有大量二维矩阵作为样本&#xff0c;为连续数据。数据具有空间连续性&#xff0c;因此用卷积网络&#xff0c;通过dcgan生成二维矩阵。因为是连续变量&#xff0c;因此损失采用nn.MSELoss()。 import torch import torch.nn as nn import torch.optim as optim import numpy a…

QTday05(TCP的服务端客户端通信)

实现聊天室功能 服务端代码&#xff1a; pro文件需要导入 network 头文件&#xff1a; #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QTcpServer>//服务端 #include <QTcpSocket>//客户端 #include <QList> #include <QMes…

超声波清洗机频率如何选择?高频和低频有什么区别

超声波清洗原理就是在清洗液中产生“空化效应”&#xff0c;即清洗液产生拉伸和压缩现象&#xff0c;清洗液拉伸时会产生大量微小气泡&#xff0c;清洗液压缩时气泡会被压碎破裂。这些气泡产生和破裂的局部压强可达到上千个大气压的冲击力&#xff0c;这种极强大的压力足以使得…

Leetcode 1089. 复写零

复写零 题目链接1089. 复写零 给你一个长度固定的整数数组 arr &#xff0c;请你将该数组中出现的每个零都复写一遍&#xff0c;并将其余的元素向右平移。 注意&#xff1a;请不要在超过该数组长度的位置写入元素。请对输入的数组 就地 进行上述修改&#xff0c;不要从函数返回…

网络协议--UDP:用户数据报协议

11.1 引言 UDP是一个简单的面向数据报的运输层协议&#xff1a;进程的每个输出操作都正好产生一个UDP数据报&#xff0c;并组装成一份待发送的IP数据报。这与面向流字符的协议不同&#xff0c;如TCP&#xff0c;应用程序产生的全体数据与真正发送的单个IP数据报可能没有什么联…

并查集学习笔记

在一些有 n n n 个元素的集合应用问题中&#xff0c;我们通常是在开始时让每个元素构成一个单元素的集合&#xff0c;我们通常是在开始时让每个元素构成一个单元素的集合&#xff0c;然后按一定顺序将属于同一族的元素所在的集合合并&#xff0c;期间要反复查找一个元素在哪个…

老卫带你学---leetcode刷题(56. 合并区间)

56. 合并区间 问题&#xff1a; 以数组 intervals 表示若干个区间的集合&#xff0c;其中单个区间为 intervals[i] [starti, endi] 。请你合并所有重叠的区间&#xff0c;并返回 一个不重叠的区间数组&#xff0c;该数组需恰好覆盖输入中的所有区间 。 示例 1&#xff1a;输…

一分钟带你了解什么是0day攻击什么是Nday攻击

1. 什么是零日漏洞 零日攻击是指利用零日漏洞对系统或软件应用发动的网络攻击。 零日漏洞也称零时差漏洞&#xff0c;通常是指还没有补丁的安全漏洞。由于零日漏洞的严重级别通常较高&#xff0c;所以零日攻击往往也具有很大的破坏性。目前&#xff0c;任何安全产品或解决方案…

吃瓜教程2|线性模型

线性回归 “广义的线性模型”&#xff08;generalized linear model&#xff09;&#xff0c;其中&#xff0c;g&#xff08;*&#xff09;称为联系函数&#xff08;link function&#xff09;。 线性几率回归&#xff08;逻辑回归&#xff09; 线性判别分析 想让同类样本点的…