Last active
October 13, 2015 18:35
-
-
Save jylock/2170cdabdb339ebb4243 to your computer and use it in GitHub Desktop.
Compile, ToolRunner, Configuration, etc
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #TF-IDF | |
| TF-IDF = tf * idf = tf * log(N/n) | |
| tf: number of times a term appears in a document | |
| N: total number of documents | |
| n: number of documents that contain a term | |
| TF-IDF stands for "Term Frequency, Inverse Document Frequency". | |
| It is a way to score the importance of words (or "terms") in a | |
| document based on how frequently they appear across multiple documents. | |
| If a word appears frequently in a document, it's important. Give the word a high score. | |
| But if a word appears in many documents, it's not a unique identifier. Give the word a low score. | |
| Therefore, common words like "the" and "for", which appear in many documents, will be scaled down. | |
| Words that appear frequently in a single document will be scaled up. | |
| -------------------------------------------------------------------------------- | |
| #Hadoop streaming to compress file | |
| hadoop jar contrib/streaming/hadoop-streaming-1.0.3.jar \ | |
| -Dmapred.reduce.tasks=0 \ | |
| -Dmapred.output.compress=true \ | |
| -Dmapred.compress.map.output=true \ | |
| -Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec \ | |
| -input filename \ | |
| -output /filename \ | |
| -mapper /bin/cat \ | |
| -inputformat org.apache.hadoop.mapred.TextInputFormat \ | |
| -outputformat org.apache.hadoop.mapred.TextOutputFormat | |
| hadoop fs -cat /path/part* | hadoop fs -put - /path/compressed.gz | |
| -------------------------------------------------------------------------------- | |
| #show file size in HDFS | |
| hadoop fs -du -s -h /path/to/dir | |
| -------------------------------------------------------------------------------- | |
| #Copy and uncompress file to HDFS without unziping the file on local filesystem | |
| #If your file is in GB's then this command would certainly help to avoid out of space errors as | |
| #there is no need to unzip the file on local filesystem. | |
| #put command in hadoop supports reading input from stdin. For reading the input from stdin use '-' as source file. | |
| gunzip -c compressed.tar.gz | hadoop fs -put - /user/files/uncompressed_data | |
| -------------------------------------------------------------------------------- | |
| #Compile the classes | |
| javac -classpath `hadoop classpath` myPackage/*.java | |
| #Create a .jar file | |
| jar cf MyMR.jar myPackage/*.class | |
| #Submit your job | |
| hadoop jar MyMR.jar myPackage.MyDriver input_dir output_dir | |
| --------------------------------------------------------------------------------- | |
| # LocalJobRunner mode | |
| # set in driver | |
| Configuration conf = new Configuration(); | |
| conf.set("mapred.job.tracker", "local"); | |
| conf.set("fs.default.name", "file:///"); | |
| # or set on command line | |
| hadoop jar myjar.jar MyDriver -fs=file:/// -jt=local indir outdir | |
| --------------------------------------------------------------------------------- | |
| # Logging | |
| # instantiate | |
| import org.apache.log4j.Level; | |
| import org.apache.log4j.Logger; | |
| class FooMapper implements Mapper { | |
| private static final Logger LOGGER = | |
| Logger.getLogger (FooMapper.class.getName()); | |
| ... | |
| } | |
| # send logging strings | |
| # increasing threshhold from top to buttom | |
| LOGGER.trace("message"); | |
| LOGGER.debug("message"); | |
| LOGGER.info("message"); | |
| LOGGER.warn("message"); | |
| LOGGER.error("message"); | |
| # make it conditional | |
| if (LOGGER.isDebugEnabled()){ | |
| LOGGER.debug("Account info:" + acc.getReport()); | |
| } | |
| # using ToolRunner | |
| hadoop jar myjob.jar MyDriver -Dmapred.map.child.log.level=DEBUG indir outdir | |
| --------------------------------------------------------------------------------- | |
| # Counters | |
| context.getCounter(group, name).increment(amount); | |
| # retrieve counters in the driver | |
| long typeARecords = job.getCounters().findCounter("RecordType", "A").getValue(); | |
| --------------------------------------------------------------------------------- | |
| # Map-only jobs | |
| # to create a map-only job set the number of reducers to 0 | |
| job.setNumRedueTasks(0); | |
| # specify the output types by calling | |
| job.setOutputKeyClass() | |
| job.setOutputValueClass() | |
| --------------------------------------------------------------------------------- | |
| # get a list of job ids of all running jobs | |
| mapred job -list | |
| # tell the job tracker to kill the job | |
| mapred job -kill jobid | |
| --------------------------------------------------------------------------------- | |
| # Typical Import | |
| import org.apache.hadoop.fs.Path; | |
| import org.apache.hadoop.io.IntWritable; | |
| import org.apache.hadoop.io.Text; | |
| import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
| import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
| import org.apache.hadoop.mapreduce.Job; | |
| --------------------------------------------------------------------------------- | |
| # To launch a streaming job, use command | |
| hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/\ | |
| streaming/hadoop-streaming*.jar \ | |
| -input myInputDirs \ | |
| -output myOutputDir \ | |
| -mapper myMapScript.pl \ | |
| -reducer myReduceScript.pl \ | |
| -file mycode/myMapScript.pl \ | |
| -file mycode/myReduceScript.pl | |
| --------------------------------------------------------------------------------- | |
| # MRUnit | |
| import org.apache.hadoop.io.IntWritable; | |
| import org.apache.hadoop.io.LongWritable; | |
| import org.apache.hadoop.io.Text; | |
| import org.apache.hadoop.mrunit.mapreduce.MapDriver; | |
| import org.junit.Before; | |
| import org.junit.Test; | |
| public class TestWordCount { | |
| MapDriver<LongWritable, Text, Text, IntWritable> mapDriver; | |
| @Before | |
| public void setUp() { | |
| WordMapper mapper = new WordMapper(); | |
| mapDriver = new MapDriver<LongWritable, Text, Text, IntWritable>(); | |
| mapDriver.setMapper(mapper); | |
| } | |
| @Test | |
| public void testMapper() { | |
| mapDriver.withInput(new LongWritable(1), new Text("cat dog")); | |
| mapDriver.withOutput(new Text("cat"), new IntWritable(1)); | |
| mapDriver.withOutput(new Text("dog"), new IntWritable(1)); | |
| mapDriver.runTest(); | |
| } | |
| } | |
| # Compiling and running unit tests from the command line | |
| $ javac -classpath `hadoop classpath`:\ | |
| /home/training/lib/mrunit-0.9.0-incubating-hadoop2.jar:. *.java | |
| $ java -cp `hadoop classpath`:/home/training/lib/\ | |
| mrunit-0.9.0-incubating-hadoop2.jar:. \ | |
| org.junit.runner.JUnitCore TestWordCount | |
| --------------------------------------------------------------------------------- | |
| # Custom writable | |
| class DateWritable implements Writable { | |
| int month, day, year; | |
| // Constructors omitted for brevity | |
| public void readFields(DataInput in) throws IOException { | |
| this.month = in.readInt(); | |
| this.day = in.readInt(); | |
| this.year = in.readInt(); | |
| } | |
| public void write(DataOutput out) throws IOException { | |
| out.writeInt(this.month); | |
| out.writeInt(this.day); | |
| out.writeInt(this.year); | |
| } | |
| public boolean equals (Object o) { | |
| if (o instanceof DateWritable) { | |
| DateWritable other = (DateWritable) 0; | |
| return this.year == other.year && | |
| this.month == other.month && | |
| this.day == other.day; | |
| } | |
| return false; | |
| } | |
| public int compareTo(DateWritable other) { | |
| //Return -1 if this date is earlier | |
| //Return 0 if dates are equal | |
| //Return 1 if this date is later | |
| if(this.year != other.year){ | |
| return(this.year < other.year ? -1:1); | |
| } else if(this.month != other.month) { | |
| return(this.month < other.month ? -1:1); | |
| } else if(this.day != other.day) { | |
| return(this.day < other.day ? -1:1); | |
| } | |
| return 0; | |
| } | |
| public int hashCode(){ | |
| int seed = 163; | |
| return this.year*seed + this.month*seed + this.day*seed; | |
| } | |
| } | |
| --------------------------------------------------------------------------------- | |
| # Sequencefiles | |
| # Display content in terminal | |
| hadoop fs -text filename.seq | head | |
| # Read sequencefiles from a Java program | |
| Configuration config = new Configuration(); | |
| SequenceFile.Reader reader = | |
| new SequenceFile.Reader(FileSystem.get(config),path, config); | |
| Text key = (Text) reader.getKeyClass().newInstance(); | |
| IntWritable value = (IntWritable) reader.getValueClass().newInstance(); | |
| while (reader.next(key, value)) { | |
| // do something here | |
| } | |
| reader.close(); | |
| --------------------------------------------------------------------------------- | |
| # Output compression with Snappy | |
| import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; | |
| import org.apache.hadoop.io.SequenceFile.CompressionType; | |
| import org.apache.hadoop.io.compress.SnappyCodec; | |
| ... | |
| job.setOutputFormatClass(SequenceFileOutputFormat.class); | |
| FileOutputFormat.setCompressOutput(job,true); | |
| FileOutputFormat.setOutputCompressorClass(job,SnappyCodec.class); | |
| SequenceFileOutputFormat.setOutputCompressionType(job, | |
| CompressionType.BLOCK); | |
| --------------------------------------------------------------------------------- | |
| # driver | |
| # set another input format | |
| # default input format: TextInputFormat | |
| job.setInputFormatClass(KeyValueTextInputFormat.class) | |
| # specify the output format | |
| job.setOutputFormatClass(SequenceFileOutputFormat.class) | |
| # driver using ToolRunner | |
| import org.apache.hadoop.fs.Path; | |
| import org.apache.hadoop.io.IntWritable; | |
| import org.apache.hadoop.io.Text; | |
| import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
| import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
| import org.apache.hadoop.mapreduce.Job; | |
| import org.apache.hadoop.conf.Configured; | |
| import org.apache.hadoop.conf.Configuration; | |
| import org.apache.hadoop.util.Tool; | |
| import org.apache.hadoop.util.ToolRunner; | |
| public class WordCount extends Configured implements Tool { | |
| public static void main(String[] args) throws Exception { | |
| int exitCode = ToolRunner.run(new Configuration(), new WordCount(), args); | |
| System.exit(exitCode); | |
| } | |
| public int run(String[] args) throws Exception { | |
| if (args.length != 2) { | |
| System.out.printf( "Usage: %s [generic options] <input dir> <output dir>\n”, getClass().getSimpleName()); | |
| return -1; | |
| } | |
| Job job = new Job(getConf()); | |
| job.setJarByClass(WordCount.class); | |
| job.setJobName("Word Count"); | |
| FileInputFormat.setInputPaths(job, new Path(args[0])); | |
| FileOutputFormat.setOutputPath(job, new Path(args[1])); | |
| job.setMapperClass(WordMapper.class); | |
| job.setReducerClass(SumReducer.class); | |
| job.setMapOutputKeyClass(Text.class); | |
| job.setMapOutputValueClass(IntWritable.class); | |
| job.setOutputKeyClass(Text.class); | |
| job.setOutputValueClass(IntWritable.class); | |
| boolean success = job.waitForCompletion(true); | |
| return success ? 0 : 1; | |
| } | |
| } | |
| # add files to the distributed cache directly from the command line | |
| $ hadoop jar myjar.jar MyDriver \ | |
| -files path-to-file1,path-to-file2,... input output | |
| --------------------------------------------------------------------------------- | |
| # Partiitoner | |
| # guarantees all pairs with the same key go to the same reducer | |
| public class HashPartitioner<K, V> extends Partitioner<K, V> { | |
| public int getPartition(K key, V value, int numReduceTasks) { | |
| return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; | |
| } | |
| } | |
| --------------------------------------------------------------------------------- | |
| Possible Hardware Component Failures in big data center: | |
| Loss of a single node (disk crash) | |
| Loss of an entire rack (network connection) | |
| Solutions: | |
| store files redundently -> DFS | |
| divide computations into tasks -> mapreduce | |
| Speculative execution | |
| if a Map taks appears to be running slow a new instance will be started on another machine |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment