Flink自定义Source模拟数据流

maven依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.zxl</groupId><artifactId>FlinkJoin</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.22</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.27</version></dependency><!--com.mysql.cj.jdbc.Driver--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!--这个一定要加,否则会报错(5001好像是,记不清了)--><dependency><groupId>org.glassfish.jersey.inject</groupId><artifactId>jersey-hk2</artifactId><version>2.34</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.0</version><scope>compile</scope></dependency></dependencies>
</project>

实体类

订单类

package com.zxl.bean;// TODO: 2024/1/6 订单类 public class Orders {//订单IDprivate Long order_id;//用户IDprivate Long user_id;//订单日期private Long order_date;//订单金额private Integer order_amount;//商品IDprivate Integer product_id;//订单数量private Long order_num;public Long getOrder_id() {return order_id;}public void setOrder_id(Long order_id) {this.order_id = order_id;}public Long getUser_id() {return user_id;}public void setUser_id(Long user_id) {this.user_id = user_id;}public Long getOrder_date() {return order_date;}public void setOrder_date(Long order_date) {this.order_date = order_date;}public Integer getOrder_amount() {return order_amount;}public void setOrder_amount(Integer order_amount) {this.order_amount = order_amount;}public Integer getProduct_id() {return product_id;}public void setProduct_id(Integer product_id) {this.product_id = product_id;}public Long getOrder_num() {return order_num;}public void setOrder_num(Long order_num) {this.order_num = order_num;}public Orders() {}public Orders(Long order_id, Long user_id, Long order_date, Integer order_amount, Integer product_id, Long order_num) {this.order_id = order_id;this.user_id = user_id;this.order_date = order_date;this.order_amount = order_amount;this.product_id = product_id;this.order_num = order_num;}@Overridepublic String toString() {return "Orders{" +"order_id=" + order_id +", user_id=" + user_id +", order_date=" + order_date +", order_amount=" + order_amount +", product_id=" + product_id +", order_num=" + order_num +'}';}
}

支付类

package com.zxl.bean;// TODO: 2024/1/6 支付类 public class Payments {//支付IDprivate Long payment_id;//订单号private Long order_id;//支付金额private Integer payment_amount;//支付类型private String payment_type;//支付日期private Long payment_date;public Long getPayment_id() {return payment_id;}public void setPayment_id(Long payment_id) {this.payment_id = payment_id;}public Long getOrder_id() {return order_id;}public void setOrder_id(Long order_id) {this.order_id = order_id;}public Integer getPayment_amount() {return payment_amount;}public void setPayment_amount(Integer payment_amount) {this.payment_amount = payment_amount;}public String getPayment_type() {return payment_type;}public void setPayment_type(String payment_type) {this.payment_type = payment_type;}public Long getPayment_date() {return payment_date;}public void setPayment_date(Long payment_date) {this.payment_date = payment_date;}public Payments() {}public Payments(Long payment_id, Long order_id, Integer payment_amount, String payment_type, Long payment_date) {this.payment_id = payment_id;this.order_id = order_id;this.payment_amount = payment_amount;this.payment_type = payment_type;this.payment_date = payment_date;}@Overridepublic String toString() {return "payments{" +"payment_id=" + payment_id +", order_id=" + order_id +", payment_amount=" + payment_amount +", payment_type='" + payment_type + '\'' +", payment_date=" + payment_date +'}';}
}

商品类

用作维表测试

package com.zxl.bean;// TODO: 2024/1/6 商品类public class Products {//商品IDprivate Integer product_id;//商品名称private String product_name;//商品价格private Integer product_price;//商品库存private Long product_num;//商品分类private String product_type;public Integer getProduct_id() {return product_id;}public void setProduct_id(Integer product_id) {this.product_id = product_id;}public String getProduct_name() {return product_name;}public void setProduct_name(String product_name) {this.product_name = product_name;}public Integer getProduct_price() {return product_price;}public void setProduct_price(Integer product_price) {this.product_price = product_price;}public Long getProduct_num() {return product_num;}public void setProduct_num(Long product_num) {this.product_num = product_num;}public String getProduct_type() {return product_type;}public void setProduct_type(String product_type) {this.product_type = product_type;}public Products() {}public Products(Integer product_id, String product_name, Integer product_price, Long product_num, String product_type) {this.product_id = product_id;this.product_name = product_name;this.product_price = product_price;this.product_num = product_num;this.product_type = product_type;}@Overridepublic String toString() {return "products{" +"product_id=" + product_id +", product_name='" + product_name + '\'' +", product_price=" + product_price +", product_num=" + product_num +", product_type='" + product_type + '\'' +'}';}
}

数据生成

订单数据生成

package com.zxl.datas;import com.zxl.bean.Orders;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Random;public class OrdersData implements SourceFunction<Orders> {private static Random random = new Random();private static boolean isRunning = true;private static Integer num = 0;//订单IDprivate static Long getOrder_id() {num++;long aLong = Long.parseLong(num.toString());return aLong;}//订单日期private static Long getOrder_date() {//为了模拟数据延迟所里利用随机数进行模拟时间int i = random.nextInt(15);return Long.valueOf(i);}//用户IDprivate static Long getUser_id() {return random.nextLong();}//订单金额private static Integer getOrder_amount() {return random.nextInt(100);}//商品IDprivate static Integer getProduct_id() {return random.nextInt(100);}//订单数量private static Long getOrder_num() {return random.nextLong();}//订单类private static Orders getOrders() {Orders orders = new Orders(getOrder_id(), getUser_id(), getOrder_date(), getOrder_amount(), getProduct_id(), getOrder_num());return orders;}@Overridepublic void run(SourceContext<Orders> sourceContext) throws Exception {while (isRunning) {sourceContext.collect(getOrders());Thread.sleep(1000);}}@Overridepublic void cancel() {isRunning = false;}
}

支付数据生成

package com.zxl.datas;import com.zxl.bean.Payments;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Date;
import java.util.Random;public class PaymentData implements SourceFunction<Payments> {private static Random random = new Random();private static boolean isRunning = true;private static Integer num = 0;//支付IDprivate static Long getPayment_id(){return random.nextLong();}//订单IDprivate static Long getOrder_id() {num++;long aLong = Long.parseLong(num.toString());return aLong;}//支付金额private static Integer getPayment_amount(){return random.nextInt(1000);}//支付类型private static String getPayment_type(){String[] type = {"银行卡", "支付宝", "微信", "美团", "抖音", "现金"};int are= random.nextInt(6);String area=type[are];return area;}//支付日期private static Long getPayment_date(){//为了模拟数据延迟所里利用随机数进行模拟时间int i = random.nextInt(15);return Long.valueOf(i);}//支付类private static Payments getPayments(){Payments payments = new Payments(getPayment_id(),getOrder_id(),getPayment_amount(),getPayment_type(),getPayment_date());return payments;}@Overridepublic void run(SourceContext<Payments> sourceContext) throws Exception {while (isRunning) {sourceContext.collect(getPayments());Thread.sleep(1000);}}@Overridepublic void cancel() {isRunning = false;}
}

测试数据打印

package com.zxl.flink;import com.zxl.bean.Orders;
import com.zxl.bean.Payments;
import com.zxl.datas.OrdersData;
import com.zxl.datas.PaymentData;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class flinkWorks {public static void main(String[] args) throws Exception {//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据 DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());// TODO: 2024/1/6 支付数据 DataStreamSource<Payments> paymentsDataStreamSource = environment.addSource(new PaymentData());//打印数据paymentsDataStreamSource.print();ordersDataStreamSource.print();//启动程序environment.execute();}
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/605039.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

OpenCV | 光流估计

光流估计 光流是空间运动物体在观测成像平面上的像素运动的“瞬时速度”&#xff0c;根据各个像素点的速度的速度矢量特征&#xff0c;可以对图像进行动态分析&#xff0c;例如目标跟踪 高度恒定&#xff1a;同一点随着时间的变化&#xff0c;其亮度不会发生改变。小运动&…

ProtoBuf一些踩坑记录

一、Protobuf学习基础 学习的资料很多也很全&#xff0c;这里添加几个链接进行Protobuf的基础学习的链接&#xff0c;链接中的案例使用C编辑&#xff1a; 链接&#xff1a;Protobuf介绍及简单使用(上&#xff09;_google_protobuf_version-CSDN博客 Protobuf介绍及简单使用(下&…

MFC综合实验二学习记录

文章目录 如何确定消息映射宏中命令对应的菜单项资源虚函数和纯虚函数的区别&#xff1f;MFC中什么是UPDATE_COMMAND_UI 消息如何查看控件对应的成员变量模态对话框的理解HGDIOBJ" 类型的值不能用于初始化 "CBrush *" 类型的实体错误MFC编程中CDC类型和HDC类型有…

TOP 9 安卓手机系统和应用程序修复工具,可修复各种Android 系统问题

您的新 Android 手机可能因其令人兴奋的性能而印象深刻。然而&#xff0c;随着时间的推移&#xff0c;您可能会发现系统有些地方与以前不太一样。您可能会遇到屏幕无响应、 Android应用程序崩溃、连接问题、电池耗尽等现象。 好吧&#xff0c;在这些情况下您不必感到不安&…

centos通过yum安装redis

1. 安装yum添加epel源(此步根据环境&#xff0c;如果有源则可跳过&#xff0c;在阿里去可跳过&#xff09; yum install epel-release 2 使用yum安装Redis yum install redis 出现如下图所示的内容&#xff0c;默认的安装路径是在 /usr/bin目录下&#xff1a; 文件安装路径…

uniapp 微信小程序跳转至其他小程序

一、背景&#xff1a; 需要在目前的小程序中跳转到另一个小程序&#xff0c;跳转的目标小程序需要已经发布上线了 二、具体实现 使用uni.navigateToMiniProgram打开另一个小程序 官网指引&#x1f449;&#xff1a;uni.navigateToMiniProgram(OBJECT) | uni-app官网 <t…

iview 选择框远程搜索 指定筛选的参数

问题&#xff1a;开启了filterable之后&#xff0c;选择框是允许键盘输入的&#xff0c;但是会对选择列表进行过滤&#xff0c;如果不想使用再次过滤&#xff0c;可以试下下面这个方法。 场景&#xff1a;输入加密前的关键字筛选&#xff0c;选择框显示加密后的数据 说明一&a…

Baumer工业相机堡盟工业相机如何通过NEOAPI SDK实现相机的高速图像保存(C++)

Baumer工业相机堡盟工业相机如何通过NEOAPI SDK实现相机的高速图像保存&#xff08;C&#xff09;&#xff09; Baumer工业相机Baumer工业相机的图像高速保存的技术背景Baumer工业相机通过NEOAPI SDK函数图像高速保存在NEOAPI SDK里实现线程高速图像保存&#xff1a;工业相机高…

rke2 offline install kubernetes v1.26

文章目录 1. 准备2. 安装 ansible3. 基础配置3.1 配置 hosts3.2 安装软件包3.3 内核参数3.4 连接数限制3.5 关闭swap 、selinux、防火墙3.6 时间同步 4. RKE2 安装4.1 下载安装4.2 配置其他管理节点4.3 新增 worker 节点 1. 准备 7 台主机 主机名ipcpu内存diskos角色user密码…

11.3编写Linux串口驱动

编写串口驱动主要步骤 构建并初始化 struct console 对象&#xff0c;若串口无需支持 console 可省略此步骤 //UART驱动的console static struct uart_driver virt_uart_drv; static struct console virt_uart_console {//console 的名称&#xff0c;配合index字段使用&…

unity C#中Array、Stack、Queue、Dictionary、HashSet优缺点和使用场景总结

文章目录 数组 (Array)列表 (List<T>)栈 (Stack<T>)队列 (Queue<T>)链表 (LinkedList<T>)哈希表 (Dictionary<TKey, TValue>) 或 HashSet<T>集合 (Collection<T>) 数组 (Array) 优点&#xff1a; 高效访问&#xff1a;通过索引可以…

1-04C语言执行过程

一、概述 本小节主要讲解一个C程序从源代码到最终执行的过程&#xff0c;这个过程又可以细分为两部分&#xff1a; 源代码到可执行文件的过程可执行文件在内存中执行 本小节是C语言基础当中&#xff0c;比较容易被初学者忽视的知识点。而实际上&#xff1a; 熟悉C程序从源文…

前端超好玩的小游戏合集来啦--周末两天用html5做一个3D飞行兔子萝卜小游戏

文章目录 💖飞行兔子萝卜小游戏💟效果展示💟代码展示源码获取💖飞行兔子萝卜小游戏 💟效果展示 💟代码展示 <body> <script src=

如何选猫粮:买主食冻干猫粮需要注意什么?

随着养猫的人越来越多&#xff0c;铲屎官们对猫咪的饮食也越来越注重。除了猫粮&#xff0c;很多铲屎官还会给猫咪准备小零食。那么&#xff0c;猫咪是不是除了猫粮就没有其他可吃的了呢&#xff1f;答案当然不是。猫咪还有猫冻干、冻干猫粮、猫条等可以选择。每个铲屎官都希望…

【MySQL】索引基础

文章目录 1. 索引介绍2. 创建索引 create index…on…2.1 explain2.2 创建索引create index … on…2.3 删除索引 drop index … on 表名 3. 查看索引 show indexes in …4. 前缀索引4.1 确定最佳前缀长度&#xff1a;索引的选择性 5. 全文索引5.1 创建全文索引 create fulltex…

Vue3.4更新 “Slam Dunk“发布!!!

Announcing Vue 3.4 | The Vue Point. vue3.4更新官方文档 在vue2即将结束更新的时候&#xff0c;vue3迎来了一个重要的更新。代号为“&#x1f3c0; Slam Dunk”&#xff0c;即"灌篮高手"。这个版本进行了很多显著的内部改进&#xff0c;最重要的是模版解析的底层逻…

Github 2024-01-08开源项目周报 Top14

根据Github Trendings的统计&#xff0c;本周(2024-01-08统计)共有14个项目上榜。根据开发语言中项目的数量&#xff0c;汇总情况如下&#xff1a; 开发语言项目数量Python项目5TypeScript项目3C项目2Dart项目1QML项目1Go项目1Shell项目1Rust项目1JavaScript项目1C#项目1 免费…

IO进程线程Day5

1> 将互斥机制代码重新实现一遍 #include<myhead.h>char buf[128]; //临界资源pthread_mutex_t mutex; //创建锁资源//分支线程 void* task(void* arg) {while(1){//获取锁资源pthread_mutex_lock(&mutex);printf("这里是分支线程:%s\n",buf);st…

多线程模板应用实现(实践学习笔记)

出处&#xff1a;B站码出名企路 个人笔记&#xff1a;因为是跟着b站的教学视频以及文档初步学习&#xff0c;可能存在诸多的理解有误&#xff0c;对大家仅供借鉴&#xff0c;参考&#xff0c;然后是B站up阳哥的视频&#xff0c;我是跟着他学。大家有兴趣的可以到b站搜索。加油…

CAD安装教程

CAD安装教程 目录 一&#xff0e; 下载CAD二&#xff0e; 安装CAD 一&#xff0e; 下载CAD 如果需要CAD安装包请私信。 二&#xff0e; 安装CAD 解压压缩包AutoCAD2022中文版&#xff0c;以管理员身份运行AutoCAD_2022_Simplified_Chinese_Win_64bit_dlm.sfx。 选择解压路径。…