Last active
February 25, 2016 23:09
-
-
Save apivovarov/45f629de10e890f156ac to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.hadoop.mapreduce.lib.output; | |
import java.io.IOException; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.mapreduce.JobContext; | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; | |
/** | |
* OutputCommitter suitable for S3 workloads. Unlike the usual FileOutputCommitter, which | |
* writes files to a _temporary/ directory before renaming them to their final location, this | |
* simply writes directly to the final location. | |
* | |
* The FileOutputCommitter is required for HDFS + speculation, which allows only one writer at | |
* a time for a file (so two people racing to write the same file would not work). However, S3 | |
* supports multiple writers outputting to the same file, where visibility is guaranteed to be | |
* atomic. This is a monotonic operation: all writers should be writing the same data, so which | |
* one wins is immaterial. | |
* | |
* Put jar file with the class to /usr/lib/hadoop/client, /usr/lib/spark/lib | |
* Add the following settings to mapred-site.xml | |
* <property> | |
* <name>mapred.output.direct.EmrFileSystem</name> | |
* <value>true</value> | |
* </property> | |
* <property> | |
* <name>mapred.output.direct.NativeS3FileSystem</name> | |
* <value>true</value> | |
* </property> | |
* <property> | |
* <name>mapred.output.committer.class</name> | |
* <value>org.apache.hadoop.mapreduce.lib.output.DirectFileOutputCommitter</value> | |
* </property> | |
*/ | |
public class DirectFileOutputCommitter extends FileOutputCommitter { | |
private static final Log LOG = LogFactory.getLog(DirectFileOutputCommitter.class); | |
private Path outputPath = null; | |
private final boolean directWrite; | |
public DirectFileOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException { | |
super(outputPath, context); | |
if(outputPath != null && context != null) { | |
this.outputPath = outputPath; | |
this.directWrite = isDirectWrite(context.getConfiguration(), outputPath.getFileSystem(context.getConfiguration())); | |
} else { | |
this.directWrite = false; | |
} | |
} | |
public DirectFileOutputCommitter(Path outputPath, JobContext context) throws IOException { | |
super(outputPath, context); | |
if(outputPath != null && context != null) { | |
this.outputPath = outputPath; | |
this.directWrite = isDirectWrite(context.getConfiguration(), outputPath.getFileSystem(context.getConfiguration())); | |
} else { | |
this.directWrite = false; | |
} | |
} | |
public void setupJob(JobContext context) throws IOException { | |
if(this.directWrite) { | |
LOG.info("Nothing to setup since the outputs are written directly."); | |
} else { | |
super.setupJob(context); | |
} | |
} | |
public void cleanupJob(JobContext context) throws IOException { | |
if(this.directWrite) { | |
LOG.info("Nothing to clean up since no temporary files were written."); | |
} else { | |
super.cleanupJob(context); | |
} | |
} | |
public void setupTask(TaskAttemptContext context) throws IOException { | |
if(!this.directWrite) { | |
super.setupTask(context); | |
} | |
} | |
public void commitTask(TaskAttemptContext context) throws IOException { | |
if(this.directWrite) { | |
LOG.info("Commit should not be called since this task doesnt have any commitable files. Also needsTaskCommit returns false"); | |
} else { | |
super.commitTask(context); | |
} | |
} | |
public void commitJob(JobContext context) throws IOException { | |
if(this.directWrite) { | |
if(this.hasOutputPath()) { | |
this.cleanupJob(context); | |
if(context.getConfiguration().getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) { | |
Path markerPath = new Path(this.outputPath, "_SUCCESS"); | |
FileSystem fs = markerPath.getFileSystem(context.getConfiguration()); | |
fs.create(markerPath).close(); | |
} | |
} | |
} else { | |
super.commitJob(context); | |
} | |
} | |
public void abortTask(TaskAttemptContext context) throws IOException { | |
if(this.directWrite) { | |
LOG.info("Nothing to clean up on abort since there are no temporary files written"); | |
} else { | |
super.abortTask(context); | |
} | |
} | |
public boolean needsTaskCommit(TaskAttemptContext context) throws IOException { | |
return this.directWrite?false:super.needsTaskCommit(context); | |
} | |
public Path getWorkPath() throws IOException { | |
return this.directWrite?this.outputPath:super.getWorkPath(); | |
} | |
private static boolean isDirectWrite(Configuration c, FileSystem fs) { | |
return c.getBoolean("mapred.output.direct." + fs.getClass().getSimpleName(), false); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment