Created
January 27, 2015 11:12
-
-
Save 0asa/5dcd725d464858e9082f to your computer and use it in GitHub Desktop.
Run a Scikit-Learn algorithm on top of Spark with PySpark
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark import SparkConf, SparkContext | |
from sklearn.datasets import make_classification | |
from sklearn.ensemble import ExtraTreesClassifier | |
import pandas as pd | |
import numpy as np | |
conf = (SparkConf() | |
.setMaster("local[*]") | |
.setAppName("My app") | |
.set("spark.executor.memory", "1g")) | |
sc = SparkContext(conf = conf) | |
# Build a classification task using 3 informative features | |
X, y = make_classification(n_samples=12000, | |
n_features=10, | |
n_informative=3, | |
n_redundant=0, | |
n_repeated=0, | |
n_classes=2, | |
random_state=0, | |
shuffle=False) | |
# or read from a file (for instance) | |
#df = pd.read_csv('data.csv', sep=' ', header=None) | |
#X = df[[1,2,3,4,5,6,7,8,9,10]].as_matrix() | |
#y = df[[0]][0].tolist() | |
# Partition data | |
def dataPart(X, y, start, stop): return dict(X=X[start:stop, :], y=y[start:stop]) | |
def train(data): | |
X = data['X'] | |
y = data['y'] | |
return ExtraTreesClassifier(n_estimators=100,random_state=0).fit(X,y) | |
# Merge 2 Models | |
from sklearn.base import copy | |
def merge(left,right): | |
new = copy.deepcopy(left) | |
new.estimators_ += right.estimators_ | |
new.n_estimators = len(new.estimators_) | |
return new | |
data = [dataPart(X, y, 0, 4000), dataPart(X,y,4000,8000), dataPart(X,y,8000,12000)] | |
forest = sc.parallelize(data).map(train).reduce(merge) | |
importances = forest.feature_importances_ | |
std = np.std([tree.feature_importances_ for tree in forest.estimators_], | |
axis=0) | |
indices = np.argsort(importances)[::-1] | |
# Print the feature ranking | |
print("Feature ranking:") | |
for f in range(10): | |
print("%d. feature %d (%f)" % (f + 1, indices[f], importances[indices[f]])) |
i would like to run SGD classifier using scikit + spark. Please post some examples.
Works great! One small improvement is to permit the above code to be safely run either from spark-submit
OR from inside ipython
via %run -i sklearn-pyspark.py
if not 'sc' in locals():
conf = (SparkConf()
.setMaster("local[*]")
.setAppName("My app")
.set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)
Great! This helped me parallelize an XGBoost model, thanks man.
sagnik-rzt, I dont think you can parallelize XGBoost model since the natural of the model can not be parallelized
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
any scikit-image integration example?