flink整合java,Flink使用SideOutPut替换Split实现分流

基于apache flink的流处理实时模型

44元

包邮

(需用券)

去购买 >

279e37e460736106fd1baddb12f82507.png

以前的数据分析项目(版本1.4.2),对从Kafka读取的原始数据流,调用split接口实现分流.

新项目决定使用Flink 1.7.2,使用split接口进行分流的时候,发现接口被标记为depracted(后续可能会被移除).

搜索相关文档,发现新版本Flink中推荐使用带外数据进行分流.

预先建立OutputTag实例(LogEntity是从kafka读取的日志实例类).

private static final OutputTag APP_LOG_TAG = new OutputTag<>("appLog", TypeInformation.of(LogEntity.class));

private static final OutputTag ANALYZE_METRIC_TAG = new OutputTag<>("analyzeMetricLog", TypeInformation.of(LogEntity.class));

对kafka读取的原始数据,通过process接口,打上相应标记.

private static SingleOutputStreamOperator sideOutStream(DataStream rawLogStream) {

return rawLogStream

.process(new ProcessFunction() {

@Override

public void processElement(LogEntity entity, Context ctx, Collector out) throws Exception {

// 根据日志等级,给对象打上不同的标记

if (entity.getLevel().equals(ANALYZE_LOG_LEVEL)) {

ctx.output(ANALYZE_METRIC_TAG, entity);

} else {

ctx.output(APP_LOG_TAG, entity);

}

}

})

.name("RawLogEntitySplitStream");

}

// 调用函数,对原始数据流中的对象进行标记

SingleOutputStreamOperator sideOutLogStream = sideOutStream(rawLogStream);

// 根据标记,获取不同的数据流,以便后续进行进一步分析

DataStream appLogStream = sideOutLogStream.getSideOutput(APP_LOG_TAG);

DataStream rawAnalyzeMetricLogStream = sideOutLogStream.getSideOutput(ANALYZE_METRIC_TAG);

通过以上步骤,就实现了数据流的切分.

PS:

如果您觉得我的文章对您有帮助,请关注我的微信公众号,谢谢!

原文链接:https://www.cnblogs.com/jason1990/p/11610130.html

java 11官方入门(第8版)教材

79.84元

包邮

(需用券)

去购买 >

f0f3f55624fb396b1764d42d6df88864.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/453452.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

WCF和webservice的区别

微软论坛的斑竹回答如下&#xff1a; 脑内&#xff1a;果然是高大上啊 1.WebService&#xff1a;严格来说是行业标准&#xff0c;不是技术&#xff0c;使用XML扩展标记语言来表示数据&#xff08;这个是夸语言和平台的关键&#xff09;。微 软的Web服务实现称为ASP.NET Web Ser…

链表和顺序表的一些区别

顺序表与链表是非常基本的数据结构&#xff0c;它们可以被统称为线性表。 线性表&#xff08;Linear List&#xff09;是由 n&#xff08;n≥0&#xff09;个数据元素&#xff08;结点&#xff09;a[0]&#xff0c;a[1]&#xff0c;a[2]…&#xff0c;a[n-1] 组成的有限序列。…

春节期间小游戏同时在线人数最高达2800万人/小时

微信官方发布2018年春节期间微信数据报告&#xff1a;除夕至初五&#xff0c;总共有2,297亿条微信消息&#xff0c;28亿条微信朋友圈成功发出&#xff0c;音视频通话总时长175亿乙分钟。其中&#xff0c;90后用广的消息发送量占总量的42.5%&#xff0c;80后用户25.9%&#xff0…

餐馆的故事-浅析职责链模式

我们在餐馆吃饭的时候&#xff0c;一般都是在拿到菜单后&#xff0c;选择喜欢的菜&#xff0c;然后通知服务员。服务员会将我们的定单交给大厨&#xff0c;大厨可能会亲自去做这道菜&#xff0c;也可能安排给小厨来做&#xff0c;总之&#xff0c;我们不用担心他们没有人做菜&a…

matlab非齐次方程组的通解,用matlab求非齐次线性方程组的通解?

先向大家介绍一下非齐次线性方程组。所谓非齐次线性方程组就是方程组等号右边的常数项不全为零的线性方程组。全部等于零时&#xff0c;就称为齐次线性方程组。下面我们就讲解一下如何利用matlab快速求非齐次线性方程组的通解。工具/材料matlab电脑操作方法01线性方程组Axb的求…

Linux 终端仿真程序Putty

PuTTY是一个Telnet、SSH、rlogin、纯TCP以及串行接口连接软件。较早的版本仅支持Windows平台&#xff0c;现在的版本中开始支持各类Unix平台。 用linux作为桌面系统&#xff0c;身为工程师很多时候需要通过Telnet、SSH协议进行远程管理&#xff0c;通过串口进行设备配置。Putty…

粗识静态链表

为了弥补链表在内存分配上的不足&#xff0c;出现了静态链表这么一个折中的办法。静态链表比较类似于内存池&#xff0c;它会预先分配一个足够长的数组&#xff0c;之后链表节点都会保存在这个数组里&#xff0c;这样就不需要频繁的进行内存分配了。 当然&#xff0c;这个方法的…

php用date语句获取时间,关于php date()函数获取时间的设置和使用方法

date()函数是PHP自带的时间函数&#xff0c;可以获取当前服务器的时间echo date(Y-m-d H:i:s); //输出:2020-05-18 11:02:35date()函数中可以使用的字母含义&#xff1a;a-"am"(上午)或者"pm"(下午)A-"AM"或者"PM"Y-年&#xff0c;显示…

Django_form补充

问题1: 注册页面输入为空&#xff0c;报错&#xff1a;keyError&#xff1a;找不到passworddef clean(self): print("---",self.cleaned_data) # if self.cleaned_data["password"]self.cleaned_data["repeat_password"]: …

WF4.0:NativeActivity中的错误处理

备注&#xff1a;这篇文章的使用环境是.NET framework 4.0 RC 1 在WF4中创建native活动时&#xff0c;NativeActivity是非常强大的。其众多的功能之一是围绕错误处理。 调度子活动的时的基本错误处理。 当NativeActivity执行的时候&#xff0c;它是通过一个NativeActivityConte…

Cadence 电源完整性仿真实践(二)

转载于:http://blog.csdn.net/wu20093346/article/details/38050917 通过以上步骤对每个平面进行了单节点分析并观测了响应曲线&#xff0c;接下来将观测平面对的目标阻抗是否满足要求&#xff0c;通过选择电容器的方法来减小含有电容器阻抗响应曲线中的反谐振波峰。在SigWave窗…

Johnson 全源最短路径算法

解决单源最短路径问题&#xff08;Single Source Shortest Paths Problem&#xff09;的算法包括&#xff1a; Dijkstra 单源最短路径算法&#xff1a;时间复杂度为 O(E VlogV)&#xff0c;要求权值非负&#xff1b; Bellman-Ford 单源最短路径算法&#xff1a;时间复杂度为 O…

Machine Learning 学习笔记1 - 基本概念以及各分类

What is machine learning? 并没有广泛认可的定义来准确定义机器学习。以下定义均为译文&#xff0c;若以后有时间&#xff0c;将补充原英文...... 定义1、来自Arthur Samuel&#xff08;上世纪50年代、西洋棋程序&#xff09; 在进行特定编程的情况下给予计算机学习能力的领域…

蒙特 卡罗方法matlab,蒙特·卡罗方法中的数学之美,你一定不想错过

原标题&#xff1a;蒙特卡罗方法中的数学之美&#xff0c;你一定不想错过有方教育——我们致力于为中学生提供学界和业界前沿的学术科研教育内容&#xff0c;帮助学生参加海外科研项目&#xff0c;在提升申请竞争力的同时&#xff0c;获得领跑优势。一、概述蒙特卡罗方法(Monte…

【 CDN 最佳实践】CDN 命中率优化思路

CDN 在静态资源的加速场景中是将静态资源缓存在距离客户端较近的CDN 节点上&#xff0c;然后客户端访问该资源即可通过较短的链路直接从缓存中获取资源&#xff0c;而避免再通过较长的链路回源获取静态资源。因此 CDN的缓存命中率的高低直接影响客户体验&#xff0c;而保证较高…

Python基础-time and datetime

一、在Python中&#xff0c;通常有这几种方式来表示时间&#xff1a; 时间戳格式化的时间字符串元组&#xff08;struct_time&#xff09;共九个元素。由于Python的time模块实现主要调用C库&#xff0c;所以各个平台可能有所不同。1.时间戳&#xff08;timestamp&#xff09;的…

matlab中欧姆如何表示,在excel中欧姆符号怎么打

在excel中欧姆符号怎么打&#xff0c;相信对于好多熟练用excel的朋友来说&#xff0c;是很简单不过的&#xff0c;但是对于有些初学者来说&#xff0c;就是菜鸟啦&#xff0c;就有点懵懵懂懂的感觉了&#xff0c;毕竟刚接触的东西还没用过嘛。但是&#xff0c;没关系今天笔者就…

zookeeper伪集群(在一台机器上集群)

2019独角兽企业重金招聘Python工程师标准>>> 创建一下的目录结构zookeeper-3.4.10是你下载的zookeeper的解压包 /zookeeper_cluster----/server_one|---/data|myid(文件)|---/datalog|---/zookeeper-3.4.10|---/bin|---/conf|---zoo.cfg|---..... |---/....----/ser…

Spring核心接口之Ordered

一、Ordered接口介绍Spring中提供了一个Ordered接口。从单词意思就知道Ordered接口的作用就是用来排序的。Spring框架是一个大量使用策略设计模式的框架&#xff0c;这意味着有很多相同接口的实现类&#xff0c;那么必定会有优先级的问题。于是Spring就提供了Ordered这个接口&a…

将本地代码上传至github

注册github账号 https://github.com/ 安装git工具 https://git-for-windows.github.io 1.在github中创建一个项目 2.填写相应信息&#xff0c;点击create Repository name: 仓库名称 Description(可选): 仓库描述介绍 Public, Private : 仓库权限&#xff08;公开共享&#xff…