Java版Flink使用指南——定制RabbitMQ数据源的序列化器

大纲

  • 新建工程
    • 新增依赖
    • 数据对象
    • 序列化器
    • 接入数据源
  • 测试
    • 修改Slot个数
    • 打包、提交、运行
  • 工程代码

在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们从RabbitMQ队列中读取了字符串型数据。如果我们希望读取的数据被自动化转换为一个对象,则需要定制序列化器。本文我们就将讲解数据源序列化器的定制方法。

新建工程

我们在IntelliJ中新建一个工程SourceSerializer。
Archetype填入:org.apache.flink:flink-quickstart-java
版本填入与Flink的版本:1.19.1
在这里插入图片描述

新增依赖

在pom.xml中新增RabbitMQ连接器

		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version></dependency>

新增Json库依赖

		<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.17.1</version></dependency>

新增lombok库,主要是为了使用它的一些注解

        <dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.32</version><scope>provided</scope></dependency>

数据对象

我们新建一个简单的数据对象SampleData
src/main/java/org/example/vo/SampleData.java

package org.example.vo;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class SampleData {private Long id;private String name;private int age;private Boolean married;private Double salary;public String toJson() throws JsonProcessingException {ObjectMapper mapper = new ObjectMapper();return mapper.writeValueAsString(this);}public static SampleData fromJson(String json) throws JsonProcessingException {ObjectMapper mapper = new ObjectMapper();return mapper.readValue(json, SampleData.class);}
}

这个方法包含两个方法,一个是将SampleData 转换成字符串,另一个是将字符串转成SampleData 对象。

序列化器

我们定义的数据源序列化器要实现AbstractDeserializationSchema接口,主要是通过deserialize方法将二进制数组转换成SampleData 对象。

src/main/java/org/example/serializer/SampleDataRabbitMQSourceSerializer.java

package org.example.serializer;import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.example.vo.SampleData;import java.io.IOException;public class SampleDataRabbitMQSourceSerializer extends AbstractDeserializationSchema<SampleData> {@Overridepublic SampleData deserialize(byte[] message) throws IOException {return SampleData.fromJson(new String(message));}@Overridepublic boolean isEndOfStream(SampleData nextElement) {return false;}@Overridepublic TypeInformation<SampleData> getProducedType() {return TypeInformation.of(SampleData.class);}
}

接入数据源

我们在《Java版Flink使用指南——定制RabbitMQ的Sink序列化器》一文中,往data.to.rbtmq对了写入了大量SampleData 数据。这次我们将其作为数据源来做测试
这次我们在创建RMQSource时传入序列化器SampleDataRabbitMQSourceSerializer。它会将从RabbitMQ获取的数据转换成SampleData对象。
然后我们获取所有“已婚”(filter.getMarried() == true)的数据,将其打印到日志中。

		String queueName = "data.to.rbtmq";String host = "172.21.112.140"; // IP of the rabbitmq serverint port = 5672;String username = "admin";String password = "fangliang";String virtualHost = "/";int parallelism = 1;// create a RabbitMQ sourceRMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSource<SampleData> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SampleDataRabbitMQSourceSerializer());final DataStream<SampleData> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);stream.filter(filter -> filter.getMarried() == true).print().name(username + "'s sink to stdout").setParallelism(parallelism);

测试

修改Slot个数

由于我们要运行两个流式计算任务,于是需要两个Slot。

vim conf/config.yaml 

将numberOfTaskSlots的值改成2。

打包、提交、运行

我们将本例和《Java版Flink使用指南——定制RabbitMQ的Sink序列化器》中的包都提交运行
在这里插入图片描述
然后在日志中可以看到“已婚”的数据都在输出

 tail -f log/*

在这里插入图片描述

工程代码

https://github.com/f304646673/FlinkDemo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/43286.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

智慧科技照亮水利未来:深入剖析智慧水利解决方案如何助力水利行业实现高效、精准、可持续的管理

目录 一、智慧水利的概念与内涵 二、智慧水利解决方案的核心要素 1. 物联网技术&#xff1a;构建全面感知网络 2. 大数据与云计算&#xff1a;实现数据高效处理与存储 3. GIS与三维可视化&#xff1a;提升决策支持能力 4. 人工智能与机器学习&#xff1a;驱动决策智能化 …

LibreOffice的国内镜像安装地址和node.js国内快速下载网站

文章目录 1、LibreOffice1.1、LibreOffice在application-conf.yml中的配置2、node.js 1、LibreOffice 国内镜像包网址&#xff1a;https://mirrors.cloud.tencent.com/libreoffice/libreoffice/ 1.1、LibreOffice在application-conf.yml中的配置 jodconverter:local:enable…

Java面试八股之MySQL中int(10)和bigint(10)能存储读的数据大小一样吗

MySQL中int(10)和bigint(10)能存储读的数据大小一样吗 在MySQL中&#xff0c;int(10)和bigint(10)的数据存储能力并不相同&#xff0c;尽管括号内的数字&#xff08;如10&#xff09;看起来似乎暗示着某种关联&#xff0c;但实际上这个数字代表的是显示宽度&#xff0c;而不是…

vue学习day03-指令修饰符、v-bind对于样式控制的增强、v-model应用于其他表单元素

7、指令修饰符 &#xff08;1&#xff09;概念&#xff1a; 通过“.”指明一些指令后缀&#xff0c;不同后缀封装了不同的处理操作->简化代码 &#xff08;2&#xff09;按键修饰符 keyup.enter->键盘回车监听 &#xff08;3&#xff09;v-model修饰符 v-model.tri…

vue + element ui 实现侧边栏导航栏折叠收起

首页布局如下 要求点击按钮,将侧边栏收缩, 通过 row 和 col 组件&#xff0c;并通过 col 组件的 span 属性我们就可以自由地组合布局。 折叠前 折叠后 <template><div class"app-layout" :class"{ collapse: app.isFold }"><div class&…

Onekey正版steam分流下载工具

今天给大家介绍的是一款下载steam游戏的工具。Onekey工具&#xff0c;是一款游戏下载器&#xff0c;可以下载steam正版分流游戏。下载正版分流的网站很多&#xff0c;但是都是网盘或者迅雷下载&#xff0c;或者游戏盒子下载&#xff0c;速度都很慢。这款软件是用steam下载的&am…

Flask项目搭建及部署 —— Python

flask搭建及部署 pip 19.2.3 python 3.7.5 Flask 1.1.1 Flask-SQLAlchemy 2.4.1 Pika 1.1.0 Redis 3.3.11 flask-wtf 0.14.2 1、创建flask项目&#xff1a; 创建完成后整个项目结构树&#xff1a; app.py: 项⽬管理⽂件&#xff0c;通过它管理项⽬。 static: 存放静态…

2021版本的idea热部署的详细步骤

背景&#xff1a;我是自己用的是2021版本的idea,然后发现跟2023版本的热部署不太一样&#xff0c;所以&#xff0c;今天自己出一期这样的文章吧&#xff01;&#xff01;&#xff01;其他人配置的时候根据自己的情况&#xff0c;来阅读吧&#xff01; 第一步&#xff1a;方式一…

MyBatis是如何分页的及原理

MyBatis 是一种持久层框架&#xff0c;支持通过配置文件和注解将 SQL 映射为 Java 对象。在实际开发中&#xff0c;查询数据时经常需要进行分页处理。 MyBatis 也提供了支持分页的方案&#xff0c;其主要思路是使用 Limit 偏移量和限制个数&#xff0c;来获取指定数量的数据。下…

音视频入门基础:H.264专题(10)——FFmpeg源码中,存放SPS属性的结构体和解码SPS的函数分析

一、引言 FFmpeg源码对AnnexB包装的H.264码流解码过程中&#xff0c;通过ff_h2645_extract_rbsp函数拿到该H.264码流中的某个NALU的NALU Header RBSP后&#xff08;具体可以参考&#xff1a;《FFmpeg源码&#xff1a;ff_h2645_extract_rbsp函数分析》&#xff09;&#xff0c…

【沐风老师】3DMAX建筑体块生成插件BuildingBlocks使用方法详解

BuildingBlocks建筑体块生成插件使用方法详解 听说你还在手动建配景楼&#xff1f;有了BuildingBlocks这个插件&#xff0c;一分钟搞定喔&#xff01; 3DMAX建筑体块生成插件BuildingBlocks&#xff0c;用于快速自定义街道及生成配景楼区块。 【适用版本】 3dMax2019及更高版…

分布式I/O从站的认知

为什么需要分布式I/O从站&#xff1f; 当PLC与控制机构距离过远时&#xff0c;远距离会带来信号干扰&#xff0c;分布式I/O从站只需要一个网络线缆连接。 ET200分布式I/O从站家族 体积紧凑、功能强大。 ET200SP ET200M ET200S ET200iSP ET200 AL ET200pro ET200 eco PN 通讯协议…

DSSM双塔特征交互

传统的DSSM双塔无法在早期进行user和item侧的特征交互&#xff0c;这在一定程度上降低了模型性能。我们想要对双塔模型进行细粒度的特征交互&#xff0c;同时又不失双塔模型离线建向量索引的解耦性。下面介绍两篇这方面的工作。 美团-Dual Augmented Two-tower 在user和item的特…

1. CSS Grid 网格布局教程

CSS Grid 网格布局教程 一、概述 网格布局&#xff08;Grid&#xff09;是最强大的 CSS 布局方案。 它将网页划分成一个个网格&#xff0c;可以任意组合不同的网格&#xff0c;做出各种各样的布局。以前&#xff0c;只能通过复杂的 CSS 框架达到的效果&#xff0c;现在浏览器…

Scrapy crawl spider 停止工作

Scrapy是一个用于爬取网站数据的流行框架&#xff0c;有时爬虫可能会停止工作&#xff0c;这通常是由多种原因引起的。以下是一些常见问题及其解决方法&#xff1a; 1、问题背景 用户在使用 Scrapy 0.16.2 版本进行网络爬取时遇到问题&#xff0c;具体表现为爬虫在运行一段时间…

Android 开发中 C++ 和Java 日志调试

在 C 中添加堆栈日志 先在 Android.bp 中 添加 ‘libutilscallstack’ shared_libs:["liblog"," libutilscallstack"]在想要打印堆栈的代码中添加 #include <utils/CallStack.h> using android::CallStack;// 在函数中添加 int VisualizerLib_Crea…

生物素结合金纳米粒子(Bt@Au-NPs ) biotin-conjugated Au-NPs

一、定义与特点 定义&#xff1a;生物素结合金纳米粒子&#xff0c;简称BtAu-NPs或biotin-conjugated Au-NPs&#xff0c;是指通过特定的化学反应或物理方法将生物素修饰到金纳米粒子表面&#xff0c;形成稳定的纳米复合材料。 特点&#xff1a; 高稳定性&#xff1a;生物素的修…

【VUE基础】VUE3第七节—Vue Router路由基础

Vue Router 是 Vue 官方的客户端路由解决方案。 客户端路由的作用是在单页应用 (SPA) 中将浏览器的 URL 和用户看到的内容绑定起来。当用户在应用中浏览不同页面时&#xff0c;URL 会随之更新&#xff0c;但页面不需要从服务器重新加载。 Vue Router 基于 Vue 的组件系统构建&…

LabVIEW在半导体自动化测试中的应用

半导体制造的复杂性和精密度要求极高&#xff0c;每一个生产步骤都需要严格的控制和监测。自动化测试设备在半导体制造中起到了关键作用&#xff0c;通过精密测量和数据分析&#xff0c;确保产品质量和生产效率。本文介绍如何使用LabVIEW结合研华硬件&#xff0c;开发一个用于半…

C语言编程3:运算符,运算符的基本用法

C语言3&#x1f525;&#xff1a;运算符&#xff0c;运算符的基本用法 一、运算符&#x1f33f; &#x1f387;1.1 定义 运算符是指进行运算的动作&#xff0c;比如加法运算符"“&#xff0c;减法运算符”-" 算子是指参与运算的值&#xff0c;这个值可能是常数&a…