Skip to content

Instantly share code, notes, and snippets.

@datitran
Forked from 0asa/sklearn-pyspark.py
Created November 15, 2015 19:53
Show Gist options
  • Save datitran/e0cff2d71e04431b92ab to your computer and use it in GitHub Desktop.
Save datitran/e0cff2d71e04431b92ab to your computer and use it in GitHub Desktop.
Run a Scikit-Learn algorithm on top of Spark with PySpark
from pyspark import SparkConf, SparkContext
from sklearn.datasets import make_classification
from sklearn.ensemble import ExtraTreesClassifier
import pandas as pd
import numpy as np
conf = (SparkConf()
.setMaster("local[*]")
.setAppName("My app")
.set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)
# Build a classification task using 3 informative features
X, y = make_classification(n_samples=12000,
n_features=10,
n_informative=3,
n_redundant=0,
n_repeated=0,
n_classes=2,
random_state=0,
shuffle=False)
# or read from a file (for instance)
#df = pd.read_csv('data.csv', sep=' ', header=None)
#X = df[[1,2,3,4,5,6,7,8,9,10]].as_matrix()
#y = df[[0]][0].tolist()
# Partition data
def dataPart(X, y, start, stop): return dict(X=X[start:stop, :], y=y[start:stop])
def train(data):
X = data['X']
y = data['y']
return ExtraTreesClassifier(n_estimators=100,random_state=0).fit(X,y)
# Merge 2 Models
from sklearn.base import copy
def merge(left,right):
new = copy.deepcopy(left)
new.estimators_ += right.estimators_
new.n_estimators = len(new.estimators_)
return new
data = [dataPart(X, y, 0, 4000), dataPart(X,y,4000,8000), dataPart(X,y,8000,12000)]
forest = sc.parallelize(data).map(train).reduce(merge)
importances = forest.feature_importances_
std = np.std([tree.feature_importances_ for tree in forest.estimators_],
axis=0)
indices = np.argsort(importances)[::-1]
# Print the feature ranking
print("Feature ranking:")
for f in range(10):
print("%d. feature %d (%f)" % (f + 1, indices[f], importances[indices[f]]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment