Skip to content

Instantly share code, notes, and snippets.

@jonathanmv
Created April 5, 2016 22:45
Show Gist options
  • Select an option

  • Save jonathanmv/da260b7afea1b2ca55f6b1bf997e595a to your computer and use it in GitHub Desktop.

Select an option

Save jonathanmv/da260b7afea1b2ca55f6b1bf997e595a to your computer and use it in GitHub Desktop.
Configures a hadoop job to read json files and store the records in a parquet file
package jonathanmv.storage;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.thrift.ParquetThriftOutputFormat;
// Based on http://blog.cloudera.com/blog/2014/05/how-to-convert-existing-data-into-parquet/
public class Runner extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Runner(), args);
System.exit(exitCode);
}
@Override
public int run(String[] args) throws Exception {
final String output = "output";
final String input = "input";
Path inputPath = new Path(input);
Path outputPath = new Path(output);
Job job = new Job(getConf());
job.setJarByClass(getClass());
Configuration conf = job.getConfiguration();
final FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputPath)) fs.delete(outputPath, true);
FileInputFormat.addInputPath(job, inputPath);
job.setOutputFormatClass(ParquetThriftOutputFormat.class);
ParquetThriftOutputFormat.setOutputPath(job, outputPath);
ParquetThriftOutputFormat.setThriftClass(job, FriendsEdge.class);
ParquetThriftOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
ParquetThriftOutputFormat.setCompressOutput(job, true);
job.setMapperClass(JsonToThriftMapper.class);
job.setNumReduceTasks(0);
return job.waitForCompletion(true) ? 0 : 1;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment