Created
July 2, 2011 09:14
-
-
Save allen501pc/1059884 to your computer and use it in GitHub Desktop.
Multi-Threads Jobs in Hadoop
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.util.*; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.conf.*; | |
import org.apache.hadoop.io.*; | |
import org.apache.hadoop.io.Writable; | |
import org.apache.hadoop.mapred.*; | |
import org.apache.hadoop.util.*; | |
import java.io.DataInput; | |
import java.io.DataOutput; | |
import java.lang.Thread; | |
import java.sql.PreparedStatement; | |
import java.sql.ResultSet; | |
import java.sql.SQLException; | |
import org.apache.hadoop.mapred.lib.db.DBWritable; | |
import org.apache.hadoop.mapred.lib.IdentityReducer; | |
import org.apache.hadoop.mapred.lib.db.DBConfiguration; | |
import org.apache.hadoop.mapred.lib.db.DBOutputFormat; | |
import org.apache.hadoop.mapred.lib.db.DBInputFormat; | |
/* | |
MySQL DB Schema: | |
DROP TABLE IF EXISTS `WordCount`.`Counting`; | |
CREATE TABLE `WordCount`.`Counting` ( | |
`name` char(48) default NULL, | |
`count` int(11) default NULL | |
) ENGINE=InnoDB DEFAULT CHARSET=latin1; | |
*/ | |
public class DBWordCount extends Thread | |
{ | |
public void run() //throws Exception | |
{ | |
/* Start ! */ | |
try | |
{ | |
JobClient.runJob(conf); | |
} | |
catch(Exception e) | |
{ | |
// do nothing | |
} | |
} | |
public void fnSetJob1(String[] args) throws Exception | |
{ | |
conf.setJobName("MySQL DB Wordcount Job1"); | |
Class.forName("com.mysql.jdbc.Driver"); | |
conf.setInputFormat(TextInputFormat.class); | |
conf.setOutputFormat(DBOutputFormat.class); | |
FileInputFormat.setInputPaths(conf, new Path(args[0])); | |
// Set up your host name and account | |
String[] MyDBPath={"jdbc:mysql://MySQL主機位置:3306/WordCount","帳號", "密碼"}; | |
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",MyDBPath[0], MyDBPath[1], MyDBPath[2]); | |
// Setup Output MySQL Format | |
DBOutputFormat.setOutput(conf, "Counting","name", "count"); | |
// Set Mapper and Reducer Class | |
conf.setMapperClass(Map.class); | |
//conf.setCombinerClass(Reduce.class); | |
conf.setReducerClass(Reduce.class); | |
conf.setMapOutputKeyClass(Text.class); | |
conf.setMapOutputValueClass(IntWritable.class); | |
conf.setOutputKeyClass(WordCountInfoRecord.class); | |
conf.setOutputValueClass(NullWritable.class); | |
} | |
public void fnSetJob2(String[] args) throws Exception | |
{ | |
//JobConf conf = new JobConf(DBWordCount.class); | |
conf.setJobName("MySQL DB Wordcount Job2"); | |
Class.forName("com.mysql.jdbc.Driver"); | |
// Set up your host name and account | |
String[] MyDBPath={"jdbc:mysql://MySQL主機位置:3306/WordCount","帳號", "密碼"}; | |
conf.setInputFormat(TextInputFormat.class); | |
conf.setOutputFormat(DBOutputFormat.class); | |
FileInputFormat.setInputPaths(conf, new Path(args[0])); | |
// Setup MySQL Connection , default account:root , no password | |
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",MyDBPath[0], MyDBPath[1], MyDBPath[2]); | |
// Setup Output MySQL Format | |
DBOutputFormat.setOutput(conf, "Counting","name", "count"); | |
// Set Mapper and Reducer Class | |
conf.setMapperClass(Map.class); | |
//conf.setCombinerClass(Reduce.class); | |
conf.setReducerClass(Reduce2.class); | |
// I've tried all combinations , but the bug still happen. | |
conf.setMapOutputKeyClass(Text.class); | |
conf.setMapOutputValueClass(IntWritable.class); | |
conf.setOutputKeyClass(WordCountInfoRecord.class); | |
conf.setOutputValueClass(NullWritable.class); | |
} | |
JobConf conf = new JobConf(DBWordCount.class); | |
//JobConf conf2 = new JobConf(DBWordCount.class); | |
// Output Record Object | |
static class WordCountInfoRecord implements Writable, DBWritable | |
{ | |
public String name; | |
public int count; | |
public WordCountInfoRecord() { | |
} | |
public WordCountInfoRecord(String str, int c) | |
{ | |
this.name = str; | |
this.count = c; | |
} | |
public void readFields(DataInput in) throws IOException { | |
this.name = Text.readString(in); | |
this.count = in.readInt(); | |
} | |
public void write(DataOutput out) throws IOException { | |
Text.writeString(out, this.name); | |
out.writeInt(this.count); | |
} | |
public void readFields(ResultSet result) throws SQLException { | |
this.name = result.getString(1); | |
this.count = result.getInt(2); | |
} | |
public void write(PreparedStatement stmt) throws SQLException { | |
stmt.setString(1, this.name); | |
stmt.setInt(2, this.count); | |
} | |
public String toString() { | |
return new String(this.name + " " + this.count); | |
} | |
} | |
public static class Map extends MapReduceBase implements Mapper | |
{ | |
private final static IntWritable one = new IntWritable(1); | |
private Text word = new Text(); | |
public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException | |
{ | |
String line = value.toString(); | |
StringTokenizer tokenizer = new StringTokenizer(line); | |
while (tokenizer.hasMoreTokens()) { | |
word.set(tokenizer.nextToken()); | |
output.collect(word, one); | |
} | |
} | |
} | |
public static class Reduce extends MapReduceBase implements Reducer | |
{ | |
public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException | |
{ | |
int sum = 0; | |
while (values.hasNext()) { | |
sum += values.next().get(); | |
} | |
// Output Data into MySQL | |
output.collect(new WordCountInfoRecord(key.toString(),sum), NullWritable.get()); | |
} | |
} | |
public static class Reduce2 extends MapReduceBase implements Reducer | |
{ | |
public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException | |
{ | |
int sum = 0; | |
while (values.hasNext()) { | |
sum += values.next().get(); | |
} | |
// Output Data into MySQL | |
output.collect(new WordCountInfoRecord("Job2_"+key.toString(),sum), NullWritable.get()); | |
} | |
} | |
public static void main(String[] args) throws Exception | |
{ | |
DBWordCount thread1=new DBWordCount(); | |
// Set Thread1 | |
thread1.fnSetJob1(args); | |
DBWordCount thread2=new DBWordCount(); | |
// Set Thread2 | |
thread2.fnSetJob2(args); | |
// Thread 1 Start | |
thread1.start(); | |
// Thread 2 Start | |
thread2.start(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment