-
-
Save tspannhw/438ae3f3de4d26c68e688cbddf8ca247 to your computer and use it in GitHub Desktop.
Training Random Forest Model in Spark, Exporting to PMML Using JPMML-SparkML and Evaluating Using Openscoring
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Import packages | |
| from pyspark.ml import Pipeline | |
| from pyspark.ml.classification import RandomForestClassifier | |
| from pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoder, VectorAssembler, IndexToString | |
| from pyspark.ml.evaluation import MulticlassClassificationEvaluator | |
| from pyspark.sql.functions import * | |
| # Creating Spark SQL environment | |
| from pyspark.sql import SparkSession, HiveContext | |
| SparkContext.setSystemProperty("hive.metastore.uris", "thrift://nn1:9083") | |
| spark = SparkSession.builder.enableHiveSupport().getOrCreate() | |
| # For the sake of simplicity, we've placed Titanic.csv is in the same folder | |
| train = spark.read.csv("Titanic.csv", header = True) | |
| # Displays the content of the DataFrame to stdout | |
| # train.show(10) | |
| # String to double on some columns of the dataset : creates a new dataset | |
| train = train.select(col("Survived"),col("Sex"),col("Embarked"),col("Pclass").cast("double"),col("Age").cast("double"),col("SibSp").cast("double"),col("Fare").cast("double")) | |
| # dropping null values | |
| train = train.dropna() | |
| # Spliting in train and test set. Beware : It sorts the dataset | |
| (traindf, testdf) = train.randomSplit([0.7,0.3]) | |
| # Index labels, adding metadata to the label column. | |
| # Fit on whole dataset to include all labels in index. | |
| genderIndexer = StringIndexer(inputCol="Sex", outputCol="indexedSex") | |
| embarkIndexer = StringIndexer(inputCol="Embarked", outputCol="indexedEmbarked") | |
| surviveIndexer = StringIndexer(inputCol="Survived", outputCol="indexedSurvived") | |
| # One Hot Encoder on indexed features | |
| genderEncoder = OneHotEncoder(inputCol="indexedSex", outputCol="sexVec") | |
| embarkEncoder = OneHotEncoder(inputCol="indexedEmbarked", outputCol="embarkedVec") | |
| # Create the vector structured data (label,features(vector)) | |
| assembler = VectorAssembler(inputCols=["Pclass","sexVec","Age","SibSp","Fare","embarkedVec"],outputCol="features") | |
| # Train a RandomForest model. | |
| rf = RandomForestClassifier(labelCol="indexedSurvived", featuresCol="features") | |
| # Chain indexers and forest in a Pipeline | |
| pipeline = Pipeline(stages=[surviveIndexer, genderIndexer, embarkIndexer, genderEncoder,embarkEncoder, assembler, rf]) | |
| # Train model. This also runs the indexers. | |
| model = pipeline.fit(traindf) | |
| from jpmml_sparkml import toPMMLBytes | |
| pmmlBytes = toPMMLBytes(sc, traindf, model) | |
| # Uncomment this line if you want to see the actual PMML | |
| # print(pmmlBytes.decode("UTF-8")) | |
| pmml = pmmlBytes.decode("UTF-8") | |
| # Writing PMML to file for later usage | |
| file = open("Titanic.pmml","w") | |
| file.write(pmml) | |
| file.close() | |
| from openscoring import Openscoring | |
| os = Openscoring("http://0.0.0.0:8080/openscoring") | |
| kwargs = {"auth" : ("admin", "adminadmin")} | |
| # Deploying PMML model to Openscoring web service | |
| os.deployFile("Titanic", "Titanic.pmml", **kwargs) | |
| # Two tests with different arguments | |
| arguments = {"Fare":7.25,"Sex":"male","Age":45,"SibSp":1,"Pclass":3,"Embarked":"S"} | |
| result = os.evaluate("Titanic", arguments) | |
| print(result) | |
| arguments = {"Fare":7.25,"Sex":"female","Age":20,"SibSp":1,"Pclass":1,"Embarked":"Q"} | |
| result = os.evaluate("Titanic", arguments) | |
| print(result) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment