Hadoop Distribute File System HDFS( 三 )


五、HDFS的应用操作 1、HDFS的操作
1.1搭建开发环境
clouderahttps://repository.cloudera.com/artifactory/cloudera-repos/org.apache.hadoophadoop-client2.6.0-mr1-cdh5.14.0org.apache.hadoophadoop-common2.6.0-cdh5.14.0org.apache.hadoophadoop-hdfs2.6.0-cdh5.14.0org.apache.hadoophadoop-mapreduce-client-core2.6.0-cdh5.14.0
1.原始流对接操作--繁琐但灵活
【Hadoop Distribute File SystemHDFS】/*** @ClassName StreamAccess* @Description stream流对接的方式上传下载数据*/public class StreamAccess {//创建客户端对象FileSystem fs = null;//创建fs的4种方式@Beforepublic void createFS() throws IOException, URISyntaxException, InterruptedException {Configuration conf = new Configuration();//第一种方式 如果什么参数都不给 , 创建的是本地文件系统 file:///hdfs50070 9000// 9000是用来操作的 50070是用来看的//优先级当前的配置> 项目的classpath > 用户设置的hadoop/etc/hadoop/core-site.xml > 默认的core-default.xmlconf.set("fs.defaultFS","hdfs://node01:9000");System.setProperty("HADOOP_USER_NAME","root");fs = FileSystem.get(conf);//第二种方式(用的最多)fs = FileSystem.get(new URI("hdfs://node01:9000"),conf,"root");//第三种方式conf.set("fs.defaultFS","hdfs://node01:9000");fs = FileSystem.newInstance(conf);//第四种方式fs = FileSystem.newInstance(new URI("hdfs://node01:9000"),conf);}//上传@Testpublic void streamToHdfs() throws Exception {//得到本地的输入流FileInputStream inputStream = new FileInputStream(new File("d:/pom.xml"));//得到hdfs的输出流 要写文件FSDataOutputStream outputStream = fs.create(new Path("/hadoop32/1.txt"));//流对接IOUtils.copy(inputStream,outputStream);}//下载@Testpublic void streamDownLoad() throws Exception {//hdfs得到输入流FSDataInputStream inputStream = fs.open(new Path("/hadoop32/1.txt"));//得到本地文件的输出流inputStream.seek(346); // 从哪个位置开始下载FileOutputStream outputStream = new FileOutputStream(new File("d:/3.txt"));IOUtils.copy(inputStream,outputStream);}//获取hdfs上的所有文件及路径hadoop fs -ls -R /@Testpublic void findAllFiles() throws Exception {RemoteIterator iterator = fs.listFiles(new Path("/"), true);// true为是否进行递归查询while (iterator.hasNext()) {//fileStatus是对文件或文件夹的所属数据信息LocatedFileStatus fileStatus = iterator.next();System.out.println(fileStatus.getPath());}}//遍历hdfs上所有的文件和文件夹@Testpublic void findAll() throws Exception { findFiles("/");}private void findFiles(String path) throws IOException {FileStatus[] fileStatuses = fs.listStatus(new Path(path));for (FileStatus fileStatus : fileStatuses) {//每一个文件或文件夹的所有数据if(fs.isDirectory(fileStatus.getPath())){//得到的就是文件夹System.out.println("d----"+fileStatus.getPath());findFiles(fileStatus.getPath().toString());}else{//得到的就是文件System.out.println("f----"+fileStatus.getPath());}}}//获取指定的block块@Testpublic void getBlock0() throws Exception {FSDataInputStream in = fs.open(new Path("/hadoop32/y.zip"));//如果path指定的是一个文件 , 那么数组返回的大小就是1FileStatus[] fileStatuses = fs.listStatus(new Path("/hadoop32/y.zip"));FileStatus fileStatus = fileStatuses[0];//返回当前文件所有block块的地址列表BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());/* for (BlockLocation fileBlockLocation : fileBlockLocations) {//得到的是每个块的信息System.out.println( fileBlockLocation.getLength()+" offset :"+fileBlockLocation.getOffset());String[] hosts = fileBlockLocation.getHosts();//当前块hostfor (String host : hosts) {System.out.println("host"+host);}}*/BlockLocation fileBlockLocation = fileBlockLocations[1];//第二个block块的偏移量long offset = fileBlockLocations[1].getOffset();//注意长度的问题:第一个块的长度是128M 第二个块的长度是111M 所以就是从128M向后读了111M的数据 , 也就是从128M读取到239M的位置 , 这个长度是指239这个数值long length = offset+fileBlockLocations[1].getLength();byte[] b = new byte[4096];FileOutputStream os = new FileOutputStream(new File("d:/block1"));while(in.read(offset, b, 0, 4096)!=-1){os.write(b);offset += 4096;if(offset>length) {return;}};}@Afterpublic void close() throws Exception {//通常如果是get方式的话不建议关闭 , 如果是newInstance的话 建议关闭fs.close();}}