读取hdfs指定目录下的gz文件,并读取gz文件里面的文本信息
package com.bw.credit.hdfs; import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.zip.GZIPInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import com.google.common.base.Strings; public class HdfsReader { private static Configuration conf = new Configuration(); private static String hdfspath = "hdfs://cluster-01:8020"; public static void main(String[] args) throws IOException { HdfsReader ope = new HdfsReader(); String path = "hdfs://cluster-01:8020/ads/cdrlogs_s/dt=20190320"; List<String> arr = ope.getFileList(path); int lineNum =0; int num19 = 0; int num00 = 0; for (String filepath : arr) { System.out.println(filepath); InputStream inputStream = null; GZIPInputStream gzInputStream = null; InputStreamReader reader = null; BufferedReader buff = null; try { FileSystem system = FileSystem.get(conf); inputStream = system.open(new Path(hdfspath+filepath)); gzInputStream = new GZIPInputStream(inputStream); reader = new InputStreamReader(gzInputStream); buff = new BufferedReader(reader); String line = null; while((line = buff.readLine())!=null){ if(line.contains("2019-03-19")) { num19++; } if(line.contains(",,0")) { num00++; } lineNum++; } } catch (IOException e) { e.printStackTrace(); } } System.out.println(lineNum+","+num19+","+num00); System.out.println(lineNum-num19-num00); } /** * 获取hdfs路径下的文件列表 * * @param srcpath * @return */ public List<String> getFileList(String srcpath) { List<String> files = new ArrayList<String>(); try { Path path = new Path(srcpath); FileSystem fs = path.getFileSystem(conf); if (fs.isDirectory(path)) { for (FileStatus status : fs.listStatus(path)) { List<String> arr = getFileList(status.getPath().toUri().getPath()); files.addAll(arr); } } else { files.add(path.toUri().getPath()); } return files; } catch (IOException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } return null; } /** * 给定文件名和文件内容,创建hdfs文件 * * @param dst * @param contents * @throws IOException */ public void createFile(String dst, byte[] contents) throws IOException { Configuration conf = new Configuration(); Path dstPath = new Path(dst); FileSystem fs = dstPath.getFileSystem(conf); FSDataOutputStream outputStream = fs.create(dstPath); outputStream.write(contents); outputStream.close(); System.out.println("create file " + dst + " success!"); // fs.close(); } /** * 删除hdfs文件 * * @param filePath * @throws IOException */ public void delete(String filePath) throws IOException { Configuration conf = new Configuration(); Path path = new Path(filePath); FileSystem fs = path.getFileSystem(conf); boolean isok = fs.deleteOnExit(path); if (isok) { System.out.println("delete file " + filePath + " success!"); } else { System.out.println("delete file " + filePath + " failure"); } // fs.close(); } /** * 创建hdfs目录 * * @param path * @throws IOException */ public void mkdir(String path) throws IOException { Configuration conf = new Configuration(); Path srcPath = new Path(path); FileSystem fs = srcPath.getFileSystem(conf); boolean isok = fs.mkdirs(srcPath); if (isok) { System.out.println("create dir ok!"); } else { System.out.println("create dir failure"); } // fs.close(); } /** * 读取hdfs文件内容,并在控制台打印出来 * * @param filePath * @throws IOException */ public void readFile(String filePath) throws IOException { Configuration conf = new Configuration(); Path srcPath = new Path(filePath); FileSystem fs = null; URI uri; try { uri = new URI(filePath); fs = FileSystem.get(uri, conf); } catch (URISyntaxException e) { e.printStackTrace(); } InputStream in = null; try { in = fs.open(srcPath); IOUtils.copyBytes(in, System.out, 4096, false); } finally { IOUtils.closeStream(in); } } /** * 下载hdfs文件到本地目录 * * @param dstPath * @param srcPath * @throws Exception */ public void downloadFile(String dstPath, String srcPath) throws Exception { Path path = new Path(srcPath); Configuration conf = new Configuration(); FileSystem hdfs = path.getFileSystem(conf); File rootfile = new File(dstPath); if (!rootfile.exists()) { rootfile.mkdirs(); } if (hdfs.isFile(path)) { // 只下载非txt文件 String fileName = path.getName(); if (!fileName.toLowerCase().endsWith("txt")) { FSDataInputStream in = null; FileOutputStream out = null; try { in = hdfs.open(path); File srcfile = new File(rootfile, path.getName()); if (!srcfile.exists()) srcfile.createNewFile(); out = new FileOutputStream(srcfile); IOUtils.copyBytes(in, out, 4096, false); } finally { IOUtils.closeStream(in); IOUtils.closeStream(out); } // 下载完后,在hdfs上将原文件删除 this.delete(path.toString()); } } else if (hdfs.isDirectory(path)) { File dstDir = new File(dstPath); if (!dstDir.exists()) { dstDir.mkdirs(); } // 在本地目录上加一层子目录 String filePath = path.toString();// 目录 String subPath[] = filePath.split("/"); String newdstPath = dstPath + subPath[subPath.length - 1] + "/"; System.out.println("newdstPath=======" + newdstPath); if (hdfs.exists(path) && hdfs.isDirectory(path)) { FileStatus[] srcFileStatus = hdfs.listStatus(path); if (srcFileStatus != null) { for (FileStatus status : hdfs.listStatus(path)) { // 下载子目录下文件 downloadFile(newdstPath, status.getPath().toString()); } } } } } // /** // * 下载hdfs文件内容,保存到内存对象中 // * // * @param srcPath // * @throws Exception // */ // public void downloadFileByte(String srcPath) throws Exception { // Path path = new Path(srcPath); // FileSystem hdfs = null; // Configuration conf = new Configuration(); // hdfs = FileSystem.get(URI.create(srcPath), conf); // if (hdfs.exists(path)) { // if (hdfs.isFile(path)) { // //如果是文件 // FSDataInputStream in = null; // FileOutputStream out = null; // try { // in = hdfs.open(new Path(srcPath)); // byte[] t = new byte[in.available()]; // in.read(t); // hdfsfiles.add(new HdfsFile(path.getName(), srcPath, t)); // } finally { // IOUtils.closeStream(in); // IOUtils.closeStream(out); // } // } else { // //如果是目录 // FileStatus[] srcFileStatus = hdfs.listStatus(new Path(srcPath)); // for (int i = 0; i < srcFileStatus.length; i++) { // String srcFile = srcFileStatus[i].getPath().toString(); // downloadFileByte(srcFile); // } // } // } // } // // public ArrayList<HdfsFile> getHdfsfiles() { // return hdfsfiles; // } /** * 将本地目录或文件上传的hdfs * * @param localSrc * @param dst * @throws Exception */ public void uploadFile(String localSrc, String dst) throws Exception { Configuration conf = new Configuration(); File srcFile = new File(localSrc); if (srcFile.isDirectory()) { copyDirectory(localSrc, dst, conf); } else { copyFile(localSrc, dst, conf); } } /** * 拷贝本地文件hdfs目录下 * * @param localSrc * @param dst * @param conf * @return * @throws Exception */ private boolean copyFile(String localSrc, String dst, Configuration conf) throws Exception { File file = new File(localSrc); dst = dst + file.getName(); Path path = new Path(dst); FileSystem fs = path.getFileSystem(conf);// FileSystem.get(conf); fs.exists(path); InputStream in = new BufferedInputStream(new FileInputStream(file)); OutputStream out = fs.create(new Path(dst)); IOUtils.copyBytes(in, out, 4096, true); in.close(); return true; } /** * 拷贝本地目录到hdfs * * @param src * @param dst * @param conf * @return * @throws Exception */ private boolean copyDirectory(String src, String dst, Configuration conf) throws Exception { Path path = new Path(dst); FileSystem fs = path.getFileSystem(conf); if (!fs.exists(path)) { fs.mkdirs(path); } File file = new File(src); File[] files = file.listFiles(); for (int i = 0; i < files.length; i++) { File f = files[i]; if (f.isDirectory()) { String fname = f.getName(); if (dst.endsWith("/")) { copyDirectory(f.getPath(), dst + fname + "/", conf); } else { copyDirectory(f.getPath(), dst + "/" + fname + "/", conf); } } else { copyFile(f.getPath(), dst, conf); } } return true; } }
原文:https://www.cnblogs.com/deebo/p/10689012.html