3.1 客户端环境准备
1. 根据自己电脑的操作系统拷贝对应的编译后的hadoop jar包到非中文路径(例如:D:\Develop\hadoop-2.7.2)
2. 配置HADOOP_HOME环境变量
3. 配置Path环境变量
4. 创建一个Maven工程
5.导入相应的依赖坐标+日志添加
//pom.xml
<dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>RELEASE</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.7.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.7.2</version></dependency><dependency><groupId>jdk.tools</groupId><artifactId>jdk.tools</artifactId><version>1.8</version><scope>system</scope><systemPath>${JAVA_HOME}/lib/tools.jar</systemPath></dependency>
</dependencies>
#log4j.properties
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
3.2 HDFS的API操作
(1)文件上传
public void testCopyFromLocalFile() throws IOException, InterruptedException, URISyntaxException {// 1 获取文件系统Configuration configuration = new Configuration();configuration.set("dfs.replication", "2");FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), configuration, "root");// 2 上传文件fs.copyFromLocalFile(new Path("e:/test.txt"), new Path("/test.txt"));// 3 关闭资源fs.close();System.out.println("over");
}参数优先级排序:(1)客户端代码中设置的值 >(2)ClassPath下的用户自定义配置文件 >(3)然后是服务器的默认配置
(2)文件下载
public void testCopyToLocalFile() throws IOException, InterruptedException, URISyntaxException{// 1 获取文件系统Configuration configuration = new Configuration();FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), configuration, "root");// 2 执行下载操作// boolean delSrc 指是否将原文件删除// Path src 指要下载的文件路径// Path dst 指将文件下载到的路径// boolean useRawLocalFileSystem 是否开启文件校验fs.copyToLocalFile(false, new Path("/test.txt"), new Path("e:/test.txt"), true);// 3 关闭资源fs.close();
}
(3)创建文件夹
public void testMkdirs() throws IOException, InterruptedException, URISyntaxException{// 1 获取文件系统Configuration configuration = new Configuration();FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), configuration, "root");// 2 创建目录fs.mkdirs(new Path("/xlz/20200719"));// 3 关闭资源fs.close();
}
(4)删除文件夹
public void testDelete() throws IOException, InterruptedException, URISyntaxException{// 1 获取文件系统Configuration configuration = new Configuration();FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), configuration, "root");// 2 执行删除fs.delete(new Path("/xlz/20200719"), true);// 3 关闭资源fs.close();
}
(5)文件名更改
public void testRename() throws IOException, InterruptedException, URISyntaxException{// 1 获取文件系统Configuration configuration = new Configuration();FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), configuration, "root"); // 2 修改文件名称fs.rename(new Path("/test.txt"), new Path("/xlz.txt"));// 3 关闭资源fs.close();
}
(6)文件详情查看
public void testListFiles() throws IOException, InterruptedException, URISyntaxException{// 1获取文件系统Configuration configuration = new Configuration();FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), configuration, "root"); // 2 获取文件详情RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);while(listFiles.hasNext()){LocatedFileStatus status = listFiles.next();// 输出详情// 文件名称System.out.println(status.getPath().getName());// 长度System.out.println(status.getLen());// 权限System.out.println(status.getPermission());// 分组System.out.println(status.getGroup());// 获取存储的块信息BlockLocation[] blockLocations = status.getBlockLocations();for (BlockLocation blockLocation : blockLocations) {// 获取块存储的主机节点String[] hosts = blockLocation.getHosts();for (String host : hosts) {System.out.println(host);}}System.out.println("-----------分割线----------");}// 3 关闭资源fs.close();
}
(7)文件和文件夹判断
public void testListStatus() throws IOException, InterruptedException, URISyntaxException{// 1 获取文件配置信息Configuration configuration = new Configuration();FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), configuration, "root");// 2 判断是文件还是文件夹FileStatus[] listStatus = fs.listStatus(new Path("/"));for (FileStatus fileStatus : listStatus) {// 如果是文件if (fileStatus.isFile()) {System.out.println("f:"+fileStatus.getPath().getName());}else {System.out.println("d:"+fileStatus.getPath().getName());}}// 3 关闭资源fs.close();
}
3.3 HDFS的I/O流操作
(a)文件上传
public void putFileToHDFS() throws IOException, InterruptedException, URISyntaxException {// 1 获取文件系统Configuration configuration = new Configuration();FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), configuration, "root");// 2 创建输入流FileInputStream fis = new FileInputStream(new File("e:/upload.txt"));// 3 获取输出流FSDataOutputStream fos = fs.create(new Path("/upload.txt"));// 4 流对拷IOUtils.copyBytes(fis, fos, configuration);// 5 关闭资源IOUtils.closeStream(fos);IOUtils.closeStream(fis);fs.close();
}
(b)文件下载
public void getFileFromHDFS() throws IOException, InterruptedException, URISyntaxException{// 1 获取文件系统Configuration configuration = new Configuration();FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), configuration, "root");// 2 获取输入流FSDataInputStream fis = fs.open(new Path("/upload.txt"));// 3 获取输出流FileOutputStream fos = new FileOutputStream(new File("e:/download.txt"));// 4 流的对拷IOUtils.copyBytes(fis, fos, configuration);// 5 关闭资源IOUtils.closeStream(fos);IOUtils.closeStream(fis);fs.close();
}
(c)定位文件读取
//下载文件第1块
public void readFileSeek1() throws IOException, InterruptedException, URISyntaxException{// 1 获取文件系统Configuration configuration = new Configuration();FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), configuration, "root");// 2 获取输入流FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz"));// 3 创建输出流FileOutputStream fos = new FileOutputStream(new File("e:/hadoop-2.7.2.tar.gz.part1"));// 4 流的拷贝byte[] buf = new byte[1024];for(int i =0 ; i < 1024 * 128; i++){fis.read(buf);fos.write(buf);}// 5关闭资源IOUtils.closeStream(fis);IOUtils.closeStream(fos);
fs.close();
}//下载文件第2块
public void readFileSeek2() throws IOException, InterruptedException, URISyntaxException{// 1 获取文件系统Configuration configuration = new Configuration();FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), configuration, "root");// 2 打开输入流FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz"));// 3 定位输入数据位置fis.seek(1024*1024*128);// 4 创建输出流FileOutputStream fos = new FileOutputStream(new File("e:/hadoop-2.7.2.tar.gz.part2"));// 5 流的对拷IOUtils.copyBytes(fis, fos, configuration);// 6 关闭资源IOUtils.closeStream(fis);IOUtils.closeStream(fos);
}
合并文件:
在Window命令窗口中进入到目录E:\,然后执行如下命令,对数据进行合并;
type hadoop-2.7.2.tar.gz.part2 >> hadoop-2.7.2.tar.gz.part1;
合并完成后,将hadoop-2.7.2.tar.gz.part1重新命名为hadoop-2.7.2.tar.gz;
解压发现该tar包非常完整。