Skip to content

Instantly share code, notes, and snippets.

@tf0054
Created March 22, 2012 14:46
Show Gist options
  • Save tf0054/2158730 to your computer and use it in GitHub Desktop.
Save tf0054/2158730 to your computer and use it in GitHub Desktop.
Hack#14
import java.io.*;
import java.util.*;
import org.apache.hadoop.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import java.net.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.filecache.*;
/**
*
*job.txtとperson.txtを結合し、person.txt中の人物名と職業を出力する
*job.txtには職業ごとに、職業ID、職業名が、この順にカンマ区切りで記述されている。
*person.txtには人物ごとに、個人ID、名前、職業IDが、この順にカンマ区切りで記述されている。
*/
public class DistributedCacheJob extends Configured implements Tool{
public static void main(String[] args)throws Exception{
int returnCode = ToolRunner.run(new DistributedCacheJob() , args);
System.exit(returnCode);
}
public int run(String[] args)throws Exception{
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJobName("DistributedCacheJob");
job.setJarByClass(DistributedCacheJob.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//person.txtはMapReduceへの通常の入力とする。パスはプログラムの最初の引数とする。
//あらかじめHDFS上に配置しておく必要がある。
TextInputFormat.addInputPath(job , new Path(args[0]));
//プログラムの三番目の引数に、このMapReduceジョブのHDFS上の出力先を指定する。
TextOutputFormat.setOutputPath(job , new Path(args[2]));
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(String.class);
job.setNumReduceTasks(0);
job.setMapperClass(DistributedCacheMapper.class);
System.out.println(conf == job.getConfiguration());
//job.txtはDistributedCacheファイルとして扱う。
//HDFS上のjob.txtのパスをプログラムの2番目の引数で指定する。
//あらかじめjob.txtをHDFS上に配置しておく必要がある
Path path = new Path(args[1]);
//DistributedCacheファイル(job.txt)は、mylinkという名前のシンボリックリンクで参照できるようにする。
URI uri = new URI(path.toString() + "#" + "mylink");
DistributedCache.addCacheFile(uri , job.getConfiguration());
DistributedCache.createSymlink(job.getConfiguration());
return job.waitForCompletion(true)? 0 : 1;
}
private static class DistributedCacheMapper extends Mapper<LongWritable , Text , NullWritable , String>{
HashMap<String , String> masterTable = new HashMap<String , String>();
@Override
public void setup(Context context)throws IOException , InterruptedException{
//mylinkという名前のシンボリックリンクで、HDFS上のDistributedCacheファイル(job.txt)を参照する
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("mylink")));
String str;
while((str = reader.readLine()) != null){
String[] splitStr = str.split(",");
//job.txtの最初のフィールド(職業ID)をkey、職業名をvalueとし、masterTableに格納する
String key = splitStr[0];
String value = splitStr[1];
masterTable.put(key , value);
}
reader.close();
}
@Override
public void map(LongWritable key , Text value , Context context)throws IOException , InterruptedException{
String[] text = value.toString().split(",");
//person.txtの3番目のフィールド(職業ID)を結合キーとして扱う
String joinKey = text[2];
//masterTableから、職業IDに対応した職業名を取得する
String jobName = masterTable.get(joinKey);
context.write(NullWritable.get() , text[1] + "," + jobName);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment