Last active
February 26, 2016 08:28
-
-
Save apivovarov/4de9a82e467401d005f1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.hadoop.mapred; | |
import java.io.IOException; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.mapred.JobContext; | |
import org.apache.hadoop.mapred.JobStatus; | |
import org.apache.hadoop.mapred.JobConf; | |
import org.apache.hadoop.mapred.OutputCommitter; | |
import org.apache.hadoop.mapred.TaskAttemptContext; | |
import org.apache.hadoop.mapred.TaskAttemptID; | |
/** | |
* OutputCommitter suitable for S3 workloads. Unlike the usual FileOutputCommitter, which | |
* writes files to a _temporary/ directory before renaming them to their final location, this | |
* simply writes directly to the final location. | |
* | |
* The FileOutputCommitter is required for HDFS + speculation, which allows only one writer at | |
* a time for a file (so two people racing to write the same file would not work). However, S3 | |
* supports multiple writers outputting to the same file, where visibility is guaranteed to be | |
* atomic. This is a monotonic operation: all writers should be writing the same data, so which | |
* one wins is immaterial. | |
* | |
* Code adapted from Ian Hummel's code from this PR: | |
* https://github.com/themodernlife/spark/commit/4359664b1d557d55b0579023df809542386d5b8c | |
* | |
* Add the following settings to mapred-site.xml | |
* | |
* <property> | |
* <name>mapred.output.committer.class</name> | |
* <value>org.apache.hadoop.mapred.DirectOutputCommitter</value> | |
* </property> | |
*/ | |
public class DirectOutputCommitter extends OutputCommitter { | |
@Override | |
public void setupJob(JobContext jobContext) throws IOException { | |
} | |
@Override | |
public void setupTask(TaskAttemptContext taskContext) throws IOException { | |
} | |
@Override | |
public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException { | |
// We return true here to guard against implementations that do not handle false correctly. | |
// The meaning of returning false is not entirely clear, so it's possible to be interpreted | |
// as an error. Returning true just means that commitTask() will be called, which is a no-op. | |
return true; | |
} | |
@Override | |
public void commitTask(TaskAttemptContext taskContext) throws IOException { | |
} | |
@Override | |
public void abortTask(TaskAttemptContext taskContext) throws IOException { | |
} | |
/** | |
* Creates a _SUCCESS file to indicate the entire job was successful. | |
* This mimics the behavior of FileOutputCommitter, reusing the same file name and conf option. | |
*/ | |
@Override | |
public void commitJob(JobContext context) throws IOException { | |
JobConf conf = context.getJobConf(); | |
if (shouldCreateSuccessFile(conf)) { | |
Path outputPath = FileOutputFormat.getOutputPath(conf); | |
if (outputPath != null) { | |
FileSystem fileSys = outputPath.getFileSystem(conf); | |
Path filePath = new Path(outputPath, FileOutputCommitter.SUCCEEDED_FILE_NAME); | |
fileSys.create(filePath).close(); | |
} | |
} | |
} | |
/** By default, we do create the _SUCCESS file, but we allow it to be turned off. */ | |
private boolean shouldCreateSuccessFile(JobConf conf) { | |
return conf.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment