Created
September 30, 2009 22:35
-
-
Save enachb/198524 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package com.bebo.hadoop.cascading.phishing; | |
| import java.io.IOException; | |
| import java.net.URISyntaxException; | |
| import java.text.SimpleDateFormat; | |
| import java.util.ArrayList; | |
| import java.util.Collections; | |
| import java.util.Date; | |
| import java.util.List; | |
| import java.util.Vector; | |
| import org.apache.hadoop.fs.FileStatus; | |
| import org.apache.hadoop.fs.FileSystem; | |
| import org.apache.hadoop.fs.Path; | |
| import org.apache.log4j.Logger; | |
| public class RecordArchive { | |
| private static final Logger LOG = Logger.getLogger(RecordArchive.class); | |
| private final static String TMP_REGEX = ".*\\.tmp"; | |
| private SimpleDateFormat dateFormat = new SimpleDateFormat("/yy/MM/dd"); | |
| private FileSystem _fs = null; | |
| private Path _archiveDirectory = null; | |
| private List<FileStatus> _inputFiles = new Vector<FileStatus>(); | |
| private int maxDays = 14; | |
| public RecordArchive(FileSystem fs, String archiveDirectory) throws IOException { | |
| _fs = fs; | |
| _archiveDirectory = new Path(archiveDirectory); | |
| // this isn't throwing an error if the path exists - so errors are bigger deals | |
| // and should be handled by the caller | |
| _fs.mkdirs(_archiveDirectory); | |
| } | |
| public void addJobPath(Path inputPath) throws IOException { | |
| // note: this method is meant to deal with a staging directory | |
| // it doesn't decent into a nested directory structure. | |
| if (_fs.getFileStatus(inputPath).isDir()) { | |
| // get all files inside a path except for tmp files | |
| FileStatus iDir[] = _fs.globStatus(new Path(inputPath + "/*")); | |
| for (FileStatus ifs : iDir) { | |
| if (ifs.getPath().getName().endsWith(".sq") || ifs.getPath().getName().endsWith(".txt")) { | |
| if (ifs.getLen() > 0) { | |
| _inputFiles.add(ifs); | |
| } else { // delete the empty sq file | |
| _fs.delete(ifs.getPath()); | |
| } | |
| } | |
| } | |
| } else { | |
| // have individual file - just add it if it isn't a tmp file | |
| if (!inputPath.getName().matches(TMP_REGEX)) { | |
| _inputFiles.add(_fs.getFileStatus(inputPath)); | |
| } | |
| } | |
| } | |
| public FileStatus[] getJobFiles() { | |
| FileStatus[] files = new FileStatus[_inputFiles.size()]; | |
| _inputFiles.toArray(files); | |
| return files; | |
| } | |
| public void archiveJobFiles() throws IOException { | |
| FileStatus files[] = new FileStatus[_inputFiles.size()]; | |
| _inputFiles.toArray(files); | |
| archive(files); | |
| } | |
| private void archive(FileStatus inputFile) throws IOException { | |
| // this method really can deal only with individual files | |
| // by the time this method is called that should be the case. | |
| if (inputFile.isDir()) { | |
| throw new IOException("Can't archive directories. Please call me with a file list."); | |
| } | |
| // move only non-tmp files | |
| LOG.debug("name: " + inputFile.getPath().getName()); | |
| if (!inputFile.getPath().getName().matches(TMP_REGEX)) { | |
| Date modDate = new Date(inputFile.getModificationTime()); | |
| Path dstDir = new Path(_archiveDirectory + dateFormat.format(modDate)); | |
| LOG.debug("source: " + inputFile.getPath() + " target:" + dstDir + "/" + inputFile.getPath().getName()); | |
| _fs.mkdirs(dstDir); | |
| _fs.rename(inputFile.getPath(), new Path(dstDir + "/" + inputFile.getPath().getName())); | |
| } | |
| } | |
| private void archive(FileStatus[] inputFiles) throws IOException { | |
| for (FileStatus srcFile : inputFiles) { | |
| archive(srcFile); | |
| } | |
| } | |
| public Path getArchivePath() { | |
| return _archiveDirectory; | |
| } | |
| public String getJobFilesAsString() { | |
| StringBuffer sb = new StringBuffer(100); | |
| if (getJobFiles().length > 0) { | |
| for (FileStatus file : getJobFiles()) { | |
| sb.append(file.getPath().toString()).append(","); | |
| } | |
| sb.delete(sb.length() - 1, sb.length()); | |
| } | |
| return sb.toString(); | |
| } | |
| static public List<String> createFileListAsList(FileSystem fs, Path path, String includeRegexPattern) throws IOException { | |
| ArrayList<String> fileList = new ArrayList<String>(); | |
| for(String s : createFileListAsList(fs, path)){ | |
| if(s.matches(includeRegexPattern)){ | |
| fileList.add(s); | |
| } | |
| } | |
| return fileList; | |
| } | |
| static public List<String> createFileListAsList(FileSystem fs, Path path) throws IOException { | |
| ArrayList<String> fileList = new ArrayList<String>(); | |
| StringBuffer commaSB = recurseFileList(fs, path); | |
| if(commaSB.length()==0){ | |
| return fileList; | |
| } else { | |
| for(String file : commaSB.toString().replaceFirst(",", "").split(",")){ | |
| fileList.add(file); | |
| } | |
| return fileList; | |
| } | |
| } | |
| /** | |
| * Recursively collect files from a given path - Note: Method is not thread safe! | |
| * | |
| * @author erich | |
| */ | |
| static public String createFileList(FileSystem fs, Path path) throws IOException { | |
| StringBuffer commaSB = recurseFileList(fs, path); | |
| return commaSB.toString().replaceFirst(",", ""); | |
| } | |
| static private StringBuffer recurseFileList(FileSystem fs, Path path) throws IOException { | |
| StringBuffer sb = new StringBuffer(); | |
| if (fs.exists(path)) { | |
| FileStatus[] files = fs.globStatus(new Path(path + "/*")); | |
| for (int i = 0; i < files.length; i++) { | |
| if (fs.isDirectory(files[i].getPath())) { | |
| sb.append(recurseFileList(fs, files[i].getPath())); | |
| } else { | |
| sb.append(","); | |
| sb.append(files[i].getPath().toString().replaceFirst(fs.getUri().toString(), "")); | |
| LOG.debug("file: " + files[i].getPath().toString()); | |
| } | |
| } | |
| } | |
| return sb; | |
| } | |
| public FileStatus[] getArchiveStatie(FileSystem fs, Path path) throws IOException { | |
| ArrayList<FileStatus> list = new ArrayList<FileStatus>(); | |
| getArchiveStatieRecursive(fs, path, list); | |
| return (FileStatus[]) list.toArray(new FileStatus[list.size()]); | |
| } | |
| private void getArchiveStatieRecursive(FileSystem fs, Path path, ArrayList<FileStatus> list) throws IOException { | |
| if (fs.exists(path)) { | |
| FileStatus[] files = fs.globStatus(new Path(path + "/*")); | |
| for (int i = 0; i < files.length; i++) { | |
| if (fs.isDirectory(files[i].getPath())) { | |
| getArchiveStatieRecursive(fs, files[i].getPath(), list); | |
| } else { | |
| list.add(files[i]); | |
| } | |
| } | |
| } | |
| } | |
| static public Vector<Path> createDirList(FileSystem fs, Path path) throws IOException { | |
| Vector<Path> pathList = new Vector<Path>(); | |
| // add the calling dir too | |
| if (fs.isDirectory(path)) { | |
| pathList.add(path); | |
| } | |
| recurseDirList(fs, path, pathList); | |
| return pathList; | |
| } | |
| static private void recurseDirList(FileSystem fs, Path path, Vector PathList) throws IOException { | |
| if (fs.exists(path)) { | |
| FileStatus[] files = fs.globStatus(new Path(path + "/*")); | |
| for (int i = 0; i < files.length; i++) { | |
| if (fs.isDirectory(files[i].getPath())) { | |
| PathList.add(files[i].getPath()); | |
| recurseDirList(fs, files[i].getPath(), PathList); | |
| } | |
| } | |
| } | |
| } | |
| public void setMaxDays(int MaxDays) { | |
| maxDays = MaxDays; | |
| } | |
| public void runArchivePurge() throws URISyntaxException, IOException{ | |
| runArchivePurge(maxDays); | |
| } | |
| public void runArchivePurge(int MaxDaysToKeep) throws URISyntaxException, IOException { | |
| FileStatus files[] = null; | |
| List<Path> archiveDirs = new ArrayList<Path>(); | |
| // clean out old archives | |
| for (Path currDir : createDirList(_fs, _archiveDirectory)) { | |
| // only include day dirs - exclude year and month only | |
| if (currDir.toString().matches(".*/[0-9][0-9]/[0-9][0-9]/[0-9][0-9]")) { | |
| LOG.debug("Including: " + currDir); | |
| archiveDirs.add(currDir); | |
| } else { | |
| LOG.debug(" Excluding: " + currDir); | |
| } | |
| } | |
| // check if cleanup is required | |
| if (archiveDirs.size() > MaxDaysToKeep) { | |
| LOG.info("Have " + archiveDirs.size() + " days in archive. Cleanup required..."); | |
| // sort and have the oldest first | |
| Collections.sort(archiveDirs); | |
| // deleting oldest | |
| for (int i = 0; i < archiveDirs.size() - MaxDaysToKeep; i++) { | |
| LOG.info("Deleting: " + archiveDirs.get(i)); | |
| // TODO reset maxdays to correct value | |
| _fs.delete(archiveDirs.get(i), true); | |
| } | |
| } else { | |
| LOG.info("No archive cleanup required. Have only " + archiveDirs.size() + " days in archive."); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment