Last active
December 16, 2015 07:09
-
-
Save mrorii/5396732 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package hw8; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.filecache.DistributedCache; | |
import org.apache.hadoop.fs.FileStatus; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
public class KMeansDriver extends Configured implements Tool { | |
@Override | |
public int run(String[] arg0) throws Exception { | |
Configuration conf = getConf(); | |
conf.set("mapred.textoutputformat.separator", ","); | |
int iteration = 0; | |
Path in = new Path(conf.get("in")); | |
Path out = new Path(conf.get("out")); | |
Path centroidSeed = new Path(conf.get("centroid")); | |
Path centroidDir = new Path(out.toUri() + "/iter_" + iteration); | |
// Load initial K cluster centroids | |
DistributedCache.addCacheFile(centroidSeed.toUri(), conf); | |
Job job = new Job(conf, "KMeans_0"); | |
// Set jar, mapper, reducer, combiner, etc | |
// ... | |
FileInputFormat.addInputPath(job, in); | |
FileOutputFormat.setOutputPath(job, centroidDir); | |
job.waitForCompletion(true); | |
long counter = job.getCounters().findCounter(KMeansReducer.Counter.CONVERGED).getValue(); | |
while (counter > 0) { | |
conf = new Configuration(); | |
conf.set("mapred.textoutputformat.separator", ","); | |
// Load cluster centroids | |
FileSystem fs = centroidDir.getFileSystem(conf); | |
Path pattern = new Path(centroidDir, "part-r-[0-9]*"); | |
FileStatus[] fileStatuses = fs.globStatus(pattern); | |
for (FileStatus fileStatus : fileStatuses) { | |
DistributedCache.addCacheFile(fileStatus.getPath().toUri(), conf); | |
} | |
job = new Job(conf, "Kmeans_" + iteration); | |
// Set jar, mapper, reducer, combiner, etc. | |
// ... | |
Path centroidNewDir = new Path(out.toUri() + "/iter_" + (iteration + 1)); | |
FileInputFormat.addInputPath(job, in); | |
FileOutputFormat.setOutputPath(job, centroidNewDir); | |
// wait for completion and update the counter | |
job.waitForCompletion(true); | |
iteration++; | |
centroidDir = centroidNewDir; | |
counter = job.getCounters().findCounter(KMeansReducer.Counter.CONVERGED).getValue(); | |
} | |
return 0; | |
} | |
public static void main(String[] args) throws Exception { | |
int exitCode = ToolRunner.run(new KMeansDriver(), args); | |
System.exit(exitCode); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment