Created
September 30, 2016 19:38
-
-
Save willb/f0c67905116c1b7d5f04edd6ebc24a1e to your computer and use it in GitHub Desktop.
k-means microbenchmark with text-file input
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
from time import clock | |
from pyspark.sql import SparkSession | |
from pyspark.mllib.clustering import KMeans | |
from numpy import fromstring | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--master', help='Spark master URL (default: "local[*]")', default="local[*]") | |
parser.add_argument('--infile', help='where to find input data') | |
parser.add_argument('--partitions', help='number of partitions to operate on (default=64)', type=int, default=64) | |
parser.add_argument('--iterations', help='number of iterations in each training run (default=32)', type=int, default=32) | |
parser.add_argument('--runs', help='number of training runs (default=10)', type=int, default=10) | |
parser.add_argument('--clusters', help='number of cluster centers to find (default=128)', type=int, default=128) | |
parser.add_argument('--config', metavar="KEY=VAL", help="add KEY=VAL to Spark's configuration", action='append', default=[], dest='config') | |
if __name__ == "__main__": | |
args = parser.parse_args() | |
print(args) | |
protospark = SparkSession.builder.appName("k-means-app").master(args.master) | |
spark = reduce(lambda x, y: x.config(*y.split("=")), args.config, protospark).getOrCreate() | |
runs = args.runs | |
iterations = args.iterations | |
partitions = args.partitions | |
clusters = args.clusters | |
sc = spark.sparkContext | |
rdd = sc.textFile(args.infile).map(lambda line: fromstring(line, sep=",")).repartition(partitions) | |
logger = sc._jvm.org.apache.log4j | |
logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR ) | |
start_time = clock() | |
for run in (range(runs)): | |
KMeans.train(rdd, clusters, iterations) | |
end_time = clock() | |
sc.stop() | |
print("completed %d run%s in %f seconds" % (runs, (runs > 1 and "s" or ""), end_time - start_time)) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
from numpy.random import rand as ra | |
from functools import reduce | |
from pyspark.sql import SparkSession | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--master', help='Spark master URL (default: "local[*]")', default="local[*]") | |
parser.add_argument('--outfile', help='where to store example data') | |
parser.add_argument('--size', help='number of records to generate (default: 100000)', default=1000000, type=int) | |
parser.add_argument('--dim', help='number of dimensions in each record (default=128)', type=int, default=128) | |
parser.add_argument('--config', metavar="KEY=VAL", help="add KEY=VAL to Spark's configuration", action='append', default=[], dest='config') | |
if __name__ == "__main__": | |
args = parser.parse_args() | |
print(args) | |
protospark = SparkSession.builder.appName("k-means-gen").master(args.master) | |
spark = reduce(lambda x, y: x.config(*y.split("=")), args.config, protospark).getOrCreate() | |
spark.sparkContext.parallelize(range(args.size)).map(lambda x: ",".join([str(e) for e in ra(args.dim)])).saveAsTextFile(args.outfile) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment