Skip to content

Instantly share code, notes, and snippets.

@willb
Created September 30, 2016 19:38
Show Gist options
  • Save willb/f0c67905116c1b7d5f04edd6ebc24a1e to your computer and use it in GitHub Desktop.
Save willb/f0c67905116c1b7d5f04edd6ebc24a1e to your computer and use it in GitHub Desktop.
k-means microbenchmark with text-file input
import argparse
from time import clock
from pyspark.sql import SparkSession
from pyspark.mllib.clustering import KMeans
from numpy import fromstring
parser = argparse.ArgumentParser()
parser.add_argument('--master', help='Spark master URL (default: "local[*]")', default="local[*]")
parser.add_argument('--infile', help='where to find input data')
parser.add_argument('--partitions', help='number of partitions to operate on (default=64)', type=int, default=64)
parser.add_argument('--iterations', help='number of iterations in each training run (default=32)', type=int, default=32)
parser.add_argument('--runs', help='number of training runs (default=10)', type=int, default=10)
parser.add_argument('--clusters', help='number of cluster centers to find (default=128)', type=int, default=128)
parser.add_argument('--config', metavar="KEY=VAL", help="add KEY=VAL to Spark's configuration", action='append', default=[], dest='config')
if __name__ == "__main__":
args = parser.parse_args()
print(args)
protospark = SparkSession.builder.appName("k-means-app").master(args.master)
spark = reduce(lambda x, y: x.config(*y.split("=")), args.config, protospark).getOrCreate()
runs = args.runs
iterations = args.iterations
partitions = args.partitions
clusters = args.clusters
sc = spark.sparkContext
rdd = sc.textFile(args.infile).map(lambda line: fromstring(line, sep=",")).repartition(partitions)
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
start_time = clock()
for run in (range(runs)):
KMeans.train(rdd, clusters, iterations)
end_time = clock()
sc.stop()
print("completed %d run%s in %f seconds" % (runs, (runs > 1 and "s" or ""), end_time - start_time))
import argparse
from numpy.random import rand as ra
from functools import reduce
from pyspark.sql import SparkSession
parser = argparse.ArgumentParser()
parser.add_argument('--master', help='Spark master URL (default: "local[*]")', default="local[*]")
parser.add_argument('--outfile', help='where to store example data')
parser.add_argument('--size', help='number of records to generate (default: 100000)', default=1000000, type=int)
parser.add_argument('--dim', help='number of dimensions in each record (default=128)', type=int, default=128)
parser.add_argument('--config', metavar="KEY=VAL", help="add KEY=VAL to Spark's configuration", action='append', default=[], dest='config')
if __name__ == "__main__":
args = parser.parse_args()
print(args)
protospark = SparkSession.builder.appName("k-means-gen").master(args.master)
spark = reduce(lambda x, y: x.config(*y.split("=")), args.config, protospark).getOrCreate()
spark.sparkContext.parallelize(range(args.size)).map(lambda x: ",".join([str(e) for e in ra(args.dim)])).saveAsTextFile(args.outfile)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment