Spring SpEL在Flink中的应用-与Filter结合实现数据动态分流

文章目录

  • 前言
  • 一、POM依赖
  • 二、主函数代码示例
  • 三、FilterFunction实现
  • 总结


前言

SpEL表达式与Flink fiter结合可以实现基于表达式的灵活动态过滤。有关SpEL表达式的使用请参考Spring SpEL在Flink中的应用-SpEL详解
可以将过滤规则放入数据库,根据不同的数据设置不同的过滤表达式,从而实现只需修改过滤表达式不用修改Flink代码的效果。


一、POM依赖

首先在 pom.xml 中加入依赖:

<dependency><groupId>org.springframework</groupId><artifactId>spring-expression</artifactId><version>5.2.0.RELEASE</version>
</dependency>

二、主函数代码示例


import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;import java.text.SimpleDateFormat;public class FlinkSpelFilterDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Row row=Row.of("张三","001",getTimestamp("2016-10-24 21:59:06"),23);Row row2=Row.of("张三","002",getTimestamp("2016-10-24 21:50:06"),33);Row row3=Row.of("张三","003",getTimestamp("2016-10-24 21:51:06"),43);Row row4=Row.of("李四","004",getTimestamp("2016-10-24 21:50:56"),13);Row row5=Row.of("李四","005",getTimestamp("2016-10-24 00:48:36"),53);Row row6=Row.of("李四","006",getTimestamp("2016-10-24 00:48:36"),34);Row row7=Row.of("李四","007",getTimestamp("2016-10-24 00:48:36"),23);Row row8=Row.of("李四","008",getTimestamp("2016-10-24 00:48:36"),26);Row row9=Row.of("李四","009",getTimestamp("2016-10-24 00:48:36"),63);DataStreamSource<Row> source =env.fromElements(row,row2,row3,row4,row5,row6,row7,row8,row9);//spel表达式,实现日期的比较过滤String spel="compareDate(#row.getField(2), \"2016-10-24 00:48:36\")==0";//实现对数字的过滤
//        spel="#row.getField(3)>33";SingleOutputStreamOperator<Row> filterStream = source.filter(new FilterSpelFunction(spel));filterStream.print();env.execute();}private static java.sql.Timestamp getTimestamp(String str) throws Exception {
//		String string = "2016-10-24 21:59:06";SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");java.util.Date date=sdf.parse(str);java.sql.Timestamp s = new java.sql.Timestamp(date.getTime());return s;}

三、FilterFunction实现


import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import spel.demo.util.SpelMethodUtil;/*** 基于spel 表达式的过滤*/
public class FilterSpelFunction extends RichFilterFunction<Row> {private static final Logger logger = LoggerFactory.getLogger(FilterSpelFunction.class);private transient Expression exp;private String filterExpr;public FilterSpelFunction(String filterSpel) {filterExpr=filterSpel;logger.info("filterExpr:{}",filterExpr);}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);SpelExpressionParser parser = new SpelExpressionParser();exp = parser.parseExpression(filterExpr);}@Overridepublic boolean filter(Row row) throws Exception {try {//注册自定义函数类StandardEvaluationContext conetxt = new StandardEvaluationContext(new SpelMethodUtil());//设置变量conetxt.setVariable("row",row);Boolean value = exp.getValue(conetxt, Boolean.class);if (value == null) {logger.error("表达式结果为null");throw new Exception("表达式结果为null");}return value;}catch (Exception e){logger.error("filter 异常", e);throw e;}}
}

自定义函数类


import org.apache.commons.lang3.StringUtils;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;public class SpelMethodUtil {public static final String TIMESTAMP_FORMAT = "yyyy-MM-dd HH:mm:ss";public static final String DATE_FORMAT = "yyyy-MM-dd";public static final String TIME_FORMAT = "HH:mm:ss";public static Integer compareDate(Date date, String strDate){Integer result;if(date==null&& StringUtils.isBlank(strDate)){return 0;}else{if(date==null || StringUtils.isBlank(strDate)){return -2;}}String trimDate=strDate.trim();String format = findFormat(trimDate);Date date2 = stringToDate(trimDate, format);result=date.compareTo(date2);return result;}public static Integer compareDate(Date first, Date second){if(first==null&& second==null){return 0;}else{if(first==null || second==null){return -2;}}return first.compareTo(second);}public static Date stringToDate(String dateStr,String format){SimpleDateFormat sdf = new SimpleDateFormat(format);Date date=null;try {date= sdf.parse(dateStr);} catch (ParseException e) {e.printStackTrace();}return date;}/*** 查找与输入的字符型日期相匹配的format* @param strDate* @return*/public static String findFormat(String strDate){String result=null;String trimDate=strDate.trim();int len=trimDate.length();String dateRegex = "";if(len==TIMESTAMP_FORMAT.length()){dateRegex = "^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}$";if(trimDate.matches(dateRegex)){result=TIMESTAMP_FORMAT;}}else if(len==DATE_FORMAT.length()){dateRegex = "^\\d{4}-\\d{2}-\\d{2}$";if(trimDate.matches(dateRegex)){result=DATE_FORMAT;}}else if(len==TIME_FORMAT.length()){dateRegex = "^\\d{2}:\\d{2}:\\d{2}$";if(trimDate.matches(dateRegex)){result=TIME_FORMAT;}}else{throw  new RuntimeException("不可识别的日期格式!"+strDate);}return result;}public static Integer addAge(Integer age){return age+4;}
}

总结

以上只是简单的示例,在实际应用中可以将过滤表达式放到数据库,将过滤规则放入缓存定时刷新。大家可以根据实际需求进行扩展。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/645295.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【c++学习】数据结构中的顺序表

c顺序表 数据结构中的顺序表代码 数据结构中的顺序表 顺序表强调数据的存储结构&#xff0c;表示数据在内存中连续存储。&#xff08;线性表与链表相对&#xff0c;链表数据在内存中的存储空间是不连续的&#xff09; 代码 下述代码实现了线性表及其接口 包括增、删、查、改…

《游戏-01_3D-开发》之—人物动画控制器

创建变量&#xff0c; 创建线&#xff0c; 连接&#xff0c; 选中线会变为蓝色&#xff0c;新增变量&#xff0c; 设置线&#xff0c; 双击子层进入子层&#xff0c; 创建变量&#xff0c; 双击SkillPanel 拖拽好之后返回上一层&#xff0c; 依次连接&#xff0c; 设置线&#…

解释LoRA参数

目录 LoRA参数含义 LoRA在深度学习中的作用 示例代码中的LoRA应用 结论 LoRA参数含义 LoRA (lora_r): LoRA代表"Low-Rank Adaptation"&#xff0c;是一种模型参数化技术&#xff0c;用于在不显著增加参数数量的情况下调整预训练模型。lora_r参数指的是LoRA中的秩&…

[Tomcat] [从安装到关闭] MAC部署方式

安装Tomcat 官网下载&#xff1a;Apache Tomcat - Apache Tomcat 9 Software Downloads 配置Tomcat 1、输入cd空格&#xff0c;打开Tomca目录&#xff0c;把bin文件夹直接拖拉到终端 2、授权bin目录下的所有操作&#xff1a;终端输入[sudo chmod 755 *.sh]&#xff0c;回车 …

springboot(ssm干洗店预约洗衣系统 衣物清洗预约系统Java系统

springboot(ssm干洗店预约洗衣系统 衣物清洗预约系统Java系统 开发语言&#xff1a;Java 框架&#xff1a;springboot&#xff08;可改ssm&#xff09; vue JDK版本&#xff1a;JDK1.8&#xff08;或11&#xff09; 服务器&#xff1a;tomcat 数据库&#xff1a;mysql 5.7…

docker: 搭建 harbor 镜像仓库

harbor 企业级内网镜像管理软件&#xff0c;加速拉取镜像速度&#xff0c;web 页面管理方便。 系统优化 systemctl stop NetworkManager systemctl disable NetworkManager iptables -F systemctl restart docker安装docker [roottest05 ~]# yum install -y docker-compose…

【GitHub项目推荐--不错的 Go 学习项目】【转载】

开源实时性能分析平台 Pyroscope 是基于 Go 的开源实时性能分析平台&#xff0c;在源码中添加几行代码 pyroscope 就能帮你找出源代码中的性能问题和瓶颈、CPU 利用率过高的原因&#xff0c;调用树展示帮助你理解程序&#xff0c;支持 Go、Python、Ruby 语言。 Pyroscope 可以…

nestjs之适配器模式的应用

NestJS 是一个用于构建高效、可靠和可扩展的服务器端应用程序的框架。在 NestJS 中&#xff0c;适配器模式&#xff08;Adapter Pattern&#xff09;主要体现在其对不同类型的 HTTP 服务端框架的适配上。NestJS 本身是建立在 Express 或者 Fastify 这样的底层 HTTP 框架之上的&…

openssl3.2/test/certs - 033 - time stamping certificates

文章目录 openssl3.2/test/certs - 033 - time stamping certificates概述笔记END openssl3.2/test/certs - 033 - time stamping certificates 概述 openssl3.2 - 官方demo学习 - test - certs 笔记 /*! \file my_openssl_linux_log_doc_033.txt\note openssl3.2/test/ce…

《解释与话语权》——西游真假美猴王是自导自演吗

解读与话语权 引言 一旦有人或者组织垄断了话语权&#xff0c;那么什么都可以被重新定义和解读&#xff0c;本篇旨在让读者有更多视角的思考。 为什么一个耶和华能分裂成天主教&#xff0c;东正教&#xff0c;新教等&#xff0c;都是对于圣经和圣经故事的不同解读。 红学那…

解析GPT-3、GPT-4和ChatGPT关系-迈向自然语言处理的新高度“

Hello&#xff0c;小索奇&#xff01;很高兴为你拓展关于GPT-3、GPT-4、ChatGPT之间关系的信息&#xff0c;以及解释自然语言模型和Transformer的区别。 首先&#xff0c;GPT-3、GPT-4、ChatGPT都是建立在GPT系列技术基础上的自然语言处理模型。它们在不同的代数、性能和应用场…

k8s的图形化工具---rancher

rancher是一个开源的企业级多集群的k8s管理平台。 rancher和k8s的区别&#xff1a;都是为了容器的调度和编排系统。但是rancher不仅可以调度还可以管理整个k8s集群。 rancher自带监控(普罗米修斯) 实验部署 master01 20.0.0.32 node01 20.0.0.34 node02 20.0.0.35 test …

linux系统mysql8单机多实例+主从复制部署

一、解压mysql压缩包 参考我的另一篇博文&#xff0c;tag.gz解压完并且配置完环境变量即可&#xff0c;暂时不要初始化 linux单机部署mysql(离线环境解压即可)-CSDN博客 二、修改配置文件 可能因为版本不同&#xff0c;我的这个配置可能不是通用的&#xff0c;我安装的是my…

【Java Kubernates】Java调用kubernates提交Yaml到SparkOperator

背景 目前查询框架使用的是trino&#xff0c;但是trino也有其局限性&#xff0c;需要准备一个备用的查询框架。考虑使用spark&#xff0c;spark operator也已经部署到k8s&#xff0c;现在需要定向提交spark sql到k8s的sparkoperator上&#xff0c;使用k8s资源执行sql。 对比 …

linux安装docker--更具官网教程

1.访问https://docs.docker.com/ 2.进入download 3输入cento 或者直接访问地址Install Docker Engine on CentOS | Docker Docs 4一步一步根据官网命令走 2安装 3 4 方式一&#xff1a; service docker start&#xff08;开启&#xff09; service docker status&#xff08…

ubuntu怎么安装docker

sudo apt-get update sudo apt-get install \ ca-certificates \ curl \ gnupg \ lsb-release 添加Docker官方的GPG密钥 curl -fsSL https://mirrors.aliyun.com/docker-ce/linux/ubuntu/gpg | sudo apt-key add -使用以下命令设置稳定存储库。要添加 夜间或测试存储库&…

使用PowerShell命令行,批量修改文件编码

目录 ■前言 ■PowerShell命令 ■效果 ■前言 今天统计修改代码量&#xff0c;使用工具时&#xff0c;发现有些代码无法统计。 原因时UTF-8中有某些特殊字符&#xff0c;工具不能识别。 但是&#xff0c;如果把代码转换为SJIS格式&#xff0c;就能正常统计了。 因此&…

听筒及麦克风电路时序分析

打电话的时候。当没有免提的时候&#xff0c;用的是mic1&#xff0c;麦克风1居然是在J7尾插座子上&#xff0c;所以要把手机的下面贴近嘴巴。mic1的信号给到音频编解码u21&#xff0c;u21通过i2s线给cpu, 然后给基带cpu,然后通过射频发射出去。当要听声音的时候&#xff0c;射频…

【数学建模】插值与拟合

文章目录 插值插值方法用Python解决插值问题 拟合最小二乘拟合数据拟合的Python实现 适用情况 处理由试验、测量得到的大量数据或一些过于复杂而不便于计算的函数表达式时&#xff0c;构造一个简单函数作为要考察数据或复杂函数的近似 定义 给定一组数据&#xff0c;需要确定满…

【软件测试】学习笔记-性能测试场景的分类

性能测试场景的重要程度类似于业务测试的 case&#xff0c;case 是你进行业务测试的指引&#xff0c;case 是否完善也直接决定了测试的覆盖率。同理&#xff0c;场景是传递执行性能测试的步骤和目的&#xff0c;关于这两点是你一定要清楚的。 首先认识下最重要的三个性能场景&…