Created
May 17, 2012 22:31
-
-
Save benhardy/2721999 to your computer and use it in GitHub Desktop.
Perform Spring based DI in Hadoop 0.18.x api without XML, and using supplied JobConf.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * say you'd like to have a Hadoop Mapper inject its dependencies via a constructor | |
| * instead of that super annoying post-construction configure call. e.g. | |
| * | |
| * Obviously Hadoop won't be able to instantiate this by itself, but we'll | |
| * help it out below. | |
| * | |
| */ | |
| public class TestMapper implements Mapper<IntWritable, Text, IntWritable, Text> { | |
| private final NumberService numbers; // immutable goodness | |
| public TestMapper(NumberService numbers) { // constructor injection | |
| this.numbers = numbers; | |
| } | |
| @Override | |
| public void map(IntWritable key, Text value, OutputCollector<IntWritable, Text> collect, Reporter reporter) throws IOException { | |
| collect.collect(new IntWritable(numbers.next() + key.get()), value); | |
| } | |
| @Override public void configure(JobConf entries) { } | |
| @Override public void close() throws IOException { } | |
| } | |
| /** | |
| * make JobConf an autowired field in app configuration, define mapper. | |
| */ | |
| @Configuration | |
| public class AppConfiguration { | |
| @Autowired | |
| private JobConf jobConf; | |
| @Bean | |
| public TestMapper testMapper(NumberService ns) { | |
| return new TestMapper(numberService()); | |
| } | |
| /** example service */ | |
| @Bean | |
| public NumberService numberService() { | |
| return new NumberServiceImpl(jobConf); // note this implementation's setup depends on JobConf | |
| } | |
| } | |
| /** | |
| * define a base mapper which after being called by Hadoop does IOC. | |
| * In job setup, call JobConf.setClass("internal.mapper.class", x, x); with the class of the DI-enabled mapper to use, | |
| * but always call JobConf.setMapperClass(DiMapper.class) when you want DI enabled. | |
| * | |
| * you could generalize this to be usable for both mappers and reducers | |
| */ | |
| public class DiMapper extends MapReduceBase { | |
| protected Mapper worker; | |
| protected ApplicationContext context; | |
| /** | |
| * DiMapper delegates standard hadoop map operation to the instantiated worker. | |
| */ | |
| @Override | |
| public void map(Object key, Object value, OutputCollector outputCollector, Reporter reporter) throws IOException { | |
| worker.map(key, value, outputCollector, reporter); | |
| } | |
| @Override | |
| public final void configure(JobConf conf) { | |
| context = createContext(conf); | |
| // | |
| worker = context.getBean(conf.getClass("internal.mapper.class", null)); | |
| } | |
| /** | |
| * Create a Spring Application context suitable for usage in Hadoop. | |
| * <p/> | |
| * Pre-registers extant jobConf as an available bean for all other beans, useful for | |
| * when services need to configure themselves with data in JobConf. This is useful | |
| * because by default Spring will usually try to create a new JobConf and we dont | |
| * want that. | |
| * | |
| * @param conf | |
| * @return | |
| */ | |
| @VisibleForTesting | |
| ApplicationContext createContext(JobConf conf) { | |
| // start with empty context | |
| final AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(); | |
| // register the extant JobConf as a bean before anything else | |
| // must match field name in AppConfiguration | |
| ctx.getBeanFactory().registerSingleton("jobConf", conf); | |
| // load in the rest of the config | |
| ctx.register(AppConfiguration.class); | |
| ctx.refresh(); | |
| return ctx; | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Nicely done. Thanks.