Created
July 14, 2011 07:21
-
-
Save marblejenka/1082056 to your computer and use it in GitHub Desktop.
MultipleOutputs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.apache.hadoop.mapred.lib; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.io.Writable; | |
import org.apache.hadoop.io.WritableComparable; | |
import org.apache.hadoop.mapred.*; | |
import org.apache.hadoop.util.Progressable; | |
import java.io.IOException; | |
import java.util.*; | |
/** | |
* The MultipleOutputs class simplifies writting to additional outputs other | |
* than the job default output via the <code>OutputCollector</code> passed to | |
* the <code>map()</code> and <code>reduce()</code> methods of the | |
* <code>Mapper</code> and <code>Reducer</code> implementations. | |
* <p/> | |
* Each additional output, or named output, may be configured with its own | |
* <code>OutputFormat</code>, with its own key class and with its own value | |
* class. | |
* <p/> | |
* A named output can be a single file or a multi file. The later is refered as | |
* a multi named output. | |
* <p/> | |
* A multi named output is an unbound set of files all sharing the same | |
* <code>OutputFormat</code>, key class and value class configuration. | |
* <p/> | |
* When named outputs are used within a <code>Mapper</code> implementation, | |
* key/values written to a name output are not part of the reduce phase, only | |
* key/values written to the job <code>OutputCollector</code> are part of the | |
* reduce phase. | |
* <p/> | |
* MultipleOutputs supports counters, by default the are disabled. The counters | |
* group is the {@link MultipleOutputs} class name. | |
* </p> | |
* The names of the counters are the same as the named outputs. For multi | |
* named outputs the name of the counter is the concatenation of the named | |
* output, and underscore '_' and the multiname. | |
* <p/> | |
* Job configuration usage pattern is: | |
* <pre> | |
* | |
* JobConf conf = new JobConf(); | |
* | |
* conf.setInputPath(inDir); | |
* FileOutputFormat.setOutputPath(conf, outDir); | |
* | |
* conf.setMapperClass(MOMap.class); | |
* conf.setReducerClass(MOReduce.class); | |
* ... | |
* | |
* // Defines additional single text based output 'text' for the job | |
* MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class, | |
* LongWritable.class, Text.class); | |
* | |
* // Defines additional multi sequencefile based output 'sequence' for the | |
* // job | |
* MultipleOutputs.addMultiNamedOutput(conf, "seq", | |
* SequenceFileOutputFormat.class, | |
* LongWritable.class, Text.class); | |
* ... | |
* | |
* JobClient jc = new JobClient(); | |
* RunningJob job = jc.submitJob(conf); | |
* | |
* ... | |
* </pre> | |
* <p/> | |
* Job configuration usage pattern is: | |
* <pre> | |
* | |
* public class MOReduce implements | |
* Reducer<WritableComparable, Writable> { | |
* private MultipleOutputs mos; | |
* | |
* public void configure(JobConf conf) { | |
* ... | |
* mos = new MultipleOutputs(conf); | |
* } | |
* | |
* public void reduce(WritableComparable key, Iterator<Writable> values, | |
* OutputCollector output, Reporter reporter) | |
* throws IOException { | |
* ... | |
* mos.getCollector("text", reporter).collect(key, new Text("Hello")); | |
* mos.getCollector("seq", "A", reporter).collect(key, new Text("Bye")); | |
* mos.getCollector("seq", "B", reporter).collect(key, new Text("Chau")); | |
* ... | |
* } | |
* | |
* public void close() throws IOException { | |
* mos.close(); | |
* ... | |
* } | |
* | |
* } | |
* </pre> | |
*/ | |
public class MultipleOutputs { | |
private static final String NAMED_OUTPUTS = "mo.namedOutputs"; | |
private static final String MO_PREFIX = "mo.namedOutput."; | |
private static final String FORMAT = ".format"; | |
private static final String KEY = ".key"; | |
private static final String VALUE = ".value"; | |
private static final String MULTI = ".multi"; | |
private static final String COUNTERS_ENABLED = "mo.counters"; | |
/** | |
* Counters group used by the counters of MultipleOutputs. | |
*/ | |
private static final String COUNTERS_GROUP = MultipleOutputs.class.getName(); | |
/** | |
* Checks if a named output is alreadyDefined or not. | |
* | |
* @param conf job conf | |
* @param namedOutput named output names | |
* @param alreadyDefined whether the existence/non-existence of | |
* the named output is to be checked | |
* @throws IllegalArgumentException if the output name is alreadyDefined or | |
* not depending on the value of the | |
* 'alreadyDefined' parameter | |
*/ | |
private static void checkNamedOutput(JobConf conf, String namedOutput, | |
boolean alreadyDefined) { | |
List<String> definedChannels = getNamedOutputsList(conf); | |
if (alreadyDefined && definedChannels.contains(namedOutput)) { | |
throw new IllegalArgumentException("Named output '" + namedOutput + | |
"' already alreadyDefined"); | |
} else if (!alreadyDefined && !definedChannels.contains(namedOutput)) { | |
throw new IllegalArgumentException("Named output '" + namedOutput + | |
"' not defined"); | |
} | |
} | |
/** | |
* Checks if a named output name is valid token. | |
* | |
* @param namedOutput named output Name | |
* @throws IllegalArgumentException if the output name is not valid. | |
*/ | |
private static void checkTokenName(String namedOutput) { | |
if (namedOutput == null || namedOutput.length() == 0) { | |
throw new IllegalArgumentException( | |
"Name cannot be NULL or emtpy"); | |
} | |
for (char ch : namedOutput.toCharArray()) { | |
if ((ch >= 'A') && (ch <= 'Z')) { | |
continue; | |
} | |
if ((ch >= 'a') && (ch <= 'z')) { | |
continue; | |
} | |
if ((ch >= '0') && (ch <= '9')) { | |
continue; | |
} | |
throw new IllegalArgumentException( | |
"Name cannot be have a '" + ch + "' char"); | |
} | |
} | |
/** | |
* Checks if a named output name is valid. | |
* | |
* @param namedOutput named output Name | |
* @throws IllegalArgumentException if the output name is not valid. | |
*/ | |
private static void checkNamedOutputName(String namedOutput) { | |
checkTokenName(namedOutput); | |
// name cannot be the name used for the default output | |
if (namedOutput.equals("part")) { | |
throw new IllegalArgumentException( | |
"Named output name cannot be 'part'"); | |
} | |
} | |
/** | |
* Returns list of channel names. | |
* | |
* @param conf job conf | |
* @return List of channel Names | |
*/ | |
public static List<String> getNamedOutputsList(JobConf conf) { | |
List<String> names = new ArrayList<String>(); | |
StringTokenizer st = new StringTokenizer(conf.get(NAMED_OUTPUTS, ""), " "); | |
while (st.hasMoreTokens()) { | |
names.add(st.nextToken()); | |
} | |
return names; | |
} | |
/** | |
* Returns if a named output is multiple. | |
* | |
* @param conf job conf | |
* @param namedOutput named output | |
* @return <code>true</code> if the name output is multi, <code>false</code> | |
* if it is single. If the name output is not defined it returns | |
* <code>false</code> | |
*/ | |
public static boolean isMultiNamedOutput(JobConf conf, String namedOutput) { | |
checkNamedOutput(conf, namedOutput, false); | |
return conf.getBoolean(MO_PREFIX + namedOutput + MULTI, false); | |
} | |
/** | |
* Returns the named output OutputFormat. | |
* | |
* @param conf job conf | |
* @param namedOutput named output | |
* @return namedOutput OutputFormat | |
*/ | |
public static Class<? extends OutputFormat> getNamedOutputFormatClass( | |
JobConf conf, String namedOutput) { | |
checkNamedOutput(conf, namedOutput, false); | |
return conf.getClass(MO_PREFIX + namedOutput + FORMAT, null, | |
OutputFormat.class); | |
} | |
/** | |
* Returns the key class for a named output. | |
* | |
* @param conf job conf | |
* @param namedOutput named output | |
* @return class for the named output key | |
*/ | |
public static Class<? extends WritableComparable> getNamedOutputKeyClass(JobConf conf, | |
String namedOutput) { | |
checkNamedOutput(conf, namedOutput, false); | |
return conf.getClass(MO_PREFIX + namedOutput + KEY, null, | |
WritableComparable.class); | |
} | |
/** | |
* Returns the value class for a named output. | |
* | |
* @param conf job conf | |
* @param namedOutput named output | |
* @return class of named output value | |
*/ | |
public static Class<? extends Writable> getNamedOutputValueClass(JobConf conf, | |
String namedOutput) { | |
checkNamedOutput(conf, namedOutput, false); | |
return conf.getClass(MO_PREFIX + namedOutput + VALUE, null, | |
Writable.class); | |
} | |
/** | |
* Adds a named output for the job. | |
* <p/> | |
* | |
* @param conf job conf to add the named output | |
* @param namedOutput named output name, it has to be a word, letters | |
* and numbers only, cannot be the word 'part' as | |
* that is reserved for the | |
* default output. | |
* @param outputFormatClass OutputFormat class. | |
* @param keyClass key class | |
* @param valueClass value class | |
*/ | |
public static void addNamedOutput(JobConf conf, String namedOutput, | |
Class<? extends OutputFormat> outputFormatClass, | |
Class<?> keyClass, Class<?> valueClass) { | |
addNamedOutput(conf, namedOutput, false, outputFormatClass, keyClass, | |
valueClass); | |
} | |
/** | |
* Adds a multi named output for the job. | |
* <p/> | |
* | |
* @param conf job conf to add the named output | |
* @param namedOutput named output name, it has to be a word, letters | |
* and numbers only, cannot be the word 'part' as | |
* that is reserved for the | |
* default output. | |
* @param outputFormatClass OutputFormat class. | |
* @param keyClass key class | |
* @param valueClass value class | |
*/ | |
public static void addMultiNamedOutput(JobConf conf, String namedOutput, | |
Class<? extends OutputFormat> outputFormatClass, | |
Class<?> keyClass, Class<?> valueClass) { | |
addNamedOutput(conf, namedOutput, true, outputFormatClass, keyClass, | |
valueClass); | |
} | |
/** | |
* Adds a named output for the job. | |
* <p/> | |
* | |
* @param conf job conf to add the named output | |
* @param namedOutput named output name, it has to be a word, letters | |
* and numbers only, cannot be the word 'part' as | |
* that is reserved for the | |
* default output. | |
* @param multi indicates if the named output is multi | |
* @param outputFormatClass OutputFormat class. | |
* @param keyClass key class | |
* @param valueClass value class | |
*/ | |
private static void addNamedOutput(JobConf conf, String namedOutput, | |
boolean multi, | |
Class<? extends OutputFormat> outputFormatClass, | |
Class<?> keyClass, Class<?> valueClass) { | |
checkNamedOutputName(namedOutput); | |
checkNamedOutput(conf, namedOutput, true); | |
conf.set(NAMED_OUTPUTS, conf.get(NAMED_OUTPUTS, "") + " " + namedOutput); | |
conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass, | |
OutputFormat.class); | |
conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class); | |
conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class); | |
conf.setBoolean(MO_PREFIX + namedOutput + MULTI, multi); | |
} | |
/** | |
* Enables or disables counters for the named outputs. | |
* <p/> | |
* By default these counters are disabled. | |
* <p/> | |
* MultipleOutputs supports counters, by default the are disabled. | |
* The counters group is the {@link MultipleOutputs} class name. | |
* </p> | |
* The names of the counters are the same as the named outputs. For multi | |
* named outputs the name of the counter is the concatenation of the named | |
* output, and underscore '_' and the multiname. | |
* | |
* @param conf job conf to enableadd the named output. | |
* @param enabled indicates if the counters will be enabled or not. | |
*/ | |
public static void setCountersEnabled(JobConf conf, boolean enabled) { | |
conf.setBoolean(COUNTERS_ENABLED, enabled); | |
} | |
/** | |
* Returns if the counters for the named outputs are enabled or not. | |
* <p/> | |
* By default these counters are disabled. | |
* <p/> | |
* MultipleOutputs supports counters, by default the are disabled. | |
* The counters group is the {@link MultipleOutputs} class name. | |
* </p> | |
* The names of the counters are the same as the named outputs. For multi | |
* named outputs the name of the counter is the concatenation of the named | |
* output, and underscore '_' and the multiname. | |
* | |
* | |
* @param conf job conf to enableadd the named output. | |
* @return TRUE if the counters are enabled, FALSE if they are disabled. | |
*/ | |
public static boolean getCountersEnabled(JobConf conf) { | |
return conf.getBoolean(COUNTERS_ENABLED, false); | |
} | |
// instance code, to be used from Mapper/Reducer code | |
private JobConf conf; | |
private OutputFormat outputFormat; | |
private Set<String> namedOutputs; | |
private Map<String, RecordWriter> recordWriters; | |
private boolean countersEnabled; | |
/** | |
* Creates and initializes multiple named outputs support, it should be | |
* instantiated in the Mapper/Reducer configure method. | |
* | |
* @param job the job configuration object | |
*/ | |
public MultipleOutputs(JobConf job) { | |
this.conf = job; | |
outputFormat = new InternalFileOutputFormat(); | |
namedOutputs = Collections.unmodifiableSet( | |
new HashSet<String>(MultipleOutputs.getNamedOutputsList(job))); | |
recordWriters = new HashMap<String, RecordWriter>(); | |
countersEnabled = getCountersEnabled(job); | |
} | |
/** | |
* Returns iterator with the defined name outputs. | |
* | |
* @return iterator with the defined named outputs | |
*/ | |
public Iterator<String> getNamedOutputs() { | |
return namedOutputs.iterator(); | |
} | |
// by being synchronized MultipleOutputTask can be use with a | |
// MultithreaderMapRunner. | |
private synchronized RecordWriter getRecordWriter(String namedOutput, | |
String baseFileName, | |
final Reporter reporter) | |
throws IOException { | |
RecordWriter writer = recordWriters.get(baseFileName); | |
if (writer == null) { | |
if (countersEnabled && reporter == null) { | |
throw new IllegalArgumentException( | |
"Counters are enabled, Reporter cannot be NULL"); | |
} | |
JobConf jobConf = new JobConf(conf); | |
jobConf.set(InternalFileOutputFormat.CONFIG_NAMED_OUTPUT, namedOutput); | |
FileSystem fs = FileSystem.get(conf); | |
writer = | |
outputFormat.getRecordWriter(fs, jobConf, baseFileName, reporter); | |
if (countersEnabled) { | |
if (reporter == null) { | |
throw new IllegalArgumentException( | |
"Counters are enabled, Reporter cannot be NULL"); | |
} | |
writer = new RecordWriterWithCounter(writer, baseFileName, reporter); | |
} | |
recordWriters.put(baseFileName, writer); | |
} | |
return writer; | |
} | |
private static class RecordWriterWithCounter implements RecordWriter { | |
private RecordWriter writer; | |
private String counterName; | |
private Reporter reporter; | |
public RecordWriterWithCounter(RecordWriter writer, String counterName, | |
Reporter reporter) { | |
this.writer = writer; | |
this.counterName = counterName; | |
this.reporter = reporter; | |
} | |
@SuppressWarnings({"unchecked"}) | |
public void write(Object key, Object value) throws IOException { | |
reporter.incrCounter(COUNTERS_GROUP, counterName, 1); | |
writer.write(key, value); | |
} | |
public void close(Reporter reporter) throws IOException { | |
writer.close(reporter); | |
} | |
} | |
/** | |
* Gets the output collector for a named output. | |
* <p/> | |
* | |
* @param namedOutput the named output name | |
* @param reporter the reporter | |
* @return the output collector for the given named output | |
* @throws IOException thrown if output collector could not be created | |
*/ | |
@SuppressWarnings({"unchecked"}) | |
public OutputCollector getCollector(String namedOutput, Reporter reporter) | |
throws IOException { | |
return getCollector(namedOutput, null, reporter); | |
} | |
/** | |
* Gets the output collector for a multi named output. | |
* <p/> | |
* | |
* @param namedOutput the named output name | |
* @param multiName the multi name part | |
* @param reporter the reporter | |
* @return the output collector for the given named output | |
* @throws IOException thrown if output collector could not be created | |
*/ | |
@SuppressWarnings({"unchecked"}) | |
public OutputCollector getCollector(String namedOutput, String multiName, | |
Reporter reporter) | |
throws IOException { | |
checkNamedOutputName(namedOutput); | |
if (!namedOutputs.contains(namedOutput)) { | |
throw new IllegalArgumentException("Undefined named output '" + | |
namedOutput + "'"); | |
} | |
boolean multi = isMultiNamedOutput(conf, namedOutput); | |
if (!multi && multiName != null) { | |
throw new IllegalArgumentException("Name output '" + namedOutput + | |
"' has not been defined as multi"); | |
} | |
if (multi) { | |
checkTokenName(multiName); | |
} | |
String baseFileName = (multi) ? namedOutput + "_" + multiName : namedOutput; | |
final RecordWriter writer = | |
getRecordWriter(namedOutput, baseFileName, reporter); | |
return new OutputCollector() { | |
@SuppressWarnings({"unchecked"}) | |
public void collect(Object key, Object value) throws IOException { | |
writer.write(key, value); | |
} | |
}; | |
} | |
/** | |
* Closes all the opened named outputs. | |
* <p/> | |
* If overriden subclasses must invoke <code>super.close()</code> at the | |
* end of their <code>close()</code> | |
* | |
* @throws java.io.IOException thrown if any of the MultipleOutput files | |
* could not be closed properly. | |
*/ | |
public void close() throws IOException { | |
for (RecordWriter writer : recordWriters.values()) { | |
writer.close(null); | |
} | |
} | |
private static class InternalFileOutputFormat extends | |
FileOutputFormat<Object, Object> { | |
public static final String CONFIG_NAMED_OUTPUT = "mo.config.namedOutput"; | |
@SuppressWarnings({"unchecked"}) | |
public RecordWriter<Object, Object> getRecordWriter( | |
FileSystem fs, JobConf job, String baseFileName, Progressable progress) | |
throws IOException { | |
String nameOutput = job.get(CONFIG_NAMED_OUTPUT, null); | |
String fileName = getUniqueName(job, baseFileName); | |
// The following trick leverages the instantiation of a record writer via | |
// the job conf thus supporting arbitrary output formats. | |
JobConf outputConf = new JobConf(job); | |
outputConf.setOutputFormat(getNamedOutputFormatClass(job, nameOutput)); | |
outputConf.setOutputKeyClass(getNamedOutputKeyClass(job, nameOutput)); | |
outputConf.setOutputValueClass(getNamedOutputValueClass(job, nameOutput)); | |
OutputFormat outputFormat = outputConf.getOutputFormat(); | |
return outputFormat.getRecordWriter(fs, outputConf, fileName, progress); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.apache.hadoop.mapreduce.lib.output; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.io.Writable; | |
import org.apache.hadoop.io.WritableComparable; | |
import org.apache.hadoop.mapreduce.*; | |
import org.apache.hadoop.util.ReflectionUtils; | |
import java.io.IOException; | |
import java.util.*; | |
/** | |
* The MultipleOutputs class simplifies writing output data | |
* to multiple outputs | |
* | |
* <p> | |
* Case one: writing to additional outputs other than the job default output. | |
* | |
* Each additional output, or named output, may be configured with its own | |
* <code>OutputFormat</code>, with its own key class and with its own value | |
* class. | |
* | |
* <p> | |
* Case two: to write data to different files provided by user | |
* </p> | |
* | |
* <p> | |
* MultipleOutputs supports counters, by default they are disabled. The | |
* counters group is the {@link MultipleOutputs} class name. The names of the | |
* counters are the same as the output name. These count the number records | |
* written to each output name. | |
* </p> | |
* | |
* Usage pattern for job submission: | |
* <pre> | |
* | |
* Job job = new Job(); | |
* | |
* FileInputFormat.setInputPath(job, inDir); | |
* FileOutputFormat.setOutputPath(job, outDir); | |
* | |
* job.setMapperClass(MOMap.class); | |
* job.setReducerClass(MOReduce.class); | |
* ... | |
* | |
* // Defines additional single text based output 'text' for the job | |
* MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class, | |
* LongWritable.class, Text.class); | |
* | |
* // Defines additional sequence-file based output 'sequence' for the job | |
* MultipleOutputs.addNamedOutput(job, "seq", | |
* SequenceFileOutputFormat.class, | |
* LongWritable.class, Text.class); | |
* ... | |
* | |
* job.waitForCompletion(true); | |
* ... | |
* </pre> | |
* <p> | |
* Usage in Reducer: | |
* <pre> | |
* <K, V> String generateFileName(K k, V v) { | |
* return k.toString() + "_" + v.toString(); | |
* } | |
* | |
* public class MOReduce extends | |
* Reducer<WritableComparable, Writable,WritableComparable, Writable> { | |
* private MultipleOutputs mos; | |
* public void setup(Context context) { | |
* ... | |
* mos = new MultipleOutputs(context); | |
* } | |
* | |
* public void reduce(WritableComparable key, Iterator<Writable> values, | |
* Context context) | |
* throws IOException { | |
* ... | |
* mos.write("text", , key, new Text("Hello")); | |
* mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a"); | |
* mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b"); | |
* mos.write(key, new Text("value"), generateFileName(key, new Text("value"))); | |
* ... | |
* } | |
* | |
* public void cleanup(Context) throws IOException { | |
* mos.close(); | |
* ... | |
* } | |
* | |
* } | |
* </pre> | |
*/ | |
public class MultipleOutputs<KEYOUT, VALUEOUT> { | |
private static final String MULTIPLE_OUTPUTS = "mapreduce.multipleoutputs"; | |
private static final String MO_PREFIX = | |
"mapreduce.multipleoutputs.namedOutput."; | |
private static final String FORMAT = ".format"; | |
private static final String KEY = ".key"; | |
private static final String VALUE = ".value"; | |
private static final String COUNTERS_ENABLED = | |
"mapreduce.multipleoutputs.counters"; | |
/** | |
* Counters group used by the counters of MultipleOutputs. | |
*/ | |
private static final String COUNTERS_GROUP = MultipleOutputs.class.getName(); | |
/** | |
* Cache for the taskContexts | |
*/ | |
private Map<String, TaskAttemptContext> taskContexts = new HashMap<String, TaskAttemptContext>(); | |
/** | |
* Checks if a named output name is valid token. | |
* | |
* @param namedOutput named output Name | |
* @throws IllegalArgumentException if the output name is not valid. | |
*/ | |
private static void checkTokenName(String namedOutput) { | |
if (namedOutput == null || namedOutput.length() == 0) { | |
throw new IllegalArgumentException( | |
"Name cannot be NULL or emtpy"); | |
} | |
for (char ch : namedOutput.toCharArray()) { | |
if ((ch >= 'A') && (ch <= 'Z')) { | |
continue; | |
} | |
if ((ch >= 'a') && (ch <= 'z')) { | |
continue; | |
} | |
if ((ch >= '0') && (ch <= '9')) { | |
continue; | |
} | |
throw new IllegalArgumentException( | |
"Name cannot be have a '" + ch + "' char"); | |
} | |
} | |
/** | |
* Checks if output name is valid. | |
* | |
* name cannot be the name used for the default output | |
* @param outputPath base output Name | |
* @throws IllegalArgumentException if the output name is not valid. | |
*/ | |
private static void checkBaseOutputPath(String outputPath) { | |
if (outputPath.equals(FileOutputFormat.PART)) { | |
throw new IllegalArgumentException("output name cannot be 'part'"); | |
} | |
} | |
/** | |
* Checks if a named output name is valid. | |
* | |
* @param namedOutput named output Name | |
* @throws IllegalArgumentException if the output name is not valid. | |
*/ | |
private static void checkNamedOutputName(JobContext job, | |
String namedOutput, boolean alreadyDefined) { | |
checkTokenName(namedOutput); | |
checkBaseOutputPath(namedOutput); | |
List<String> definedChannels = getNamedOutputsList(job); | |
if (alreadyDefined && definedChannels.contains(namedOutput)) { | |
throw new IllegalArgumentException("Named output '" + namedOutput + | |
"' already alreadyDefined"); | |
} else if (!alreadyDefined && !definedChannels.contains(namedOutput)) { | |
throw new IllegalArgumentException("Named output '" + namedOutput + | |
"' not defined"); | |
} | |
} | |
// Returns list of channel names. | |
private static List<String> getNamedOutputsList(JobContext job) { | |
List<String> names = new ArrayList<String>(); | |
StringTokenizer st = new StringTokenizer( | |
job.getConfiguration().get(MULTIPLE_OUTPUTS, ""), " "); | |
while (st.hasMoreTokens()) { | |
names.add(st.nextToken()); | |
} | |
return names; | |
} | |
// Returns the named output OutputFormat. | |
@SuppressWarnings("unchecked") | |
private static Class<? extends OutputFormat<?, ?>> getNamedOutputFormatClass( | |
JobContext job, String namedOutput) { | |
return (Class<? extends OutputFormat<?, ?>>) | |
job.getConfiguration().getClass(MO_PREFIX + namedOutput + FORMAT, null, | |
OutputFormat.class); | |
} | |
// Returns the key class for a named output. | |
private static Class<?> getNamedOutputKeyClass(JobContext job, | |
String namedOutput) { | |
return job.getConfiguration().getClass(MO_PREFIX + namedOutput + KEY, null, | |
WritableComparable.class); | |
} | |
// Returns the value class for a named output. | |
private static Class<? extends Writable> getNamedOutputValueClass( | |
JobContext job, String namedOutput) { | |
return job.getConfiguration().getClass(MO_PREFIX + namedOutput + VALUE, | |
null, Writable.class); | |
} | |
/** | |
* Adds a named output for the job. | |
* <p/> | |
* | |
* @param job job to add the named output | |
* @param namedOutput named output name, it has to be a word, letters | |
* and numbers only, cannot be the word 'part' as | |
* that is reserved for the default output. | |
* @param outputFormatClass OutputFormat class. | |
* @param keyClass key class | |
* @param valueClass value class | |
*/ | |
@SuppressWarnings("unchecked") | |
public static void addNamedOutput(Job job, String namedOutput, | |
Class<? extends OutputFormat> outputFormatClass, | |
Class<?> keyClass, Class<?> valueClass) { | |
checkNamedOutputName(job, namedOutput, true); | |
Configuration conf = job.getConfiguration(); | |
conf.set(MULTIPLE_OUTPUTS, | |
conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput); | |
conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass, | |
OutputFormat.class); | |
conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class); | |
conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class); | |
} | |
/** | |
* Enables or disables counters for the named outputs. | |
* | |
* The counters group is the {@link MultipleOutputs} class name. | |
* The names of the counters are the same as the named outputs. These | |
* counters count the number records written to each output name. | |
* By default these counters are disabled. | |
* | |
* @param job job to enable counters | |
* @param enabled indicates if the counters will be enabled or not. | |
*/ | |
public static void setCountersEnabled(Job job, boolean enabled) { | |
job.getConfiguration().setBoolean(COUNTERS_ENABLED, enabled); | |
} | |
/** | |
* Returns if the counters for the named outputs are enabled or not. | |
* By default these counters are disabled. | |
* | |
* @param job the job | |
* @return TRUE if the counters are enabled, FALSE if they are disabled. | |
*/ | |
public static boolean getCountersEnabled(JobContext job) { | |
return job.getConfiguration().getBoolean(COUNTERS_ENABLED, false); | |
} | |
/** | |
* Wraps RecordWriter to increment counters. | |
*/ | |
@SuppressWarnings("unchecked") | |
private static class RecordWriterWithCounter extends RecordWriter { | |
private RecordWriter writer; | |
private String counterName; | |
private TaskInputOutputContext context; | |
public RecordWriterWithCounter(RecordWriter writer, String counterName, | |
TaskInputOutputContext context) { | |
this.writer = writer; | |
this.counterName = counterName; | |
this.context = context; | |
} | |
@SuppressWarnings({"unchecked"}) | |
public void write(Object key, Object value) | |
throws IOException, InterruptedException { | |
context.getCounter(COUNTERS_GROUP, counterName).increment(1); | |
writer.write(key, value); | |
} | |
public void close(TaskAttemptContext context) | |
throws IOException, InterruptedException { | |
writer.close(context); | |
} | |
} | |
// instance code, to be used from Mapper/Reducer code | |
private TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context; | |
private Set<String> namedOutputs; | |
private Map<String, RecordWriter<?, ?>> recordWriters; | |
private boolean countersEnabled; | |
/** | |
* Creates and initializes multiple outputs support, | |
* it should be instantiated in the Mapper/Reducer setup method. | |
* | |
* @param context the TaskInputOutputContext object | |
*/ | |
public MultipleOutputs( | |
TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) { | |
this.context = context; | |
namedOutputs = Collections.unmodifiableSet( | |
new HashSet<String>(MultipleOutputs.getNamedOutputsList(context))); | |
recordWriters = new HashMap<String, RecordWriter<?, ?>>(); | |
countersEnabled = getCountersEnabled(context); | |
} | |
/** | |
* Write key and value to the namedOutput. | |
* | |
* Output path is a unique file generated for the namedOutput. | |
* For example, {namedOutput}-(m|r)-{part-number} | |
* | |
* @param namedOutput the named output name | |
* @param key the key | |
* @param value the value | |
*/ | |
@SuppressWarnings("unchecked") | |
public <K, V> void write(String namedOutput, K key, V value) | |
throws IOException, InterruptedException { | |
write(namedOutput, key, value, namedOutput); | |
} | |
/** | |
* Write key and value to baseOutputPath using the namedOutput. | |
* | |
* @param namedOutput the named output name | |
* @param key the key | |
* @param value the value | |
* @param baseOutputPath base-output path to write the record to. | |
* Note: Framework will generate unique filename for the baseOutputPath | |
*/ | |
@SuppressWarnings("unchecked") | |
public <K, V> void write(String namedOutput, K key, V value, | |
String baseOutputPath) throws IOException, InterruptedException { | |
checkNamedOutputName(context, namedOutput, false); | |
checkBaseOutputPath(baseOutputPath); | |
if (!namedOutputs.contains(namedOutput)) { | |
throw new IllegalArgumentException("Undefined named output '" + | |
namedOutput + "'"); | |
} | |
TaskAttemptContext taskContext = getContext(namedOutput); | |
getRecordWriter(taskContext, baseOutputPath).write(key, value); | |
} | |
/** | |
* Write key value to an output file name. | |
* | |
* Gets the record writer from job's output format. | |
* Job's output format should be a FileOutputFormat. | |
* | |
* @param key the key | |
* @param value the value | |
* @param baseOutputPath base-output path to write the record to. | |
* Note: Framework will generate unique filename for the baseOutputPath | |
*/ | |
@SuppressWarnings("unchecked") | |
public void write(KEYOUT key, VALUEOUT value, String baseOutputPath) | |
throws IOException, InterruptedException { | |
checkBaseOutputPath(baseOutputPath); | |
TaskAttemptContext taskContext = new TaskAttemptContext( | |
context.getConfiguration(), context.getTaskAttemptID()); | |
getRecordWriter(taskContext, baseOutputPath).write(key, value); | |
} | |
// by being synchronized MultipleOutputTask can be use with a | |
// MultithreadedMapper. | |
@SuppressWarnings("unchecked") | |
private synchronized RecordWriter getRecordWriter( | |
TaskAttemptContext taskContext, String baseFileName) | |
throws IOException, InterruptedException { | |
// look for record-writer in the cache | |
RecordWriter writer = recordWriters.get(baseFileName); | |
// If not in cache, create a new one | |
if (writer == null) { | |
// get the record writer from context output format | |
FileOutputFormat.setOutputName(taskContext, baseFileName); | |
try { | |
writer = ((OutputFormat) ReflectionUtils.newInstance( | |
taskContext.getOutputFormatClass(), taskContext.getConfiguration())) | |
.getRecordWriter(taskContext); | |
} catch (ClassNotFoundException e) { | |
throw new IOException(e); | |
} | |
// if counters are enabled, wrap the writer with context | |
// to increment counters | |
if (countersEnabled) { | |
writer = new RecordWriterWithCounter(writer, baseFileName, context); | |
} | |
// add the record-writer to the cache | |
recordWriters.put(baseFileName, writer); | |
} | |
return writer; | |
} | |
// Create a taskAttemptContext for the named output with | |
// output format and output key/value types put in the context | |
private TaskAttemptContext getContext(String nameOutput) throws IOException { | |
TaskAttemptContext taskContext = taskContexts.get(nameOutput); | |
if (taskContext != null) { | |
return taskContext; | |
} | |
// The following trick leverages the instantiation of a record writer via | |
// the job thus supporting arbitrary output formats. | |
Job job = new Job(context.getConfiguration()); | |
job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput)); | |
job.setOutputKeyClass(getNamedOutputKeyClass(context, nameOutput)); | |
job.setOutputValueClass(getNamedOutputValueClass(context, nameOutput)); | |
taskContext = new TaskAttemptContext( | |
job.getConfiguration(), context.getTaskAttemptID()); | |
taskContexts.put(nameOutput, taskContext); | |
return taskContext; | |
} | |
/** | |
* Closes all the opened outputs. | |
* | |
* This should be called from cleanup method of map/reduce task. | |
* If overridden subclasses must invoke <code>super.close()</code> at the | |
* end of their <code>close()</code> | |
* | |
*/ | |
@SuppressWarnings("unchecked") | |
public void close() throws IOException, InterruptedException { | |
for (RecordWriter writer : recordWriters.values()) { | |
writer.close(context); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.apache.hadoop.mapred.lib; | |
import org.apache.hadoop.fs.FileStatus; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.SequenceFile; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapred.*; | |
import java.io.BufferedReader; | |
import java.io.DataOutputStream; | |
import java.io.IOException; | |
import java.io.InputStreamReader; | |
import java.util.Iterator; | |
public class TestMultipleOutputs extends HadoopTestCase { | |
public TestMultipleOutputs() throws IOException { | |
super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1); | |
} | |
public void testWithoutCounters() throws Exception { | |
_testMultipleOutputs(false); | |
} | |
public void testWithCounters() throws Exception { | |
_testMultipleOutputs(true); | |
} | |
private static final Path ROOT_DIR = new Path("testing/mo"); | |
private static final Path IN_DIR = new Path(ROOT_DIR, "input"); | |
private static final Path OUT_DIR = new Path(ROOT_DIR, "output"); | |
private Path getDir(Path dir) { | |
// Hack for local FS that does not have the concept of a 'mounting point' | |
if (isLocalFS()) { | |
String localPathRoot = System.getProperty("test.build.data", "/tmp") | |
.replace(' ', '+'); | |
dir = new Path(localPathRoot, dir); | |
} | |
return dir; | |
} | |
public void setUp() throws Exception { | |
super.setUp(); | |
Path rootDir = getDir(ROOT_DIR); | |
Path inDir = getDir(IN_DIR); | |
JobConf conf = createJobConf(); | |
FileSystem fs = FileSystem.get(conf); | |
fs.delete(rootDir, true); | |
if (!fs.mkdirs(inDir)) { | |
throw new IOException("Mkdirs failed to create " + inDir.toString()); | |
} | |
} | |
public void tearDown() throws Exception { | |
Path rootDir = getDir(ROOT_DIR); | |
JobConf conf = createJobConf(); | |
FileSystem fs = FileSystem.get(conf); | |
fs.delete(rootDir, true); | |
super.tearDown(); | |
} | |
protected void _testMultipleOutputs(boolean withCounters) throws Exception { | |
Path inDir = getDir(IN_DIR); | |
Path outDir = getDir(OUT_DIR); | |
JobConf conf = createJobConf(); | |
FileSystem fs = FileSystem.get(conf); | |
DataOutputStream file = fs.create(new Path(inDir, "part-0")); | |
file.writeBytes("a\nb\n\nc\nd\ne"); | |
file.close(); | |
file = fs.create(new Path(inDir, "part-1")); | |
file.writeBytes("a\nb\n\nc\nd\ne"); | |
file.close(); | |
conf.setJobName("mo"); | |
conf.setInputFormat(TextInputFormat.class); | |
conf.setOutputKeyClass(LongWritable.class); | |
conf.setOutputValueClass(Text.class); | |
conf.setMapOutputKeyClass(LongWritable.class); | |
conf.setMapOutputValueClass(Text.class); | |
conf.setOutputFormat(TextOutputFormat.class); | |
conf.setOutputKeyClass(LongWritable.class); | |
conf.setOutputValueClass(Text.class); | |
MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class, | |
LongWritable.class, Text.class); | |
MultipleOutputs.addMultiNamedOutput(conf, "sequence", | |
SequenceFileOutputFormat.class, LongWritable.class, Text.class); | |
MultipleOutputs.setCountersEnabled(conf, withCounters); | |
conf.setMapperClass(MOMap.class); | |
conf.setReducerClass(MOReduce.class); | |
FileInputFormat.setInputPaths(conf, inDir); | |
FileOutputFormat.setOutputPath(conf, outDir); | |
JobClient jc = new JobClient(conf); | |
RunningJob job = jc.submitJob(conf); | |
while (!job.isComplete()) { | |
Thread.sleep(100); | |
} | |
// assert number of named output part files | |
int namedOutputCount = 0; | |
FileStatus[] statuses = fs.listStatus(outDir); | |
for (FileStatus status : statuses) { | |
if (status.getPath().getName().equals("text-m-00000") || | |
status.getPath().getName().equals("text-m-00001") || | |
status.getPath().getName().equals("text-r-00000") || | |
status.getPath().getName().equals("sequence_A-m-00000") || | |
status.getPath().getName().equals("sequence_A-m-00001") || | |
status.getPath().getName().equals("sequence_B-m-00000") || | |
status.getPath().getName().equals("sequence_B-m-00001") || | |
status.getPath().getName().equals("sequence_B-r-00000") || | |
status.getPath().getName().equals("sequence_C-r-00000")) { | |
namedOutputCount++; | |
} | |
} | |
assertEquals(9, namedOutputCount); | |
// assert TextOutputFormat files correctness | |
BufferedReader reader = new BufferedReader( | |
new InputStreamReader(fs.open( | |
new Path(FileOutputFormat.getOutputPath(conf), "text-r-00000")))); | |
int count = 0; | |
String line = reader.readLine(); | |
while (line != null) { | |
assertTrue(line.endsWith("text")); | |
line = reader.readLine(); | |
count++; | |
} | |
reader.close(); | |
assertFalse(count == 0); | |
// assert SequenceOutputFormat files correctness | |
SequenceFile.Reader seqReader = | |
new SequenceFile.Reader(fs, new Path(FileOutputFormat.getOutputPath(conf), | |
"sequence_B-r-00000"), conf); | |
assertEquals(LongWritable.class, seqReader.getKeyClass()); | |
assertEquals(Text.class, seqReader.getValueClass()); | |
count = 0; | |
LongWritable key = new LongWritable(); | |
Text value = new Text(); | |
while (seqReader.next(key, value)) { | |
assertEquals("sequence", value.toString()); | |
count++; | |
} | |
seqReader.close(); | |
assertFalse(count == 0); | |
Counters.Group counters = | |
job.getCounters().getGroup(MultipleOutputs.class.getName()); | |
if (!withCounters) { | |
assertEquals(0, counters.size()); | |
} | |
else { | |
assertEquals(4, counters.size()); | |
assertEquals(4, counters.getCounter("text")); | |
assertEquals(2, counters.getCounter("sequence_A")); | |
assertEquals(4, counters.getCounter("sequence_B")); | |
assertEquals(2, counters.getCounter("sequence_C")); | |
} | |
} | |
@SuppressWarnings({"unchecked"}) | |
public static class MOMap implements Mapper<LongWritable, Text, LongWritable, | |
Text> { | |
private MultipleOutputs mos; | |
public void configure(JobConf conf) { | |
mos = new MultipleOutputs(conf); | |
} | |
public void map(LongWritable key, Text value, | |
OutputCollector<LongWritable, Text> output, | |
Reporter reporter) | |
throws IOException { | |
if (!value.toString().equals("a")) { | |
output.collect(key, value); | |
} else { | |
mos.getCollector("text", reporter).collect(key, new Text("text")); | |
mos.getCollector("sequence", "A", reporter).collect(key, | |
new Text("sequence")); | |
mos.getCollector("sequence", "B", reporter).collect(key, | |
new Text("sequence")); | |
} | |
} | |
public void close() throws IOException { | |
mos.close(); | |
} | |
} | |
@SuppressWarnings({"unchecked"}) | |
public static class MOReduce implements Reducer<LongWritable, Text, | |
LongWritable, Text> { | |
private MultipleOutputs mos; | |
public void configure(JobConf conf) { | |
mos = new MultipleOutputs(conf); | |
} | |
public void reduce(LongWritable key, Iterator<Text> values, | |
OutputCollector<LongWritable, Text> output, | |
Reporter reporter) | |
throws IOException { | |
while (values.hasNext()) { | |
Text value = values.next(); | |
if (!value.toString().equals("b")) { | |
output.collect(key, value); | |
} else { | |
mos.getCollector("text", reporter).collect(key, new Text("text")); | |
mos.getCollector("sequence", "B", reporter).collect(key, | |
new Text("sequence")); | |
mos.getCollector("sequence", "C", reporter).collect(key, | |
new Text("sequence")); | |
} | |
} | |
} | |
public void close() throws IOException { | |
mos.close(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment