Java版Flink使用指南——从RabbitMQ中队列中接入消息流

大纲

  • 创建RabbitMQ队列
  • 新建工程
    • 新增依赖
    • 编码
      • 设置数据源配置
      • 读取、处理数据
      • 完整代码
    • 打包、上传和运行任务
    • 测试
  • 工程代码

在《Java版Flink使用指南——安装Flink和使用IntelliJ制作任务包》一文中,我们完成了第一个小型Demo的编写。例子中的数据是代码预先指定的。而现实中,数据往往来源于外部。本文我们将尝试Flink从RabbitMQ中读取数据,然后输出到日志中。
关于RabbitMQ的知识可以参阅《RabbitMQ实践》。

创建RabbitMQ队列

我们创建一个Classic队列data.from.rbtmq。注意要选择Durable类型,这是后续用的默认连接器的限制。
具体方法见《RabbitMQ实践——在管理后台测试消息收发功能》。
在这里插入图片描述

后续我们将在后台通过默认交换器,给这个队列新增消息。

新建工程

我们在IntelliJ中新建一个工程DataFromRabbitMQ。
Archetype填入:org.apache.flink:flink-quickstart-java。
版本填入与Flink的版本:1.19.1
在这里插入图片描述

新增依赖

在pom.xml中新增RabbitMQ连接器

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version>
</dependency>

编码

设置数据源配置

String queueName = "data.from.rbtmq";
String host = "172.21.112.140"; // IP of the rabbitmq server
int port = 5672;
String username = "admin";
String password = "fangliang";
String virtualHost = "/";
int parallelism = 1;// create a RabbitMQ source
RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSource<String> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SimpleStringSchema());

读取、处理数据

下面代码通过addSource添加RabbitMQ数据源。注意,不能使用fromSource方法,是因为RMQSource没有实现SourceFunction方法。

final DataStream<String> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);stream.print().name(username + "'s data from " + queueName);

完整代码

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.example;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;/*** Skeleton for a Flink DataStream Job.** <p>For a tutorial how to write a Flink application, check the* tutorials and examples on the <a href="https://flink.apache.org">Flink Website</a>.** <p>To package your application into a JAR file for execution, run* 'mvn clean package' on the command line.** <p>If you change the name of the main class (with the public static void main(String[] args))* method, change the respective entry in the POM.xml file (simply search for 'mainClass').*/
public class DataStreamJob {public static void main(String[] args) throws Exception {// Sets up the execution environment, which is the main entry point// to building Flink applications.final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String queueName = "data.from.rbtmq";String host = "172.21.112.140"; // IP of the rabbitmq serverint port = 5672;String username = "admin";String password = "fangliang";String virtualHost = "/";int parallelism = 1;// create a RabbitMQ sourceRMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSource<String> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SimpleStringSchema());final DataStream<String> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);stream.print().name(username + "'s data from " + queueName);env.execute("Flink Java API Skeleton");}
}

打包、上传和运行任务

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

测试

在RabbitMQ后台的默认交换器中,发布一条消息到data.from.rbtmq
在这里插入图片描述
然后使用下面指令可以看到Flink读取到消息并执行了print方法

tail log/flink-*-taskexecutor-*.out

==> flink-fangliang-taskexecutor-0-fangliang.out <==
data from http://172.21.112.140:15672/#/exchanges/%2F/amq.default

工程代码

https://github.com/f304646673/FlinkDemo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/42355.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

判断对象能否回收的两种方法,以及JVM引用

判断对象能否回收的两种方法&#xff1a;引用计数算法&#xff0c;可达性分析算法 引用计数算法&#xff1a;给对象添加一个引用计数器&#xff0c;当该对象被其它对象引用时计数加一&#xff0c;引用失效时计数减一&#xff0c;计数为0时&#xff0c;可以回收。 特点&#xf…

自动驾驶SLAM又一开源巅峰之作!深挖时间一致性,精准构建超清地图

论文标题&#xff1a; DTCLMapper: Dual Temporal Consistent Learning for Vectorized HD Map Construction 论文作者&#xff1a; Siyu Li, Jiacheng Lin, Hao Shi, Jiaming Zhang, Song Wang, You Yao, Zhiyong Li, Kailun Yang 导读&#xff1a; 本文介绍了一种用于自动…

突发!马斯克3140亿参数Grok开源!Grok原理大公开!

BIG NEWS: 全球最大开源大模型&#xff01;马斯克Grok-1参数量3410亿&#xff0c;正式开源!!! 说到做到&#xff0c;马斯克xAI的Grok&#xff0c;果然如期开源了&#xff01; 就在刚刚&#xff0c;马斯克的AI创企xAI正式发布了此前备受期待大模型Grok-1&#xff0c;其参数量达…

硅纪元视角 | 虚拟神经科学的突破:AI「赛博老鼠」诞生

在数字化浪潮的推动下&#xff0c;人工智能&#xff08;AI&#xff09;正成为塑造未来的关键力量。硅纪元视角栏目紧跟AI科技的最新发展&#xff0c;捕捉行业动态&#xff1b;提供深入的新闻解读&#xff0c;助您洞悉技术背后的逻辑&#xff1b;汇聚行业专家的见解&#xff0c;…

企业需要什么样的MES?

MES&#xff08;英文全称&#xff1a;Manufacturing Execution System&#xff09;&#xff0c;即制造执行系统&#xff0c;是面向车间生产的管理系统。它位于上层计划管理系统&#xff08;如ERP&#xff09;与底层工业控制&#xff08;如PCS层&#xff09;之间&#xff0c;是制…

【Linux】:服务器用户的登陆、删除、密码修改

用Xshell登录云服务器。 1.登录云服务器 先打开Xshell。弹出的界面点。 在终端上输入命令ssh usernameip_address&#xff0c;其中username为要登录的用户名&#xff0c;ip_address为Linux系统的IP地址或主机名。 然后输入密码进行登录。 具体如下&#xff1a; 找到新建会话…

Windows与time.windows.com同步time出错(手把手操作)

今天我来针对Windows讲解Time同步 时间问题 计算机的时间不同&#xff0c;过快或者过慢。&#xff08;可以和自己的手机时间进行对比&#xff0c;手机的时间进行同步的频率会比计算机更快&#xff0c;因此更精准&#xff09;计算机time过快和过慢&#xff0c;会导致使用过程中…

想实现随时随地远程访问?解析可道云teamOS内网穿透功能

在数字化时代&#xff0c;无论是个人还是企业&#xff0c;都面临着数据共享与远程访问的迫切需求。 比如我有时会需要在家中加班&#xff0c;急需访问公司内网中的某个关键文件。 然而&#xff0c;由于公网与内网的天然隔阂&#xff0c;这些需求往往难以实现。这时&#xff0c…

代码随想录 链表章节总结

移除链表元素 && 设计链表 学会设置虚拟头结点 翻转链表 leetcode 206 https://leetcode.cn/problems/reverse-linked-list/description/ 方法一&#xff1a;非递归新开链表 头插法&#xff1a;创建一个新的链表&#xff0c;遍历旧链表&#xff0c;按顺序在新链表使…

AIGC | 在机器学习工作站安装NVIDIA CUDA® 并行计算平台和编程模型

[ 知识是人生的灯塔&#xff0c;只有不断学习&#xff0c;才能照亮前行的道路 ] 0x02.初识与安装 CUDA 并行计算平台和编程模型 什么是 CUDA? CUDA&#xff08;Compute Unified Device Architecture&#xff09;是英伟达&#xff08;NVIDIA&#xff09;推出的并行计算平台和编…

idea提交代码或更新代码一直提示token然后登陆失败无法提交或者更新代码

最近因为换了电脑需要对开发环境做配置&#xff0c; 遇到了这个问题&#xff0c; 应该是因为我们用到了gitlab&#xff0c;默认的最新的idea会有gitlab插件 强制录入gitlab的token&#xff0c;如果gitlab不支持token的验证那么问题就来了 &#xff0c; 不管怎么操作都无法提交或…

FPGA之术语

FPGA之术语 IOSTANDARDDIFF_SSTL12:LVCMOS33:sys_clk_p/n:rst_n:UART时钟JTAG:GPIOONFIPCIe IOSTANDARD 在电子工程领域&#xff0c;DIFF_SSTL12和LVCMOS33是两种不同的电气标准&#xff0c;用于定义信号的电压级别和特性。 IOSTANDARD是一个在FPGA&#xff08;现场可编程门阵…

Spring MVC深入理解之源码实现

1、SpringMVC的理解 1&#xff09;谈谈对Spring MVC的了解 MVC 是模型(Model)、视图(View)、控制器(Controller)的简写&#xff0c;其核心思想是通过将业务逻辑、数据、显示分离来组织代码。 Model&#xff1a;数据模型&#xff0c;JavaBean的类&#xff0c;用来进行数据封装…

【cocos2dx】【iOS工程】如何保存用户在游戏内的绘画数据,并将数据以图像形式展示在预览界面

【cocos2dx】【iOS工程】如何保存用户在应用内的操作数据&#xff0c;并将数据以图像形式展示在预览界面 设备/引擎&#xff1a;Mac&#xff08;11.6&#xff09;/Mac Mini 开发工具&#xff1a;Xcode&#xff08;15.0.1&#xff09; 开发需求&#xff1a;如何保存用户在应用…

富格林:抓住正规稳健出金思路

富格林指出&#xff0c;凡事要学会抓住正规思路避繁就简&#xff0c;才会顺利达到终点。在现货黄金市场中&#xff0c;投资者必须学会抓对正规趋势&#xff0c;才是走向盈利出金的根本保障。以下是富格林投资总结的几个观点和建议&#xff0c;希望能帮助投资者实现稳健出金。 …

算法基础之分治法

算法原理 对于一个规模为 n n n 的子问题&#xff0c;若该问题可以容易地解决则直接解决&#xff0c;否则将其分解为 k k k 个规模较小的子问题&#xff0c;这些子问题相互独立且与原问题形式相同。递归地解决这些子问题&#xff0c;然后将各子问题的解合并得到原问题的解&a…

单链表详解(2)

三、函数定义 查找节点 //查找结点 SLTNode* SLTNodeFind(SLTNode* phead, SLTDataType x) {assert(phead);SLTNode* pcur phead;while (pcur){if (pcur->data x){return pcur;}pcur pcur->next;}return NULL; } 查找节点我们是通过看数据域来查找的&#xff0c;查…

Arm64 基础指令集介绍

按照字母排序顺序&#xff1a; ● ADC&#xff1a;带进位加法。 ● ADCS&#xff1a;带进位加法&#xff0c;设置标志位。 ● ADD (extended register)&#xff1a;扩展寄存器加法。 ● ADD (immediate)&#xff1a;立即数加法。 ● ADD (shifted register)&#xff1a;移位寄存…

【MySQL05】【 undo 日志】

文章目录 一、前言二、undo 日志&#xff08;回滚日志&#xff09;1. 事务 id2. undo 日志格式2.1 INSERT 对应的 undo 日志2.2 DELETE 对应的 undo 日志2.3 UPDATE 对应的 undo 日志2.3.1 不更新主键2.3.2 更新主键 2.3 增删改操作对二级索引的影响2.4 roll_pointer 3. FIL_PA…

Windows 网络重置

netsh int ip reset 命令是用于重置 Windows 操作系统中的网络设置和配置的命令。 在网络故障排除、修复网络连接问题以及清除可能存在的网络配置冲突时非常有用。 命令详解&#xff1a; netsh: 用于配置各种网络设置 int: 用于管理网络接口 ip: 用于管理网络接口的 IP 配…