Created
March 22, 2012 14:46
-
-
Save tf0054/2158730 to your computer and use it in GitHub Desktop.
Hack#14
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.io.*; | |
| import java.util.*; | |
| import org.apache.hadoop.*; | |
| import org.apache.hadoop.util.*; | |
| import org.apache.hadoop.conf.*; | |
| import org.apache.hadoop.io.*; | |
| import org.apache.hadoop.mapreduce.*; | |
| import org.apache.hadoop.mapreduce.lib.input.*; | |
| import org.apache.hadoop.mapreduce.lib.output.*; | |
| import java.net.*; | |
| import org.apache.hadoop.fs.*; | |
| import org.apache.hadoop.filecache.*; | |
| /** | |
| * | |
| *job.txtとperson.txtを結合し、person.txt中の人物名と職業を出力する | |
| *job.txtには職業ごとに、職業ID、職業名が、この順にカンマ区切りで記述されている。 | |
| *person.txtには人物ごとに、個人ID、名前、職業IDが、この順にカンマ区切りで記述されている。 | |
| */ | |
| public class DistributedCacheJob extends Configured implements Tool{ | |
| public static void main(String[] args)throws Exception{ | |
| int returnCode = ToolRunner.run(new DistributedCacheJob() , args); | |
| System.exit(returnCode); | |
| } | |
| public int run(String[] args)throws Exception{ | |
| Configuration conf = new Configuration(); | |
| Job job = new Job(conf); | |
| job.setJobName("DistributedCacheJob"); | |
| job.setJarByClass(DistributedCacheJob.class); | |
| job.setInputFormatClass(TextInputFormat.class); | |
| job.setOutputFormatClass(TextOutputFormat.class); | |
| //person.txtはMapReduceへの通常の入力とする。パスはプログラムの最初の引数とする。 | |
| //あらかじめHDFS上に配置しておく必要がある。 | |
| TextInputFormat.addInputPath(job , new Path(args[0])); | |
| //プログラムの三番目の引数に、このMapReduceジョブのHDFS上の出力先を指定する。 | |
| TextOutputFormat.setOutputPath(job , new Path(args[2])); | |
| job.setOutputKeyClass(NullWritable.class); | |
| job.setOutputValueClass(String.class); | |
| job.setNumReduceTasks(0); | |
| job.setMapperClass(DistributedCacheMapper.class); | |
| System.out.println(conf == job.getConfiguration()); | |
| //job.txtはDistributedCacheファイルとして扱う。 | |
| //HDFS上のjob.txtのパスをプログラムの2番目の引数で指定する。 | |
| //あらかじめjob.txtをHDFS上に配置しておく必要がある | |
| Path path = new Path(args[1]); | |
| //DistributedCacheファイル(job.txt)は、mylinkという名前のシンボリックリンクで参照できるようにする。 | |
| URI uri = new URI(path.toString() + "#" + "mylink"); | |
| DistributedCache.addCacheFile(uri , job.getConfiguration()); | |
| DistributedCache.createSymlink(job.getConfiguration()); | |
| return job.waitForCompletion(true)? 0 : 1; | |
| } | |
| private static class DistributedCacheMapper extends Mapper<LongWritable , Text , NullWritable , String>{ | |
| HashMap<String , String> masterTable = new HashMap<String , String>(); | |
| @Override | |
| public void setup(Context context)throws IOException , InterruptedException{ | |
| //mylinkという名前のシンボリックリンクで、HDFS上のDistributedCacheファイル(job.txt)を参照する | |
| BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("mylink"))); | |
| String str; | |
| while((str = reader.readLine()) != null){ | |
| String[] splitStr = str.split(","); | |
| //job.txtの最初のフィールド(職業ID)をkey、職業名をvalueとし、masterTableに格納する | |
| String key = splitStr[0]; | |
| String value = splitStr[1]; | |
| masterTable.put(key , value); | |
| } | |
| reader.close(); | |
| } | |
| @Override | |
| public void map(LongWritable key , Text value , Context context)throws IOException , InterruptedException{ | |
| String[] text = value.toString().split(","); | |
| //person.txtの3番目のフィールド(職業ID)を結合キーとして扱う | |
| String joinKey = text[2]; | |
| //masterTableから、職業IDに対応した職業名を取得する | |
| String jobName = masterTable.get(joinKey); | |
| context.write(NullWritable.get() , text[1] + "," + jobName); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment