Java版Flink使用指南——自定义无界流生成器

大纲

  • 新建工程
    • 自定义无界流
  • 使用
  • 打包、提交、运行
  • 工程代码

在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们让外部组件RabbitMQ充当了无界流的数据源,使得Flink进行了流式处理。在《Java版Flink使用指南——将消息写入到RabbitMQ的队列中》一文中,我们使用了Flink自带的数据生成器,生成了有限数据,从而让Flink以批处理形式运行了该任务。
本文我们将自定义一个无界流生成器,以方便后续测试。

新建工程

我们新建一个名字叫UnboundedStreamGenerator的工程。
Archetype:org.apache.flink:flink-quickstart-java
版本:1.19.1
在这里插入图片描述

自定义无界流

新建src/main/java/org/example/generator/UnBoundedStreamGenerator.java
然后UnBoundedStreamGenerator实现RichSourceFunction接口

public abstract class RichSourceFunction<OUT> extends AbstractRichFunctionimplements SourceFunction<OUT> {private static final long serialVersionUID = 1L;
}

主要实现SourceFunction接口的run和cancel方法。run方法用来获取获取,cancel方法用于终止任务。

package org.example.generator;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;public class UnBoundedStreamGenerator extends RichSourceFunction<Long> {private volatile boolean isRunning = true;@Overridepublic void run(SourceContext<Long> ctx) throws Exception {long count = 0L;while (isRunning) {Thread.sleep(1000); // Simulate delayctx.collect(count++); // Emit data}}@Overridepublic void cancel() {isRunning = false;System.out.println("UnBoundedStreamGenerator canceled");}
}

在run方法中,我们每隔一秒产生一条数据,且这个数字自增。

使用

我们使用addSource方法,将该无界流生成器添加成数据源。然后将其输出到日志。

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.example;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.example.generator.UnBoundedStreamGenerator;/*** Skeleton for a Flink DataStream Job.** <p>For a tutorial how to write a Flink application, check the* tutorials and examples on the <a href="https://flink.apache.org">Flink Website</a>.** <p>To package your application into a JAR file for execution, run* 'mvn clean package' on the command line.** <p>If you change the name of the main class (with the public static void main(String[] args))* method, change the respective entry in the POM.xml file (simply search for 'mainClass').*/
public class DataStreamJob {public static void main(String[] args) throws Exception {// Sets up the execution environment, which is the main entry point// to building Flink applications.final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new UnBoundedStreamGenerator()).name("Custom Stream Source").setParallelism(1) .print(); // For demonstration, print the stream to stdout// Execute program, beginning computation.env.execute("Flink Java API Skeleton");}
}

打包、提交、运行

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
使用下面命令查看日志输出

tail -f log/*

在这里插入图片描述
然后我们在后台点击Cancel Job
在这里插入图片描述
可以看到输出
在这里插入图片描述

工程代码

https://github.com/f304646673/FlinkDemo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/868400.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

STM32智能电网监控系统教程

目录 引言环境准备智能电网监控系统基础代码实现&#xff1a;实现智能电网监控系统 4.1 数据采集模块 4.2 数据处理与分析 4.3 通信与网络系统实现 4.4 用户界面与数据可视化应用场景&#xff1a;电网监控与优化问题解决方案与优化收尾与总结 1. 引言 智能电网监控系统通过S…

thingsboard v3.7 win编译相关问题记录

遇到的问题总结 node\yarn 相关版本问题 3.7 开始需要 JDK17 ui-ngx 模块 yarn 相关问题报错 报错信息 [INFO] Downloading https://github.com/yarnpkg/yarn/releases/download/v1.22.10/yarn-v1.22.10.tar.gz to D:\soft\maven\com\github\eirslett\yarn\1.22.10\yarn-1.2…

视频图文理解关联技术与创业团队(二)

上一篇&#xff1a;google gemini1.5 flash视频图文理解能力初探&#xff08;一&#xff09;提到了gemini 1.5 flash 可以对视频进行理解以及分析&#xff0c;但是整体在检索任务上效果不佳。 这几天参加了人工智能大会 网上收集&#xff0c;看一看有相似能力的一些技术点、创…

安全防御(防火墙)

第二天&#xff1a; 1.恶意程序---一般会具有一下多个或则全部特点 1.非法性&#xff1a;你未经授权它自动运行或者自动下载的&#xff0c;这都属于非法的。那恶意程序一般它会具有这种特点&#xff0c; 2.隐蔽性&#xff1a;一般隐藏的会比较深&#xff0c;目的就是为了防止…

【MySQL备份】Percona XtraBackup压缩备份实战篇

目录 1.前言 2.准备工作 2.1.环境信息 2.2.配置/etc/my.cnf文件 2.3.授予root用户BACKUP_ADMIN权限 2.4.安装qpress 3. 压缩备份 3.1.创建压缩备份 3.2.创建全量备份 3.3.对比两个备份的大小 4.解压备份 5.准备备份 6.备份恢复 ​7.问题分析 8.总结 "实战…

JavaScript 原型链那些事

在讲原型之前我们先来了解一下函数。 在JS中&#xff0c;函数的本质就是对象&#xff0c;它与其他对象不同的是&#xff0c;创建它的构造函数与创建其他对象的构造函数不一样。那产生函数对象的构造函数是什么呢&#xff1f;是一个叫做Function的特殊函数&#xff0c;通过newFu…

单片机软件架构连载(4)-结构体

枚举、指针、结构体&#xff0c;我愿称为C语言"三板斧"。 用人话来讲&#xff0c;几乎所有c语言高阶编程&#xff0c;都离不开这这3个知识点的应用。 今天站在实际产品常用的角度&#xff0c;给大家讲一下结构体。 1.结构体概念 结构体可以用来构建更复杂的数据结…

中霖教育怎么样?二建继续教育什么意思?

1. 为什么要继续教育? 根据现行规定&#xff0c;二级建设师在获取资格证书后&#xff0c;若时间超过三年无论是否已进行注册&#xff0c;均需参加继续教育。此举旨在确保专业技术人员能够不断更新和补充其专业知识与技能&#xff0c;进而提升其创新能力、创造能力以及专业技术…

事件mousePressEvent、paintEvent、closeEvent、keyPressEvent】

事件 mousePressEvent、paintEvent、closeEvent、keyPressEvent 鼠标样式的设置 按WSAD通过keyPressEvent事件移动按钮 通过事件mousePressEvent获取鼠标位置的相对位置&#xff0c;绝对位置 cusor 鼠标样式设置成十字星 .h #ifndef DEFAULTHANDLEREXAMPLE_H #define DEFAUL…

GANs算法简介、学习步骤及具体实现

生成对抗网络&#xff08;Generative Adversarial Networks&#xff0c;GANs&#xff09;自从2014年由Ian Goodfellow等人提出以来&#xff0c;已经成为深度学习领域中最活跃的研究方向之一。GAN的基本思想是利用两个神经网络——生成器&#xff08;Generator&#xff09;和判别…

做好私域服务就是赢得用户的心

私域流量的概念在当今的商业环境中已经变得极为重要&#xff0c;许多品牌和企业都投入大量资源尝试通过各种策略吸引并保留用户。然而&#xff0c;单纯的流量积累并不足以确保商业成功。当面对用户的沉默、缺乏活跃度以及无法变现的困境时&#xff0c;我们必须重新审视私域流量…

Perforce发布白皮书,解读电动汽车初创公司如何加速进入市场并降低软件开发中的风险和成本

电动汽车&#xff08;EV&#xff09;领域的初创企业正迅速崛起&#xff0c;创新速度显著加快。然而&#xff0c;随着消费者对电动汽车需求的激增&#xff0c;老牌汽车制造商正加速进军这一市场&#xff0c;加剧了行业竞争。为在竞争中生存并发展&#xff0c;电动汽车初创企业必…

硬盘错误0x80071ac3如何修复?5大免费修复法,轻松找回硬盘数据

今天我们要聊的是一个让大家头疼不已的问题——硬盘错误0x80071ac3。你是否也曾经遇到过这个烦人的错误代码&#xff0c;导致数据无法读取、文件丢失&#xff0c;甚至整个硬盘都无法正常使用&#xff1f;别担心今天小编就为大家详细解析这个错误的原因&#xff0c;并分享5个免费…

27_电子电路设计基础

电路设计 电路板的设计 电路板的设计主要分三个步骤&#xff1a;设计电路原理图、生成网络表、设计印制电路板。 (1)设计电路原理图&#xff1a;将元器件按逻辑关系用导线连接起来。设计原理图的元件来源是“原理图库”,除了元件库外还可以由用户自己增加建立新的元件&#…

WAIC | 2024年世界人工智能大会“数学与人工智能”学术会议成功举办!

由斯梅尔数学与计算研究院&#xff08;Smale Institue of Mathematics & Computation&#xff09;主办的2024年世界人工智能大会(WAIC)“数学与人工智能”学术会议7月4日在上海世博中心圆满落幕&#xff01;作为全球性高级别学术研讨会&#xff0c;此次会议由华院计算技术&…

html+js+css练手小项目

文章目录 练手小项目前言1.多轮播图1.1 效果展示1.2 实现思路1.2.1 三张轮播图1.2.2 左侧轮播图 1.2.3 右侧轮播图1.2.4 整体结合 2.图片变色2.1 效果展示2.2 实现 练手小项目 ☀️作者简介&#xff1a;大家好我是言不及行yyds &#x1f40b;个人主页&#xff1a;言不及行yyds的…

字节码编程javassist之打印方法耗时和入参

写在前面 本文看下如何实现打印方法耗时和入参。 1&#xff1a;程序 需要增强的类&#xff1a; public class ApiTest1 {public Integer strToInt(String str01, String str02) {return Integer.parseInt(str01);}}插桩类 package com.dahuyou.javassist.huohuo.aa;import…

tableau条形图绘制 - 2

tableau条形图绘制 1. 条形图绘制-11.1 创建工作表1.2 修改工作表名称1.3 条形图绘制1.4 显示标签1.5 行列转换 2. 条形图绘制-22.1 新建工作表2.2 修改工作表名称2.3 条形图绘制2.4 价格度量选平均值2.5 标签度量选平均值2.6 升序&#xff0c;整个视图显示2.7 行列转换 3. 堆积…

模型训练结果可视化

&#x1f4da;博客主页&#xff1a;knighthood2001 ✨公众号&#xff1a;认知up吧 &#xff08;目前正在带领大家一起提升认知&#xff0c;感兴趣可以来围观一下&#xff09; &#x1f383;知识星球&#xff1a;【认知up吧|成长|副业】介绍 ❤️如遇文章付费&#xff0c;可先看…

AntDesign上传组件upload二次封装+全局上传hook使用

文章目录 前言a-upload组件二次封装1. 功能分析2. 代码详细注释3. 使用到的全局上传hook代码4. 使用方式5. 效果展示 总结 前言 在项目中&#xff0c;ant-design是我们常用的UI库之一&#xff0c;今天就来二次封装常用的组件a-upload批量上传组件,让它用起来更方便。 a-uploa…