Skip to content

Instantly share code, notes, and snippets.

@enachb
Created September 30, 2009 22:35
Show Gist options
  • Select an option

  • Save enachb/198524 to your computer and use it in GitHub Desktop.

Select an option

Save enachb/198524 to your computer and use it in GitHub Desktop.
package com.bebo.hadoop.cascading.phishing;
import java.io.IOException;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Vector;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
public class RecordArchive {
private static final Logger LOG = Logger.getLogger(RecordArchive.class);
private final static String TMP_REGEX = ".*\\.tmp";
private SimpleDateFormat dateFormat = new SimpleDateFormat("/yy/MM/dd");
private FileSystem _fs = null;
private Path _archiveDirectory = null;
private List<FileStatus> _inputFiles = new Vector<FileStatus>();
private int maxDays = 14;
public RecordArchive(FileSystem fs, String archiveDirectory) throws IOException {
_fs = fs;
_archiveDirectory = new Path(archiveDirectory);
// this isn't throwing an error if the path exists - so errors are bigger deals
// and should be handled by the caller
_fs.mkdirs(_archiveDirectory);
}
public void addJobPath(Path inputPath) throws IOException {
// note: this method is meant to deal with a staging directory
// it doesn't decent into a nested directory structure.
if (_fs.getFileStatus(inputPath).isDir()) {
// get all files inside a path except for tmp files
FileStatus iDir[] = _fs.globStatus(new Path(inputPath + "/*"));
for (FileStatus ifs : iDir) {
if (ifs.getPath().getName().endsWith(".sq") || ifs.getPath().getName().endsWith(".txt")) {
if (ifs.getLen() > 0) {
_inputFiles.add(ifs);
} else { // delete the empty sq file
_fs.delete(ifs.getPath());
}
}
}
} else {
// have individual file - just add it if it isn't a tmp file
if (!inputPath.getName().matches(TMP_REGEX)) {
_inputFiles.add(_fs.getFileStatus(inputPath));
}
}
}
public FileStatus[] getJobFiles() {
FileStatus[] files = new FileStatus[_inputFiles.size()];
_inputFiles.toArray(files);
return files;
}
public void archiveJobFiles() throws IOException {
FileStatus files[] = new FileStatus[_inputFiles.size()];
_inputFiles.toArray(files);
archive(files);
}
private void archive(FileStatus inputFile) throws IOException {
// this method really can deal only with individual files
// by the time this method is called that should be the case.
if (inputFile.isDir()) {
throw new IOException("Can't archive directories. Please call me with a file list.");
}
// move only non-tmp files
LOG.debug("name: " + inputFile.getPath().getName());
if (!inputFile.getPath().getName().matches(TMP_REGEX)) {
Date modDate = new Date(inputFile.getModificationTime());
Path dstDir = new Path(_archiveDirectory + dateFormat.format(modDate));
LOG.debug("source: " + inputFile.getPath() + " target:" + dstDir + "/" + inputFile.getPath().getName());
_fs.mkdirs(dstDir);
_fs.rename(inputFile.getPath(), new Path(dstDir + "/" + inputFile.getPath().getName()));
}
}
private void archive(FileStatus[] inputFiles) throws IOException {
for (FileStatus srcFile : inputFiles) {
archive(srcFile);
}
}
public Path getArchivePath() {
return _archiveDirectory;
}
public String getJobFilesAsString() {
StringBuffer sb = new StringBuffer(100);
if (getJobFiles().length > 0) {
for (FileStatus file : getJobFiles()) {
sb.append(file.getPath().toString()).append(",");
}
sb.delete(sb.length() - 1, sb.length());
}
return sb.toString();
}
static public List<String> createFileListAsList(FileSystem fs, Path path, String includeRegexPattern) throws IOException {
ArrayList<String> fileList = new ArrayList<String>();
for(String s : createFileListAsList(fs, path)){
if(s.matches(includeRegexPattern)){
fileList.add(s);
}
}
return fileList;
}
static public List<String> createFileListAsList(FileSystem fs, Path path) throws IOException {
ArrayList<String> fileList = new ArrayList<String>();
StringBuffer commaSB = recurseFileList(fs, path);
if(commaSB.length()==0){
return fileList;
} else {
for(String file : commaSB.toString().replaceFirst(",", "").split(",")){
fileList.add(file);
}
return fileList;
}
}
/**
* Recursively collect files from a given path - Note: Method is not thread safe!
*
* @author erich
*/
static public String createFileList(FileSystem fs, Path path) throws IOException {
StringBuffer commaSB = recurseFileList(fs, path);
return commaSB.toString().replaceFirst(",", "");
}
static private StringBuffer recurseFileList(FileSystem fs, Path path) throws IOException {
StringBuffer sb = new StringBuffer();
if (fs.exists(path)) {
FileStatus[] files = fs.globStatus(new Path(path + "/*"));
for (int i = 0; i < files.length; i++) {
if (fs.isDirectory(files[i].getPath())) {
sb.append(recurseFileList(fs, files[i].getPath()));
} else {
sb.append(",");
sb.append(files[i].getPath().toString().replaceFirst(fs.getUri().toString(), ""));
LOG.debug("file: " + files[i].getPath().toString());
}
}
}
return sb;
}
public FileStatus[] getArchiveStatie(FileSystem fs, Path path) throws IOException {
ArrayList<FileStatus> list = new ArrayList<FileStatus>();
getArchiveStatieRecursive(fs, path, list);
return (FileStatus[]) list.toArray(new FileStatus[list.size()]);
}
private void getArchiveStatieRecursive(FileSystem fs, Path path, ArrayList<FileStatus> list) throws IOException {
if (fs.exists(path)) {
FileStatus[] files = fs.globStatus(new Path(path + "/*"));
for (int i = 0; i < files.length; i++) {
if (fs.isDirectory(files[i].getPath())) {
getArchiveStatieRecursive(fs, files[i].getPath(), list);
} else {
list.add(files[i]);
}
}
}
}
static public Vector<Path> createDirList(FileSystem fs, Path path) throws IOException {
Vector<Path> pathList = new Vector<Path>();
// add the calling dir too
if (fs.isDirectory(path)) {
pathList.add(path);
}
recurseDirList(fs, path, pathList);
return pathList;
}
static private void recurseDirList(FileSystem fs, Path path, Vector PathList) throws IOException {
if (fs.exists(path)) {
FileStatus[] files = fs.globStatus(new Path(path + "/*"));
for (int i = 0; i < files.length; i++) {
if (fs.isDirectory(files[i].getPath())) {
PathList.add(files[i].getPath());
recurseDirList(fs, files[i].getPath(), PathList);
}
}
}
}
public void setMaxDays(int MaxDays) {
maxDays = MaxDays;
}
public void runArchivePurge() throws URISyntaxException, IOException{
runArchivePurge(maxDays);
}
public void runArchivePurge(int MaxDaysToKeep) throws URISyntaxException, IOException {
FileStatus files[] = null;
List<Path> archiveDirs = new ArrayList<Path>();
// clean out old archives
for (Path currDir : createDirList(_fs, _archiveDirectory)) {
// only include day dirs - exclude year and month only
if (currDir.toString().matches(".*/[0-9][0-9]/[0-9][0-9]/[0-9][0-9]")) {
LOG.debug("Including: " + currDir);
archiveDirs.add(currDir);
} else {
LOG.debug(" Excluding: " + currDir);
}
}
// check if cleanup is required
if (archiveDirs.size() > MaxDaysToKeep) {
LOG.info("Have " + archiveDirs.size() + " days in archive. Cleanup required...");
// sort and have the oldest first
Collections.sort(archiveDirs);
// deleting oldest
for (int i = 0; i < archiveDirs.size() - MaxDaysToKeep; i++) {
LOG.info("Deleting: " + archiveDirs.get(i));
// TODO reset maxdays to correct value
_fs.delete(archiveDirs.get(i), true);
}
} else {
LOG.info("No archive cleanup required. Have only " + archiveDirs.size() + " days in archive.");
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment