Created
October 2, 2012 02:49
-
-
Save geofferyzh/3815898 to your computer and use it in GitHub Desktop.
Hadoop 101 - Side Data Distribution using Job Configuration
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Set the side data info in job configuration file | |
import java.io.BufferedReader; | |
import java.io.FileReader; | |
import java.io.IOException; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
public class SideDataDistribution extends Configured implements Tool { | |
private static final Logger sLogger = Logger.getLogger(SideDataDistribution.class); | |
// specify the path of the side data source on local filesystem ( | |
public static final String sideData= ".../Initial_Activation_List.txt"; | |
String sideDataLine; | |
public void loadSideData(String sideData, int linenum_index) throws IOException { | |
String line; | |
int linenum = 0; | |
BufferedReader bufferedReader = new BufferedReader(new FileReader(sideData)); | |
while ((line = bufferedReader.readLine()) != null) { | |
linenum = linenum + 1; | |
if (linenum == linenum_index) { | |
sideDataLine= line.toString(); | |
break; | |
} | |
} | |
bufferedReader.close(); | |
} | |
//---------------------------------------------------------------------- | |
@Override | |
public int run(String[] arg0) throws Exception { | |
Configuration conf = getConf(); | |
// Add custom config variables to configuration | |
conf.addResource("extraconfiguration.xml"); | |
FileSystem hdfs = FileSystem.get(conf); | |
// load the side data and pass one line each time to mappers/reducers through jobconf | |
loadSideData(sideData, n_lines); | |
conf.setInt("passsidedata", Integer.parseInt(sideDataLine)); | |
// Job | |
Job job = new Job(conf, "load side data"); | |
job.setJarByClass(SideDataDistribution.class); | |
... // detailed configuration omitted | |
return 0; | |
} | |
//---------------------------------------------------------------------- | |
public static void main(String[] args) throws Exception { | |
int exitCode = ToolRunner.run(new Configuration(), new SideDataDistribution(), args); | |
System.exit(exitCode); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// In mapper/reducer tasks, retrieve the side data information from Job Conf | |
import java.io.IOException; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.log4j.Logger; | |
public class SideDataDistributionMapperextends Mapper<Text, VertexInfo, Text, VertexInfo> { | |
int sideDataLine; | |
// Get Configuration and Retrieve side data | |
@Override | |
public void setup(Context context) throws IOException { | |
// Get configuration | |
Configuration conf = context.getConfiguration(); | |
sideDataLine= conf.getInt("passsidedata",0); | |
} | |
// Map Task --------------------------------------------------------------------------- | |
public void map(Text key, VertexInfo value, Context context) throws IOException, InterruptedException { | |
... // map tasks omitted | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment