好好排版
import com.jianwei.collection.entity.qo.ZhudanSourceQO;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
import java.io.*;
/**
* 拷贝文件
* @param src
* @param dst
* @param conf
* @return
* @throws Exception
*/
public static boolean copyFile(String src , String dst , Configuration conf) throws Exception{
FileSystem fs = FileSystem.get(conf);
fs.exists(new Path(dst));
//FileStatus status = fs.getFileStatus(new Path(dst));
File file = new File(src);
InputStream in = new BufferedInputStream(new FileInputStream(file));
/**
* FieSystem的create方法可以为文件不存在的父目录进行创建,
*/
OutputStream out = fs.create(new Path(dst) , new Progressable() {
public void progress() {
System.out.print(".");
}
});
IOUtils.copyBytes(in, out, 4096, true);
return true;
}
上传单个文件的时候,最需要注意的地方就是路径的问题,要注意使用 / ,hdfs 根据斜杠区分文件夹层级
/**
* 将单个文件上传到hdfs服务器
* @param zhudanSourceQO
* @throws IOException
*/
public static void uploadSingleToHdfs(ZhudanSourceQO zhudanSourceQO) throws Exception {
//1 创建连接
Configuration conf = new Configuration();
//2 连接端口
conf.set("fs.defaultFS", "hdfs://hcluster");
copyFile(zhudanSourceQO.getFileUrl(),zhudanSourceQO.getPathSelection() + "/"+new File(zhudanSourceQO.getFileUrl()).getName(),conf);
}
/**
* 拷贝文件夹
* @param src
* @param dst
* @return
* @throws Exception
*/
public static boolean copyDirectory(String src , String dst) throws IOException {
FileSystem fs =null;
try {
//1 创建连接
Configuration conf = new Configuration();
//2 连接端口
conf.set("fs.defaultFS", "hdfs://hcluster");
fs = FileSystem.get(conf);
if (!fs.exists(new Path(dst))) {
fs.mkdirs(new Path(dst));
}
System.out.println("copyDirectory:" + dst);
FileStatus status = fs.getFileStatus(new Path(dst));
File file = new File(src);
if (status.isFile()) {
System.exit(2);
System.out.println("You put in the " + dst + "is file !");
} else {
dst = cutDir(dst);
}
File[] files = file.listFiles();
for (int i = 0; i < files.length; i++) {
File f = files[i];
if (f.isDirectory()) {
copyDirectory(f.getPath(), dst);
} else {
copyFile(f.getPath(), dst + files[i].getName(), conf);
}
}
}catch (Exception e){
e.printStackTrace();
}finally {
fs.close();
}
return true;
}
public static String cutDir(String str){
String[] strs = str.split(File.pathSeparator);
String result = "";
if("hdfs"==strs[0]){
result += "hdfs://";
for(int i = 1 ; i < strs.length ; i++){
result += strs[i] + File.separator;
}
}else{
for(int i = 0 ; i < strs.length ; i++){
result += strs[i] + File.separator;
}
}
return result;
}
/**
* 拷贝文件
* @param src
* @param dst
* @param conf
* @return
* @throws Exception
*/
public static boolean copyFile(String src , String dst , Configuration conf) throws Exception{
FileSystem fs = FileSystem.get(conf);
fs.exists(new Path(dst));
//FileStatus status = fs.getFileStatus(new Path(dst));
File file = new File(src);
InputStream in = new BufferedInputStream(new FileInputStream(file));
/**
* FieSystem的create方法可以为文件不存在的父目录进行创建,
*/
OutputStream out = fs.create(new Path(dst) , new Progressable() {
public void progress() {
System.out.print(".");
}
});
IOUtils.copyBytes(in, out, 4096, true);
return true;
}
copyDirectory(fileParentPath + "/parquet","/tw/parquet/" + templateId);
上传单个文件与上传整个文件夹这两个方法,共同点也是核心的地方,就是
有一个如果所指路径不存在,hdfs 就会创建出来?
/**
* FieSystem的create方法可以为文件不存在的父目录进行创建,
*/
OutputStream out = fs.create(new Path(dst) , new Progressable() {
public void progress() {
System.out.print(".");
}
});