HDFS-java客户端操作API

    技术2025-01-02  22

    文章目录

    客户端操作环境搭建常见错误:客户端常用API1.文件常见API操作2.流操作-分块下载

    客户端操作环境搭建

    1.环境准备工作:

    在windows使用java客户端调用HDFS集群时,会初始化一个本地的文件系统对象,期间会用到hadoop编译包的一些文件。所以我们需要将Hadoop安装包解压到某个目录,并配置HADOOP_HOME 环境变量同时在path中添加%HADOOP_HOME%/bin

    需要注意,要选择对应版本的windows编译包。如果用Linux包(即集群中安装的hadoop包)解压,则需要手动添加winutils.exe和hadoop.dll到bin目录。

    winutils对应版本下载链接如下,我用的是Hadoop3.1.3,版本列表没有,使用的3.1.2代替也生效。

    https://github.com/cdarlint/winutils

    2.新建maven工程:

    添加依赖:依赖的版本可以和集群的hadoop版本不一致。

    <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <dependency>--> <!--<groupId>org.apache.logging.log4j</groupId>--> <!--<artifactId>log4j-core</artifactId>--> <!--<version>2.8.2</version>--> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs-client</artifactId> <version>3.2.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>3.2.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.2.0</version> </dependency> </dependencies>

    添加一个日志配置文件log4j.properties到resources目录:

    log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

    3.新建测试类:

    package com.zd.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.Test; public class HadfsClient { @Test public void testHdfs(){ Configuration conf= new Configuration(); conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); conf.set("fs.defaultFS", "hdfs://192.168.43.11:9000"); try { FileSystem fs = FileSystem.get(conf); // 2 创建目录 fs.mkdirs(new Path("/zx/6222")); // 3 关闭资源 fs.close(); } catch (Exception e) { e.printStackTrace(); } } }

    执行后,在集群的hdfs便可以生成一个目录:

    常见错误:

    缺少wintuils.exe 和hadoop.dll Exception in thread "main" java.lang.RuntimeException: java.io.FileNotFoundException: Could not locate Hadoop executable: 权限错误: org.apache.hadoop.security.AccessControlException: Permission denied: user=Administrator, access=WRITE, inode="/zx":xiaomao:supergroup:drwxr-xr-x

    这是因为调用用户使用了windows的administrator,要采用集群中的用户,所以通过下面方式调用:指明用户是 xiaomao

    FileSystem fs = FileSystem.get(new URI("hdfs://192.168.43.11:9000"), conf, "xiaomao");

    3.hadoop环境配置错误

    java.io.FileNotFoundException:HADOOP_HOME and hadoop.home.dir are unset

    该错误是因为没有再windows本地配置hadoop,参加文章开头。

    4.main方法错误

    Exception in thread "main" org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "hdfs"

    因为依赖的范围是provided,所以运行main方法,报错。

    该错误还有一个可能的原因是,如果配置了maven-assembly-plugin插件,描述如下

    Different JARs (hadoop-commons for LocalFileSystem, hadoop-hdfs for DistributedFileSystem) each contain a different file called org.apache.hadoop.fs.FileSystem in their META-INFO/services directory. This file lists the canonical classnames of the filesystem implementations they want to declare (This is called a Service Provider Interface implemented via java.util.ServiceLoader, see org.apache.hadoop.FileSystem line 2622). When we use maven-assembly-plugin, it merges all our JARs into one, and all META-INFO/services/org.apache.hadoop.fs.FileSystem overwrite each-other. Only one of these files remains (the last one that was added). In this case, the FileSystem list from hadoop-commons overwrites the list from hadoop-hdfs, so DistributedFileSystem was no longer declared.

    这是需要这样修复:

    hadoopConfig.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName() ); hadoopConfig.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()

    或者在core-site.xml 添加如下:

    <property> <name>fs.file.impl</name> <value>org.apache.hadoop.fs.LocalFileSystem</value> <description>The FileSystem for file: uris.</description> </property> <property> <name>fs.hdfs.impl</name> <value>org.apache.hadoop.hdfs.DistributedFileSystem</value> <description>The FileSystem for hdfs: uris.</description> </property>

    客户端常用API

    完整代码github

    1.文件常见API操作
    package com.zd.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.permission.FsPermission; import org.junit.Test; import java.net.URI; /** * HDFS客户端文件操作API */ public class HDFSClientAPI { private String url = "hdfs://192.168.43.11:9000"; private String user = "xiaomao"; /** * 获取一个只有基本配置FileSystem对象 * @return */ public FileSystem createCommonFS() { try { Configuration conf= new Configuration(); //如果对接的文件系统 FileSystem fs = FileSystem.get(new URI(url), conf, user); return fs; } catch (Exception e) { e.printStackTrace(); } return null; } /** * 创建目录 * @throws Exception */ @Test public void mkdirs() throws Exception{ FileSystem fs = createCommonFS(); // 2 创建目录 fs.mkdirs(new Path("/zx/7117")); // 3 关闭资源 fs.close(); } /** * 删除文件或文件夹 * @throws Exception */ @Test public void delete() throws Exception{ FileSystem fs = createCommonFS(); //如果删除是目录recursive必须设置为true,表示递归删除。如果是文件,true和false都可以。 fs.delete(new Path("/zx/7117"),true); fs.close(); } /** * 上传文件 * @throws Exception */ @Test public void copyFromLocalFile() throws Exception{ FileSystem fs = createCommonFS(); // 如果参数为 /zx/file/a.txt 表示重命名为a.txt fs.copyFromLocalFile(new Path("f:/hdfs.txt"),new Path("/zx/file/")); } /** * 下载文件 * @throws Exception */ @Test public void copyToLocalFile() throws Exception{ FileSystem fs = createCommonFS(); // 表示将file目下的文件拷贝到当前f盘,如果指定文件名,则是拷贝指定文件到本地 fs.copyToLocalFile(new Path("/zx/file/"),new Path("f:/")); } /** * 修改文件夹或文件名 * @throws Exception */ @Test public void rename() throws Exception{ FileSystem fs = createCommonFS(); fs.rename(new Path("/zx/file"),new Path("/zx/files")); } /** * 查看文件详情 * @throws Exception */ @Test public void listFiles() throws Exception{ FileSystem fs = createCommonFS(); RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"),true); while(listFiles.hasNext()){ LocatedFileStatus fileStatus = listFiles.next(); System.out.println(fileStatus.isFile()?"文件":"文件夹");//判断文件类型 System.out.println(fileStatus.getPath().getName());// 文件名称 System.out.println(fileStatus.getPermission());// 文件权限 System.out.println(fileStatus.getLen());// 文件长度 BlockLocation[] blockLocations = fileStatus.getBlockLocations(); for(BlockLocation blockLocation:blockLocations){ String[] hosts = blockLocation.getHosts(); for(String host:hosts){ System.out.println(host); } } } fs.close(); } /** * 判断是文件夹还是文件 * @throws Exception */ @Test public void isFile() throws Exception{ FileSystem fs = createCommonFS(); // 2 判断操作 FileStatus[] listStatus = fs.listStatus(new Path("/")); for (FileStatus fileStatus : listStatus) { if (fileStatus.isFile()) { // 文件 System.out.println("f:"+fileStatus.getPath().getName()); }else{ // 文件夹 System.out.println("d:"+fileStatus.getPath().getName()); } } // 3 关闭资源 fs.close(); } /** * 检测环境变量 */ @Test public void testEnv(){ System.out.println(System.getenv("HADOOP_HOME")); } }
    2.流操作-分块下载
    package com.zd.hadoop; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.junit.Test; /** * HDFS IO流相关操作 */ public class HDFSIO { /** * 通过流 上传文件到HDFS * @throws IOException * @throws InterruptedException * @throws URISyntaxException */ @Test public void putFileToHDFS() throws IOException, InterruptedException, URISyntaxException{ // 1 获取对象 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf , "xiaomao"); // 2 获取输入流 FileInputStream fis = new FileInputStream(new File("f:/测试文件.docx")); // 3 获取输出流 FSDataOutputStream fos = fs.create(new Path("/zx/622/测试文件.docx")); // 4 流的对拷 IOUtils.copyBytes(fis, fos, conf); // 5 关闭资源 IOUtils.closeStream(fos); IOUtils.closeStream(fis); fs.close(); } /** * 通过流下载hdfs上的文件到本地 * @throws IOException * @throws InterruptedException * @throws URISyntaxException */ @Test public void getFileFromHDFS() throws IOException, InterruptedException, URISyntaxException{ // 1 获取对象 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf , "xiaomao"); // 2 获取输入流 FSDataInputStream fis = fs.open(new Path("/zx/622/测试文件.docx")); // 3 获取输出流 FileOutputStream fos = new FileOutputStream(new File("f:/download.docx")); // 4 流的对拷 IOUtils.copyBytes(fis, fos, conf); // 5 关闭资源 IOUtils.closeStream(fos); IOUtils.closeStream(fis); fs.close(); } //下载第一块 @Test public void readFileSeek1() throws IOException, InterruptedException, URISyntaxException{ // 1 获取对象 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf , "xiaomao"); // 2 获取输入流 FSDataInputStream fis = fs.open(new Path("/zx/622/测试文件.docx")); // 3 获取输出流 FileOutputStream fos = new FileOutputStream(new File("f:/测试文件.docx.part1")); // 4 流的对拷(只拷贝20m) byte[] buf = new byte[1024]; for (int i = 0; i < 1024 * 20; i++) { fis.read(buf); fos.write(buf); } // 5 关闭资源 IOUtils.closeStream(fos); IOUtils.closeStream(fis); fs.close(); } // 下载第二块 @SuppressWarnings("resource") @Test public void readFileSeek2() throws IOException, InterruptedException, URISyntaxException{ // 1 获取对象 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf , "xiaomao"); // 2 获取输入流 FSDataInputStream fis = fs.open(new Path("/zx/622/测试文件.docx")); // 3 设置指定读取的起点 fis.seek(1024*1024*20); // 4 获取输出流 FileOutputStream fos = new FileOutputStream(new File("f:/测试文件.docx.part2")); // 5 流的对拷 IOUtils.copyBytes(fis, fos, conf); // 6 关闭资源 IOUtils.closeStream(fos); IOUtils.closeStream(fis); fs.close(); }

    对分块下载的文件,在windows下,进入命令行窗口,用下面的命令合并

    type 测试文件.docx.part2>>测试文件.docx.part1

    修改后缀后,便可以正常打开。

    附录:

    scope依赖范围:

    compile (编译范围)

    compile是默认的范围;如果没有提供一个范围,那该依赖的范围就是编译范围。编译范围依赖在所有的classpath 中可用,同时它们也会被打包。

    provided (已提供范围)

    provided 依赖只有在当JDK 或者一个容器已提供该依赖之后才使用。例如, 如果你开发了一个web 应用,你可能在编译 classpath 中需要可用的Servlet API 来编译一个servlet,但是你不会想要在打包好的WAR 中包含这个Servlet API;这个Servlet API JAR 由你的应用服务器或者servlet 容器提供。已提供范围的依赖在编译classpath (不是运行时)可用。它们不是传递性的,也不会被打包。

    runtime (运行时范围)

    runtime 依赖在运行和测试系统的时候需要,但在编译的时候不需要。比如,你可能在编译的时候只需要JDBC API JAR,而只有在运行的时候才需要JDBC 驱动实现。

    test (测试范围)

    test范围依赖 在一般的编译和运行时都不需要,它们只有在测试编译和测试运行阶段可用。

    system (系统范围)

    system范围依赖与provided 类似,但是你必须显式的提供一个对于本地系统中JAR 文件的路径。这么做是为了允许基于本地对象编译,而这些对象是系统类库的一部分。这样的构件应该是一直可用的,Maven 也不会在仓库中去寻找它。如果你将一个依赖范围设置成系统范围,你必须同时提供一个 systemPath 元素。注意该范围是不推荐使用的(你应该一直尽量去从公共或定制的 Maven 仓库中引用依赖)。

    Processed: 0.011, SQL: 9