Skip to content

Instantly share code, notes, and snippets.

@marblejenka
Created July 14, 2011 07:21
Show Gist options
  • Save marblejenka/1082056 to your computer and use it in GitHub Desktop.
Save marblejenka/1082056 to your computer and use it in GitHub Desktop.
MultipleOutputs
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.lib;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.Progressable;
import java.io.IOException;
import java.util.*;
/**
* The MultipleOutputs class simplifies writting to additional outputs other
* than the job default output via the <code>OutputCollector</code> passed to
* the <code>map()</code> and <code>reduce()</code> methods of the
* <code>Mapper</code> and <code>Reducer</code> implementations.
* <p/>
* Each additional output, or named output, may be configured with its own
* <code>OutputFormat</code>, with its own key class and with its own value
* class.
* <p/>
* A named output can be a single file or a multi file. The later is refered as
* a multi named output.
* <p/>
* A multi named output is an unbound set of files all sharing the same
* <code>OutputFormat</code>, key class and value class configuration.
* <p/>
* When named outputs are used within a <code>Mapper</code> implementation,
* key/values written to a name output are not part of the reduce phase, only
* key/values written to the job <code>OutputCollector</code> are part of the
* reduce phase.
* <p/>
* MultipleOutputs supports counters, by default the are disabled. The counters
* group is the {@link MultipleOutputs} class name.
* </p>
* The names of the counters are the same as the named outputs. For multi
* named outputs the name of the counter is the concatenation of the named
* output, and underscore '_' and the multiname.
* <p/>
* Job configuration usage pattern is:
* <pre>
*
* JobConf conf = new JobConf();
*
* conf.setInputPath(inDir);
* FileOutputFormat.setOutputPath(conf, outDir);
*
* conf.setMapperClass(MOMap.class);
* conf.setReducerClass(MOReduce.class);
* ...
*
* // Defines additional single text based output 'text' for the job
* MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class,
* LongWritable.class, Text.class);
*
* // Defines additional multi sequencefile based output 'sequence' for the
* // job
* MultipleOutputs.addMultiNamedOutput(conf, "seq",
* SequenceFileOutputFormat.class,
* LongWritable.class, Text.class);
* ...
*
* JobClient jc = new JobClient();
* RunningJob job = jc.submitJob(conf);
*
* ...
* </pre>
* <p/>
* Job configuration usage pattern is:
* <pre>
*
* public class MOReduce implements
* Reducer&lt;WritableComparable, Writable&gt; {
* private MultipleOutputs mos;
*
* public void configure(JobConf conf) {
* ...
* mos = new MultipleOutputs(conf);
* }
*
* public void reduce(WritableComparable key, Iterator&lt;Writable&gt; values,
* OutputCollector output, Reporter reporter)
* throws IOException {
* ...
* mos.getCollector("text", reporter).collect(key, new Text("Hello"));
* mos.getCollector("seq", "A", reporter).collect(key, new Text("Bye"));
* mos.getCollector("seq", "B", reporter).collect(key, new Text("Chau"));
* ...
* }
*
* public void close() throws IOException {
* mos.close();
* ...
* }
*
* }
* </pre>
*/
public class MultipleOutputs {
private static final String NAMED_OUTPUTS = "mo.namedOutputs";
private static final String MO_PREFIX = "mo.namedOutput.";
private static final String FORMAT = ".format";
private static final String KEY = ".key";
private static final String VALUE = ".value";
private static final String MULTI = ".multi";
private static final String COUNTERS_ENABLED = "mo.counters";
/**
* Counters group used by the counters of MultipleOutputs.
*/
private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
/**
* Checks if a named output is alreadyDefined or not.
*
* @param conf job conf
* @param namedOutput named output names
* @param alreadyDefined whether the existence/non-existence of
* the named output is to be checked
* @throws IllegalArgumentException if the output name is alreadyDefined or
* not depending on the value of the
* 'alreadyDefined' parameter
*/
private static void checkNamedOutput(JobConf conf, String namedOutput,
boolean alreadyDefined) {
List<String> definedChannels = getNamedOutputsList(conf);
if (alreadyDefined && definedChannels.contains(namedOutput)) {
throw new IllegalArgumentException("Named output '" + namedOutput +
"' already alreadyDefined");
} else if (!alreadyDefined && !definedChannels.contains(namedOutput)) {
throw new IllegalArgumentException("Named output '" + namedOutput +
"' not defined");
}
}
/**
* Checks if a named output name is valid token.
*
* @param namedOutput named output Name
* @throws IllegalArgumentException if the output name is not valid.
*/
private static void checkTokenName(String namedOutput) {
if (namedOutput == null || namedOutput.length() == 0) {
throw new IllegalArgumentException(
"Name cannot be NULL or emtpy");
}
for (char ch : namedOutput.toCharArray()) {
if ((ch >= 'A') && (ch <= 'Z')) {
continue;
}
if ((ch >= 'a') && (ch <= 'z')) {
continue;
}
if ((ch >= '0') && (ch <= '9')) {
continue;
}
throw new IllegalArgumentException(
"Name cannot be have a '" + ch + "' char");
}
}
/**
* Checks if a named output name is valid.
*
* @param namedOutput named output Name
* @throws IllegalArgumentException if the output name is not valid.
*/
private static void checkNamedOutputName(String namedOutput) {
checkTokenName(namedOutput);
// name cannot be the name used for the default output
if (namedOutput.equals("part")) {
throw new IllegalArgumentException(
"Named output name cannot be 'part'");
}
}
/**
* Returns list of channel names.
*
* @param conf job conf
* @return List of channel Names
*/
public static List<String> getNamedOutputsList(JobConf conf) {
List<String> names = new ArrayList<String>();
StringTokenizer st = new StringTokenizer(conf.get(NAMED_OUTPUTS, ""), " ");
while (st.hasMoreTokens()) {
names.add(st.nextToken());
}
return names;
}
/**
* Returns if a named output is multiple.
*
* @param conf job conf
* @param namedOutput named output
* @return <code>true</code> if the name output is multi, <code>false</code>
* if it is single. If the name output is not defined it returns
* <code>false</code>
*/
public static boolean isMultiNamedOutput(JobConf conf, String namedOutput) {
checkNamedOutput(conf, namedOutput, false);
return conf.getBoolean(MO_PREFIX + namedOutput + MULTI, false);
}
/**
* Returns the named output OutputFormat.
*
* @param conf job conf
* @param namedOutput named output
* @return namedOutput OutputFormat
*/
public static Class<? extends OutputFormat> getNamedOutputFormatClass(
JobConf conf, String namedOutput) {
checkNamedOutput(conf, namedOutput, false);
return conf.getClass(MO_PREFIX + namedOutput + FORMAT, null,
OutputFormat.class);
}
/**
* Returns the key class for a named output.
*
* @param conf job conf
* @param namedOutput named output
* @return class for the named output key
*/
public static Class<? extends WritableComparable> getNamedOutputKeyClass(JobConf conf,
String namedOutput) {
checkNamedOutput(conf, namedOutput, false);
return conf.getClass(MO_PREFIX + namedOutput + KEY, null,
WritableComparable.class);
}
/**
* Returns the value class for a named output.
*
* @param conf job conf
* @param namedOutput named output
* @return class of named output value
*/
public static Class<? extends Writable> getNamedOutputValueClass(JobConf conf,
String namedOutput) {
checkNamedOutput(conf, namedOutput, false);
return conf.getClass(MO_PREFIX + namedOutput + VALUE, null,
Writable.class);
}
/**
* Adds a named output for the job.
* <p/>
*
* @param conf job conf to add the named output
* @param namedOutput named output name, it has to be a word, letters
* and numbers only, cannot be the word 'part' as
* that is reserved for the
* default output.
* @param outputFormatClass OutputFormat class.
* @param keyClass key class
* @param valueClass value class
*/
public static void addNamedOutput(JobConf conf, String namedOutput,
Class<? extends OutputFormat> outputFormatClass,
Class<?> keyClass, Class<?> valueClass) {
addNamedOutput(conf, namedOutput, false, outputFormatClass, keyClass,
valueClass);
}
/**
* Adds a multi named output for the job.
* <p/>
*
* @param conf job conf to add the named output
* @param namedOutput named output name, it has to be a word, letters
* and numbers only, cannot be the word 'part' as
* that is reserved for the
* default output.
* @param outputFormatClass OutputFormat class.
* @param keyClass key class
* @param valueClass value class
*/
public static void addMultiNamedOutput(JobConf conf, String namedOutput,
Class<? extends OutputFormat> outputFormatClass,
Class<?> keyClass, Class<?> valueClass) {
addNamedOutput(conf, namedOutput, true, outputFormatClass, keyClass,
valueClass);
}
/**
* Adds a named output for the job.
* <p/>
*
* @param conf job conf to add the named output
* @param namedOutput named output name, it has to be a word, letters
* and numbers only, cannot be the word 'part' as
* that is reserved for the
* default output.
* @param multi indicates if the named output is multi
* @param outputFormatClass OutputFormat class.
* @param keyClass key class
* @param valueClass value class
*/
private static void addNamedOutput(JobConf conf, String namedOutput,
boolean multi,
Class<? extends OutputFormat> outputFormatClass,
Class<?> keyClass, Class<?> valueClass) {
checkNamedOutputName(namedOutput);
checkNamedOutput(conf, namedOutput, true);
conf.set(NAMED_OUTPUTS, conf.get(NAMED_OUTPUTS, "") + " " + namedOutput);
conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
OutputFormat.class);
conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class);
conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class);
conf.setBoolean(MO_PREFIX + namedOutput + MULTI, multi);
}
/**
* Enables or disables counters for the named outputs.
* <p/>
* By default these counters are disabled.
* <p/>
* MultipleOutputs supports counters, by default the are disabled.
* The counters group is the {@link MultipleOutputs} class name.
* </p>
* The names of the counters are the same as the named outputs. For multi
* named outputs the name of the counter is the concatenation of the named
* output, and underscore '_' and the multiname.
*
* @param conf job conf to enableadd the named output.
* @param enabled indicates if the counters will be enabled or not.
*/
public static void setCountersEnabled(JobConf conf, boolean enabled) {
conf.setBoolean(COUNTERS_ENABLED, enabled);
}
/**
* Returns if the counters for the named outputs are enabled or not.
* <p/>
* By default these counters are disabled.
* <p/>
* MultipleOutputs supports counters, by default the are disabled.
* The counters group is the {@link MultipleOutputs} class name.
* </p>
* The names of the counters are the same as the named outputs. For multi
* named outputs the name of the counter is the concatenation of the named
* output, and underscore '_' and the multiname.
*
*
* @param conf job conf to enableadd the named output.
* @return TRUE if the counters are enabled, FALSE if they are disabled.
*/
public static boolean getCountersEnabled(JobConf conf) {
return conf.getBoolean(COUNTERS_ENABLED, false);
}
// instance code, to be used from Mapper/Reducer code
private JobConf conf;
private OutputFormat outputFormat;
private Set<String> namedOutputs;
private Map<String, RecordWriter> recordWriters;
private boolean countersEnabled;
/**
* Creates and initializes multiple named outputs support, it should be
* instantiated in the Mapper/Reducer configure method.
*
* @param job the job configuration object
*/
public MultipleOutputs(JobConf job) {
this.conf = job;
outputFormat = new InternalFileOutputFormat();
namedOutputs = Collections.unmodifiableSet(
new HashSet<String>(MultipleOutputs.getNamedOutputsList(job)));
recordWriters = new HashMap<String, RecordWriter>();
countersEnabled = getCountersEnabled(job);
}
/**
* Returns iterator with the defined name outputs.
*
* @return iterator with the defined named outputs
*/
public Iterator<String> getNamedOutputs() {
return namedOutputs.iterator();
}
// by being synchronized MultipleOutputTask can be use with a
// MultithreaderMapRunner.
private synchronized RecordWriter getRecordWriter(String namedOutput,
String baseFileName,
final Reporter reporter)
throws IOException {
RecordWriter writer = recordWriters.get(baseFileName);
if (writer == null) {
if (countersEnabled && reporter == null) {
throw new IllegalArgumentException(
"Counters are enabled, Reporter cannot be NULL");
}
JobConf jobConf = new JobConf(conf);
jobConf.set(InternalFileOutputFormat.CONFIG_NAMED_OUTPUT, namedOutput);
FileSystem fs = FileSystem.get(conf);
writer =
outputFormat.getRecordWriter(fs, jobConf, baseFileName, reporter);
if (countersEnabled) {
if (reporter == null) {
throw new IllegalArgumentException(
"Counters are enabled, Reporter cannot be NULL");
}
writer = new RecordWriterWithCounter(writer, baseFileName, reporter);
}
recordWriters.put(baseFileName, writer);
}
return writer;
}
private static class RecordWriterWithCounter implements RecordWriter {
private RecordWriter writer;
private String counterName;
private Reporter reporter;
public RecordWriterWithCounter(RecordWriter writer, String counterName,
Reporter reporter) {
this.writer = writer;
this.counterName = counterName;
this.reporter = reporter;
}
@SuppressWarnings({"unchecked"})
public void write(Object key, Object value) throws IOException {
reporter.incrCounter(COUNTERS_GROUP, counterName, 1);
writer.write(key, value);
}
public void close(Reporter reporter) throws IOException {
writer.close(reporter);
}
}
/**
* Gets the output collector for a named output.
* <p/>
*
* @param namedOutput the named output name
* @param reporter the reporter
* @return the output collector for the given named output
* @throws IOException thrown if output collector could not be created
*/
@SuppressWarnings({"unchecked"})
public OutputCollector getCollector(String namedOutput, Reporter reporter)
throws IOException {
return getCollector(namedOutput, null, reporter);
}
/**
* Gets the output collector for a multi named output.
* <p/>
*
* @param namedOutput the named output name
* @param multiName the multi name part
* @param reporter the reporter
* @return the output collector for the given named output
* @throws IOException thrown if output collector could not be created
*/
@SuppressWarnings({"unchecked"})
public OutputCollector getCollector(String namedOutput, String multiName,
Reporter reporter)
throws IOException {
checkNamedOutputName(namedOutput);
if (!namedOutputs.contains(namedOutput)) {
throw new IllegalArgumentException("Undefined named output '" +
namedOutput + "'");
}
boolean multi = isMultiNamedOutput(conf, namedOutput);
if (!multi && multiName != null) {
throw new IllegalArgumentException("Name output '" + namedOutput +
"' has not been defined as multi");
}
if (multi) {
checkTokenName(multiName);
}
String baseFileName = (multi) ? namedOutput + "_" + multiName : namedOutput;
final RecordWriter writer =
getRecordWriter(namedOutput, baseFileName, reporter);
return new OutputCollector() {
@SuppressWarnings({"unchecked"})
public void collect(Object key, Object value) throws IOException {
writer.write(key, value);
}
};
}
/**
* Closes all the opened named outputs.
* <p/>
* If overriden subclasses must invoke <code>super.close()</code> at the
* end of their <code>close()</code>
*
* @throws java.io.IOException thrown if any of the MultipleOutput files
* could not be closed properly.
*/
public void close() throws IOException {
for (RecordWriter writer : recordWriters.values()) {
writer.close(null);
}
}
private static class InternalFileOutputFormat extends
FileOutputFormat<Object, Object> {
public static final String CONFIG_NAMED_OUTPUT = "mo.config.namedOutput";
@SuppressWarnings({"unchecked"})
public RecordWriter<Object, Object> getRecordWriter(
FileSystem fs, JobConf job, String baseFileName, Progressable progress)
throws IOException {
String nameOutput = job.get(CONFIG_NAMED_OUTPUT, null);
String fileName = getUniqueName(job, baseFileName);
// The following trick leverages the instantiation of a record writer via
// the job conf thus supporting arbitrary output formats.
JobConf outputConf = new JobConf(job);
outputConf.setOutputFormat(getNamedOutputFormatClass(job, nameOutput));
outputConf.setOutputKeyClass(getNamedOutputKeyClass(job, nameOutput));
outputConf.setOutputValueClass(getNamedOutputValueClass(job, nameOutput));
OutputFormat outputFormat = outputConf.getOutputFormat();
return outputFormat.getRecordWriter(fs, outputConf, fileName, progress);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.lib.output;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.IOException;
import java.util.*;
/**
* The MultipleOutputs class simplifies writing output data
* to multiple outputs
*
* <p>
* Case one: writing to additional outputs other than the job default output.
*
* Each additional output, or named output, may be configured with its own
* <code>OutputFormat</code>, with its own key class and with its own value
* class.
*
* <p>
* Case two: to write data to different files provided by user
* </p>
*
* <p>
* MultipleOutputs supports counters, by default they are disabled. The
* counters group is the {@link MultipleOutputs} class name. The names of the
* counters are the same as the output name. These count the number records
* written to each output name.
* </p>
*
* Usage pattern for job submission:
* <pre>
*
* Job job = new Job();
*
* FileInputFormat.setInputPath(job, inDir);
* FileOutputFormat.setOutputPath(job, outDir);
*
* job.setMapperClass(MOMap.class);
* job.setReducerClass(MOReduce.class);
* ...
*
* // Defines additional single text based output 'text' for the job
* MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
* LongWritable.class, Text.class);
*
* // Defines additional sequence-file based output 'sequence' for the job
* MultipleOutputs.addNamedOutput(job, "seq",
* SequenceFileOutputFormat.class,
* LongWritable.class, Text.class);
* ...
*
* job.waitForCompletion(true);
* ...
* </pre>
* <p>
* Usage in Reducer:
* <pre>
* <K, V> String generateFileName(K k, V v) {
* return k.toString() + "_" + v.toString();
* }
*
* public class MOReduce extends
* Reducer&lt;WritableComparable, Writable,WritableComparable, Writable&gt; {
* private MultipleOutputs mos;
* public void setup(Context context) {
* ...
* mos = new MultipleOutputs(context);
* }
*
* public void reduce(WritableComparable key, Iterator&lt;Writable&gt; values,
* Context context)
* throws IOException {
* ...
* mos.write("text", , key, new Text("Hello"));
* mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a");
* mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b");
* mos.write(key, new Text("value"), generateFileName(key, new Text("value")));
* ...
* }
*
* public void cleanup(Context) throws IOException {
* mos.close();
* ...
* }
*
* }
* </pre>
*/
public class MultipleOutputs<KEYOUT, VALUEOUT> {
private static final String MULTIPLE_OUTPUTS = "mapreduce.multipleoutputs";
private static final String MO_PREFIX =
"mapreduce.multipleoutputs.namedOutput.";
private static final String FORMAT = ".format";
private static final String KEY = ".key";
private static final String VALUE = ".value";
private static final String COUNTERS_ENABLED =
"mapreduce.multipleoutputs.counters";
/**
* Counters group used by the counters of MultipleOutputs.
*/
private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
/**
* Cache for the taskContexts
*/
private Map<String, TaskAttemptContext> taskContexts = new HashMap<String, TaskAttemptContext>();
/**
* Checks if a named output name is valid token.
*
* @param namedOutput named output Name
* @throws IllegalArgumentException if the output name is not valid.
*/
private static void checkTokenName(String namedOutput) {
if (namedOutput == null || namedOutput.length() == 0) {
throw new IllegalArgumentException(
"Name cannot be NULL or emtpy");
}
for (char ch : namedOutput.toCharArray()) {
if ((ch >= 'A') && (ch <= 'Z')) {
continue;
}
if ((ch >= 'a') && (ch <= 'z')) {
continue;
}
if ((ch >= '0') && (ch <= '9')) {
continue;
}
throw new IllegalArgumentException(
"Name cannot be have a '" + ch + "' char");
}
}
/**
* Checks if output name is valid.
*
* name cannot be the name used for the default output
* @param outputPath base output Name
* @throws IllegalArgumentException if the output name is not valid.
*/
private static void checkBaseOutputPath(String outputPath) {
if (outputPath.equals(FileOutputFormat.PART)) {
throw new IllegalArgumentException("output name cannot be 'part'");
}
}
/**
* Checks if a named output name is valid.
*
* @param namedOutput named output Name
* @throws IllegalArgumentException if the output name is not valid.
*/
private static void checkNamedOutputName(JobContext job,
String namedOutput, boolean alreadyDefined) {
checkTokenName(namedOutput);
checkBaseOutputPath(namedOutput);
List<String> definedChannels = getNamedOutputsList(job);
if (alreadyDefined && definedChannels.contains(namedOutput)) {
throw new IllegalArgumentException("Named output '" + namedOutput +
"' already alreadyDefined");
} else if (!alreadyDefined && !definedChannels.contains(namedOutput)) {
throw new IllegalArgumentException("Named output '" + namedOutput +
"' not defined");
}
}
// Returns list of channel names.
private static List<String> getNamedOutputsList(JobContext job) {
List<String> names = new ArrayList<String>();
StringTokenizer st = new StringTokenizer(
job.getConfiguration().get(MULTIPLE_OUTPUTS, ""), " ");
while (st.hasMoreTokens()) {
names.add(st.nextToken());
}
return names;
}
// Returns the named output OutputFormat.
@SuppressWarnings("unchecked")
private static Class<? extends OutputFormat<?, ?>> getNamedOutputFormatClass(
JobContext job, String namedOutput) {
return (Class<? extends OutputFormat<?, ?>>)
job.getConfiguration().getClass(MO_PREFIX + namedOutput + FORMAT, null,
OutputFormat.class);
}
// Returns the key class for a named output.
private static Class<?> getNamedOutputKeyClass(JobContext job,
String namedOutput) {
return job.getConfiguration().getClass(MO_PREFIX + namedOutput + KEY, null,
WritableComparable.class);
}
// Returns the value class for a named output.
private static Class<? extends Writable> getNamedOutputValueClass(
JobContext job, String namedOutput) {
return job.getConfiguration().getClass(MO_PREFIX + namedOutput + VALUE,
null, Writable.class);
}
/**
* Adds a named output for the job.
* <p/>
*
* @param job job to add the named output
* @param namedOutput named output name, it has to be a word, letters
* and numbers only, cannot be the word 'part' as
* that is reserved for the default output.
* @param outputFormatClass OutputFormat class.
* @param keyClass key class
* @param valueClass value class
*/
@SuppressWarnings("unchecked")
public static void addNamedOutput(Job job, String namedOutput,
Class<? extends OutputFormat> outputFormatClass,
Class<?> keyClass, Class<?> valueClass) {
checkNamedOutputName(job, namedOutput, true);
Configuration conf = job.getConfiguration();
conf.set(MULTIPLE_OUTPUTS,
conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput);
conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
OutputFormat.class);
conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class);
conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class);
}
/**
* Enables or disables counters for the named outputs.
*
* The counters group is the {@link MultipleOutputs} class name.
* The names of the counters are the same as the named outputs. These
* counters count the number records written to each output name.
* By default these counters are disabled.
*
* @param job job to enable counters
* @param enabled indicates if the counters will be enabled or not.
*/
public static void setCountersEnabled(Job job, boolean enabled) {
job.getConfiguration().setBoolean(COUNTERS_ENABLED, enabled);
}
/**
* Returns if the counters for the named outputs are enabled or not.
* By default these counters are disabled.
*
* @param job the job
* @return TRUE if the counters are enabled, FALSE if they are disabled.
*/
public static boolean getCountersEnabled(JobContext job) {
return job.getConfiguration().getBoolean(COUNTERS_ENABLED, false);
}
/**
* Wraps RecordWriter to increment counters.
*/
@SuppressWarnings("unchecked")
private static class RecordWriterWithCounter extends RecordWriter {
private RecordWriter writer;
private String counterName;
private TaskInputOutputContext context;
public RecordWriterWithCounter(RecordWriter writer, String counterName,
TaskInputOutputContext context) {
this.writer = writer;
this.counterName = counterName;
this.context = context;
}
@SuppressWarnings({"unchecked"})
public void write(Object key, Object value)
throws IOException, InterruptedException {
context.getCounter(COUNTERS_GROUP, counterName).increment(1);
writer.write(key, value);
}
public void close(TaskAttemptContext context)
throws IOException, InterruptedException {
writer.close(context);
}
}
// instance code, to be used from Mapper/Reducer code
private TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context;
private Set<String> namedOutputs;
private Map<String, RecordWriter<?, ?>> recordWriters;
private boolean countersEnabled;
/**
* Creates and initializes multiple outputs support,
* it should be instantiated in the Mapper/Reducer setup method.
*
* @param context the TaskInputOutputContext object
*/
public MultipleOutputs(
TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
this.context = context;
namedOutputs = Collections.unmodifiableSet(
new HashSet<String>(MultipleOutputs.getNamedOutputsList(context)));
recordWriters = new HashMap<String, RecordWriter<?, ?>>();
countersEnabled = getCountersEnabled(context);
}
/**
* Write key and value to the namedOutput.
*
* Output path is a unique file generated for the namedOutput.
* For example, {namedOutput}-(m|r)-{part-number}
*
* @param namedOutput the named output name
* @param key the key
* @param value the value
*/
@SuppressWarnings("unchecked")
public <K, V> void write(String namedOutput, K key, V value)
throws IOException, InterruptedException {
write(namedOutput, key, value, namedOutput);
}
/**
* Write key and value to baseOutputPath using the namedOutput.
*
* @param namedOutput the named output name
* @param key the key
* @param value the value
* @param baseOutputPath base-output path to write the record to.
* Note: Framework will generate unique filename for the baseOutputPath
*/
@SuppressWarnings("unchecked")
public <K, V> void write(String namedOutput, K key, V value,
String baseOutputPath) throws IOException, InterruptedException {
checkNamedOutputName(context, namedOutput, false);
checkBaseOutputPath(baseOutputPath);
if (!namedOutputs.contains(namedOutput)) {
throw new IllegalArgumentException("Undefined named output '" +
namedOutput + "'");
}
TaskAttemptContext taskContext = getContext(namedOutput);
getRecordWriter(taskContext, baseOutputPath).write(key, value);
}
/**
* Write key value to an output file name.
*
* Gets the record writer from job's output format.
* Job's output format should be a FileOutputFormat.
*
* @param key the key
* @param value the value
* @param baseOutputPath base-output path to write the record to.
* Note: Framework will generate unique filename for the baseOutputPath
*/
@SuppressWarnings("unchecked")
public void write(KEYOUT key, VALUEOUT value, String baseOutputPath)
throws IOException, InterruptedException {
checkBaseOutputPath(baseOutputPath);
TaskAttemptContext taskContext = new TaskAttemptContext(
context.getConfiguration(), context.getTaskAttemptID());
getRecordWriter(taskContext, baseOutputPath).write(key, value);
}
// by being synchronized MultipleOutputTask can be use with a
// MultithreadedMapper.
@SuppressWarnings("unchecked")
private synchronized RecordWriter getRecordWriter(
TaskAttemptContext taskContext, String baseFileName)
throws IOException, InterruptedException {
// look for record-writer in the cache
RecordWriter writer = recordWriters.get(baseFileName);
// If not in cache, create a new one
if (writer == null) {
// get the record writer from context output format
FileOutputFormat.setOutputName(taskContext, baseFileName);
try {
writer = ((OutputFormat) ReflectionUtils.newInstance(
taskContext.getOutputFormatClass(), taskContext.getConfiguration()))
.getRecordWriter(taskContext);
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
// if counters are enabled, wrap the writer with context
// to increment counters
if (countersEnabled) {
writer = new RecordWriterWithCounter(writer, baseFileName, context);
}
// add the record-writer to the cache
recordWriters.put(baseFileName, writer);
}
return writer;
}
// Create a taskAttemptContext for the named output with
// output format and output key/value types put in the context
private TaskAttemptContext getContext(String nameOutput) throws IOException {
TaskAttemptContext taskContext = taskContexts.get(nameOutput);
if (taskContext != null) {
return taskContext;
}
// The following trick leverages the instantiation of a record writer via
// the job thus supporting arbitrary output formats.
Job job = new Job(context.getConfiguration());
job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
job.setOutputKeyClass(getNamedOutputKeyClass(context, nameOutput));
job.setOutputValueClass(getNamedOutputValueClass(context, nameOutput));
taskContext = new TaskAttemptContext(
job.getConfiguration(), context.getTaskAttemptID());
taskContexts.put(nameOutput, taskContext);
return taskContext;
}
/**
* Closes all the opened outputs.
*
* This should be called from cleanup method of map/reduce task.
* If overridden subclasses must invoke <code>super.close()</code> at the
* end of their <code>close()</code>
*
*/
@SuppressWarnings("unchecked")
public void close() throws IOException, InterruptedException {
for (RecordWriter writer : recordWriters.values()) {
writer.close(context);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.lib;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Iterator;
public class TestMultipleOutputs extends HadoopTestCase {
public TestMultipleOutputs() throws IOException {
super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
}
public void testWithoutCounters() throws Exception {
_testMultipleOutputs(false);
}
public void testWithCounters() throws Exception {
_testMultipleOutputs(true);
}
private static final Path ROOT_DIR = new Path("testing/mo");
private static final Path IN_DIR = new Path(ROOT_DIR, "input");
private static final Path OUT_DIR = new Path(ROOT_DIR, "output");
private Path getDir(Path dir) {
// Hack for local FS that does not have the concept of a 'mounting point'
if (isLocalFS()) {
String localPathRoot = System.getProperty("test.build.data", "/tmp")
.replace(' ', '+');
dir = new Path(localPathRoot, dir);
}
return dir;
}
public void setUp() throws Exception {
super.setUp();
Path rootDir = getDir(ROOT_DIR);
Path inDir = getDir(IN_DIR);
JobConf conf = createJobConf();
FileSystem fs = FileSystem.get(conf);
fs.delete(rootDir, true);
if (!fs.mkdirs(inDir)) {
throw new IOException("Mkdirs failed to create " + inDir.toString());
}
}
public void tearDown() throws Exception {
Path rootDir = getDir(ROOT_DIR);
JobConf conf = createJobConf();
FileSystem fs = FileSystem.get(conf);
fs.delete(rootDir, true);
super.tearDown();
}
protected void _testMultipleOutputs(boolean withCounters) throws Exception {
Path inDir = getDir(IN_DIR);
Path outDir = getDir(OUT_DIR);
JobConf conf = createJobConf();
FileSystem fs = FileSystem.get(conf);
DataOutputStream file = fs.create(new Path(inDir, "part-0"));
file.writeBytes("a\nb\n\nc\nd\ne");
file.close();
file = fs.create(new Path(inDir, "part-1"));
file.writeBytes("a\nb\n\nc\nd\ne");
file.close();
conf.setJobName("mo");
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapOutputKeyClass(LongWritable.class);
conf.setMapOutputValueClass(Text.class);
conf.setOutputFormat(TextOutputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class,
LongWritable.class, Text.class);
MultipleOutputs.addMultiNamedOutput(conf, "sequence",
SequenceFileOutputFormat.class, LongWritable.class, Text.class);
MultipleOutputs.setCountersEnabled(conf, withCounters);
conf.setMapperClass(MOMap.class);
conf.setReducerClass(MOReduce.class);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
JobClient jc = new JobClient(conf);
RunningJob job = jc.submitJob(conf);
while (!job.isComplete()) {
Thread.sleep(100);
}
// assert number of named output part files
int namedOutputCount = 0;
FileStatus[] statuses = fs.listStatus(outDir);
for (FileStatus status : statuses) {
if (status.getPath().getName().equals("text-m-00000") ||
status.getPath().getName().equals("text-m-00001") ||
status.getPath().getName().equals("text-r-00000") ||
status.getPath().getName().equals("sequence_A-m-00000") ||
status.getPath().getName().equals("sequence_A-m-00001") ||
status.getPath().getName().equals("sequence_B-m-00000") ||
status.getPath().getName().equals("sequence_B-m-00001") ||
status.getPath().getName().equals("sequence_B-r-00000") ||
status.getPath().getName().equals("sequence_C-r-00000")) {
namedOutputCount++;
}
}
assertEquals(9, namedOutputCount);
// assert TextOutputFormat files correctness
BufferedReader reader = new BufferedReader(
new InputStreamReader(fs.open(
new Path(FileOutputFormat.getOutputPath(conf), "text-r-00000"))));
int count = 0;
String line = reader.readLine();
while (line != null) {
assertTrue(line.endsWith("text"));
line = reader.readLine();
count++;
}
reader.close();
assertFalse(count == 0);
// assert SequenceOutputFormat files correctness
SequenceFile.Reader seqReader =
new SequenceFile.Reader(fs, new Path(FileOutputFormat.getOutputPath(conf),
"sequence_B-r-00000"), conf);
assertEquals(LongWritable.class, seqReader.getKeyClass());
assertEquals(Text.class, seqReader.getValueClass());
count = 0;
LongWritable key = new LongWritable();
Text value = new Text();
while (seqReader.next(key, value)) {
assertEquals("sequence", value.toString());
count++;
}
seqReader.close();
assertFalse(count == 0);
Counters.Group counters =
job.getCounters().getGroup(MultipleOutputs.class.getName());
if (!withCounters) {
assertEquals(0, counters.size());
}
else {
assertEquals(4, counters.size());
assertEquals(4, counters.getCounter("text"));
assertEquals(2, counters.getCounter("sequence_A"));
assertEquals(4, counters.getCounter("sequence_B"));
assertEquals(2, counters.getCounter("sequence_C"));
}
}
@SuppressWarnings({"unchecked"})
public static class MOMap implements Mapper<LongWritable, Text, LongWritable,
Text> {
private MultipleOutputs mos;
public void configure(JobConf conf) {
mos = new MultipleOutputs(conf);
}
public void map(LongWritable key, Text value,
OutputCollector<LongWritable, Text> output,
Reporter reporter)
throws IOException {
if (!value.toString().equals("a")) {
output.collect(key, value);
} else {
mos.getCollector("text", reporter).collect(key, new Text("text"));
mos.getCollector("sequence", "A", reporter).collect(key,
new Text("sequence"));
mos.getCollector("sequence", "B", reporter).collect(key,
new Text("sequence"));
}
}
public void close() throws IOException {
mos.close();
}
}
@SuppressWarnings({"unchecked"})
public static class MOReduce implements Reducer<LongWritable, Text,
LongWritable, Text> {
private MultipleOutputs mos;
public void configure(JobConf conf) {
mos = new MultipleOutputs(conf);
}
public void reduce(LongWritable key, Iterator<Text> values,
OutputCollector<LongWritable, Text> output,
Reporter reporter)
throws IOException {
while (values.hasNext()) {
Text value = values.next();
if (!value.toString().equals("b")) {
output.collect(key, value);
} else {
mos.getCollector("text", reporter).collect(key, new Text("text"));
mos.getCollector("sequence", "B", reporter).collect(key,
new Text("sequence"));
mos.getCollector("sequence", "C", reporter).collect(key,
new Text("sequence"));
}
}
}
public void close() throws IOException {
mos.close();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment