2018年第23周-大数据的HDFS数据流及操作例子

上周已经把Hadoop的HDFS的架构和设计大概说了下,也有部署过程。在这周讲的HDFS的数据流及操作例子

HDFS数据流

HDFS系统依赖于以下服务
1.NameNode
2.DataNode
3.JournalNode
4.zkfc

其中JournalNode和zkfc是用来做高可用的。
那么数据流将在客户端、NameNode和DataNode之间进行流转。
HDFS可通过以下接口进行操作:
1.HTTP
2.C
3.NFS
4.FUSE
5.Java接口
本篇文章着重讲的是Java接口——FileSystem类。

读数据流

FileSystem类是Hadoop提供操作HDFS的Java类,通过这类,我们就可以作为客户端进行操作HDFS(除了自己写的服务是客户端,在节点上运行的MR程序(MapReduce)也是客户端,而且还是主要且常用的)。
以下是客户端、NameNode和DataNode之间的(读)数据流图:
2018年第23周-大数据的HDFS数据流及操作例子

1.客户端通过FileSystem对象的open()方法
2.open()方法通过DistributeFileSystem对象通过RPC调用NameNode,获取文件起始块的位置及其副本的DataNode地址,并返回FSDataInputStream对象。

DataNode是根据DataNode与客户端的距离进行排序,如果客户端本身就是一个DataNode,那么客户端将会从保存有相应数据块副本的本地DataNode读取数据。

3.调用FSDataInputStream对象的read()方法时,它会调用其自身的read方法将数据从DataNode传输到客户端。
4.到达块的末端时,DFSInputStream关闭与该DataNode的链接,然后寻找下一块数据的最佳DataNode。这些对于客户端都是透明的,在客户端看来它是一直在读取一个连续的流。

如果DFSInputStream在与DataNode通信时遇到错误,会尝试从这个块的另一个最邻近的DataNode读取数据。DFSDFSInputStream会记住故障的DataNode,以保证不会反复读取该节点上后续的块。DFSInputStream也会通过 校验和 确认从DataNode读取的数据是否完整。如果发现有损坏的块,DFSInputStream会试图从其他DataNode读取其副本,也会将损坏的块通知给NameNode。
5.客户端读取完后,会调用close()方法

写数据流

以下是客户端、NameNode和DataNode之间的(写)数据流图:
2018年第23周-大数据的HDFS数据流及操作例子

1.客户端通过FileSystem对象的create()方法
2.create()方法通过DistributeFileSystem对象通过RPC调用NameNode,在文件系统的命名空间新建一个文件,此时该文件中还没有相应的数据块,,并返回FSDataOutputStream对象。

NameNode执行各种不同的检查以确保这个文件不存在以及客户端有新建该文件的权限,如果通过检查,NameNode会创建新文件,否则 ,文件创建失败并向客户端抛出IOException异常。

3.调用FSDataOutputStream对象的write()方法时,它会使用DFSOutputStream对象进行写入数据(DFSOutputStream是封装在FSDataOutputStream)。

在客户端写入数据时,DFSOutputStream将它分成一个个的数据包(DFSPacket),并写入内部队列,称为“数据队列(dataQueue,是其内部成员变量LinkedList)”。DataStreamer处理数据队列,它的职责时挑选出适合存储数据副本的一组DataNode,并根据此要求NameNode分配新的数据块。这一组DataNode构成一个管线,如果副本数是3个,则管线中有3个DataNode节点。DataStreamer将数据包流式传输到管线中第1个DataNode,第1个DataNode保存数据包并将数据包继续发送到管线的第2个DataNode,如此类推到第3个DataNode节点。
DFSOutputStream其成员变量ackQueue“确认队列”,维护者一个内部数据包队列来等待DataNode的确认回执。,收到管线中所有DataNode节点的确认信息后,该数据包才会从ackQueue删除。
异常情况,如果任意DataNode在数据包写入期间失败,则执行以下操作:首先关闭管线,会从ackQueue把所有数据包都添加回dataQueue的最前端,以保证故障节点下游的DataNode不会漏掉任何一个数据包。并将标识传给NameNode,以便故障DataNode在恢复后可以删除存储部分的数据块。从管线删除故障DataNode后,基于正常DataNode构建一条新的管线,继续写数据。

4.客户端完成数据的写入后,对数据流调用close()方法,该操作等待NameNode返回确认写入完成。

HDFS操作例子

命令行

管理命令参考:http://hadoop.apache.org/docs...
文件操作命令参考:http://hadoop.apache.org/docs... 不过这文档里的hadoop fs 要改为hdfs dfs

  • 查看版本hdfs version
[jevoncode@s1 ~]$ hdfs version
Hadoop 2.7.3
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r baa91f7c6bc9cb92be5982de4719c1c8af91ccff
Compiled by root on 2016-08-18T01:41Z
Compiled with protoc 2.5.0
From source with checksum 2e4ce5f957ea4db193bce3734ff29ff4
This command was run using /mydata1/hadoop-2.7.3/share/hadoop/common/hadoop-common-2.7.3.jar
  • 查看文件系统的统计信息hdfs dfsadmin -report
[jevoncode@s1 ~]$hdfs dfsadmin -report
Configured Capacity: 158127783936 (147.27 GB)
Present Capacity: 148158701568 (137.98 GB)
DFS Remaining: 148158615552 (137.98 GB)
DFS Used: 86016 (84 KB)
DFS Used%: 0.00%
Under replicated blocks: 0
Blocks with corrupt replicas: 0
Missing blocks: 0
Missing blocks (with replication factor 1): 0

-------------------------------------------------
Live datanodes (3):

Name: 192.168.31.181:50010 (s6.jevoncode.com)
Hostname: s6.jevoncode.com
Decommission Status : Normal
Configured Capacity: 52709261312 (49.09 GB)
DFS Used: 28672 (28 KB)
Non DFS Used: 3323027456 (3.09 GB)
DFS Remaining: 49386205184 (45.99 GB)
DFS Used%: 0.00%
DFS Remaining%: 93.70%
Configured Cache Capacity: 0 (0 B)
Cache Used: 0 (0 B)
Cache Remaining: 0 (0 B)
Cache Used%: 100.00%
Cache Remaining%: 0.00%
Xceivers: 1
Last contact: Sun Jun 10 14:00:14 CST 2018

...
  • 创建和目录
[jevoncode@s1 ~]$ hdfs dfs -mkdir /opt/
[jevoncode@s1 ~]$ hdfs dfs -mkdir /opt/command/
[jevoncode@s1 ~]$ hdfs dfs -ls /
Found 1 items
drwxr-xr-x   - jevoncode supergroup          0 2018-06-10 14:05 /opt
  • 复制文本文件到HDFS
hdfs dfs -put sougouword.txt /opt/command/word.txt
  • 复制HDFS的文件到本地
hdfs dfs -get /opt/command/word.txt sougouword2.txt
  • 删除文件
hdfs dfs -rm /opt/command/word.txt

Java接口

  • 通过URL对象获取文件内容
package com.jc.demo.hadoop.hdfs;

import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;

import java.io.InputStream;
import java.net.URL;

/**
 * 前期准备:
 * [jevoncode@s1 ~]# hdfs dfs -mkdir /opt/
 * [jevoncode@s1 ~]# hdfs dfs -mkdir /opt/command/
 * [jevoncode@s1 ~]# hdfs dfs -put sougouword.txt /opt/command/word.txt
 * <p>
 * <p>
 * 方法一:动态参数
 * 命令如下:上传至hadoop服务器
 * [jevoncode@s1 ~]# export HADOOP_CLASSPATH=jc-demo-hadoop-0.0.1.0-SNAPSHOT-development.jar
 * [jevoncode@s1 ~]# hadoop com.jc.demo.hadoop.hdfs.URLCat hdfs://ns/opt/command/word.txt
 * 其中ns是hdfs-site.xml配置的主机名,用于高可用
 *
 * <p>
 * 方法二:远程访问
 * 直接执行main方法,使用hdfsHost做参数,可远程访问
 */
public class URLCat {

    static {
        URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
    }

    public static void main(String[] args) throws Exception {
        String hdfsHost = "hdfs://s1.jevoncode.com:9000/opt/command/word.txt";
        InputStream in = null;
        try {
//            in = new URL(args[0]).openStream();  //方法一:动态参数
            in = new URL(hdfsHost).openStream();    //方法二:远程访问
            IOUtils.copyBytes(in, System.out, 4096, false);
        } finally {
            IOUtils.closeStream(in);
        }
    }
}
  • 通过FileSystem获取文件内容
package com.jc.demo.hadoop.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

import java.io.InputStream;
import java.net.URI;

/**
 * 使用FileSystem获取文件内容
 * Configuration在这例子中仅仅做个参数而已,没啥用,还是需要在代码里指定url
 *
 */
public class FileSystemCat {

    public static void main(String[] args) throws Exception {
        String hdfsHost = "hdfs://s1.jevoncode.com:9000/opt/command/word.txt";
        String uri = hdfsHost;
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        InputStream in = null;
        try {
            in = fs.open(new Path(uri));
            IOUtils.copyBytes(in, System.out, 4096, false);
        } finally {
            IOUtils.closeStream(in);
        }
    }
}
  • FSDataInputStream还支持随机读
package com.jc.demo.hadoop.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

import java.net.URI;

/**
 * 使用FSDataInputStream随机读
 */
public class FileSystemDoubleCat {

    public static void main(String[] args) throws Exception {
        String hdfsHost = "hdfs://s1.jevoncode.com:9000/opt/command/word.txt";
        String uri = hdfsHost;
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        FSDataInputStream in = null;
        try {
            in = fs.open(new Path(uri));
            IOUtils.copyBytes(in, System.out, 4096, false);
            in.seek(0); // go back to the start of the file
            IOUtils.copyBytes(in, System.out, 4096, false);
        } finally {
            IOUtils.closeStream(in);
        }
    }
}
  • 使用FileSystem复制文件(写入),会自动创建目录
package com.jc.demo.hadoop.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;

/**
 * 使用FileSystem复制文件(写入),会自动创建目录
 * FileSystem也是可以操作本地文件的,所以没有指定协议,就会操作本地文件目录/opt/java
 */
public class FileCopyWithProgress {
    public static void main(String[] args) throws Exception {
        String localSrc = "/home/cherry/Downloads/斗破苍穹.txt";
        String dst = "hdfs://s1.jevoncode.com:9000/opt/java/斗破苍穹.txt";
//        String dst = "/opt/java/"; //FileSystem也是可以操作本地文件的,所以没有指定协议,就会操作本地文件目录/opt/java
        InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(dst), conf);
        OutputStream out = fs.create(new Path(dst), new Progressable() {
            public void progress() {
                System.out.print(".");
            }
        });
        IOUtils.copyBytes(in, out, 4096, true);
    }
}
  • 遍历目录(仅一层)
package com.jc.demo.hadoop.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;

import java.net.URI;

/**
 * 获取当前目录的所有文件及目录信息(仅一层)
 */
public class ListStatus {

    public static void main(String[] args) throws Exception {
        String hdfsHost = "hdfs://s1.jevoncode.com:9000/";
        String uri = hdfsHost;
        args = new String[]{"/opt/", "/dir"};
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        Path[] paths = new Path[args.length];
        for (int i = 0; i < paths.length; i++) {
            paths[i] = new Path(args[i]);
        }
        FileStatus[] status = fs.listStatus(paths);
        Path[] listedPaths = FileUtil.stat2Paths(status);       //将文件状态FileStatus数组转为Path数组
        for (Path p : listedPaths) {
            System.out.println(p);
        }
    }
}
/**
 * output:
 * 06-10 14:29:16 [main] DEBUG o.a.h.s.a.util.KerberosName - Kerberos krb5 configuration not found, setting default realm to empty
 * 06-10 14:29:16 [main] DEBUG o.a.hadoop.util.PerformanceAdvisory - Falling back to shell based
 * 06-10 14:29:18 [main] DEBUG o.a.hadoop.util.PerformanceAdvisory - Both short-circuit local reads and UNIX domain socket are disabled.
 * 06-10 14:29:18 [main] DEBUG o.a.h.h.p.d.s.DataTransferSaslUtil - DataTransferProtocol not using SaslPropertiesResolver, no QOP found in configuration for dfs.data.transfer.protection
 * hdfs://s1.jevoncode.com:9000/opt/command
 * hdfs://s1.jevoncode.com:9000/opt/java
 */
  • 获取文件信息(创建时间,创建者等)
package com.jc.demo.hadoop.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;

import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;

/**
 * 获取文件状态
 */
public class ShowFileStatusTest {
    private FileSystem fs;

    @Before
    public void setUp() throws IOException {

        String hdfsHost = "hdfs://s1.jevoncode.com:9000/";
        String uri = hdfsHost;
        Configuration conf = new Configuration();
        fs = FileSystem.get(URI.create(uri), conf);

        OutputStream out = fs.create(new Path("/dir/file"));
        out.write("content".getBytes("UTF-8"));
        out.close();
    }

    @After
    public void tearDown() throws IOException {
        if (fs != null) {
            fs.close();
        }
    }

    @Test(expected = FileNotFoundException.class)
    public void throwsFileNotFoundForNonExistentFile() throws IOException {
        fs.getFileStatus(new Path("no-such-file"));
    }

    /**
     * 测试文件状态
     * @throws IOException
     * @throws InterruptedException
     */
    @Test
    public void fileStatusForFile() throws IOException, InterruptedException {
        Path file = new Path("/dir/file");
        FileStatus stat = fs.getFileStatus(file);
        assertThat(stat.getPath().toUri().getPath(), is("/dir/file"));                      //路径应为/dir/file
        assertThat(stat.isDirectory(), is(false));                                          //不是目录
        assertThat(stat.getLen(), is(7L));                                                  //文件大小
        Thread.sleep(3000);                                                                 //避免创建时间大于测试时间
        assertThat(stat.getModificationTime(), is(lessThanOrEqualTo(System.currentTimeMillis())));//创建时间应该小于测试时间
        assertThat(stat.getReplication(), is((short) 3));                                         //副本个数
        assertThat(stat.getBlockSize(), is(128 * 1024 * 1024L));                            //块大小
        assertThat(stat.getOwner(), is(System.getProperty("user.name")));                         //当前用户是其创建者
        assertThat(stat.getGroup(), is("supergroup"));                                      //文件的用户组校验
        assertThat(stat.getPermission().toString(), is("rw-r--r--"));                       //文件权限教研
    }

    @Test
    public void fileStatusForDirectory() throws IOException {
        Path dir = new Path("/dir");
        FileStatus stat = fs.getFileStatus(dir);
        assertThat(stat.getPath().toUri().getPath(), is("/dir"));
        assertThat(stat.isDirectory(), is(true));
        assertThat(stat.getLen(), is(0L));
        assertThat(stat.getModificationTime(), is(lessThanOrEqualTo(System.currentTimeMillis())));
        assertThat(stat.getReplication(), is((short) 0));
        assertThat(stat.getBlockSize(), is(0L));
        assertThat(stat.getOwner(), is(System.getProperty("user.name")));
        assertThat(stat.getGroup(), is("supergroup"));
        assertThat(stat.getPermission().toString(), is("rwxr-xr-x"));
    }

}

相关推荐