Spark内置图像数据源初探

概述

    在Apache Spark 2.4中引入了一个新的内置数据源, 图像数据源.用户可以通过DataFrame API加载指定目录的中图像文件,生成一个DataFrame对象.通过该DataFrame对象,用户可以对图像数据进行简单的处理,然后使用MLlib进行特定的训练和分类计算.
    本文将介绍图像数据源的实现细节和使用方法.

简单使用

    先通过一个例子来简单的了解下图像数据源使用方法. 本例设定有一组图像文件存放在阿里云的OSS上, 需要对这组图像加水印,并压缩存储到parquet文件中. 废话不说,先上代码:

  // 为了突出重点,代码简化图像格式相关的处理逻辑def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]")val spark = SparkSession.builder().config(conf).getOrCreate()val imageDF = spark.read.format("image").load("oss://<bucket>/path/to/src/dir")imageDF.select("image.origin", "image.width", "image.height", "image.nChannels", "image.mode", "image.data").map(row => {val origin = row.getAs[String]("origin")val width = row.getAs[Int]("width")val height = row.getAs[Int]("height")val mode = row.getAs[Int]("mode")val nChannels = row.getAs[Int]("nChannels")val data = row.getAs[Array[Byte]]("data")Row(Row(origin, height, width, nChannels, mode,markWithText(width, height, BufferedImage.TYPE_3BYTE_BGR, data, "EMR")))}).write.format("parquet").save("oss://<bucket>/path/to/dst/dir")}def markWithText(width: Int, height: Int, imageType: Int, data: Array[Byte], text: String): Array[Byte] = {val image = new BufferedImage(width, height, imageType)val raster = image.getData.asInstanceOf[WritableRaster]val pixels = data.map(_.toInt)raster.setPixels(0, 0, width, height, pixels)image.setData(raster)val buffImg = new BufferedImage(width, height, imageType)val g = buffImg.createGraphicsg.drawImage(image, 0, 0, null)g.setColor(Color.red)g.setFont(new Font("宋体", Font.BOLD, 30))g.drawString(text, width/2, height/2)g.dispose()val buffer = new ByteArrayOutputStreamImageIO.write(buffImg, "JPG", buffer)buffer.toByteArray}

从生成的parquet文件中抽取一条图像二进制数据,保存为本地jpg,效果如下:

图1 左图为原始图像,右图为处理后的图像

你可能注意到两个图像到颜色并不相同,这是因为Spark的图像数据将图像解码为BGR顺序的数据,而示例程序在保存的时候,没有处理这个变换,导致颜色出现了反差.

实现初窥

下面我们深入到spark源码中来看一下实现细节.Apache Spark内置图像数据源的实现代码在spark-mllib这个模块中.主要包括两个类:

  • org.apache.spark.ml.image.ImageSchema
  • org.apache.spark.ml.source.image.ImageFileFormat

其中,ImageSchema定义了图像文件加载为DataFrame的Row的格式和解码方法.ImageFileFormat提供了面向存储层的读写接口.

格式定义

一个图像文件被加载为DataFrame后,对应的如下:

  val columnSchema = StructType(StructField("origin", StringType, true) ::StructField("height", IntegerType, false) ::StructField("width", IntegerType, false) ::StructField("nChannels", IntegerType, false) ::// OpenCV-compatible type: CV_8UC3 in most casesStructField("mode", IntegerType, false) ::// Bytes in OpenCV-compatible order: row-wise BGR in most casesStructField("data", BinaryType, false) :: Nil)val imageFields: Array[String] = columnSchema.fieldNamesval imageSchema = StructType(StructField("image", columnSchema, true) :: Nil)

如果将该DataFrame打印出来,可以得到如下形式的表:

+--------------------+-----------+------------+---------------+----------+-------------------+
|image.origin        |image.width|image.height|image.nChannels|image.mode|image.data         |
+--------------------+-----------+------------+---------------+----------+-------------------+
|oss://.../dir/1.jpg |600        |343         |3              |16        |55 45 21 56  ...   |
+--------------------+-----------+------------+---------------+----------+-------------------+

其中:

  • origin: 原始图像文件的路径
  • width: 图像的宽度,单位像素
  • height: 图像的高度,单位像素
  • nChannels: 图像的通道数, 如常见的RGB位图为通道数为3
  • mode: 像素矩阵(data)中元素的数值类型和通道顺序, 与OpenCV的类型兼容
  • data: 解码后的像素矩阵

提示: 关于图像的基础支持,可以参考如下文档: Image file reading and writing

加载和解码

图像文件通过ImageFileFormat加载为一个Row对象.

// 文件: ImageFileFormat.scala
// 为了简化说明起见,代码有删减和改动
private[image] class ImageFileFormat extends FileFormat with DataSourceRegister {......override def prepareWrite(sparkSession: SparkSession,job: Job,options: Map[String, String],dataSchema: StructType): OutputWriterFactory = {throw new UnsupportedOperationException("Write is not supported for image data source")}override protected def buildReader(sparkSession: SparkSession,dataSchema: StructType,partitionSchema: StructType,requiredSchema: StructType,filters: Seq[Filter],options: Map[String, String],hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {    ......(file: PartitionedFile) => {......val path = new Path(origin)val stream = fs.open(path)val bytes = ByteStreams.toByteArray(stream)val resultOpt = ImageSchema.decode(origin, bytes) // <-- 解码 val filteredResult = Iterator(resultOpt.getOrElse(ImageSchema.invalidImageRow(origin)))......val converter = RowEncoder(requiredSchema)filteredResult.map(row => converter.toRow(row))......}}}
}

从上可以看出:

  • 当前的图像数据源实现并不支持保存操作;
  • 图像数据的解码工作在ImageSchema中完成.

下面来看一下具体的解码过程:

// 文件: ImageSchema.scala
// 为了简化说明起见,代码有删减和改动
private[spark] def decode(origin: String, bytes: Array[Byte]): Option[Row] = {// 使用ImageIO加载原始图像数据val img = ImageIO.read(new ByteArrayInputStream(bytes))if (img != null) {// 获取图像的基本属性val isGray = img.getColorModel.getColorSpace.getType == ColorSpace.TYPE_GRAYval hasAlpha = img.getColorModel.hasAlphaval height = img.getHeightval width = img.getWidth// ImageIO::ImageType -> OpenCV Typeval (nChannels, mode) = if (isGray) {(1, ocvTypes("CV_8UC1"))} else if (hasAlpha) {(4, ocvTypes("CV_8UC4"))} else {(3, ocvTypes("CV_8UC3"))}// 解码val imageSize = height * width * nChannels// 用于存储解码后的像素矩阵val decoded = Array.ofDim[Byte](imageSize)if (isGray) {// 处理单通道图像...} else {// 处理多通道图像var offset = 0for (h <- 0 until height) {for (w <- 0 until width) {val color = new Color(img.getRGB(w, h), hasAlpha)// 解码后的通道顺序为BGR(A)decoded(offset) = color.getBlue.toBytedecoded(offset + 1) = color.getGreen.toBytedecoded(offset + 2) = color.getRed.toByteif (hasAlpha) {decoded(offset + 3) = color.getAlpha.toByte}offset += nChannels}}}// 转换为一行数据Some(Row(Row(origin, height, width, nChannels, mode, decoded)))}}

从上可以看出:

  • 本数据源在实现上使用javax的ImageIO库实现各类格式的图像文件的解码.ImageIO虽然是一个十分强大和专业的java图像处理库,但是和更专业的CV库(如OpenCV)比起来,性能上和功能上差距还是非常大的;
  • 解码后的图像通道顺序和像素数值类型是固定的, 顺序固定为BGR(A), 像素数值类型为8U;
  • 最多支持4个通道,因此像多光谱遥感图像这类可能包含数十个波段信息的图像就无法支持了;
  • 解码后输出的信息仅包含基本的长宽、通道数和模式等字段,如果需要获取更为详细元数据,如exif,GPS坐标等就爱莫能助了;
  • 数据源在生成DataFrame时执行了图像的解码操作,并且解码后的数据存储在Java堆内内存中.这在实际项目应该是一个比较粗放的实现方式,会占用大量的资源,包括内存和带宽(如果发生shuffle的话,可以考虑参考同一个图像文件保存为BMP和JPG的大小差别).

编码和存储

从上分析可以看出,当前图像数据源并不支持对处理后的像素矩阵进行编码并保存为指定格式的图像文件.

图像处理能力

当前版本Apache Spark并没有提供面向图像数据的UDF,图像数据的处理需要借助ImageIO库或其他更专业的CV库.

小结

当前Apache Spark的内置图像数据源可以较为方便的加载图像文件进行分析.不过,当前的实现还十分简陋,性能和资源消耗应该都不会太乐观.并且,当前版本仅提供了图像数据的加载能力,并没有提供常用处理算法的封装和实现,也不能很好的支持更为专业的CV垂直领域的分析业务.当然,这和图像数据源在Spark中的定位有关(将图像数据作为输入用于训练DL模型,这类任务对图像的处理本身要求并不多).如果希望使用Spark框架完成更实际的图像处理任务,还有很多工作要做,比如:

  • 支持更加丰富的元数据模型
  • 使用更专业的编解码库和更灵活编解码流程控制
  • 封装面向CV的算子和函数
  • 更高效的内存管理
  • 支持GPU

等等诸如此类的工作,限于篇幅,这里就不展开了.
好了,再多说一句,现在Spark已经支持处理图像数据了(虽然支持有限),那么,视频流数据还会远吗?


原文链接
本文为云栖社区原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/518677.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于Knative开发应用

title: &#xff08;三&#xff09;基于Knative开发应用 目录 安装 Istio安装 Knative玩转 helloworld-goWordPress 实战 创建 Kubernetes 集群 确保 Kubernetes 集群创建的时候已经选择了启用日志服务确保 Kubernetes 集群和 OSS 在一个 regionKubernetes 集群创建的时候需…

jdk1.6 + jsch End of IO Stream Read Algorithm negotiation fail

文章目录方案一&#xff08;jdk升级到jdk8,openSSH配置文件不变,亲测可用&#xff09;方案二&#xff08;保持jdk1.6,修改openSSH配置文件,亲测可用&#xff09;问题描述 环境&#xff1a;jdk1.6 jsch-0.1.52.jar openSSH7.4 升级openSSH到7.4后jsch报错&#xff1a;End of I…

Java-可变参数

public class Demo04 {public static void main(String[] args) {// 调用可变参数的方法printMax(34, 3, 3, 2, 56.5);printMax(new double[]{1, 2,4, 3});}public static void printMax(double... numbers) {if (numbers.length 0){System.out.println("没有传递参数&qu…

Oracle应用迁移到AnalyticDB for PostgreSQL指导

AnalyticDB for PostgreSQL&#xff08;简称&#xff1a;ADB for PG&#xff09;对Oracle语法有着较好的兼容&#xff0c;本文介绍如何将Oracle应用迁移到AnalyticDB for PostgreSQL。 1 PL/SQL PL/SQL&#xff08;Procedural Language/SQL&#xff09;是一种过程化的SQL语言…

生产环境使用HBase,你必须知道的最佳实践

来源 | 阿丸笔记封图| CSDN 下载于视觉中国前面&#xff0c;我们已经打下了很多关于HBase的理论基础&#xff0c;今天&#xff0c;我们主要聊聊在实际开发使用HBase中&#xff0c;需要关注的一些最佳实践经验。Schema设计七大原则1&#xff09;每个region的大小应该控制在10G到…

消息点击率翻倍的背后——闲鱼无侵入可扩展IFTTT系统

作者&#xff1a;闲鱼技术-剑辛 一、面临问题 在闲鱼生态里&#xff0c;用户之间会有很多种关系。其中大部分关系是由买家触发&#xff0c;联系到卖家&#xff0c;比如买家通过搜索、收藏、聊天等动作与卖家产生联系&#xff1b;另外一部分是平台与用户之间的关系。对这些关系…

2019阿里云618大促主会场全攻略

2019阿里云618大促活动已经于6月16日正式开启&#xff0c;从已开放的活动页面来看&#xff0c;整场大促活动由爆款拼团、满额最高返6000、上云接力赛分享集赞赢6.18万大奖三大活动组成。 在618这个年中的大幅度优惠促销日&#xff0c;怎样才能花最少的钱配置最特惠的云服务&am…

Redis-6.2.5 安装 Linux环境(单机)

文章目录1. 安装依赖环境2. 升级GCC3. 在线下载4. 解压5. 编译6. 安装7. 前台启动8. 后台启动9. 配置开机启动10. 常用命令11. 评析1. 安装依赖环境 yum install -y gcc-c autoconf automaker2. 升级GCC 这里说明一下&#xff0c;在编译之前&#xff1a;在编译之前需要升级gcc…

Java-递归

public class Demo05 {public static void main(String[] args) {System.out.println(f(5));}// 5! 5*4*3*2*1 阶乘public static int f(int n){if (n1){return 1;} else {return n*f(n-1);}} }递归特别消耗资源&#xff0c;如果嵌套太多层就不建议使用了 https://www.bilibi…

为什么说优秀架构师往往是一个悲观主义者?

阿里妹导读&#xff1a;18年前&#xff0c;200家企业由于在事故中信息系统遭到严重破坏而永远地关闭了。这样的事故引发了后人深思&#xff0c;对于工程师而言&#xff0c;不仅要求设计的系统足够强壮&#xff0c;还需要具备考虑失败的能力&#xff0c;当失败场景悉数被考虑周全…

石锤!今年Python要过苦日子了? 程序员:我疯了!

Python的好日子到头了&#xff1f;Python终于要回归现实了&#xff1f;所有程序员&#xff0c;刚刚一份报告把Python的真相撕开了&#xff01;不信你看&#xff1a;Python今年要跑路&#xff1f;三份报告炸出真相....「人生苦短&#xff0c;钱多事少&#xff0c;快用Python」&a…

Redis 基本数据类型试炼

文章目录1. String 类型2. 散列hashes3. 列表lists&#xff08;双向链表&#xff09;4. 集合set(自动去重)5. 有序集合sorted(自动去重)1. String 类型 # 设置单个值 set key value# 获取单个值 get key# 设置多个值 mset key1 value1 key2 value2 。。。# 获取多个值 mget …

ECS事件通知之创建失败事件

ECS提供了批量实例创建接口&#xff0c;可以一次调用创建最多100台实例。批量创建接口可以完成批量实例的创建、启动、IP分配等流程&#xff0c;可以快速完成实例资源的扩容。 在实例的创建过程中&#xff08;实际后台异步创建&#xff09;&#xff0c;库存和VSwitch中私网IP的…

安装 kivy

系统&#xff1a;Windows10 64位 python 3.7.6 最好管理员身份打开 命令行窗口pip安装 kivy 依赖 ——在 windows 命令行中&#xff0c;执行以下命令 pip3 install docutils pygments pypiwin32 kivy.deps.sdl2 kivy.deps.glew -i http://pypi.douban.com/simple --trusted-h…

HSF/Dubbo序列化时的LocalDateTime, Instant的性能问题

来源 在对Dubbo新版本做性能压测时&#xff0c;无意中发现对用例中某个TO&#xff08;Transfer Object&#xff09;类的一属性字段稍作修改&#xff0c;由Date变成LocalDateTime&#xff0c;结果是吞吐量由近5w变成了2w&#xff0c;RT由9ms升指90ms。 在线的系统&#xff0c;拼…

Java-数组的声明与创建

public class ArrayDemo01 {// 变量的类型 变量的名字 变量的值&#xff1b;public static void main(String[] args) {// 数组定义方式有两种&#xff0c;但是推荐第一个int[] nums; // 1. 声明一个数组 // int nums1[];nums new int[10]; // 2. 创建一个数组// …

云上快速搭建Serverless AI实验室

Serverless Kubernetes和ACK虚拟节点都已基于ECI提供GPU容器实例功能&#xff0c;让用户在云上低成本快速搭建serverless AI实验室&#xff0c;用户无需维护服务器和GPU基础运行环境&#xff0c;极大降低AI平台运维的负担&#xff0c;显著提升整体计算效率。 如何使用GPU容器实…

Vim快速移动光标至行首和行尾

1、 需要按行快速移动光标时&#xff0c;可以使用键盘上的编辑键Home&#xff0c;快速将光标移动至当前行的行首。除此之外&#xff0c;也可以在命令模式中使用快捷键"^"&#xff08;即Shift6&#xff09;或0&#xff08;数字0&#xff09;。 2、 如果要快速移动光标…

十分钟上手 ES 2020 新特性

作者 | 浪里行舟责编 | 郭芮ES2020 是 ECMAScript 对应 2020 年的版本。这个版本不像 ES6 (ES2015)那样包含大量新特性。但也添加了许多有趣且有用的特性。本文的代码地址&#xff1a;https://github.com/ljianshu/Blog本文以简单的代码示例来介绍 ES2020新特性。这样&#xff…