1. Flink自定义Source

一. Source 简介

DataStream是Flink的低级API,用于进行数据的实时处理,Flink编程模型分为Source、Transformation、Sink三个部分,如下图所示。
在这里插入图片描述

默认Flink提供了大量的内置Source,常见的Source如下:

  • 基于文件的Source
  • 基于Socket的Source
  • 基于集合的Source
  • 基于Kafka消息队列的Source

当以上内置Source不能满足业务需要时,可以实现自定义Source。

Flink中有关Source的接口类的继承关系如下:
在这里插入图片描述

  • SourceFunction:单并行度Source的基类
  • RichSourceFunction:单并行度增强型Source的基类
  • ParallelSourceFunction:多并行度Source的基类
  • RichParallelSourceFunction:多并行度增强型Source的基类
二. 自定义单并行度Source

自定义单并行度的source需要实现SourceFunction接口。

代码实现

MySource.java

package flink.basic.source;import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;public class MySource implements SourceFunction<String> {boolean running = true;@Overridepublic void run(SourceContext<String> ctx) throws Exception {Random random = new Random();while (running) {// "Num"加上0~100的随机数生成一个字符串ctx.collect("Num: " + random.nextInt(100));Thread.sleep(1000);}}@Overridepublic void cancel() {running = false;}
}

SourceDemo.java

package flink.basic.source;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.addSource(new MySource());source.print();env.execute("source_demo");}
}

运行结果

5> Num: 62
6> Num: 91
7> Num: 13
8> Num: 53
三. 自定义多并行度Source

自定义多并行度的source需要实现ParallelSourceFunction接口。

代码实现

MyParallelSource.java

package flink.basic.source;import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import java.util.Random;public class MyParallelSource implements ParallelSourceFunction<String> {boolean running = true;@Overridepublic void run(SourceContext<String> ctx) throws Exception {Random random = new Random();while (running) {ctx.collect("Num: " + random.nextInt(100));Thread.sleep(1000);}}@Overridepublic void cancel() {running = false;}
}

SourceDemo.java

package flink.basic.source;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.addSource(new MyParallelSource());source.print();env.execute("source_demo");}
}

运行结果

7> Num: 43
8> Num: 30
1> Num: 92
2> Num: 50
5> Num: 39
6> Num: 6
4> Num: 20
3> Num: 2
四. 自定义单并行度增强型Source

增强型Source额外提供了open和close方法,可以用于自定义Source的初始化和清理工作。单并行度增强型Source需要实现RichSourceFunction接口。下面演示实现读取mysql表的单并行度Source。

在mysql中创建student表,并插入三条数据。

create table student (id int primary key,name varchar(50),age int
);insert into student values(1, "name1", 20),(2, "name2", 30), (3, "name3", 15);

实现代码

Student.java

package flink.basic.source;public class Student {private int id;private String name;private int age;public Student(int id, String name, int age) {this.id = id;this.name = name;this.age = age;}public Student() {}public int getId() {return id;}public void setId(int id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}@Overridepublic String toString() {return "Student{" +"id=" + id +", name='" + name + '\'' +", age=" + age +'}';}
}

MysqlSource.java

package flink.basic.source;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;public class MysqlSource extends RichSourceFunction<Student> {Connection conn;Statement stmt;@Overridepublic void open(Configuration parameters) throws Exception {Class.forName("com.mysql.cj.jdbc.Driver");String url = "jdbc:mysql://192.168.47.130:3306/test";String user = "root";String password = "root";conn = DriverManager.getConnection(url,user,password);stmt = conn.createStatement();}@Overridepublic void run(SourceContext<Student> ctx) throws Exception {ResultSet rs = stmt.executeQuery("select * from student");while (rs.next()) {int id = rs.getInt("id");String name = rs.getString("name");int age = rs.getInt("age");ctx.collect(new Student(id, name, age));}rs.close();}@Overridepublic void cancel() {}@Overridepublic void close() throws Exception {stmt.close();conn.close();}
}

SourceDemo.java

package flink.basic.source;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();// 添加mysql SourceDataStreamSource<Student> source = env.addSource(new MysqlSource());source.print();env.execute("source_demo");}
}

运行结果

1> Student{id=3, name='name3', age=15}
8> Student{id=2, name='name2', age=30}
7> Student{id=1, name='name1', age=20}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/889616.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Yolov8界面可视化

本教程使用的是Pyside6 1、安装PySide6模块 pip install pyside6 安装完成之后&#xff0c;会有一个designer.exe可执行文件&#xff0c;打开之后&#xff0c;我们可以通过拖拉拽的方式来布局我们的界面。 designer.exe文件位置&#xff0c;一般位于当前虚拟环境下面的路径…

谷粒商城—分布式高级①.md

1. ELASTICSEARCH 1、安装elastic search dokcer中安装elastic search (1)下载ealastic search和kibana docker pull elasticsearch:7.6.2 docker pull kibana:7.6.2(2)配置 mkdir -p /mydata/elasticsearch/config mkdir -p /mydata/elasticsearch/data echo "h…

系统性能优化

一、概述 性能优化的目标&#xff1a;是提高系统或应用程序的响应时间、吞吐量、cpu、内存、磁盘IO、网络、流量、JVM、Tomcat、DB等方面的性能指标。 性能优化需要有一些技巧&#xff1a;对于整个产品或项目而言&#xff0c;比如可以从前端优化、后端优化、架构优化、高并发…

基于STM32设计的粮食仓库(粮仓)环境监测系统

一、前言 当前项目使用的相关软件工具、传感器源代码工程已经上传到网盘&#xff08;实时更新项目内容&#xff09;&#xff1a;https://ccnr8sukk85n.feishu.cn/wiki/QjY8weDYHibqRYkFP2qcA9aGnvb?fromfrom_copylink 1.1 项目开发背景 随着现代农业的发展和粮食储存规模的…

基于STM32的智能导盲/智能拐杖系统

基于STM32的智能导盲/智能拐杖系统 持续更新&#xff0c;欢迎关注!!! ** 基于STM32的智能导盲/智能拐杖系统 ** 据统计&#xff0c;全球视障人士的数量已经超过2.5亿&#xff0c;其中大部分人需要一种有效的辅助器具来帮助他们感知周围环境&#xff0c;安全行走。 近年来&am…

关于idea-Java-servlet-Tomcat-Web开发中出现404NOT FOUND问题的解决

在做web项目时&#xff0c;第一次使用servlet开发链接前端和后端的操作&#xff0c;果不其然&#xff0c;遇到了诸多问题&#xff0c;而遇到最多的就是运行项目打开页面时出现404NOT FOUND的情况。因为这个问题我也是鼓捣了好久&#xff0c;上网查了许多资料才最终解决&#xf…

【数据结构——栈与队列】链栈的基本运算(头歌实践教学平台习题)【合集】

目录&#x1f60b; 任务描述 相关知识 测试说明 我的通关代码: 测试结果&#xff1a; 任务描述 本关任务&#xff1a;编写一个程序实现链栈的基本运算。 相关知识 为了完成本关任务&#xff0c;你需要掌握&#xff1a; 初始化栈、销毁栈、判断栈是否为空、进栈、出栈、取栈…

与 Cursor AI 对话编程:2小时开发报修维修微信小程序

本文记录了如何通过与 Cursor AI 对话&#xff0c;全程不写一行代码的情况下&#xff0c;完成一个完整的报修小程序。整个过程展示了 AI 如何帮助我们&#xff1a; 生成代码 、解决问题、优化实现、完善细节。 先看一下效果图&#xff1a; 一、项目配置 首先我是这样和 AI 对…

基于windows环境使用nvm安装多版本nodejs

目录 前言 一、卸载node 二、nvm是什么&#xff1f; 三、nvm安装 1.官网下载 nvm 包 2. 安装nvm-setup.exe 3. 配置路径和下载镜像 4. 检查安装是否完成 四、 使用nvm安装node 五、修改npm默认镜像源为淘宝镜像 六、环境变量配置 1. 新建目录 2. 设置环境变量 七…

MVP模式的理解和实践

MVP&#xff08;Model-View-Presenter&#xff09;模式是一种用于组织代码的架构模式&#xff0c;主要用于用户界面的开发。它通过将应用程序的三个主要组件分开&#xff0c;提高了应用的可维护性和可测试性。本文将详细介绍MVP模式的理解和实践&#xff0c;并通过Java语言提供…

在Liunx中安装JDK、Tomcat、mysql、lrzsz、Nginx

一.软件安装方式 在Linux系统中&#xff0c;安装软件的方式主要有四种&#xff0c;这四种安装方式的特点如下&#xff1a; 二.安装JDK 上述我们介绍了Linux系统软件安装的四种形式&#xff0c;接下来我们就通过第一种(二进制发 布包)形式来安装JDK。 在/下创建soft目录&…

神经网络基础-初识神经网络

人工神经网络&#xff08; Artificial Neural Network&#xff0c; 简写为ANN&#xff09;也简称为神经网络&#xff08;NN&#xff09;&#xff0c;是一种模仿生物神经网络结构和功能的计算模型。人脑可以看做是一个生物神经网络&#xff0c;由众多的神经元连接而成。各个神经…

Python中PyTorch详解

文章目录 Python中PyTorch详解一、引言二、PyTorch核心概念1、张量&#xff08;Tensor&#xff09;1.1、创建张量1.2、张量操作 2、自动求导&#xff08;Autograd&#xff09;2.1、自动求导示例 三、构建神经网络1、使用nn模块2、优化器&#xff08;Optimizer&#xff09; 四、…

云服务器挖矿程序占用资源处理

云服务器挖矿程序占用资源处理 文章目录 云服务器挖矿程序占用资源处理top查看服务器后台运行情况关闭病毒删除病毒文件top 云服务器通过手机短信发送了多次预警&#xff0c;疑似出现挖矿程序&#xff0c;登录口令可能已经被暴力破解。处理方法是立即更改口令&#xff0c;然后处…

电脑文件夹打不开了,能打开但是会闪退,提示“找不到iUtils.dll”是什么原因?

电脑运行时常见问题解析&#xff1a;文件夹打不开、闪退及“找不到iUtils.dll”报错 在使用电脑的过程中&#xff0c;我们可能会遇到文件夹打不开、软件闪退或系统报错等问题&#xff0c;特别是提示“找不到iUtils.dll”的报错&#xff0c;更是让人困惑不已。今天我将为大家详…

【教程】让Jupyter支持打开CSV和Excel(xlsx)文件

转载请注明出处&#xff1a;小锋学长生活大爆炸[xfxuezhagn.cn] 如果本文帮助到了你&#xff0c;欢迎[点赞、收藏、关注]哦~ 支持CSV JupyterLab本身支持直接打开CSV文件&#xff0c;因此只需要在JupyterLab的文件浏览器中找到CSV文件并双击它&#xff0c;就可以在JupyterLab的…

自动驾驶域控制器简介

汽车智能驾驶功能持续高速渗透&#xff0c;带来智能驾驶域控制器市场空间快速增 长。智驾域控制器是智能驾驶决策环节的重要零部件&#xff0c;主要功能为处理感知 信息、进行规划决策等。其核心部件主要为计算芯片&#xff0c;英伟达、地平线等芯 片厂商市场地位突出。随着消费…

计算机网络-传输层 TCP协议(上)

目录 报头结构 TCP的可靠传输机制 核心机制一&#xff1a;确认应答 TCP的序号和确认序号 核心机制二&#xff1a;丢包重传 核心机制三&#xff1a;连接管理 建立连接-三次握手 断开连接-四次挥手 核心机制四&#xff1a;滑动窗口 数据包已经抵达, ACK被丢了 数据包就…

5.2章节python字符串的格式化三种方式

在Python中&#xff0c;格式化字符串是编程中常见的任务&#xff0c;它用于将变量或表达式的值嵌入到字符串中。以下是三种常见的格式化字符串的方式&#xff1a; 1.百分号&#xff08;%&#xff09;格式化&#xff1a; 这是Python早期版本中常用的字符串格式化方法。通过在字…

【经验分享】容器云运维的知识点

最近忙于备考没关注&#xff0c;有次点进某小黄鱼发现首页出现了我的笔记还被人收费了 虽然我也卖了一些资源&#xff0c;但我以交流、交换为主&#xff0c;笔记都是免费给别人看的 由于当时刚刚接触写的并不成熟&#xff0c;为了避免更多人花没必要的钱&#xff0c;所以决定公…