Skip to content

Instantly share code, notes, and snippets.

@adamv
Last active August 29, 2015 14:24
Show Gist options
  • Save adamv/af40115b086b2345e069 to your computer and use it in GitHub Desktop.
Save adamv/af40115b086b2345e069 to your computer and use it in GitHub Desktop.
// Subclass TextOutputFormat so we can use a base ".json" extension.
// adamv: I feel as if this shouldn't be this complicated.
static class JsonTextOutputFormat extends TextOutputFormat<LongWritable, Text> {
@Override
public RecordWriter<LongWritable, Text>
getRecordWriter(TaskAttemptContext job
) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator = conf.get(SEPERATOR, "\t");
CompressionCodec codec = null;
String extension = ".json";
if (isCompressed) {
Class<? extends CompressionCodec> codecClass =
getOutputCompressorClass(job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
extension += codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
if (!isCompressed) {
FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter<>(fileOut, keyValueSeparator);
} else {
FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter<>(new DataOutputStream
(codec.createOutputStream(fileOut)),
keyValueSeparator);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment