Skip to content

Instantly share code, notes, and snippets.

@lucidfrontier45
Created January 22, 2016 08:27
Show Gist options
  • Select an option

  • Save lucidfrontier45/591be3eb78557d1844ca to your computer and use it in GitHub Desktop.

Select an option

Save lucidfrontier45/591be3eb78557d1844ca to your computer and use it in GitHub Desktop.
Use trained sklearn model with pyspark
from pyspark import SparkContext
import numpy as np
from sklearn import ensemble
def batch(xs):
yield list(xs)
N = 1000
train_x = np.random.randn(N, 10)
train_y = np.random.binomial(1, 0.5, N)
model = ensemble.RandomForestClassifier(10).fit(train_x, train_y)
test_x = np.random.randn(N * 100, 10)
sc = SparkContext()
n_partitions = 10
rdd = sc.parallelize(test_x, n_partitions).zipWithIndex()
b_model = sc.broadcast(model)
result = rdd.mapPartitions(batch) \
.map(lambda xs: ([x[0] for x in xs], [x[1] for x in xs])) \
.flatMap(lambda x: zip(x[1], b_model.value.predict(x[0])))
print(result.take(100))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment