Skip to content

Instantly share code, notes, and snippets.

@mrorii
Last active December 16, 2015 07:09
Show Gist options
  • Save mrorii/5396732 to your computer and use it in GitHub Desktop.
Save mrorii/5396732 to your computer and use it in GitHub Desktop.
package hw8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class KMeansDriver extends Configured implements Tool {
@Override
public int run(String[] arg0) throws Exception {
Configuration conf = getConf();
conf.set("mapred.textoutputformat.separator", ",");
int iteration = 0;
Path in = new Path(conf.get("in"));
Path out = new Path(conf.get("out"));
Path centroidSeed = new Path(conf.get("centroid"));
Path centroidDir = new Path(out.toUri() + "/iter_" + iteration);
// Load initial K cluster centroids
DistributedCache.addCacheFile(centroidSeed.toUri(), conf);
Job job = new Job(conf, "KMeans_0");
// Set jar, mapper, reducer, combiner, etc
// ...
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, centroidDir);
job.waitForCompletion(true);
long counter = job.getCounters().findCounter(KMeansReducer.Counter.CONVERGED).getValue();
while (counter > 0) {
conf = new Configuration();
conf.set("mapred.textoutputformat.separator", ",");
// Load cluster centroids
FileSystem fs = centroidDir.getFileSystem(conf);
Path pattern = new Path(centroidDir, "part-r-[0-9]*");
FileStatus[] fileStatuses = fs.globStatus(pattern);
for (FileStatus fileStatus : fileStatuses) {
DistributedCache.addCacheFile(fileStatus.getPath().toUri(), conf);
}
job = new Job(conf, "Kmeans_" + iteration);
// Set jar, mapper, reducer, combiner, etc.
// ...
Path centroidNewDir = new Path(out.toUri() + "/iter_" + (iteration + 1));
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, centroidNewDir);
// wait for completion and update the counter
job.waitForCompletion(true);
iteration++;
centroidDir = centroidNewDir;
counter = job.getCounters().findCounter(KMeansReducer.Counter.CONVERGED).getValue();
}
return 0;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new KMeansDriver(), args);
System.exit(exitCode);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment