Java版Flink使用指南——从RabbitMQ中队列中接入消息流

大纲

  • 创建RabbitMQ队列
  • 新建工程
    • 新增依赖
    • 编码
      • 设置数据源配置
      • 读取、处理数据
      • 完整代码
    • 打包、上传和运行任务
    • 测试
  • 工程代码

在《Java版Flink使用指南——安装Flink和使用IntelliJ制作任务包》一文中,我们完成了第一个小型Demo的编写。例子中的数据是代码预先指定的。而现实中,数据往往来源于外部。本文我们将尝试Flink从RabbitMQ中读取数据,然后输出到日志中。
关于RabbitMQ的知识可以参阅《RabbitMQ实践》。

创建RabbitMQ队列

我们创建一个Classic队列data.from.rbtmq。注意要选择Durable类型,这是后续用的默认连接器的限制。
具体方法见《RabbitMQ实践——在管理后台测试消息收发功能》。
在这里插入图片描述

后续我们将在后台通过默认交换器,给这个队列新增消息。

新建工程

我们在IntelliJ中新建一个工程DataFromRabbitMQ。
Archetype填入:org.apache.flink:flink-quickstart-java。
版本填入与Flink的版本:1.19.1
在这里插入图片描述

新增依赖

在pom.xml中新增RabbitMQ连接器

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version>
</dependency>

编码

设置数据源配置

String queueName = "data.from.rbtmq";
String host = "172.21.112.140"; // IP of the rabbitmq server
int port = 5672;
String username = "admin";
String password = "fangliang";
String virtualHost = "/";
int parallelism = 1;// create a RabbitMQ source
RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSource<String> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SimpleStringSchema());

读取、处理数据

下面代码通过addSource添加RabbitMQ数据源。注意,不能使用fromSource方法,是因为RMQSource没有实现SourceFunction方法。

final DataStream<String> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);stream.print().name(username + "'s data from " + queueName);

完整代码

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.example;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;/*** Skeleton for a Flink DataStream Job.** <p>For a tutorial how to write a Flink application, check the* tutorials and examples on the <a href="https://flink.apache.org">Flink Website</a>.** <p>To package your application into a JAR file for execution, run* 'mvn clean package' on the command line.** <p>If you change the name of the main class (with the public static void main(String[] args))* method, change the respective entry in the POM.xml file (simply search for 'mainClass').*/
public class DataStreamJob {public static void main(String[] args) throws Exception {// Sets up the execution environment, which is the main entry point// to building Flink applications.final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String queueName = "data.from.rbtmq";String host = "172.21.112.140"; // IP of the rabbitmq serverint port = 5672;String username = "admin";String password = "fangliang";String virtualHost = "/";int parallelism = 1;// create a RabbitMQ sourceRMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSource<String> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SimpleStringSchema());final DataStream<String> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);stream.print().name(username + "'s data from " + queueName);env.execute("Flink Java API Skeleton");}
}

打包、上传和运行任务

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

测试

在RabbitMQ后台的默认交换器中,发布一条消息到data.from.rbtmq
在这里插入图片描述
然后使用下面指令可以看到Flink读取到消息并执行了print方法

tail log/flink-*-taskexecutor-*.out

==> flink-fangliang-taskexecutor-0-fangliang.out <==
data from http://172.21.112.140:15672/#/exchanges/%2F/amq.default

工程代码

https://github.com/f304646673/FlinkDemo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/42355.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

判断对象能否回收的两种方法,以及JVM引用

判断对象能否回收的两种方法&#xff1a;引用计数算法&#xff0c;可达性分析算法 引用计数算法&#xff1a;给对象添加一个引用计数器&#xff0c;当该对象被其它对象引用时计数加一&#xff0c;引用失效时计数减一&#xff0c;计数为0时&#xff0c;可以回收。 特点&#xf…

自动驾驶SLAM又一开源巅峰之作!深挖时间一致性,精准构建超清地图

论文标题&#xff1a; DTCLMapper: Dual Temporal Consistent Learning for Vectorized HD Map Construction 论文作者&#xff1a; Siyu Li, Jiacheng Lin, Hao Shi, Jiaming Zhang, Song Wang, You Yao, Zhiyong Li, Kailun Yang 导读&#xff1a; 本文介绍了一种用于自动…

突发!马斯克3140亿参数Grok开源!Grok原理大公开!

BIG NEWS: 全球最大开源大模型&#xff01;马斯克Grok-1参数量3410亿&#xff0c;正式开源!!! 说到做到&#xff0c;马斯克xAI的Grok&#xff0c;果然如期开源了&#xff01; 就在刚刚&#xff0c;马斯克的AI创企xAI正式发布了此前备受期待大模型Grok-1&#xff0c;其参数量达…

硅纪元视角 | 虚拟神经科学的突破:AI「赛博老鼠」诞生

在数字化浪潮的推动下&#xff0c;人工智能&#xff08;AI&#xff09;正成为塑造未来的关键力量。硅纪元视角栏目紧跟AI科技的最新发展&#xff0c;捕捉行业动态&#xff1b;提供深入的新闻解读&#xff0c;助您洞悉技术背后的逻辑&#xff1b;汇聚行业专家的见解&#xff0c;…

企业需要什么样的MES?

MES&#xff08;英文全称&#xff1a;Manufacturing Execution System&#xff09;&#xff0c;即制造执行系统&#xff0c;是面向车间生产的管理系统。它位于上层计划管理系统&#xff08;如ERP&#xff09;与底层工业控制&#xff08;如PCS层&#xff09;之间&#xff0c;是制…

【Linux】:服务器用户的登陆、删除、密码修改

用Xshell登录云服务器。 1.登录云服务器 先打开Xshell。弹出的界面点。 在终端上输入命令ssh usernameip_address&#xff0c;其中username为要登录的用户名&#xff0c;ip_address为Linux系统的IP地址或主机名。 然后输入密码进行登录。 具体如下&#xff1a; 找到新建会话…

Windows与time.windows.com同步time出错(手把手操作)

今天我来针对Windows讲解Time同步 时间问题 计算机的时间不同&#xff0c;过快或者过慢。&#xff08;可以和自己的手机时间进行对比&#xff0c;手机的时间进行同步的频率会比计算机更快&#xff0c;因此更精准&#xff09;计算机time过快和过慢&#xff0c;会导致使用过程中…

想实现随时随地远程访问?解析可道云teamOS内网穿透功能

在数字化时代&#xff0c;无论是个人还是企业&#xff0c;都面临着数据共享与远程访问的迫切需求。 比如我有时会需要在家中加班&#xff0c;急需访问公司内网中的某个关键文件。 然而&#xff0c;由于公网与内网的天然隔阂&#xff0c;这些需求往往难以实现。这时&#xff0c…

代码随想录 链表章节总结

移除链表元素 && 设计链表 学会设置虚拟头结点 翻转链表 leetcode 206 https://leetcode.cn/problems/reverse-linked-list/description/ 方法一&#xff1a;非递归新开链表 头插法&#xff1a;创建一个新的链表&#xff0c;遍历旧链表&#xff0c;按顺序在新链表使…

AIGC | 在机器学习工作站安装NVIDIA CUDA® 并行计算平台和编程模型

[ 知识是人生的灯塔&#xff0c;只有不断学习&#xff0c;才能照亮前行的道路 ] 0x02.初识与安装 CUDA 并行计算平台和编程模型 什么是 CUDA? CUDA&#xff08;Compute Unified Device Architecture&#xff09;是英伟达&#xff08;NVIDIA&#xff09;推出的并行计算平台和编…

idea提交代码或更新代码一直提示token然后登陆失败无法提交或者更新代码

最近因为换了电脑需要对开发环境做配置&#xff0c; 遇到了这个问题&#xff0c; 应该是因为我们用到了gitlab&#xff0c;默认的最新的idea会有gitlab插件 强制录入gitlab的token&#xff0c;如果gitlab不支持token的验证那么问题就来了 &#xff0c; 不管怎么操作都无法提交或…

Spring MVC深入理解之源码实现

1、SpringMVC的理解 1&#xff09;谈谈对Spring MVC的了解 MVC 是模型(Model)、视图(View)、控制器(Controller)的简写&#xff0c;其核心思想是通过将业务逻辑、数据、显示分离来组织代码。 Model&#xff1a;数据模型&#xff0c;JavaBean的类&#xff0c;用来进行数据封装…

单链表详解(2)

三、函数定义 查找节点 //查找结点 SLTNode* SLTNodeFind(SLTNode* phead, SLTDataType x) {assert(phead);SLTNode* pcur phead;while (pcur){if (pcur->data x){return pcur;}pcur pcur->next;}return NULL; } 查找节点我们是通过看数据域来查找的&#xff0c;查…

【MySQL05】【 undo 日志】

文章目录 一、前言二、undo 日志&#xff08;回滚日志&#xff09;1. 事务 id2. undo 日志格式2.1 INSERT 对应的 undo 日志2.2 DELETE 对应的 undo 日志2.3 UPDATE 对应的 undo 日志2.3.1 不更新主键2.3.2 更新主键 2.3 增删改操作对二级索引的影响2.4 roll_pointer 3. FIL_PA…

layui项目中的layui.define、layui.config以及layui.use的使用

第一步:创建一个layuiTest项目&#xff0c;结构如下 第二步&#xff1a;新建一个test.js,利用layui.define定义一个模块test,并向外暴露该模块&#xff0c;该模块里面有两个方法method1和method2. 第三步&#xff1a;新建一个test.html&#xff0c;在该页面引入layui.js&#x…

neo4j 图数据库:Cypher 查询语言、医学知识图谱

neo4j 图数据库&#xff1a;Cypher 查询语言、医学知识图谱 Cypher 查询语言创建数据查询数据查询并返回所有节点查询并返回所有带有特定标签的节点查询特定属性的节点及其所有关系和关系的另一端节点查询从名为“小明”的节点到名为“小红”的节点的路径 更新数据更新一个节点…

python爬虫和用腾讯云API接口进行翻译并存入excel,通过本机的Windows任务计划程序定时运行Python脚本!

项目场景&#xff1a; 提示&#xff1a;这里简述项目相关背景&#xff1a;定时爬取外网的某个页面&#xff0c;并将需要的部分翻译为中文存入excel 接下了的&#xff0c;没学过的最好看一下 基本爬虫的学习 【爬虫】requests 结合 BeautifulSoup抓取网页数据_requests beauti…

Vue CoreVideoPlayer 一款基于 vue.js 的轻量级、优秀的视频播放器组件

大家好,我是程序视点的小二哥!今天小二哥给大家推荐一款非常优秀的视频播放组件 效果欣赏 介绍 Vue-CoreVideoPlayer 一款基于vue.js的轻量级的视频播放器插件。 采用Adobd XD进行UI设计&#xff0c;支持移动端适配,不仅功能强大&#xff0c;颜值也是超一流&#xff01; Vue-…

第一次构建一个对话机器人流程解析(二)

1. 问答机器人的组成-基于知识图谱的搜索 在教育场景下&#xff0c;若学生有关于学习内容的提问&#xff0c;或业务层面的提问&#xff0c;则要求问答机器人的回答必须精准&#xff0c;来满足业务的要求因此需要通过知识图谱来快速检索&#xff0c;所提内容的相关信息&#xf…

数字系统与进制转换

数字系统 数字逻辑是计算机科学的基础&#xff0c;它研究的是如何通过逻辑门电路&#xff08;与门、或门、非门等&#xff09;实现各种逻辑功能。数字系统则是由数字逻辑电路组成的系统&#xff0c;可以实现各种复杂的运算和控制功能。在计算机科学中&#xff0c;数字逻辑和数…