Skip to content

Instantly share code, notes, and snippets.

@tspannhw
Forked from kkravik/randomforest.py
Created July 24, 2018 16:14
Show Gist options
  • Select an option

  • Save tspannhw/438ae3f3de4d26c68e688cbddf8ca247 to your computer and use it in GitHub Desktop.

Select an option

Save tspannhw/438ae3f3de4d26c68e688cbddf8ca247 to your computer and use it in GitHub Desktop.
Training Random Forest Model in Spark, Exporting to PMML Using JPMML-SparkML and Evaluating Using Openscoring
# Import packages
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoder, VectorAssembler, IndexToString
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import *
# Creating Spark SQL environment
from pyspark.sql import SparkSession, HiveContext
SparkContext.setSystemProperty("hive.metastore.uris", "thrift://nn1:9083")
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
# For the sake of simplicity, we've placed Titanic.csv is in the same folder
train = spark.read.csv("Titanic.csv", header = True)
# Displays the content of the DataFrame to stdout
# train.show(10)
# String to double on some columns of the dataset : creates a new dataset
train = train.select(col("Survived"),col("Sex"),col("Embarked"),col("Pclass").cast("double"),col("Age").cast("double"),col("SibSp").cast("double"),col("Fare").cast("double"))
# dropping null values
train = train.dropna()
# Spliting in train and test set. Beware : It sorts the dataset
(traindf, testdf) = train.randomSplit([0.7,0.3])
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
genderIndexer = StringIndexer(inputCol="Sex", outputCol="indexedSex")
embarkIndexer = StringIndexer(inputCol="Embarked", outputCol="indexedEmbarked")
surviveIndexer = StringIndexer(inputCol="Survived", outputCol="indexedSurvived")
# One Hot Encoder on indexed features
genderEncoder = OneHotEncoder(inputCol="indexedSex", outputCol="sexVec")
embarkEncoder = OneHotEncoder(inputCol="indexedEmbarked", outputCol="embarkedVec")
# Create the vector structured data (label,features(vector))
assembler = VectorAssembler(inputCols=["Pclass","sexVec","Age","SibSp","Fare","embarkedVec"],outputCol="features")
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedSurvived", featuresCol="features")
# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[surviveIndexer, genderIndexer, embarkIndexer, genderEncoder,embarkEncoder, assembler, rf])
# Train model. This also runs the indexers.
model = pipeline.fit(traindf)
from jpmml_sparkml import toPMMLBytes
pmmlBytes = toPMMLBytes(sc, traindf, model)
# Uncomment this line if you want to see the actual PMML
# print(pmmlBytes.decode("UTF-8"))
pmml = pmmlBytes.decode("UTF-8")
# Writing PMML to file for later usage
file = open("Titanic.pmml","w")
file.write(pmml)
file.close()
from openscoring import Openscoring
os = Openscoring("http://0.0.0.0:8080/openscoring")
kwargs = {"auth" : ("admin", "adminadmin")}
# Deploying PMML model to Openscoring web service
os.deployFile("Titanic", "Titanic.pmml", **kwargs)
# Two tests with different arguments
arguments = {"Fare":7.25,"Sex":"male","Age":45,"SibSp":1,"Pclass":3,"Embarked":"S"}
result = os.evaluate("Titanic", arguments)
print(result)
arguments = {"Fare":7.25,"Sex":"female","Age":20,"SibSp":1,"Pclass":1,"Embarked":"Q"}
result = os.evaluate("Titanic", arguments)
print(result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment