Skip to content

Instantly share code, notes, and snippets.

@jylock
Last active October 13, 2015 18:35
Show Gist options
  • Select an option

  • Save jylock/2170cdabdb339ebb4243 to your computer and use it in GitHub Desktop.

Select an option

Save jylock/2170cdabdb339ebb4243 to your computer and use it in GitHub Desktop.
Compile, ToolRunner, Configuration, etc
#TF-IDF
TF-IDF = tf * idf = tf * log(N/n)
tf: number of times a term appears in a document
N: total number of documents
n: number of documents that contain a term
TF-IDF stands for "Term Frequency, Inverse Document Frequency".
It is a way to score the importance of words (or "terms") in a
document based on how frequently they appear across multiple documents.
If a word appears frequently in a document, it's important. Give the word a high score.
But if a word appears in many documents, it's not a unique identifier. Give the word a low score.
Therefore, common words like "the" and "for", which appear in many documents, will be scaled down.
Words that appear frequently in a single document will be scaled up.
--------------------------------------------------------------------------------
#Hadoop streaming to compress file
hadoop jar contrib/streaming/hadoop-streaming-1.0.3.jar \
-Dmapred.reduce.tasks=0 \
-Dmapred.output.compress=true \
-Dmapred.compress.map.output=true \
-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec \
-input filename \
-output /filename \
-mapper /bin/cat \
-inputformat org.apache.hadoop.mapred.TextInputFormat \
-outputformat org.apache.hadoop.mapred.TextOutputFormat
hadoop fs -cat /path/part* | hadoop fs -put - /path/compressed.gz
--------------------------------------------------------------------------------
#show file size in HDFS
hadoop fs -du -s -h /path/to/dir
--------------------------------------------------------------------------------
#Copy and uncompress file to HDFS without unziping the file on local filesystem
#If your file is in GB's then this command would certainly help to avoid out of space errors as
#there is no need to unzip the file on local filesystem.
#put command in hadoop supports reading input from stdin. For reading the input from stdin use '-' as source file.
gunzip -c compressed.tar.gz | hadoop fs -put - /user/files/uncompressed_data
--------------------------------------------------------------------------------
#Compile the classes
javac -classpath `hadoop classpath` myPackage/*.java
#Create a .jar file
jar cf MyMR.jar myPackage/*.class
#Submit your job
hadoop jar MyMR.jar myPackage.MyDriver input_dir output_dir
---------------------------------------------------------------------------------
# LocalJobRunner mode
# set in driver
Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "local");
conf.set("fs.default.name", "file:///");
# or set on command line
hadoop jar myjar.jar MyDriver -fs=file:/// -jt=local indir outdir
---------------------------------------------------------------------------------
# Logging
# instantiate
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
class FooMapper implements Mapper {
private static final Logger LOGGER =
Logger.getLogger (FooMapper.class.getName());
...
}
# send logging strings
# increasing threshhold from top to buttom
LOGGER.trace("message");
LOGGER.debug("message");
LOGGER.info("message");
LOGGER.warn("message");
LOGGER.error("message");
# make it conditional
if (LOGGER.isDebugEnabled()){
LOGGER.debug("Account info:" + acc.getReport());
}
# using ToolRunner
hadoop jar myjob.jar MyDriver -Dmapred.map.child.log.level=DEBUG indir outdir
---------------------------------------------------------------------------------
# Counters
context.getCounter(group, name).increment(amount);
# retrieve counters in the driver
long typeARecords = job.getCounters().findCounter("RecordType", "A").getValue();
---------------------------------------------------------------------------------
# Map-only jobs
# to create a map-only job set the number of reducers to 0
job.setNumRedueTasks(0);
# specify the output types by calling
job.setOutputKeyClass()
job.setOutputValueClass()
---------------------------------------------------------------------------------
# get a list of job ids of all running jobs
mapred job -list
# tell the job tracker to kill the job
mapred job -kill jobid
---------------------------------------------------------------------------------
# Typical Import
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
---------------------------------------------------------------------------------
# To launch a streaming job, use command
hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/\
streaming/hadoop-streaming*.jar \
-input myInputDirs \
-output myOutputDir \
-mapper myMapScript.pl \
-reducer myReduceScript.pl \
-file mycode/myMapScript.pl \
-file mycode/myReduceScript.pl
---------------------------------------------------------------------------------
# MRUnit
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.junit.Before;
import org.junit.Test;
public class TestWordCount {
MapDriver<LongWritable, Text, Text, IntWritable> mapDriver;
@Before
public void setUp() {
WordMapper mapper = new WordMapper();
mapDriver = new MapDriver<LongWritable, Text, Text, IntWritable>();
mapDriver.setMapper(mapper);
}
@Test
public void testMapper() {
mapDriver.withInput(new LongWritable(1), new Text("cat dog"));
mapDriver.withOutput(new Text("cat"), new IntWritable(1));
mapDriver.withOutput(new Text("dog"), new IntWritable(1));
mapDriver.runTest();
}
}
# Compiling and running unit tests from the command line
$ javac -classpath `hadoop classpath`:\
/home/training/lib/mrunit-0.9.0-incubating-hadoop2.jar:. *.java
$ java -cp `hadoop classpath`:/home/training/lib/\
mrunit-0.9.0-incubating-hadoop2.jar:. \
org.junit.runner.JUnitCore TestWordCount
---------------------------------------------------------------------------------
# Custom writable
class DateWritable implements Writable {
int month, day, year;
// Constructors omitted for brevity
public void readFields(DataInput in) throws IOException {
this.month = in.readInt();
this.day = in.readInt();
this.year = in.readInt();
}
public void write(DataOutput out) throws IOException {
out.writeInt(this.month);
out.writeInt(this.day);
out.writeInt(this.year);
}
public boolean equals (Object o) {
if (o instanceof DateWritable) {
DateWritable other = (DateWritable) 0;
return this.year == other.year &&
this.month == other.month &&
this.day == other.day;
}
return false;
}
public int compareTo(DateWritable other) {
//Return -1 if this date is earlier
//Return 0 if dates are equal
//Return 1 if this date is later
if(this.year != other.year){
return(this.year < other.year ? -1:1);
} else if(this.month != other.month) {
return(this.month < other.month ? -1:1);
} else if(this.day != other.day) {
return(this.day < other.day ? -1:1);
}
return 0;
}
public int hashCode(){
int seed = 163;
return this.year*seed + this.month*seed + this.day*seed;
}
}
---------------------------------------------------------------------------------
# Sequencefiles
# Display content in terminal
hadoop fs -text filename.seq | head
# Read sequencefiles from a Java program
Configuration config = new Configuration();
SequenceFile.Reader reader =
new SequenceFile.Reader(FileSystem.get(config),path, config);
Text key = (Text) reader.getKeyClass().newInstance();
IntWritable value = (IntWritable) reader.getValueClass().newInstance();
while (reader.next(key, value)) {
// do something here
}
reader.close();
---------------------------------------------------------------------------------
# Output compression with Snappy
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.SnappyCodec;
...
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileOutputFormat.setCompressOutput(job,true);
FileOutputFormat.setOutputCompressorClass(job,SnappyCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job,
CompressionType.BLOCK);
---------------------------------------------------------------------------------
# driver
# set another input format
# default input format: TextInputFormat
job.setInputFormatClass(KeyValueTextInputFormat.class)
# specify the output format
job.setOutputFormatClass(SequenceFileOutputFormat.class)
# driver using ToolRunner
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WordCount extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(), new WordCount(), args);
System.exit(exitCode);
}
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out.printf( "Usage: %s [generic options] <input dir> <output dir>\n”, getClass().getSimpleName());
return -1;
}
Job job = new Job(getConf());
job.setJarByClass(WordCount.class);
job.setJobName("Word Count");
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(WordMapper.class);
job.setReducerClass(SumReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
}
# add files to the distributed cache directly from the command line
$ hadoop jar myjar.jar MyDriver \
-files path-to-file1,path-to-file2,... input output
---------------------------------------------------------------------------------
# Partiitoner
# guarantees all pairs with the same key go to the same reducer
public class HashPartitioner<K, V> extends Partitioner<K, V> {
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
---------------------------------------------------------------------------------
Possible Hardware Component Failures in big data center:
Loss of a single node (disk crash)
Loss of an entire rack (network connection)
Solutions:
store files redundently -> DFS
divide computations into tasks -> mapreduce
Speculative execution
if a Map taks appears to be running slow a new instance will be started on another machine
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment