java上传文件或文件夹到hdfs

Updated on in Java是世界上最好的语言 with 0 views and 0 comments

  

  好好排版

  • hadoop-hdfs 2.7.3
  • JDK 1.8
需要引入的包?
import com.jianwei.collection.entity.qo.ZhudanSourceQO;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;

import java.io.*;

上传单个文件?

/**
     * 拷贝文件
     * @param src
     * @param dst
     * @param conf
     * @return
     * @throws Exception
     */
    public static boolean copyFile(String src , String dst , Configuration conf) throws Exception{
        FileSystem fs = FileSystem.get(conf);
        fs.exists(new Path(dst));
        //FileStatus status = fs.getFileStatus(new Path(dst));
        File file = new File(src);

        InputStream in = new BufferedInputStream(new FileInputStream(file));
        /**
         * FieSystem的create方法可以为文件不存在的父目录进行创建,
         */
        OutputStream out = fs.create(new Path(dst) , new Progressable() {
            public void progress() {
                System.out.print(".");
            }
        });
        IOUtils.copyBytes(in, out, 4096, true);

        return true;
    }

上传单个文件方法的调用示例?

上传单个文件的时候,最需要注意的地方就是路径的问题,要注意使用 / ,hdfs 根据斜杠区分文件夹层级

/**
     * 将单个文件上传到hdfs服务器
     * @param zhudanSourceQO
     * @throws IOException
     */
    public static void uploadSingleToHdfs(ZhudanSourceQO zhudanSourceQO) throws Exception {
        //1 创建连接
        Configuration conf = new Configuration();
        //2 连接端口
        conf.set("fs.defaultFS", "hdfs://hcluster");

        copyFile(zhudanSourceQO.getFileUrl(),zhudanSourceQO.getPathSelection() + "/"+new File(zhudanSourceQO.getFileUrl()).getName(),conf);

    }

上传整个文件夹到 hdfs?

/**
     * 拷贝文件夹
     * @param src
     * @param dst
     * @return
     * @throws Exception
     */
    public static boolean copyDirectory(String src , String dst) throws IOException {

        FileSystem fs =null;
        try {
            //1 创建连接
            Configuration conf = new Configuration();
            //2 连接端口
            conf.set("fs.defaultFS", "hdfs://hcluster");
            fs = FileSystem.get(conf);
            if (!fs.exists(new Path(dst))) {
                fs.mkdirs(new Path(dst));
            }
            System.out.println("copyDirectory:" + dst);
            FileStatus status = fs.getFileStatus(new Path(dst));
            File file = new File(src);

            if (status.isFile()) {
                System.exit(2);
                System.out.println("You put in the " + dst + "is file !");
            } else {
                dst = cutDir(dst);
            }
            File[] files = file.listFiles();
            for (int i = 0; i < files.length; i++) {
                File f = files[i];
                if (f.isDirectory()) {
                    copyDirectory(f.getPath(), dst);
                } else {
                    copyFile(f.getPath(), dst + files[i].getName(), conf);
                }

            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            fs.close();
        }
        return true;
    }

    public static String cutDir(String str){
        String[] strs = str.split(File.pathSeparator);
        String result = "";
        if("hdfs"==strs[0]){
            result += "hdfs://";
            for(int i = 1 ; i < strs.length  ; i++){
                result += strs[i] + File.separator;
            }
        }else{
            for(int i = 0 ; i < strs.length  ; i++){
                result += strs[i] + File.separator;
            }
        }
        return result;
    }


    /**
     * 拷贝文件
     * @param src
     * @param dst
     * @param conf
     * @return
     * @throws Exception
     */
    public static boolean copyFile(String src , String dst , Configuration conf) throws Exception{
        FileSystem fs = FileSystem.get(conf);
        fs.exists(new Path(dst));
        //FileStatus status = fs.getFileStatus(new Path(dst));
        File file = new File(src);

        InputStream in = new BufferedInputStream(new FileInputStream(file));
        /**
         * FieSystem的create方法可以为文件不存在的父目录进行创建,
         */
        OutputStream out = fs.create(new Path(dst) , new Progressable() {
            public void progress() {
                System.out.print(".");
            }
        });
        IOUtils.copyBytes(in, out, 4096, true);

        return true;
    }

上传整个文件夹方法调用示例?

copyDirectory(fileParentPath + "/parquet","/tw/parquet/" + templateId);

  上传单个文件与上传整个文件夹这两个方法,共同点也是核心的地方,就是
有一个如果所指路径不存在,hdfs 就会创建出来?

/**
         * FieSystem的create方法可以为文件不存在的父目录进行创建,
         */
        OutputStream out = fs.create(new Path(dst) , new Progressable() {
            public void progress() {
                System.out.print(".");
            }
        });