Java版Flink使用指南——分流导出

大纲

  • 新建工程
  • 编码
    • Pom.xml
    • 自定义无界流
    • 分流
  • 测试
  • 工程代码

在之前的案例中,我们一直使用的是单个Sink来做数据的输出。实际上,Flink是支持多个输出流的。本文我们就来讲解如何在Flink数据输出时做分流处理。
我们将基于《Java版Flink使用指南——自定义无界流生成器》的输入流,按生成数字的奇偶性,将其分流输出到不同的RabbitMQ队列中。

新建工程

我们新建一个名字叫MultiSinkTo的工程。
Archetype:org.apache.flink:flink-quickstart-java
版本:1.19.1
在这里插入图片描述

编码

Pom.xml

因为我们要往RabbitMQ中输出,所以需要引入相关连接组件。

		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version></dependency>

自定义无界流

新建src/main/java/org/example/generator/UnBoundedStreamGenerator.java
这块的代码可以见《Java版Flink使用指南——自定义无界流生成器》
它会每隔1秒钟生成一个递增的数字

package org.example.generator;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;public class UnBoundedStreamGenerator extends RichSourceFunction<Long> {private volatile boolean isRunning = true;@Overridepublic void run(SourceContext<Long> ctx) throws Exception {long count = 0L;while (isRunning) {Thread.sleep(1000); // Simulate delayctx.collect(count++); // Emit data}}@Overridepublic void cancel() {isRunning = false;System.out.println("UnBoundedStreamGenerator canceled");}
}

分流

我们通过下面的代码生成数据流

		DataStreamSource<Long> longDataStreamSource = env.addSource(new UnBoundedStreamGenerator());

然后奇数发布到odd.data.to.rbtmq队列;偶数发布到even.data.to.rbtmq。
分流主要是通过filter来区分数据,然后针对不同的数据addSink来发布到不同的队列。
如果不需要区分数据,只是将相同的数据发布到不同的目的地,则可以直接多次addSink来达成。

		String host = "172.25.103.252"; // IP of the rabbitmq serverint port = 5672;String username = "admin";String password = "fangliang";String virtualHost = "/";RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();int parallelism = 1;String oddSinkQueueName = "odd.data.to.rbtmq"; RMQSink<String> oddRMQSink = new RMQSink<>(rmqConnectionConfig, oddSinkQueueName, new SimpleStringSchema());longDataStreamSource.filter(value -> value % 2 != 0).map(Object::toString).addSink(oddRMQSink).setParallelism(parallelism).name("oddSink");String evenSinkQueueName = "even.data.to.rbtmq";RMQSink<String> evenRMQSink = new RMQSink<>(rmqConnectionConfig, evenSinkQueueName, new SimpleStringSchema());longDataStreamSource.filter(value -> value % 2 == 0).map(Object::toString).addSink(evenRMQSink).setParallelism(parallelism).name("evenSink");

测试

执行一段时间后,我们看到两个队列相序增加
在这里插入图片描述
奇数队列
在这里插入图片描述
偶数队列
在这里插入图片描述

工程代码

https://github.com/f304646673/FlinkDemo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/43849.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【目标检测】使用自己的数据集训练并预测yolov8模型

1、下载yolov8的官方代码 地址&#xff1a; GitHub - ultralytics/ultralytics: NEW - YOLOv8 &#x1f680; in PyTorch > ONNX > OpenVINO > CoreML > TFLite 2、下载目标检测的训练权重 yolov8n.pt 将 yolov8n.pt 放在ultralytics文件夹下 3、数据集分布 注…

国际网课平台Udemy上的亚马逊云科技AWS免费高分课程和创建、维护EC2动手实践

亚马逊云科技(AWS)是全球云行业最&#x1f525;火的云平台&#xff0c;在全球经济形势不好的大背景下&#xff0c;通过网课学习亚马逊云科技AWS基础备考亚马逊云科技AWS证书&#xff0c;对于找工作或者无背景转行做AWS帮助巨大。欢迎大家关注小李哥&#xff0c;及时了解世界最前…

文件操作和IO流(Java版)

前言 我们无时无刻不在操作文件。可以说&#xff0c;我们在电脑上能看到的图片、视频、音频、文档都是一个又一个的文件&#xff0c;我们需要从文件中读取我们需要的数据&#xff0c;将数据运算后也需要将结果写入文件中长期保存。可见文件的重要性&#xff0c;今天我们就来简…

分布式锁(仅供自己参考)

分布式锁&#xff1a;满足分布式系统或集群式下多进程可见并且互斥的锁&#xff08;使用外部的锁&#xff0c;因为如果是集群部署&#xff0c;每台服务器都有一个对应的tomcat&#xff0c;则每个tomcat的jvm就不同&#xff0c;锁对象就不同&#xff08;加锁的机制&#xff0c;每…

独立开发者系列(23)——Linux掌握小结

只要开发系统&#xff0c;就绕不开使用Linux服务器 &#xff0c;而Linux除了使用BT面板进行初级管理&#xff0c;很多稍微高级点的管理&#xff0c;还是需要命令行进行的。这里总结在不需要精通的情况下&#xff0c;掌握常见命令和环境的相关配置。 &#xff08;1&#xff09…

HI3559AV100四路IMX334非融合拼接8K视频记录

下班无事&#xff0c;写篇博客记录海思hi3559av100四路4K视频采集拼接输出8K视频Demo 一、准备工作&#xff1a; 软件&#xff1a;Win11系统、VMware虚拟机Ubuntu14、Hitool、Xshell等 硬件&#xff1a;HI3559AV100开发板4路imx334摄像头、串口线、电源等 附硬件图&#xff1…

来一场栈的大模拟(主要是单调栈)

一.栈模拟 二.单调栈求最大矩形面积 通常&#xff0c;直方图用于表示离散分布&#xff0c;例如&#xff0c;文本中字符的频率。 现在&#xff0c;请你计算在公共基线处对齐的直方图中最大矩形的面积。 图例右图显示了所描绘直方图的最大对齐矩形。 输入格式 输入包含几个测…

哪里有主机游戏店收费系统,佳易王电玩ps5ps4计时计费系统操作教程

哪里有主机游戏店收费系统&#xff0c;佳易王电玩ps5ps4计时计费系统操作教程 以下软件操作教程以&#xff0c;佳易王计时计费管理系统为例说明 软件文件下载可以点击最下方官网卡片——软件下载——试用版软件下载 一、软件程序图文讲解 1、主机游戏计时软件、电玩店计费软…

HumanoidBench——模拟仿人机器人算法有未来

概述 论文地址&#xff1a;https://arxiv.org/pdf/2403.10506 仿人机器人具有类似人类的外形&#xff0c;有望在各种环境和任务中为人类提供支持。然而&#xff0c;昂贵且易碎的硬件是这项研究面临的挑战。因此&#xff0c;本研究开发了使用先进模拟技术的 HumanoidBench。该基…

GTK是如何加密WLAN组播和广播数据的?

1. References WLAN 4-Way Handshake如何生成GTK&#xff1f;_tk bigtk gtk igtk-CSDN博客 2. 概述 在Wi-Fi网络中&#xff0c;单播、组播和广播帧的加密算法是由AP决定的。其中组播帧和广播帧的加密使用GTK密钥&#xff0c;其PTK的密钥结构如下图所示&#xff1a; GTK的组成…

2024 Q3 NAND闪存价格|企业级依然猛涨,消费级放缓

在企业领域持续投资于服务器基础设施&#xff0c;特别是在人工智能应用的推动下&#xff0c;企业级SSD需求增加的同时&#xff0c;消费电子市场却依旧疲软。加之NAND供应商在2024年下半年积极扩大生产&#xff0c;预计到2024年第三季度&#xff0c;NAND闪存供应充足率将上升至2…

“郑商企航”暑期社会实践赴美丽美艳直播基地开展调研

马常旭文化传媒网讯&#xff08;记者张明辉报道&#xff09;导读&#xff1a;2024 年 7 月 3 日&#xff0c;商学院暑期社会实践团“郑商企航”在河南省郑州市新密市岳村镇美丽美艳直播基地&#xff0c;展开了一场意义非凡的考察活动&#xff0c;团队成员深度调研了直播基地的产…

【系统架构设计】计算机组成与体系结构(二)

计算机组成与体系结构 计算机系统组成存储器系统前言主存储器存储器存储数量&#xff08;计算&#xff09; 辅助存储器&#xff08;以磁盘为例&#xff09;Cache存储器 流水线 计算机系统组成 存储器系统 前言 存储器用来存放程序和数据的部件&#xff0c;是一个记忆装置&am…

【自动驾驶/机器人面试C++八股精选】专栏介绍

目录 一、自动驾驶和机器人技术发展前景二、C在自动驾驶和机器人领域的地位三、专栏介绍四、订阅需知 一、自动驾驶和机器人技术发展前景 随着人工智能、机器学习、传感器技术和计算能力的进步&#xff0c;自动驾驶和机器人的技术水平不断提升&#xff0c;使得它们更加智能、可…

fatal error: napi.h: No such file or directory

使用Cmake-js构建基于node-addon-api的C扩展 基于node-addon官方的eample改造测试&#xff1a;https://github.com/nodejs/node-addon-examples Cmake-js的github给了一个例子&#xff0c;但是是基于NAN的&#xff0c;而不是node-addon-api&#xff1a;https://github.com/cma…

如何压缩视频大小不改变画质,视频太大怎么压缩变小

在现代生活中&#xff0c;视频已经成为我们记录生活、分享快乐的重要工具。但随之而来的问题就是视频文件体积过大&#xff0c;不仅占用大量存储空间&#xff0c;还难以在社交平台上快速分享。别担心&#xff0c;下面我就来教大家几种简单有效的方法&#xff0c;让视频文件轻松…

回溯算法-以医院信息管理系统为例

1.回溯算法介绍 1.来源 回溯算法也叫试探法&#xff0c;它是一种系统地搜索问题的解的方法。 用回溯算法解决问题的一般步骤&#xff1a; 1、 针对所给问题&#xff0c;定义问题的解空间&#xff0c;它至少包含问题的一个&#xff08;最优&#xff09;解。 2 、确定易于搜…

移除元素的讲解,看这篇就够了!

一&#xff1a;题目 博主本文将用指向来形象的表示下标位的移动。 二&#xff1a;思路 1&#xff1a;两个整形&#xff0c;一个start&#xff0c;一个end&#xff0c;在一开始都 0&#xff0c;即这里都指向第一个元素。 2&#xff1a;在查到val之前&#xff0c;查一个&…

Jackson与FastJson时间Date转换问题

今天在开发Excel导出时发现一个Date类型的属性导出的不对&#xff0c;因为导出时将Vo装换成了Json, Vo类Date字段也添加了DateTimeFormat(pattern "yyyy-MM-dd"),JsonFormat(timezone "GMT8", pattern "yyyy-MM-dd"),那么这是为什么呢&#…

渲染农场怎么用更省钱?渲染100邀请码1a12

现在越来越多的设计师开始使用渲染农场&#xff0c;其中收费是个大问题&#xff0c;怎么用渲染农场才能更省钱呢&#xff1f;今天我们就来看下吧。 1、明确渲染方式 要根据不同情况选择合理的渲染方式&#xff0c;比如渲染农场就适合大场景渲染和紧急出图情况&#xff0c;其他…