Created
May 2, 2014 22:09
-
-
Save agibsonccc/8e0c4bc63c69cfc9ea4c to your computer and use it in GitHub Desktop.
HdfsUtil
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.BufferedReader; | |
import java.io.File; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.io.InputStreamReader; | |
import java.io.OutputStream; | |
import java.net.InetAddress; | |
import java.net.URL; | |
import java.net.URLDecoder; | |
import java.util.ArrayList; | |
import java.util.Enumeration; | |
import java.util.HashMap; | |
import java.util.HashSet; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Properties; | |
import java.util.Random; | |
import java.util.Set; | |
import org.apache.commons.io.IOUtils; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.filecache.DistributedCache; | |
import org.apache.hadoop.filecache.TrackerDistributedCacheManager; | |
import org.apache.hadoop.fs.FSDataOutputStream; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.FileUtil; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.mapred.JobConf; | |
import org.apache.zookeeper.KeeperException; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
public class HdfsUtil { | |
private HdfsUtil(){} | |
private static Logger log = LoggerFactory.getLogger(HdfsUtil.class); | |
private static Map<Configuration,FileSystem> systems = new HashMap<Configuration,FileSystem>(); | |
public final static String HDFS_HOST = "hdfs.host"; | |
public static void setRunLocal(Configuration conf) { | |
conf.set("fs.default.name","file:///"); | |
conf.set("mapred.job.tracker","local"); | |
conf.set("mapred.system.dir","/tmp/mapred/system"); | |
conf.set("mapred.local.dir","/tmp/mapred"); | |
conf.set("hadoop.tmp.dir","/tmp"); | |
} | |
public static void setJarFileFor(Configuration conf,Class<?> jarClass) { | |
String jar = findJar(jarClass); | |
conf.setClassLoader(Thread.currentThread().getContextClassLoader()); | |
conf.set("mapred.jar",jar); | |
} | |
public static String findJar(Class<?> my_class) { | |
ClassLoader loader = my_class.getClassLoader(); | |
String class_file = my_class.getName().replaceAll("\\.", "/") + ".class"; | |
try { | |
for(Enumeration<?> itr = loader.getResources(class_file); | |
itr.hasMoreElements();) { | |
URL url = (URL) itr.nextElement(); | |
if ("jar".equals(url.getProtocol())) { | |
String toReturn = url.getPath(); | |
if (toReturn.startsWith("file:")) { | |
toReturn = toReturn.substring("file:".length()); | |
} | |
//URLDecoder is a misnamed class, since it actually decodes | |
// x-www-form-urlencoded MIME type rather than actual | |
// URL encoding (which the file path has). Therefore it would | |
// decode +s to ' 's which is incorrect (spaces are actually | |
// either unencoded or encoded as "%20"). Replace +s first, so | |
// that they are kept sacred during the decoding process. | |
toReturn = toReturn.replaceAll("\\+", "%2B"); | |
toReturn = URLDecoder.decode(toReturn, "UTF-8"); | |
return toReturn.replaceAll("!.*$", ""); | |
} | |
} | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} | |
return null; | |
} | |
/** | |
* Returns the default file system used to reach hdfs | |
* @param conf the configuration to use | |
* @return conf.get(fs.defaultFS) | |
*/ | |
public static String getHost(Configuration conf) { | |
if(conf.get("hdfs.host")==null) { | |
try { | |
HdfsUtil.setHostForConf(conf); | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
return conf.get("hdfs.host"); | |
} | |
/** | |
/** | |
* Returns the default file system used to reach hdfs | |
* @param conf the configuration to use | |
* @return conf.get(fs.defaultFS) | |
*/ | |
public static String getHdfs(Configuration conf) { | |
return conf.get("fs.defaultFS"); | |
} | |
/** | |
* Set the name node and job tracker for a given host | |
* based on an autodiscovered file: host.properties | |
* @param conf the configuration to set | |
* @throws IOException | |
*/ | |
public static void setHostForConf(Configuration conf) throws IOException { | |
Properties prop = new Properties(); | |
InputStream in = HdfsUtil.class.getResourceAsStream("/host.properties"); | |
if(in==null) { | |
throw new IOException("No host.properties found"); | |
} | |
prop.load(in); | |
in.close(); | |
String host = prop.getProperty("hdfs.host","localhost"); | |
boolean reachable = InetAddress.getByName(host).isReachable(1000); | |
if(!reachable) { | |
log.warn("Host " + host + " was not reachable! Falling back to localhost"); | |
host = "localhost"; | |
} | |
log.info("Using host " + host); | |
conf.set("hdfs.host",host); | |
conf.set("fs.defaultFS", String.format("hdfs://%s:8020",host)); | |
conf.set("mapred.job.tracker", String.format("hdfs://%s:8021",host)); | |
} | |
public static void cleanup(Configuration conf) throws Exception { | |
if(conf.get(HDFS_HOST) !=null) { | |
String hdfs = getHost(conf); | |
HdfsLock lock = new HdfsLock(hdfs,2181); | |
if(lock.isLocked()) { | |
if(log.isDebugEnabled()) | |
log.debug("Returning paths; already found host"); | |
List<Path> paths = lock.getPaths(); | |
FileSystem system = FileSystem.get(conf); | |
for(Path path : paths) | |
system.delete(path,true); | |
lock.delete(); | |
} | |
lock.close(); | |
} | |
} | |
/** | |
* Adapted from | |
* http://terrier.org/docs/v3.5/javadoc/org/terrier/utility/io/HadoopUtility.html#saveClassPathToJob%28org.apache.hadoop.mapred.JobConf%29 | |
* @param jobConf | |
* @throws IOException | |
*/ | |
public static List<Path> saveClassPathToJob(Configuration jobConf) throws Exception { | |
String hdfs = getHost(jobConf); | |
HdfsLock lock = new HdfsLock(hdfs); | |
String hdfs2 = getHdfs(jobConf); | |
if(jobConf.get(HDFS_HOST) !=null) { | |
if(lock.isLocked()) { | |
List<Path> ret = lock.getPaths(); | |
StringBuffer files = new StringBuffer(); | |
StringBuffer classPath = new StringBuffer(); | |
for(Path path : ret) { | |
files.append(hdfs2 + path.toString()); | |
files.append(","); | |
classPath.append(hdfs2 + path.toString()); | |
classPath.append(":"); | |
DistributedCache.addArchiveToClassPath(path, jobConf); | |
} | |
String classPathToSet = classPath.toString().substring(0, classPath.lastIndexOf(":")); | |
String filesToSet = files.toString().substring(0,files.lastIndexOf(",")); | |
log.info("Setting class path " + classPathToSet); | |
log.info("Using files " + filesToSet); | |
jobConf.set("mapred.cache.files",filesToSet); | |
jobConf.set("mapred.job.classpath.files",classPathToSet); | |
return ret; | |
} | |
} | |
List<Path> paths = new ArrayList<Path>(); | |
log.info("Copying classpath to job"); | |
final String[] jars = findJarFiles(new String[]{ | |
System.getenv().get("CLASSPATH"), | |
System.getProperty("java.class.path"), | |
System.getProperty("surefire.test.class.path") | |
}); | |
final FileSystem defFS = FileSystem.get(jobConf); | |
int numFilesWritten = 0; | |
for (String jarFile : jars) { | |
//class path issues | |
if(jarFile.contains("hadoop-client")) { | |
log.info("Skipping hadoop-client"); | |
continue; | |
} | |
else if(jarFile.contains("mapreduce-run")) { | |
log.info("Skipping map reduce run"); | |
continue; | |
} | |
Path srcJarFilePath = new Path("file:///"+jarFile); | |
String filename = srcJarFilePath.getName(); | |
Path tmpJarFilePath = makeFile(jobConf, filename); | |
log.info("Uploading " + jarFile + " to " + tmpJarFilePath.toString()); | |
try { | |
defFS.copyFromLocalFile(srcJarFilePath, tmpJarFilePath); | |
DistributedCache.addArchiveToClassPath(tmpJarFilePath, jobConf); | |
paths.add(tmpJarFilePath); | |
numFilesWritten++; | |
}catch(Exception e) { | |
for(Path path : paths) { | |
if(defFS.exists(path)) | |
defFS.delete(path,true); | |
} | |
lock.close(); | |
log.error(String.format("Exception writing to hdfs; rolling back %d jar files ",numFilesWritten),e); | |
throw new IOException("Couldn't write jar file " + jarFile); | |
} | |
} | |
try { | |
lock.create(paths); | |
}catch(KeeperException.SessionExpiredException e) { | |
lock = new HdfsLock(hdfs); | |
lock.create(paths); | |
} | |
DistributedCache.createSymlink(jobConf); | |
lock.close(); | |
//resolve any differences by removing clashing names in the files (archives are removed from files) | |
jobConf.set(DistributedCache.CACHE_FILES,""); | |
TrackerDistributedCacheManager.validate(jobConf); | |
Set<Path> remove = new HashSet<Path>(); | |
for(Path path : paths) { | |
boolean exists = false; | |
try { | |
exists = defFS.exists(path); | |
}catch(IllegalArgumentException e) {exists = false;} | |
if(!exists) | |
remove.add(path); | |
} | |
paths.removeAll(remove); | |
return paths; | |
} | |
protected static final String[] checkSystemProperties = {"file", "java", "line", "os", "path", "sun", "user"}; | |
protected static final Random random = new Random(); | |
public static Path makeTemporaryFile(Configuration jobConf, String filename) throws IOException | |
{ | |
final int randomKey = jobConf.getInt("terrier.tempfile.id", random.nextInt()); | |
jobConf.setInt("terrier.tempfile.id", randomKey); | |
FileSystem defFS = FileSystem.get(jobConf); | |
final Path tempFile = new Path("/tmp/"+(randomKey)+"-"+filename); | |
defFS.deleteOnExit(tempFile); | |
return tempFile; | |
} | |
public static Path makeFile(Configuration jobConf, String filename) throws IOException | |
{ | |
final int randomKey = jobConf.getInt("terrier.tempfile.id", random.nextInt()); | |
jobConf.setInt("terrier.tempfile.id", randomKey); | |
final Path tempFile = new Path("/tmp/"+(randomKey)+"-"+filename); | |
return tempFile; | |
} | |
protected static String[] findJarFiles(String [] classPathLines) | |
{ | |
Set<String> jars = new HashSet<String>(); | |
for (String locationsLine : classPathLines) | |
{ | |
if (locationsLine == null) | |
continue; | |
for (String CPentry : locationsLine.split(":")) | |
{ | |
if (CPentry.endsWith(".jar")) | |
jars.add(new File(CPentry).getAbsoluteFile().toString()); | |
} | |
} | |
return jars.toArray(new String[0]); | |
} | |
@Deprecated | |
public static void addToClassPath(Configuration conf) throws Exception { | |
String cp = System.getProperty("java.class.path", null); | |
String[] jars = cp.split(":"); | |
for( String jar : jars ) { | |
if(jar.endsWith("jar")) { | |
String path = getHdfsUri(conf) + "/tmp/" + jar; | |
if(!pathExists(conf,path)) { | |
FileSystem fs = FileSystem.get(conf); | |
fs.copyFromLocalFile( new Path(jar), new Path(path)); | |
} | |
//already exists | |
DistributedCache.addArchiveToClassPath(new Path(path), conf); | |
} | |
} | |
} | |
@Deprecated | |
public static void addToClassPath(JobConf conf) throws Exception { | |
String cp = System.getProperty("java.class.path", null); | |
String[] jars = cp.split(":"); | |
for( String jar : jars ) { | |
if(jar.endsWith("jar")) { | |
String path = getHdfsUri(conf) + "/tmp/" + jar; | |
if(!pathExists(conf,path)) { | |
FileSystem fs = FileSystem.get(conf); | |
fs.copyFromLocalFile( new Path(jar), new Path(path)); | |
} | |
//already exists | |
DistributedCache.addArchiveToClassPath(new Path(path), conf); | |
} | |
} | |
} | |
public static void close(Configuration conf) throws Exception { | |
FileSystem system = systems.get(conf); | |
if(system!=null) { | |
system.close(); | |
systems.remove(conf); | |
} | |
} | |
public static String getHdfsUri(Configuration conf) { | |
return conf.get("fs.default.name","127.0.0.1:8020"); | |
} | |
private static FileSystem getFileSystem(Configuration conf) throws Exception{ | |
FileSystem ret = systems.get(conf); | |
if(ret == null) { | |
ret = FileSystem.get(conf); | |
systems.put(conf,ret); | |
} | |
return ret; | |
} | |
public static void ensureUserDirExists(Configuration conf) throws Exception { | |
FileSystem fs = getFileSystem(conf); | |
if(!fs.exists(new Path(prependUserPath("")))) { | |
boolean dirs = fs.mkdirs( new Path(prependUserPath(""))); | |
if(!dirs) | |
throw new IllegalStateException("Couldn't make " + prependUserPath("")); | |
FileUtil.chmod(prependUserPath(""), "777"); | |
} | |
} | |
public static void createFile(String path,Configuration conf) throws Exception { | |
createFile(path,conf,true); | |
} | |
public static void ensureParentDirectoriesExist(String basePath,Configuration conf) throws Exception { | |
File f = new File(basePath); | |
if(!pathExists(conf,f.getParent())) | |
mkdir(f.getParent(),conf,false); | |
} | |
public static void createFile(String path,Configuration conf,boolean prepend) throws Exception { | |
FileSystem fs = getFileSystem(conf); | |
ensureParentDirectoriesExist(path,conf); | |
//returns an output stream, ensure it's closed | |
if(!prepend && !pathExists(conf,path)) { | |
OutputStream os = fs.create(new Path(path)); | |
IOUtils.closeQuietly(os); | |
} | |
else if(!pathExists(conf,prependUserPath( path ) ) ) { | |
OutputStream os = fs.create(new Path( prependUserPath( path ) ) ) ; | |
IOUtils.closeQuietly(os); | |
} | |
} | |
public static boolean pathExists(Configuration conf,String path) throws Exception { | |
FileSystem fs = getFileSystem(conf); | |
boolean ret = fs.exists( new Path(path)); | |
return ret; | |
} | |
public static String getUser() { | |
String name; | |
name = System.getProperty("user.name"); | |
if(name==null) | |
name="hdfs"; | |
return name; | |
} | |
public static String getUserPath() { | |
return "/user/" + getUser() + "/"; | |
} | |
public static String prependUserPath(String path) { | |
return "/user/" + getUser() + "/" + path; | |
} | |
public static void deleteUserDir(Configuration conf) throws Exception { | |
FileSystem fs = getFileSystem(conf); | |
if(fs.exists( new Path( prependUserPath( "" ) ) ) ) { | |
boolean delete = fs.delete(new Path( prependUserPath( "" ) ), true); | |
if(!delete) | |
throw new RuntimeException("Couldn't delete file " + prependUserPath("") ); | |
} | |
} | |
public static void writeText(String text,Configuration conf,String file) throws Exception { | |
FileSystem fs = getFileSystem(conf); | |
if(pathExists(conf,file)) | |
deletePath(file, conf, false); | |
ensureParentDirectoriesExist(file,conf); | |
FSDataOutputStream fdos = fs.create(new Path(file), true); | |
fdos.writeBytes(text); | |
fdos.flush(); | |
fdos.close(); | |
} | |
public static String getContents(String text,Configuration conf,String file) throws Exception { | |
FileSystem fs = getFileSystem(conf); | |
if(!pathExists(conf,file)) | |
return ""; | |
InputStream is = fs.open(new Path(file)); | |
StringBuffer sb = new StringBuffer(); | |
BufferedReader reader = new BufferedReader( new InputStreamReader(is)); | |
String line = null; | |
while((line = reader.readLine()) != null ) | |
sb.append(line); | |
reader.close(); | |
is.close(); | |
return sb.toString(); | |
} | |
public static void deletePath(String path,Configuration conf,boolean prependUserPath) throws Exception { | |
FileSystem fs = getFileSystem(conf); | |
if(!prependUserPath && pathExists( conf,path ) ) | |
fs.delete( new Path(path),true); | |
else if(pathExists(conf,prependUserPath( path ) ) ) | |
fs.delete( new Path( prependUserPath( path ) ),true); | |
} | |
public static void mkdir(String path,Configuration conf,boolean prependUserPath) throws Exception { | |
if(prependUserPath) | |
ensureUserDirExists(conf); | |
FileSystem fs = getFileSystem(conf); | |
if(prependUserPath && !fs.exists(new Path( prependUserPath( path ) ) ) ) | |
fs.mkdirs(new Path( prependUserPath( path ) ) ); | |
if(!fs.exists(new Path( path ) ) ) | |
fs.mkdirs( new Path(path ) ); | |
} | |
public static void deleteDir(String path,Configuration conf) throws Exception { | |
deletePath(path,conf,true); | |
} | |
public static void mkdir(String path,Configuration conf) throws Exception { | |
deletePath(path,conf,true); | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment