MaxCompute(ODPS)上处理非结构化数据的Best Practice

摘要: 随着MaxCompute(ODPS)2.0的上线,新增的非结构化数据处理框架也推出一系列的介绍文章,包括 MaxCompute上如何访问OSS数据, 基本功能用法和整体介绍,侧重介绍读取OSS数据进行计算处理; 本文:MaxCompute(ODPS)上处理非结构化数据的Best Practice。

随着MaxCompute(ODPS)2.0的上线,新增的非结构化数据处理框架也推出一系列的介绍文章,包括

1、MaxCompute上如何访问OSS数据, 基本功能用法和整体介绍,侧重介绍读取OSS数据进行计算处理;

2、MaxCompute上处理非结构化数据的Best Practice。 基于非结构化框架实现原理,提供一些最佳实践总结;

3、MaxCompute访问TableStore(OTS) 数据, 着重介绍通过非结构化框架来访问计算KV(TableStore/OTS)数据;

4、MaxCompute到OSS的非结构化数据输出(及图像处理实例):介绍了非结构化输出功能,并通过图像处理等范例,说明怎样通过MaxCompute的计算能力,打通整个OSS -> MaxCompute -> OSS的数据处理闭环;

5、如何在MaxCompute上处理存储在OSS上的开源格式数据, 介绍对于存储在OSS上的常见开源数据(ORC, PARQUET, AVRO等)格式,如何通过非结构化框架进行处理。

本文是这系列中的第【2】篇。

 前言

随着MaxCompute(原ODPS)非结构化数据处理框架的推出,在SQL线上打通了MaxCompute与OSS数据之间的计算数据连接生态,我们看到了视频,图像,音频以及基因,气象等各种各种各样数据在MaxCompute平台上实现了与传统结构化数据的无缝融合。之前我们提供了在MaxCompute非结构化框架处理OSS上数据的整体介绍,在基本功能实现后,我们收到用户许多关于优化和怎样最好的使用非结构化功能的问题。 这里通过分析非结构化框架底层的一些实现原理以及我们看到的一些使用场景,提供一些关于Best Practice的总结,方便大家更有效的在MaxCompute中处理各种数据。

1. 数据在OSS上的存储

1.1 OSS LOCATION 的选择

MaxCompute通过在EXTERNAL TABLE上的LOCATION cluase来指定需要处理的OSS数据地址【注:本文假设用户对于非结构化框架,包括EXTERNABLE TABLE, StorageHanlder等的定义等都有比较好的了解,相关细节这里不再具体说明。 有疑问可以先参考之前的基本功能介绍】。其中LOCATION将指向一个OSS的一个目录(或者更准确的说,是一个以‘/’结尾的地址),其中LOCATION为标准URI格式:

LOCATION 'oss://${endpoint}/${bucket}/${userPath}/'

 对于数据安全比较敏感的场景,比如在多用户场景或者公共云上,则推荐采用上述方式,不再LOCATION上使用AK,而是通过STS/RAM体系事先进行鉴权(参见基本功能介绍)。

LOCATION的选择有几点要注意:

  • 不允许使用oss的root bucket作为LOCATION, 也就是说${userPath}不可以为空,这个要求源自OSS对root bucket下存放内容的一些限制。
  • LOCATION不能指向一个单独文件,也就是说,类似oss://oss-cn-hangzhou.aliyuncs.com/mybucket/directory/data.csv 这种LOCATION是无效的。 如果只有一个文件要处理,则应该提供该文件的父目录。

1.2 数据文件的存储和处理:小文件和大文件

在分布式计算系统中,文件的大小对于整个系统的运行效率,性能等都有比较大的相关性。 这里对MaxCompute对非结构化数据的相关处理机制做一个介绍,并分析几种有代表性的场景(e.g., 小文件和大文件),总结了几个针对MaxCompute计算场景中,比较好的OSS文件存储建议。

  • 小文件:通常小文件往往伴随着超大的文件数目,这对于分布式计算系统来说,有两个问题:

    1. 大的文件数,会导致在进行文件分片时, 获取文件宏信息的overhead较大,导致planning和分片比较耗时,比如一个100万个文件的oss LOCATION, planning的耗时可能在分钟以上的量级。
    2. 打开每个OSS文件是有ovehead的,碎片化的小文件会带来额外的读取开销。 比如从OSS读取1000个10KB大小的文件,相比读取一个10MB的的文件,耗时可能在10倍以上。 对大量小文件的访问将带来整个分布式系统更多的网络开销,降低实际上有效的IO throughput。

    所以总体上不推荐在一个OSS目录中存放过多的文件。 可以从另一个方面,考虑将Externable Table做partition,尽量在partition的子粒度上进行数据处理。 另外,在适用的场景下,可以考虑使用tar文件,比如把多个图像文件打在一个tar文件中再保存到OSS上面。 如果是文本文件,MaxCompute的built-in StorageHandler (比如com.aliyun.odps.CsvStorageHandler或者com.aliyun.odps.TsvStorageHandler) 是能自动从tar文件中读取数据的。 如果用户自己定义的StorageHandler/Extractor,也可以在用户代码中使用Java中的tar处理类,比如直接使用Apache common 的TarArchiveInputStream来访问。

  • 大文件:与小文件相对的,是另外一个极端: 超大文件。 分布式系统的精髓是分而治之的思想:对数据进行分片,通过并发处理多个分片来加快海量数据的处理。 在极限情况下,如果海量数据存在一个无法被切割处理的单个文件中,那并发度就被降成为1,这样子的“分布式系统”就失去了意义。 即使没有那么极端,多个超大文件(比如每个几十GB),对分布式系统也是不友好的:大的文件处理可能需要单独占用大量系统资源,给资源调度带来困难,另外还容易造成长尾,失败重跑代价过高等问题。 所以从MaxCompute处理计算的角度,也不推荐在OSS上使用超大文件保存数据。

总结一下, 作为一个整体上的指导原则,MaxCompute非结构框架推荐如下比较理想的OSS数据存储方案:

  1. 数据文件根据应用特性,分文件夹存储,不推荐一个文件夹中存储10万以上个文件。 可以考虑使用tar打包多个文件来作为降低物理文件数目的方法。

  2. 比较适中的文件大小以及均匀分布的数据文件,能更合理的使用各种系统资源, 从而提高分布式处理效率。 对MaxCompute非结构化框架而言,单个文件大小在1MB-2GB是比较理想的情况。

1.3 MaxCompute访问OSS的网络连通以及速度

MaxComput和OSS作为独立的分布式计算和存储服务,在不同的部署集群上的网络连通性有可能影响MaxCompute访问OSS的数据的可达性。 网络的连通性整体服从七网隔离的原则,具体一点来说有几点:

  1. MaxCompute的公共云集群上的计算应该访问OSS的外部集群,另外推荐需要访问的OSS集群与MaxCompute计算集群在物理上尽量靠近。关于OSS公共云上的访问域名以及对应数据中心可以参考OSS文档。

在MaxCompute并发访问OSS的情况下,一个需要特别注意的是OSS具有限流机制,默认情况下一个OSS账号的访问流量是限制在5Gb/s,也就是600MB/s左右。 在MaxComput的高并发度下(比如1000个以上的计算节点),OSS数据下载的速度可能将不再受限于单机网络速度,而取决与OSS的总体流量限速。 在这种情况下,完全可能出现单个计算节点的下载速度低于1MB/s。 当然OSS的限流是可以特别配置的,如果有超大量的数据计算需求,可以联系OSS团队调高对应账户的具体的限流上限。

2. 在用户自定义StorageHandler/Extractor中对输入数据的处理

除了提供几个内置的StorageHandler用来处理CSV, TSV以及Apache ORC文件以外,MaxCompute同时开发了非结构化Java SDK来方便用户对数据进行解析和处理。 通过这样的方法,扩展整个非结构化数据处理的生态,对接视频,图像,音频,基因,气象等数据处理的能力。 简单的来说, MaxCompute封装了分布式系统的细节,使用Java InputStream 的一个增强子类来将做输入数据与用户代码的对接。 这样的接口设计区别于Hive的SerDe, RowFormatter等多层封装,提供了更自然的完全非结构化数据入口, 用户能获得原始数据流,用类似单机程序相似的逻辑进行处理。 当然,基于分布式系统的处理原则,还是有一些Best Practice推荐用户遵守。

2.1 输入数据流的处理模式

对于输入数据流(InputStream),推荐在获取数据bytes后能直接在内存中直接处理。 最理想的情况是,能针对输入数据做流式的“边读边计算”的处理。 当然,对于某些数据格式,由于数据本身的特性,很难做到完全的流式处理:比如对于某些图片/音频数据格式,一张文件必须完全读入才能获得正确的编码信息以及其他特性,那这种情况下,在文件本身不是很大的情况下,可以把文件完全读入本地内存,再行处理。 效率比较低的一种方式是把数据文件下载到本地,然后再通过FileStream读取本地文件进行处理,这样的处理模式有两个问题:

  1. 作为分布式系统,为了实现资源隔离和保护计算节点的健康度,一般不推荐往本地磁盘写文件(尤其是大文件)。在MaxCompue计算系统上,用户的Java代码对本地文件近些读写操作需要另外申请权限,或者打开隔离选项(总体计算性能会下降)。
  2. 数据写入到本地落盘,再读取,性能上有额外的损耗。
  3. 对于比较大的数据(比如10GB或更大的文件),运算节点的磁盘空间无法做保证,存在磁盘被写爆的可能

2.2 三方库使用

在非结构化数据的处理线上,经常遇到的一个需求是把单机的数据处理机制,通过MaxCompute非结构化数据框架,迁移到分布式系统上执行。 比如希望同过ffmpeg来直接读取视频数据,或者希望通过Netcdf-Java来直接处理气象的netcdf/grib格式数据。 而这些三方库往往有一些共同的特性/局限性,比如

  • 可能是基于C/C++,所以需要通过JNI来运行native代码
  • 可能是面对单机实现,所以数据的入口经常是一个本地的文件地址

在这些情况下,非结构化框架均有对应的方式来支持。 比如在隔离打开的情况下允许JNI的使用,以及通过权限审批允许数据下载到本机临时文件等等。 从长期来讲,MaxCompute框架本身也认同使用native C/C++代码库,来处理各种特定的数据格式,将是无法避免的,所以会从框架本身安全等方面来解决这个问题,但是对于读取数据到本地再做处理,从本质上是一种比较大的额外消耗,还是推荐通过直接处理输入数据的方式来做,比如改动NETCDF-JAVA的实现,把输入接口通过FilePath->FileStream改成直接使用InputStream等。

3. 结语

MaxCompute非结构化框架是随着MaxCompute2.0推出的新功能,除了处理OSS上面的非结构化数据之外,最近也打通了与TableStore(OTS)的数据链路。 框架本身也还在不断的发展和完善,包括和MaxCompute优化器以及和整个UDF框架更紧密的结合和扩展等等。 在这里先从现有系统的实现和我们收到的一些反馈,总结提炼了一些处理非结构化数据的最佳实践,也希望得到更多的反馈,把框架功能做到更优。 后继我们也会结合具体的使用场景,比如城市大脑上的离线视频图像处理等,来提供一些更具体的使用范例。

原文链接

相关推荐