在Hadoop框架源代码org.apache.hadoop.fs包中,都是关于Hadoop文件系统实现的相关类,主要包括文件系统模型的建立,及其在该文件系统定义、实现基本的文件操作。例如给出文件系统抽象,对文件系统上存储的文件执行基本操作进行抽象。
FileSystem类:
(1)实现了文件系统的抽象类,继承自org.apache.hadoop.conf.Configured,并实现Closeable接口,可以适用于多种 文件系统,如本地文件系统file://,ftp,hdfs等。如果要自己实现一个系统可以通过继承这个类(hadoop中 DistributeFileSystem就是这样的),做相应的配置,并实现相应的抽象方法。
(2)抽象类定义了文件系统所具有的基本特征和基本操作。首先从该抽象类的属性定义来看,这些属性描述了文件系统的静态特性。该类中定义了如下属性:
/**************************************************************** * An abstract base class for a fairly generic filesystem. It * may be implemented as a distributed filesystem, or as a "local" * one that reflects the locally-connected disk. The local version * exists for small Hadoop instances and for testing. * * <p> * * All user code that may potentially use the Hadoop Distributed * File System should be written to use a FileSystem object. The * Hadoop DFS is a multi-machine system that appears as a single * disk. It's useful because of its fault tolerance and potentially * very large capacity. * * <p> * The local implementation is {@link LocalFileSystem} and distributed * implementation is DistributedFileSystem. * * * FileSystem抽象类继承自org.apache.hadoop.conf.Configured配置基类,实现了java.io.Closeable接口,通过这一点,可以了解到, * FileSystem抽象类作为一个文件系统的抽象定义,它是可配置的,也就是说可以通过指定的配置文件中的一些配置项来描述一个文件系统, * 实际上,最重要的配置类是org.apache.hadoop.conf.Configuration,org.apache.hadoop.conf.Configured中定义的方法就是对 * org.apache.hadoop.conf.Configuration配置类进行设置或获取,满足一个基于org.apache.hadoop.conf.Configuration配置类的其它类的需要。 * FileSystem抽象类定义了文件系统所具有的基本特征和基本操作 * * *****************************************************************/ public abstract class FileSystem extends Configured implements Closeable { public static final String FS_DEFAULT_NAME_KEY = "fs.default.name"; public static final Log LOG = LogFactory.getLog(FileSystem.class); /** FileSystem cache *//** 文件系统缓存 */ private static final Cache CACHE = new Cache(); /** The key this instance is stored under in the cache. 该文件系统(this)在缓存中的键实例 */ private Cache.Key key; /** Recording statistics per a FileSystem class 记录文件系统类的统计信息的Map * * 上面statisticsTable是一个IdentityHashMap<Class<? extends FileSystem>, Statistics>,键是继承自FileSystem的Class,值是统计信息Statistics类。 * 为了在一个并行计算环境中进行安全的计算,Statistics类使用了java.util.concurrent.atomic包中的原子变量属性,保证线程安全的原子读写操作的同时,提高并行性能 * * */ private static final Map<Class<? extends FileSystem>, Statistics> statisticsTable = new IdentityHashMap<Class<? extends FileSystem>, Statistics>(); /** * The statistics for this file system.该文件系统(this)的统计信息的实例 * 是对当前(this)的FileSystem的统计信息实例。该属性是在该文件系统(this)的实例被构造完成之后被初始化的,通过调用initialize方法实现统计信息初始化 */ protected Statistics statistics; /** * A cache of files that should be deleted when filsystem is closed * or the JVM is exited.当文件系统关闭或者JVM退出以后,需要将缓存中的文件清空。该Set<Path>中的内容是,对缓存中文件的Path,并且是排好序的 */ private Set<Path> deleteOnExit = new TreeSet<Path>(); ...... }
2.1 cache机制
在FileSystem中有一个静态cache类,内含一个 final Map<Key, FileSystem> map(为了能够快速获取FileSystem,这个map采用的是HashMap),用于存储和管理由{scheme,authority,username}作为Key,而以FileSystem引用作为值的键值对。当给定uri调用get获取指定的 FileSystem时,最终还是调用cache.get。cache.get会查找相应的键值对,不存在时调用createFileSystem新建一 个FileSystem并将其插入map中。当本FS关闭时,要将CACHE中对应的键值对删除。由于同一时刻可能要多个client请求访问map,所以需要对访问map的操作进行同步 synchronized(这些都要保证是thread-safe)。
当关闭FS的时候(用户手动关闭或JVM在程序运行结束,ClientFinalizer对文件系统进行关闭),首先会调用cache的closeAll 方法,对map进行清空(先清空cache中其他FS的键值对,再清空本FS的键值对);然后再调用FS的processDeleteOnExit方法对 一些temp目录进行清空。
Cache类
用来缓存文件系统对象。也就是可能存在多个文件系统对象,从而可知,每个文件系统除了管理基于其上的内容之外,还可能要管理缓存的一组文件系统实例,这要看具体的文件系统是如何实现的。当然,也可能是在分布式环境中,一个文件系统管理远程的和本地的文件系统实例。
为了能够快速获取到一个存在于缓存中的文件系统对象,Hadoop采用了Hash算法,将文件系统对象以键值对的方式存储到HashMap中,也就org.apache.hadoop.fs.FileSystem.Cache缓存类定义的map属性
Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
Key是通过一个合法的URI信息与用户名快速获取到缓存中存在的一个文件系统的对象,从而能够获取到指定文件系统中文件信息。
该缓存类提供了3个基本的操作,如下所示:
//根据URI与Configuration,从缓存中取出一个FileSystem实例,要求同步缓存操作 FileSystem get(URI uri, Configuration conf) throws IOException //根据指定的缓存Key实例,从缓存中删除该Key对应的FileSystem实例,要求同步缓存操作。 synchronized void remove(Key key, FileSystem fs) //迭代缓存Map,删除缓存中的缓存的全部文件系统实例,要求同步缓存操作。 synchronized void closeAll() throws IOException
2.2 statisticsTable统计信息管理映射表
statisticsTable是一个IdentityHashMap<Class<? extends FileSystem>, Statistics>,用于保存并管理每个文件系统对应的统计信息Statistics。
Statistics类使用了java.util.concurrent.atomic包中的原子变量属性,保证线程安全的原子读写操作的同时,提高并行性能。如下所示:
private final String scheme; private AtomicLong bytesRead = new AtomicLong(); private AtomicLong bytesWritten = new AtomicLong(); private AtomicInteger readOps = new AtomicInteger(); private AtomicInteger largeReadOps = new AtomicInteger(); private AtomicInteger writeOps = new AtomicInteger();
其中,bytesRead是从统计数据中读取指定数量的字节,加到当前读取字节数上。同理,bytesRead是基于原子写操作的。
2.3 deleteOnExit临时文件集合
FileSystem中deleteOnExit用于保存所有需要在本FS关闭时或JVM退出时需要删掉的文件集合,通过调用processDeleteOnExit将这些路径进行清空。
/** * A cache of files that should be deleted when filsystem is closed * or the JVM is exited.当文件系统关闭或者JVM退出以后,需要将缓存中的文件清空。该Set<Path>中的内容是,对缓存中文件的Path,并且是排好序的 */ private Set<Path> deleteOnExit = new TreeSet<Path>();
for (Iterator<Path> iter = deleteOnExit.iterator(); iter.hasNext();) { Path path = iter.next(); delete(path, true); iter.remove(); }
2.4 文件系统抽象
从FileSystem抽象类“抽象”的切面横向了解一个FileSystem定义了哪些基于文件系统的操作,使我们能够知道如果实现一个基于文件系统,需要实现哪些基本操作。如下所示:
/**获取能够唯一标识一个FileSystem的URI**/ public abstract URI getUri(); /**根据给定的Path f,打开一个文件的FSDataInputStream输入流.f 待打开的文件,bufferSize缓冲区大小**/ public abstract FSDataInputStream open(Path f, int bufferSize) /** * 为写入进程打开一个FSDataOutputStream。 * @param f 待写入的文件 * @param permission 权限 * @param overwrite 是否重写 * @param bufferSize 缓冲区大小 * @param replication 文件的块副本数量 * @param blockSize 块大小 * @param progress 用于报告Hadoop框架工作状况的进程 * @throws IOException */ public abstract FSDataOutputStream create(Path f,FsPermission permission,boolean overwrite,int bufferSize,short replication,long blockSize,Progressable progress) throws IOException; /** * 向一个已经存在的文件中执行追加操作 * @param f 存在的文件 * @param bufferSize 缓冲区大小 * @param progress 报告进程 * @throws IOException */ public abstract FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException; /** * 重命名文件src为dst */ public abstract boolean rename(Path src, Path dst) throws IOException; /** * 删除文件 */ public abstract boolean delete(Path f, boolean recursive) throws IOException; /** * 如果f是一个目录,列出该目录中的文件 */ public abstract FileStatus[] listStatus(Path f) throws IOException; /** * 为给定的文件系统设置当前工作目录 */ public abstract void setWorkingDirectory(Path new_dir); /** * 获取文件系统的当前工作目录 */ public abstract Path getWorkingDirectory(); /** * 创建一个目录 */ public abstract boolean mkdirs(Path f, FsPermission permission) throws IOException;
2.5 文件操作
主要包括文件的创建、读写、重命名、拷贝、删除这几个基本操作:
(1)目录创建
//本地文件系统 默认权限 public boolean mkdirs(Path f) throws IOException { return mkdirs(f, FsPermission.getDefault()); } //分布式文件系统 需指定权限 public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission) throws IOException { boolean result = fs.mkdirs(dir); fs.setPermission(dir, permission); return result; }
(2)文件创建
文件的创建,主要是为了读或写操作而打开一个文件,返回文件的流对象,可以进行流式读写与追加。
public static FSDataOutputStream create(FileSystem fs, Path file, FsPermission permission) throws IOException { // create the file with default permission FSDataOutputStream out = fs.create(file); // set its permission to the supplied one fs.setPermission(file, permission); return out; }
所有重载 create的方法均最终调用了publicabstract FSDataOutputStream create。其中提供了多个create的重载函数。值得注意的是这个函数返回的是一个输出流 FSDataOutputStream(用于将本地的输出到FS(FileSystem,下同)(即 FileSystem) 上),这个流继承自DataOutputStream,可以对文件进行写入。同时其内部还包含了一个静态类 PositionCache(用于保存一个文件的已写入数据的统计类和写入的位置)。
(3)读取文件
public FSDataInputStream open(Path f) throws IOException { return open(f, getConf().getInt("io.file.buffer.size", 4096)); }
FileSystem的open方法返回一个FSDataInputStream,实现了 Seekable、PositionedReadable这两个接口,支持随机访问读取文件,可以通过指定开始读取的位置。
(4)文件的复制、移动
FileSystem中有两种复制:copyFromLocalFile和copyToLocalFile。文件的移动可以看成是文件的复制再将源文件删除。故FileSystem中存在两种相应的文件移动操作moveFromLocalFile和moveToLocalFile。
(a)copyFromLocalFile
(b)copyToLocalFile
(c)moveFromLocalFile
(d)moveToLocalFile
最终调用:
/** Copy files between FileSystems. */ public static boolean copy(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, boolean deleteSource, Configuration conf) throws IOException { return copy(srcFS, src, dstFS, dst, deleteSource, true, conf); }
FileUtil类 将会在下面的专题中详细讲解。
(5)获取块信息对于块,块是组成文件的基本单位,那么给定一个文件,它就应该具有一个块的列表,可以通过getFileBlockLocations方法获取到一个文件对应的块所在主机的列表、所在文件中的偏移位置等信息,返回一个BlockLocation[],它包含了主机名列表、偏移位置、文件大小的信息。
public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException { ….............. if (file.getLen() < start) { return new BlockLocation[0]; } String[] name = { "renrenzhuhui:50010" }; String[] host = { "renrenzhuhui" }; return new BlockLocation[] { new BlockLocation(name, host, 0,file.getLen()) }; }
2.6 重要数据结构
(1)BlockLocation
private String[] hosts; // hostnames of datanodes private String[] names; // hostname:portNumber of datanodes private String[] topologyPaths; // full path name in network topology private long offset; // offset of the of the block in the file private long length;
一个BlockLocation包含了一个文件的一个块的详细信息,包括这个块对应的全部副本(包含它本身) ,比如上述定义的有:所在主机、所在主机及其端口号、在网络拓扑结构中的全路径名称、块在文件中的偏移位置、块长度。显然,这些块副本长度和在文件中的偏移位置都是相同的,可以共享(分别对应length和offset属性),其他三个属性的信息就不相同了(可能存在某两个相同的情况)。
Hadoop文件系统中,一个文件对应多个块(Block),每个块默认大小设置为64M。那么,对于由多个块组成的文件来说,如果想要获取到该文件的全部块及其块副本的信息,就需要通过文件系统中文件的统计信息FileStatus来获取到一个BlockLocation[],该数组中对应的全部快就能够构成完整的该文件。
下面通过形式化语言来表达一下上面的含义:
假设一个文件F由n个块组成,则分别为: B(1),B(2),……,B(n)
假设默认块的大小为BS,那么B(1)~B(n-1)一定是大小相同的块,大小都等于BS,而B(i)<=BS,这是显而易见的。
文件F的每个块B(i)都被存储在指定主机的文件系统中,假设存储到了主机H(i)上。为了快速计算,需要快速定位到文件F的Bi块上,也就是需要进行流式读取获取到,那么F的块B(i)需要有一个记录其详细信息的结构,也就是Hadoop定义的BlockLocation。
假设Bi对应的描述信息对象为BL(i),那么BL(i)就包含了与块B(i)相关的全部块副本的信息,当然每个块副本同样包含与BL(i),相同的描述信息的属性,只是属性值不同而已。
假设文件F对应的块B(i)一共具有m个副本:
BR1(i),BR2(i),……,BRm(i)
这些块副本分别存储在对应如下的主机上:
H1(i),H2(i),……,Hm(i)
这些块副本分别对应指定主机的端口号分别如下:
H1(i):P1(i),H2(i):P2(i),……,Hm(i):Pm(i)
这些块副本对应的拓扑网络中的完整路径分别为:
U1(i),U2(i),……,Um(i)
假设块Bi的长度为LENGTH(i),偏移位置为OFFSET(i),那么,通过该文件的FileStatus获取的BlockLocation[i]的内容,形式化的可以描述为:
new BlockLocation[]{ new String[m]{H1(i), H2(i), ……, Hm(i)}, new String[m]{H1(i):P1(i), H2(i):P2(i), ……, Hm(i):Pm(i)}, new String[m]{U1(i), U2(i), ……, Um(i)}, LENGTH(i), OFFSET(i) }
(2)ContentSummary
对文件、目录统计的实体,记录文件或是目录的元信息。
(3)FileChecksum
对文件进行校验和,实现类MD5MD5CRC32FileChecksum
/** Return true if both the algorithms and the values are the same. */ public boolean equals(Object other) { …........... return this.getAlgorithmName().equals(that.getAlgorithmName()) && Arrays.equals(this.getBytes(), that.getBytes()); }
(4)FileStatus
系统文件信息实体:
private Path path; // 文件路径 private long length; // 文件长度 private boolean isdir; // 是否是目录 private short block_replication; // 块副本因子 private long blocksize; // 块大小 private long modification_time; // 修改时间 private long access_time; // 访问时间 private FsPermission permission; //在指定文件系统中的操作权限 private String owner; // 文件属主 private String group; // 所属组
(5)PositionedReadable
PositionedReadable提高了从某个位置开始读的方法(一个read方法和两个readFully方法)
(6)Seekable
Seekable提供了可以在流中定位的能力(seek,getPos和seekToNewSource)
(7)FSDataInputStream、FSDataOutputStream
FSDataInputStream类:实现了Seekable与PositionedReadable接口,赋予了Hadoop文件系统中的文件输入流分别能够进行流式搜索与定位流式读取的语义。
FSDataOutputStream:创建一个FSDataOutputStream.PositionCache缓冲流对象以后,可以向该文件输出缓冲流中写入相关的数据,作为缓冲使用,其中相关数据包括:文件系统(FileSystem)统计信息 FileSystem.Statistics、当前待写入流的位置。每当需要向文件系统中写入数据,都会从 PositionCache缓冲流中获取到一个写入位置(也就是,要从流中的该位置开始写入)。 FSDataOutputStream输出流类的通过一个PositionCache缓冲流来构造一个 FSDataOutputStream输出流对象.
(8)FSInputChecker、FSInputStream
FSInputChecker在FSInputStream的基础上,加入了HDFS中需要的校验功能。校验在readChecksumChunk中实现,并在内部的read1方法中调用。所有的read调用,最终都是使用read1读数据并做校验。如果校验出错,抛出异常ChecksumException。它将接口seekable和PositionedReadable混插到类中。seekable提供了可以在流中定位的能力(seek,getPos和seekToNewSource),而PositionedReadable提高了从某个位置开始读的方法(一个read方法和两个readFully方法)。
(9)Trash
回收站,提供moveToTrash(Path path),并通过内部Emptier线程实时清理