55、Flink 中使用 Java Lambda 表达式详解

1)概述
1.注意

Flink 支持对 Java API 的所有算子使用 Lambda 表达式,但是,当 Lambda 表达式使用 Java 泛型时,需要 显式 地声明类型信息。

2.示例和限制

示例: map() 函数使用 Lambda 表达式计算输入值的平方。

不需要声明 map() 函数的输入 i 和输出参数的数据类型,因为 Java 编译器会对它们做出推断。

env.fromElements(1, 2, 3)
// 返回 i 的平方
.map(i -> i*i)
.print();

由于 OUTInteger 而不是泛型,所以 Flink 可以从方法签名 OUT map(IN value) 的实现中自动提取出结果的类型信息。

但像 flatMap() 这样的函数,它的签名 void flatMap(IN value, Collector out) 被 Java 编译器编译为 void flatMap(IN value, Collector out)。Flink 就无法自动推断输出的类型信息了。

Flink 很可能抛出如下异常:

org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing.In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface.Otherwise the type has to be specified explicitly using type information.

此时需要 显式 指定类型信息,否则输出将被视为 Object 类型,这会导致低效的序列化。

DataStream<Integer> input = env.fromElements(1, 2, 3);// 必须声明 collector 类型
input.flatMap((Integer number, Collector<String> out) -> {StringBuilder builder = new StringBuilder();for(int i = 0; i < number; i++) {builder.append("a");out.collect(builder.toString());}
})
// 显式提供类型信息
.returns(Types.STRING)
// 打印 "a", "a", "aa", "a", "aa", "aaa"
.print();

当使用 map() 函数返回泛型类型的时候也会发生类似的问题。下面示例中的方法签名 Tuple2<Integer,Integer> map(Integer value) 被擦除为 Tuple2 map(Integer value)

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;env.fromElements(1, 2, 3).map(i -> Tuple2.of(i, i))    // 没有关于 Tuple2 字段的信息.print();

解决方式如下

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;// 使用显式的 ".returns(...)"
env.fromElements(1, 2, 3).map(i -> Tuple2.of(i, i)).returns(Types.TUPLE(Types.INT, Types.INT)).print();// 使用类来替代
env.fromElements(1, 2, 3).map(new MyTuple2Mapper()).print();public static class MyTuple2Mapper extends MapFunction<Integer, Tuple2<Integer, Integer>> {@Overridepublic Tuple2<Integer, Integer> map(Integer i) {return Tuple2.of(i, i);}
}// 使用匿名类来替代
env.fromElements(1, 2, 3).map(new MapFunction<Integer, Tuple2<Integer, Integer>> {@Overridepublic Tuple2<Integer, Integer> map(Integer i) {return Tuple2.of(i, i);}}).print();// 也可以像这个示例中使用 Tuple 的子类来替代
env.fromElements(1, 2, 3).map(i -> new DoubleTuple(i, i)).print();public static class DoubleTuple extends Tuple2<Integer, Integer> {public DoubleTuple(int f0, int f1) {this.f0 = f0;this.f1 = f1;}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/39672.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

大学新生人工智能学习路线规划

1. 引言 七月来临&#xff0c;各省高考分数已揭榜完成。而高考的完结并不意味着学习的结束&#xff0c;而是新旅程的开始。对于有志于踏入IT领域的高考少年们&#xff0c;这个假期是开启探索IT世界的绝佳时机。作为该领域的前行者和经验前辈&#xff0c;我愿意为准新生们提供一…

基于Hadoop平台的电信客服数据的处理与分析③项目开发:搭建基于Hadoop的全分布式集群---任务10:Hive安装部署

任务描述 任务内容为安装并配置在Hadoop集群中使用Hive。 任务指导 Hive是一个基于Hadoop的数据仓库框架&#xff0c;在实际使用时需要将元数据存储在数据库中 具体安装步骤如下&#xff1a; 1. 安装MySQL数据库&#xff08;已安装&#xff09; 2. 解压缩Hive的压缩包 3…

剪映 v5.5 Pro Vip解锁版:使用指南与注意事项

摘要&#xff1a;本文介绍了剪映Pro VIP解锁版的使用方法&#xff0c;包括安装、测试和使用VIP素材的步骤&#xff0c;以及如何避免误报和保持解锁状态的建议。 正文&#xff1a; 剪映Pro是一款广受欢迎的视频编辑软件&#xff0c;提供了丰富的视频编辑功能和大量高质量的素材…

发送微信消息和文件

参考&#xff1a;https://www.bilibili.com/video/BV1S84y1m7xd 安装&#xff1a; pip install PyOfficeRobotimport PyOfficeRobotPyOfficeRobot.chat.send_message(who"文件传输助手", message"你好&#xff0c;我是PyOfficeRobot&#xff0c;有什么可以帮助…

RabbitMQ中java实现队列和交换机的声明

java实现队列和交换机的声明 在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时&#xff0c;队列和交换机是程序员定义的&#xff0c;将来项目上线&#xff0c;又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来&#xff0c;…

【PYG】 PyTorch中size方法和属性

在 PyTorch 中&#xff0c;size 方法和属性用于获取张量的维度信息。下面是它们的用法和区别&#xff1a; node_features.size&#xff1a; 这是一个属性&#xff08;attribute &#xff09;&#xff0c;返回一个 torch.Size 对象&#xff0c;表示张量的维度。这是不可调用的&a…

用MySQL+node+vue做一个学生信息管理系统(一):配置项目

先用npm init -y生成配置文件 在项目下新建src文件夹&#xff0c;app.js文件。src目录用来放静态资源文件&#xff0c;app.js是服务器文件&#xff0c;index.js是vue的入口文件 使用npm install express下载express框架 在app.js文件夹开启node服务&#xff0c;监听的端口为…

C++ //练习 14.29 为什么不定义const版本的递增和递减运算符?

C Primer&#xff08;第5版&#xff09; 练习 14.29 练习 14.29 为什么不定义const版本的递增和递减运算符&#xff1f; 环境&#xff1a;Linux Ubuntu&#xff08;云服务器&#xff09; 工具&#xff1a;vim 解释&#xff1a; 递增和递减要改变对象本身&#xff0c;const类…

Go语言--运算符

算术运算符 关系运算符 不能写0<a<10&#xff0c;要判断必须0<a&&a<10。因为int和bool不兼容 逻辑运算符 位运算符 赋值运算符 其他 运算符的优先级

AcWing 1254:找树根和孩子

【题目来源】https://www.acwing.com/problem/content/1256/【题目描述】 给定一棵树&#xff0c;输出树的根root&#xff0c;孩子最多的结点max以及他的孩子。【输入格式】 第一行&#xff1a;n&#xff0c;m&#xff0c;表示树的节点数和边数。 以下m行&#xff1a;每行两个结…

浮点数在内存中的存储结构

浮点数在内存中的存储可以参考《IEEE754标准》https://people.eecs.berkeley.edu/~wkahan/ieee754status/IEEE754.PDF 参考博文&#xff1a;IEEE754详解&#xff08;最详细简单有趣味的介绍&#xff09;-CSDN博客 单精度float占内存4字节&#xff0c;最高位bit31表示符号位&…

国家海岸线变化评估:新英格兰和中大西洋沿岸海岸线的历史变化

National Assessment of Shoreline Change: Historical Shoreline Change along the New England and Mid-Atlantic Coasts 国家海岸线变化评估&#xff1a;新英格兰和中大西洋沿岸海岸线的历史变化 摘要 海滩侵蚀是美国许多公海沿岸的一个长期问题。随着沿岸人口的不断增加…

永辉超市购物卡有什么用?

感觉现在在超市买东西&#xff0c;还不如网购 这不&#xff0c;端午的时候&#xff0c;朋友送的永辉卡&#xff0c;一直没时间去用&#xff0c;我总担心过期 但是去了超市后&#xff0c;又不知道买什么&#xff0c;最后空手而归 还好收卡云可以回收永辉卡&#xff0c;两张三…

《C++20设计模式》适配器模式经验分享

文章目录 一、前言二、对于接口的讨论三、实现1、对象适配器1.1 UML类图1.2 实现 2、类适配器 四、最后 一、前言 从适配器模式开始就是类的组合聚合&#xff0c;类与类之间结构性的问题了。 适配器模式解决的问题&#xff1a; 适配器模式能够在不破坏现有系统结构的情况下&a…

mapreduce实现bean的序列化与反序列化

目录 序列化&#xff08;Serialization&#xff09; 反序列化&#xff08;Deserialization&#xff09; 事例操作 UserSale 重写序列化方法 重写反序列化 重写toString方法 SaleMapper SaleReducer SaleDriver 序列化&#xff08;Serialization&#xff09; 序列化是将…

【后端面试题】【中间件】【NoSQL】MongoDB的配置服务器、复制机制、写入语义和面试准备

MongoDB的配置服务器 引入了分片机制之后&#xff0c;MongoDB启用了配置服务器(config server) 来存储元数据&#xff0c;这些元数据包括分片信息、权限控制信息&#xff0c;用来控制分布式锁。其中分片信息还会被负责执行查询mongos使用。 MongoDB的配置服务器有一个很大的优…

WPF----自定义滚动条ScrollViewer

滚动条是项目当中经常用到的一个控件&#xff0c;大部分对外项目都有外观的需求&#xff0c;因此需要自定义&#xff0c;文中主要是针对一段动态的状态数据进行展示&#xff0c;并保证数据始终在最新一条&#xff0c;就是需要滚动条滚动到底部。 1&#xff0c;xaml中引入 <…

zxing-cpp+OpenCV根据字符串生成条形码

编译构建 需要使用到 CMake、Git、GCC 或 MSVC。 github 链接&#xff1a;https://github.com/zxing-cpp/zxing-cpp 编译之前请确保&#xff1a; 确保安装了 CMake 版本 3.15 或更高版本。 确保安装了与 C17 兼容的编译器(最低VS 2019 16.8 / gcc 7 / clang 5)。 编译构建…

Python面试宝典第4题:环形链表

题目 给你一个链表的头节点 head &#xff0c;判断链表中是否有环。如果存在环 &#xff0c;则返回 true 。 否则&#xff0c;返回 false 。 如果链表中有某个节点&#xff0c;可以通过连续跟踪 next 指针再次到达&#xff0c;则链表中存在环。 为了表示给定链表中的环&#xf…

重写父类方法、创建单例对象 题目

题目 JAVA27 重写父类方法分析&#xff1a;代码&#xff1a; JAVA28 创建单例对象分析&#xff1a;代码&#xff1a; JAVA27 重写父类方法 描述 父类Base中定义了若干get方法&#xff0c;以及一个sum方法&#xff0c;sum方法是对一组数字的求和。请在子类 Sub 中重写 getX() 方…