HDFS中的回收站

在Linux操作系统下面,如果用户删除了某一个文件或者是某一个文件目录,操作系统并不会把这文件从文件系统中真正删除,而是先把它放入回收站中,这样在用户误操作的情况下还能找回原文件,以防止给用户造成中大损失。实际上,HDFS也为用户提供了类似这样的一个功能,但是这样的功能只限于用户在客户端的脚本操作,也就是HDFS的shell命令,而当用户写程序调用HDFS的API时,NameNode并不会把删除的文件或目录放入回收站Trash中,所以这一点请用户们要注意了。下面就来具体的谈谈HDFS是如何来实现这个回收站的功能的吧。

           首先,HDFS在客户端为用户提供了对文件系统的命令行操作,这个命令行操作是通过FsShell来实现的。当用户通过命令-rm/-rmr删除文件系统中的一个文件或者是目录的时候,HDFS并没有真正的删除这个文件或目录,而是把这个文件/目录移动到HDFS的系统回收站目录下。和Linux系统的回收站设计一样,HDFS会为每一个用户创建一个回收站目录:/user/用户名/.Trash/。这里就有一个问题了,经过长时间之后,删除的文件/目录占用了该文件系统大量的存储空间,而这些删除文件/目录确实已经真的对用户来说没有任何用处,那么HDFS是如何处理这个问题的呢?

            对于上面的问题,HDFS给出的一个解决方案是:每一个被用户通过Shell删除的文件/目录,在系统回收站中都一个周期,也就是当系统回收站中的文件/目录在一段时间之后没有被用户回复的话,HDFS就会自动的把这个文件/目录彻底删除,之后,用户就永远也找不回这个文件/目录了。在HDFS内部的具体实现就是在NameNode中开启了一个后台线程Emptier,这个线程专门管理和监控系统回收站下面的所有文件/目录,对于已经超过生命周期的文件/目录,这个线程就会自动的删除它们,不过这个管理的粒度很大。另外,用户也可以手动清空回收站,清空回收站的操作和删除普通的文件目录是一样的,只不过HDFS会自动检测这个文件目录是不是回收站,如果是,HDFS当然不会再把它放入用户的回收站中了。

       在HDFS中,用户的回收站用类org.apache.Hadoop.fs.Trash来表示,这个类主要包含四个属性:

HDFS中的回收站

fs:当前用户使用的文件系统;

trash:用户的回收站目录(/user/用户名/.Trash);

current:存放被用户删除的文件/目录的路径(/user/用户名/.Trash/current);

interval:被用户删除的文件/目录在回收站中的生命期;

在默认的情况下,interval的被Hadoop设置为0,即关闭了用户的回收站,所以我强烈的建议用户在读完本文之后赶紧在配置文件中设置fs.trash.interval的值(单位是minute),同时这个设置只与客户端和NameNode节点都相关,但是在客户端设置这个值,只能说明是开启了用户的回收站功能,而不能决定删除的文件/目录在回收站中的生命期,在NameNode节点设置这个值只能说是开启了自动清空所有用户回收站的功能。所以当某一个用户删除一个文件/目录时,HDFS就把这个待删除的文件/目录移动到该用户对应的回收站目录/user/用户名/.Trash/current中。另外,如果用户的回收站中已经存在了用户当前删除的文件/目录,则HDFS会将这个当前被删除的文件/目录重命名,命名规则很简单就是在这个被删除的文件/目录名后面紧跟一个编号(从1开始知道没有重名为止)。

       还有,NameNode是通过后台线程来定时清空所有用户回收站中的文件/目录的,与之相对应的类是org.apache.hadoop.fs.Trash.Emptier,它每隔interval分钟就清空一次用户回收站。具体的操作步骤是,先检查用户回收站目录/user/用户名/.Trash下的所有yyMMddHHmm形式的目录,然后删除寿命超过interval的目录,最后将当前存放删除的文件/目录回收站目录/user/用户名/.Trash/current重命名为一个/user/用户名/.Trash/yyMMddHHmm,相关的源代码是:
  1. /** Create a trash checkpoint. */  
  2.   public void checkpoint() throws IOException {  
  3.     if (!fs.exists(current))                      // no trash, no checkpoint   
  4.       return;  
  5.   
  6.     Path checkpoint;  
  7.     synchronized (CHECKPOINT) {  
  8.       checkpoint = new Path(trash, CHECKPOINT.format(new Date()));  
  9.     }  
  10.   
  11.     if (fs.rename(current, checkpoint)) {  
  12.       LOG.info("Created trash checkpoint: "+checkpoint.toUri().getPath());  
  13.     } else {  
  14.       throw new IOException("Failed to checkpoint trash: "+checkpoint);  
  15.     }  
  16.   }  
  17.   
  18.   /** Delete old checkpoints. */  
  19.   public void expunge() throws IOException {  
  20.     FileStatus[] dirs = fs.listStatus(trash);            // scan trash sub-directories   
  21.     if( dirs == nullreturn;  
  22.     long now = System.currentTimeMillis();  
  23.     for (int i = 0; i < dirs.length; i++) {  
  24.       Path path = dirs[i].getPath();  
  25.       String dir = path.toUri().getPath();  
  26.       String name = path.getName();  
  27.       if (name.equals(CURRENT.getName()))  continue;      // skip current           
  28.   
  29.       long time;  
  30.       try {  
  31.         synchronized (CHECKPOINT) {  
  32.           time = CHECKPOINT.parse(name).getTime();  
  33.         }  
  34.       } catch (ParseException e) {  
  35.         LOG.warn("Unexpected item in trash: "+dir+". Ignoring.");  
  36.         continue;  
  37.       }  
  38.   
  39.       if ((now - interval) > time) {  
  40.         if (fs.delete(path, true)) LOG.info("Deleted trash checkpoint: "+dir);  
  41.         else LOG.warn("Couldn't delete checkpoint: "+dir+" Ignoring.");  
  42.       }  
  43.     }  
  44.   }  
  45.   
  46.   
  47. private static final PathHOMES = new Path("/user/");  
  48. public void run() {  
  49.       if (interval == 0)  return;    // trash disabled   
  50.         
  51.       long now = System.currentTimeMillis();  
  52.       long end;  
  53.       while (true) {  
  54.         end = ceiling(now, interval);  
  55.         try {                                     // sleep for interval   
  56.           Thread.sleep(end - now);  
  57.         } catch (InterruptedException e) {  
  58.           return;                                 // exit on interrupt   
  59.         }      
  60.         try {  
  61.           now = System.currentTimeMillis();  
  62.           if (now >= end) {  
  63.             FileStatus[] homes = null;  
  64.             try {  
  65.               homes = fs.listStatus(HOMES);         // list all home dirs   
  66.             } catch (IOException e) {  
  67.               LOG.warn("Trash can't list homes: "+e+" Sleeping.");  
  68.               continue;  
  69.             }  
  70.             if (homes == nullcontinue;  
  71.             for (FileStatus home : homes) {         // dump each trash   
  72.               if (!home.isDir()) continue;  
  73.               try {  
  74.                 Trash trash = new Trash(home.getPath(), conf);  
  75.                 trash.expunge();  
  76.                 trash.checkpoint();  
  77.               } catch (IOException e) {  
  78.                 LOG.warn("Trash caught: "+e+". Skipping "+home.getPath()+".");  
  79.               }  
  80.             }  
  81.           }  
  82.         } catch (Exception e) {  
  83.           LOG.warn("RuntimeException during Trash.Emptier.run() " + StringUtils.stringifyException(e));  
  84.         }  
  85.       }  
  86.     }  

相关推荐