Spring SpEL在Flink中的应用-与FlatMap结合实现数据动态计算

文章目录

  • 前言
  • 一、POM依赖
  • 二、主函数代码示例
  • 三、RichFlatMapFunction实现
  • 总结


前言

SpEL表达式与Flink FlatMapFunction或MapFunction结合可以实现基于表达式的简单动态计算。有关SpEL表达式的使用请参考Spring SpEL在Flink中的应用-SpEL详解
可以将计算表达式放入数据库,对数据进行计算处理,从而实现只需修改表达式不用修改Flink代码的效果。


一、POM依赖

首先在 pom.xml 中加入依赖:

<dependency><groupId>org.springframework</groupId><artifactId>spring-expression</artifactId><version>5.2.0.RELEASE</version>
</dependency>

二、主函数代码示例


import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;import java.text.SimpleDateFormat;public class FlinkSpelFilterDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Row row=Row.of("张三","001",getTimestamp("2016-10-24 21:59:06"),23);Row row2=Row.of("张三","002",getTimestamp("2016-10-24 21:50:06"),33);Row row3=Row.of("张三","003",getTimestamp("2016-10-24 21:51:06"),43);Row row4=Row.of("李四","004",getTimestamp("2016-10-24 21:50:56"),13);Row row5=Row.of("李四","005",getTimestamp("2016-10-24 00:48:36"),53);Row row6=Row.of("李四","006",getTimestamp("2016-10-24 00:48:36"),34);Row row7=Row.of("李四","007",getTimestamp("2016-10-24 00:48:36"),23);Row row8=Row.of("李四","008",getTimestamp("2016-10-24 00:48:36"),26);Row row9=Row.of("李四","009",getTimestamp("2016-10-24 00:48:36"),63);DataStreamSource<Row> source =env.fromElements(row,row2,row3,row4,row5,row6,row7,row8,row9);//spel表达式 //json串数据略...................JSONObject spelConfig=".................................";SingleOutputStreamOperator<Row> stream = source.flatmap(new FlatMapExprFunction (spelConfig));stream .print();env.execute();}private static java.sql.Timestamp getTimestamp(String str) throws Exception {
//		String string = "2016-10-24 21:59:06";SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");java.util.Date date=sdf.parse(str);java.sql.Timestamp s = new java.sql.Timestamp(date.getTime());return s;}

三、RichFlatMapFunction实现


import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** @author gaowc* 基于表达式计算的flatmap*/
public class FlatMapExprFunction extends RichFlatMapFunction<Row, Row> {private static final Logger logger = LoggerFactory.getLogger(FlatMapExprFunction.class);/*** 列名和列的索引map key:列名 value:列索引*/private Map<String, Integer> columnIndexMap;/*** key:输出字段名 value:表达式对象*/private transient Map<String,Expression> expMap;private Integer size;private List<JSONObject> outputColumnList;public FlatMapExprFunction(JSONObject conf){List<JSONObject > columnList = conf.getList(Constants.COLUMN);columnIndexMap = TransformUtil.getColumnIndexMap(columnList);List<JSONObject > outputColumns = conf.getList(Constants.OUTPUT_COLUMN);//将表达式中的占位符替换为row.getField(x)size = outputColumns.size();outputColumnList = new ArrayList<>();for (JSONObject col:outputColumns) {String expr = col.getString("expr");if(StringUtils.isNotBlank(expr)){ExprTokenParser tokenParser = new ExprTokenParser("#{","}",new ColumnTokenHandler(columnIndexMap));String newExpr=tokenParser.parse(expr);logger.info("expr: {} newExpr {}",expr,newExpr);col.set("expr",newExpr);}outputColumnList.add(col);}}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);expMap = new HashMap<>();//初始化expMapfor (JSONObject col:outputColumnList) {logger.info("open col:{}",col);String expr = col.getString("expr");if(StringUtils.isNotBlank(expr)){SpelExpressionParser parser = new SpelExpressionParser();Expression expression = parser.parseExpression(expr);expMap.put(col.getString(Constants.COLUMN_NAME),expression);}}}@Overridepublic void flatMap(Row row, Collector<Row> collector) throws Exception {Row outputRow=new Row(size);//注册自定义函数StandardEvaluationContext conetxt = new StandardEvaluationContext(new SpelMethodUtil());conetxt.setVariable("row",row);for (int i = 0; i < size; i++) {JSONObject col = outputColumnList.get(i);String colName = col.getString(Constants.COLUMN_NAME);Expression expression = expMap.get(colName);Object value = null;if(expression!=null){value = expression.getValue(conetxt);if(value!=null){logger.info("expression.getValue :{}  class {}",value,value.getClass() );}}else{value=row.getField(columnIndexMap.get(colName));}outputRow.setField(i,value);}collector.collect(outputRow);}}

自定义函数类


import org.apache.commons.lang3.StringUtils;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;public class SpelMethodUtil {public static final String TIMESTAMP_FORMAT = "yyyy-MM-dd HH:mm:ss";public static final String DATE_FORMAT = "yyyy-MM-dd";public static final String TIME_FORMAT = "HH:mm:ss";public static Integer compareDate(Date date, String strDate){Integer result;if(date==null&& StringUtils.isBlank(strDate)){return 0;}else{if(date==null || StringUtils.isBlank(strDate)){return -2;}}String trimDate=strDate.trim();String format = findFormat(trimDate);Date date2 = stringToDate(trimDate, format);result=date.compareTo(date2);return result;}public static Integer compareDate(Date first, Date second){if(first==null&& second==null){return 0;}else{if(first==null || second==null){return -2;}}return first.compareTo(second);}public static Date stringToDate(String dateStr,String format){SimpleDateFormat sdf = new SimpleDateFormat(format);Date date=null;try {date= sdf.parse(dateStr);} catch (ParseException e) {e.printStackTrace();}return date;}/*** 查找与输入的字符型日期相匹配的format* @param strDate* @return*/public static String findFormat(String strDate){String result=null;String trimDate=strDate.trim();int len=trimDate.length();String dateRegex = "";if(len==TIMESTAMP_FORMAT.length()){dateRegex = "^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}$";if(trimDate.matches(dateRegex)){result=TIMESTAMP_FORMAT;}}else if(len==DATE_FORMAT.length()){dateRegex = "^\\d{4}-\\d{2}-\\d{2}$";if(trimDate.matches(dateRegex)){result=DATE_FORMAT;}}else if(len==TIME_FORMAT.length()){dateRegex = "^\\d{2}:\\d{2}:\\d{2}$";if(trimDate.matches(dateRegex)){result=TIME_FORMAT;}}else{throw  new RuntimeException("不可识别的日期格式!"+strDate);}return result;}public static Integer addAge(Integer age){return age+4;}
}

总结

以上只是简单的示例,在实际应用中可以将运算表达式放到数据库,将计算规则放入缓存定时刷新。大家可以根据实际需求进行扩展。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/647408.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RabbitMQ工作模式 - 简单模式和work工作模式多个竞争的消费者

RabbitMQ 是一个消息队列中间件&#xff0c;用于在分布式系统中进行消息传递。在 RabbitMQ 中&#xff0c;有几种工作模式&#xff0c;其中简单模式和工作模式是其中两种基本的模式之一。 简单模式&#xff08;Simple Mode&#xff09;&#xff1a; 在简单模式中&#xff0c;有…

【github】github打开慢或者打不开解决方案

目录 1、打开hosts文件&#xff08;C:\Windows\System32\drivers\etc&#xff09; 2、然末尾放入一下两个 IP 地址&#xff1a; 3、替换覆盖原文件 最近github老是打不开&#xff0c;找了一个方法试了一下管用 github网址查询&#xff1a;https://ipaddress.com/website/git…

css 中 flex 布局最后一行实现左对齐

问题 flex 布局最后一行没有进行左对齐显示&#xff1a; <div classparent><div classchild></div><div classchild></div><div classchild></div><div classchild></div><div classchild></div><div…

2022年至2023年广东省职业院校技能大赛高职组“信息安全管理与评估”赛项样题

2022 年至 2023 年广东省职业院校技能大赛高职组“信息安全管理与评估”赛项样题 一、 第一阶段竞赛项目试题 本文件为信息安全管理与评估项目竞赛第一阶段试题&#xff0c;第一阶段内容包 括&#xff1a;网络平台搭建、网络安全设备配置与防护。 本阶段比赛时间为 180 分钟…

VirtualBox如何复制虚拟机

对于vmware或virtual pc虚拟机&#xff0c;要想快速复制几个虚拟机&#xff0c;以便集群使用&#xff0c;方法比较简单&#xff0c;例如直接复制其虚拟机相应的磁盘文件和配置文件即可&#xff0c;例如对于vmware&#xff0c;修改vmx文本文件中的内容如虚拟机名称、磁盘文件路径…

#Uniapp: uni.previewImage(OBJECT) 预览图片

uni.previewImage(OBJECT) 预览图片。 api地址 媒体-图片 示例 handlePreviewImg(current) {const urls this.rightList.map(x > x.icon)uni.previewImage({urls,current})}OBJECT 参数说明 参数名类型必填说明平台差异说明countNumber否最多可以选择的图片张数&#…

【知识---ubuntu和debian之间的关系】

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、ubuntu和debian之间的关系衍生关系&#xff1a;Debian作为上游源&#xff1a;软件包管理&#xff1a;版本发布&#xff1a;社区和支持&#xff1a; 二、ubu…

华为数通方向HCIP-DataCom H12-831题库(判断题:121-140)

第121题 BGP/MPLS IP VPN内层采用MP-BGP分配的标签区分不同的VPN实例,外层可采用多种隧道类型,例如GRE隧道。 正确 错误 答案: 错误 解析: VPN业务的转发需要隧道来承载,隧道类型包括GRE隧道、LSP隧道、TE隧道(即CR-LSP)。 如果网络边缘的PE设备具备MPLS功能,但骨干网核…

林浩然与Hadoop的奇幻数据之旅

林浩然与Hadoop的奇幻数据之旅 Lin Haoran and the Enchanting Data Journey with Hadoop 在一个名为“比特村”的地方&#xff0c;住着一位名叫林浩然的程序员大侠。他并非江湖上常见的武艺高强之人&#xff0c;而是凭借一把键盘、一支鼠标&#xff0c;纵横在大数据的海洋里。…

UI跟随物体的关键是什么?重要吗?

引言 UI的跟随效果 在游戏开发中&#xff0c;UI的跟随效果是提高用户体验和交互性的重要组成部分。 本文将深入介绍如何创建一个高效且可定制的UI跟随目标组件&#xff0c;并分享一些最佳实践。 本文源工程在文末获取&#xff0c;小伙伴们自行前往。 UI跟随物体的关键 UI…

MQ回顾之kafka速通

不定期更新 官网概念自查 官网&#xff1a;Apache Kafka kafka结构 和kafka相关的关键名词有&#xff1a;Producer、Broker、Topic、Partition、Replication、Message、Consumer、Consumer Group、Zookeeper。 各名词解释已经泛滥&#xff0c;如果你想看点不一样的&#xf…

如何设计一个可靠UDP

背景 通信领域存在制约三角&#xff1a;时延&#xff0c;成本和质量。TCP是增大时延和成本来保证通信质量&#xff0c;UDP牺牲了质量保证了时延和成本。一定场景使用RDP可以找到这三之间的平衡点。实现可靠UDP主要有三种方式&#xff1a; 尽力可靠&#xff1a;接收方要求发送…

A 股承担着一个什么功能?

​A 股&#xff1a;中国资本市场的核心角色 A 股&#xff0c;即人民币普通股票&#xff0c;在中国资本市场中扮演着至关重要的角色。它不仅是投资者买卖交易的场所&#xff0c;更是中国经济发展的重要引擎。 首先&#xff0c;A 股为中国的企业提供了融资平台。中国有着庞大的…

从Elasticsearch来看分布式系统架构设计

从Elasticsearch来看分布式系统架构设计 - 知乎 分布式系统类型多&#xff0c;涉及面非常广&#xff0c;不同类型的系统有不同的特点&#xff0c;批量计算和实时计算就差别非常大。这篇文章中&#xff0c;重点会讨论下分布式数据系统的设计&#xff0c;比如分布式存储系统&…

Zookeeper3.5.7源码分析

文章目录 一、Zookeeper算法一致性1、Paxos 算法1.1 概述1.2 算法流程1.3 算法缺陷 2、ZAB 协议2.1 概述2.2 Zab 协议内容 3、CAP理论 二、源码详解1、辅助源码1.1 持久化源码(了解)1.2 序列化源码 2、ZK 服务端初始化源码解析2.1 启用脚本分析2.2 ZK 服务端启动入口2.3 解析参…

鸿蒙入门学习的一些总结

前言 刚开始接触鸿蒙是从2023年开始的&#xff0c;当时公司在调研鸿蒙开发板能否在实际项目中使用。我们当时使用的是OpenHarmony的&#xff0c;基于DAYU/rk3568开发板&#xff0c;最开始系统是3.2的&#xff0c;API最高是API9&#xff0c;DevecoStudio 版本3.1的。 鸿…

List, Set, Queue, Map 四者的区别

List、Set、Queue、Map 是 Java 中常用的集合类型&#xff0c;它们的主要区别如下&#xff1a; List List 是可重复有序的集合。可以通过索引访问 List 中的元素。可以添加、删除、修改 List 中的元素。常用的实现类有 ArrayList 和 LinkedList。 Set Set 是不可重复的无序…

excel统计分析——Duncan法多重比较

参考资料&#xff1a;生物统计学 Duncan法又称新复极差检验法&#xff0c;是对S-N-K法的改进&#xff0c;根据秩次距m对临界值的显著水平α进行调整&#xff0c;是最常用的多重比较方法。最小显著极差表示如下&#xff1a; 其中&#xff0c;m为秩次距&#xff0c;df为方差分析中…

【软件测试】学习笔记-制定一份有效的性能测试方案

什么是性能测试方案&#xff1f; 性能测试方案&#xff0c;通俗一点说就是指导你进行性能测试的文档&#xff0c;包含测试目的、测试方法、测试场景、环境配置、测试排期、测试资源、风险分析等内容。一份详细的性能测试方案可以帮助项目成员明确测试计划和手段&#xff0c;更…

CentOS服务器拒绝SSH登录

当CentOS服务器拒绝SSH登录时&#xff0c;有几个可能的原因和解决方法&#xff1a; 检查网络连接&#xff1a;确保服务器与您的计算机之间的网络连接是正常的。您可以尝试使用其他网络连接或ping服务器以检查是否能够访问。 确认SSH服务正在运行&#xff1a;在服务器上确认SSH…