Created
July 11, 2013 19:46
-
-
Save okram/5978602 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.thinkaurelius.faunus.formats.titan; | |
import com.thinkaurelius.faunus.FaunusVertex; | |
import com.thinkaurelius.faunus.Holder; | |
import com.thinkaurelius.faunus.formats.BlueprintsGraphOutputMapReduce; | |
import com.thinkaurelius.faunus.formats.JobConfigurationFormat; | |
import com.thinkaurelius.faunus.formats.MapReduceFormat; | |
import com.thinkaurelius.faunus.formats.noop.NoOpOutputFormat; | |
import com.thinkaurelius.faunus.hdfs.HDFSTools; | |
import com.thinkaurelius.faunus.mapreduce.FaunusCompiler; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.fs.PathFilter; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import java.io.IOException; | |
/** | |
* @author Marko A. Rodriguez (http://markorodriguez.com) | |
*/ | |
public abstract class TitanOutputFormat extends NoOpOutputFormat implements MapReduceFormat, JobConfigurationFormat { | |
public static final String FAUNUS_GRAPH_OUTPUT_TITAN = "faunus.graph.output.titan"; | |
public static final String FAUNUS_GRAPH_OUTPUT_TITAN_INFER_SCHEMA = "faunus.graph.output.titan.infer-schema"; | |
@Override | |
public void addMapReduceJobs(final FaunusCompiler compiler) { | |
if (compiler.getConf().getBoolean(FAUNUS_GRAPH_OUTPUT_TITAN_INFER_SCHEMA, true)) { | |
compiler.addMapReduce(SchemaInferencerMapReduce.Map.class, | |
null, | |
SchemaInferencerMapReduce.Reduce.class, | |
LongWritable.class, | |
FaunusVertex.class, | |
NullWritable.class, | |
FaunusVertex.class, | |
SchemaInferencerMapReduce.createConfiguration()); | |
} | |
compiler.addMapReduce(BlueprintsGraphOutputMapReduce.Map.class, | |
null, | |
BlueprintsGraphOutputMapReduce.Reduce.class, | |
LongWritable.class, | |
Holder.class, | |
NullWritable.class, | |
FaunusVertex.class, | |
BlueprintsGraphOutputMapReduce.createConfiguration()); | |
} | |
@Override | |
public void updateJob(final Job job) throws InterruptedException, IOException { | |
try { | |
final Configuration configuration = job.getConfiguration(); | |
if (FileInputFormat.class.isAssignableFrom(job.getInputFormatClass())) { | |
final Long splitSize = configuration.getLong("mapred.max.split.size", -1); | |
final Path[] paths = FileInputFormat.getInputPaths(job); | |
final PathFilter filter = FileInputFormat.getInputPathFilter(job); | |
final FileSystem fs = FileSystem.get(configuration); | |
Long totalSize = 0l; | |
for (final Path path : paths) { | |
totalSize = totalSize + HDFSTools.getFileSize(fs, path, filter); | |
} | |
job.setNumReduceTasks((int) Math.round(totalSize.doubleValue() / splitSize.doubleValue())); | |
} | |
} catch (final ClassNotFoundException e) { | |
throw new InterruptedException(e.getMessage()); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment