Skip to content

Instantly share code, notes, and snippets.

@Ben-Epstein
Created July 24, 2020 00:35
Show Gist options
  • Select an option

  • Save Ben-Epstein/eb0c6ef08dab76f6829150c9506012ca to your computer and use it in GitHub Desktop.

Select an option

Save Ben-Epstein/eb0c6ef08dab76f6829150c9506012ca to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{"cells":[{"metadata":{},"cell_type":"markdown","source":"# Machine Learning Model Creation in Splice Machine\n#### Starting the Spark Session"},{"metadata":{"trusted":true},"cell_type":"code","source":"# Setup\nfrom pyspark.sql import SparkSession\nfrom splicemachine.spark import PySpliceContext\nfrom splicemachine.mlflow_support.utilities import get_user\n\nspark = SparkSession.builder.getOrCreate()\nsplice = PySpliceContext(spark)\nschema = get_user()","execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":"## Importing MLflow Support\n<blockquote><p class='quotation'><span style='font-size:15px'> As explained in <a href='./7.2 Splice MLflow Support.ipynb'>7.2 Splice MLflow Support</a>, using MLflow on Splice Machine is extremely easy. Check out our <a href='https://pysplice.readthedocs.io/en/latest/splicemachine.mlflow_support.html'>documentation</a> for the available functionality.<br><footer>Splice Machine</footer>\n</blockquote>"},{"metadata":{"trusted":true},"cell_type":"code","source":"# MLFlow Setup\nfrom splicemachine.mlflow_support import *\nmlflow.register_splice_context(splice)","execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":"## Starting an experiment\n<blockquote><p class='quotation'><span style='font-size:15px'> Here we'll begin an experiment to keep track of our modeling efforts for this prediction task.<footer>Splice Machine</footer>\n</blockquote>"},{"metadata":{"trusted":true},"cell_type":"code","source":"mlflow.set_experiment('model_creation_demo')","execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":"## Starting a run\n<blockquote><p class='quotation'><span style='font-size:15px'> Here we'll begin an experiment to keep track of our modeling efforts in this notebook specifically.<footer>Splice Machine</footer>\n</blockquote>"},{"metadata":{"trusted":true},"cell_type":"code","source":"#start our first MLFlow run\nfrom datetime import datetime\n\ntags = {'team': 'Splice Machine', 'purpose': 'fraud DEMO'}\nmlflow.start_run(tags=tags, run_name=f\"RF_run\")","execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":"## Ingesting Data\n<blockquote><p class='quotation'><span style='font-size:15px'> Ingesting the table created in <a href='./7.3 Data Exploration.ipynb'>7.3 Data Exploration</a>, we will begin constructing a very simple Machine Learning Model. <footer>Splice Machine</footer>\n</blockquote>"},{"metadata":{"trusted":true},"cell_type":"code","source":"sql_query = f\"SELECT * FROM {schema}.cc_fraud_data\"\ndf = splice.df(sql_query)","execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":"## Logging our first Parameter \n<blockquote><p class='quotation'><span style='font-size:15px'> We're utilizing MLFlow to keep track of the query we used to ingest the data for this modeling effort. <footer>Splice Machine</footer>\n</blockquote>"},{"metadata":{"trusted":true},"cell_type":"code","source":"# Logging our first parameter: the query we used to ingest our data\nmlflow.log_param(\"ingest_query\", sql_query)","execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":"## Selecting Our Features\n<blockquote>Here we'll select the features only most strongly correlated to our target<footer>Splice Machine</footer>\n</blockquote>"},{"metadata":{"scrolled":true,"trusted":true},"cell_type":"code","source":"import pandas as pd\npdf = df.filter(df.CLASS_RESULT == 0).limit(900).toPandas()\\\n .append(df.filter(df.CLASS_RESULT == 1).limit(100).toPandas())\npdf = pdf.apply(pd.to_numeric)\ncorr = pdf.corr()\n\nmost_correlated = corr.abs()['CLASS_RESULT'].sort_values(ascending=False).reset_index()\nmost_correlated = most_correlated.iloc[1:].rename({\"index\":\"feature\",\"CLASS_RESULT\":\"correlation_to_target\"}, axis = 1)\nprint(most_correlated)","execution_count":null,"outputs":[]},{"metadata":{"trusted":true},"cell_type":"code","source":"CORRELATION_CUTOFF = 0.05\n#Logging this in mlflow\nmlflow.log_param(\"correlation_cutoff\", CORRELATION_CUTOFF)\n\nfeature_cols = list(most_correlated[most_correlated['correlation_to_target']>CORRELATION_CUTOFF]['feature'])\nprint(feature_cols)","execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":"## Defining a Machine Learning Pipeline\n\n<blockquote>We'll use Spark's <code>Pipeline</code> class to define a set of <code>Transformers</code> that get your dataset ready for modeling<br>\nWe'll then use <code>mlflow</code> to <code>log</code> our Pipeline stages. Both <code>log_pipeline_stages</code> and <code>log_feature_transformations</code> are custom Splice Machine functions for tracking Spark Pipelines. </blockquote>"},{"metadata":{"trusted":true},"cell_type":"code","source":"%%time\nfrom pyspark.ml.feature import StandardScaler, VectorAssembler\nfrom pyspark.ml import Pipeline,PipelineModel\nfrom pyspark.ml.classification import RandomForestClassifier, MultilayerPerceptronClassifier\n\n\"\"\"\nThe preprocessing stages for this example are: \n1) Vector assembling the feature columns \n2) Standardizing our feature columns\n\"\"\"\nmax_depth = 5 \nnum_trees = 20\n\nassembler = VectorAssembler(inputCols=feature_cols, outputCol='features')\nscaler = StandardScaler(inputCol=\"features\", outputCol='scaledFeatures')\nrf = RandomForestClassifier(featuresCol = 'scaledFeatures', labelCol = 'CLASS_RESULT', maxDepth = max_depth, numTrees = num_trees)\n\n# Pipeline to preprocess and model our data\nmlpipe = Pipeline(stages=[assembler,scaler, rf])\n\n# Custom Splice functions to add granularity and governance to your Spark Pipeline Models\nmlflow.log_pipeline_stages(mlpipe)\nmlflow.log_feature_transformations(mlpipe)","execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":"## Separating our data for performance evaluation \n<blockquote> We are using a simple, single train/ test split to assess the performance of our simple model. Of note, we are not invesitgated the class balances, and we are using untuned hyperparameters to predict the target variable. These can be adjusted as an exercise. <footer>Splice Machine</footer>\n</blockquote>"},{"metadata":{"trusted":true},"cell_type":"code","source":"from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator\nfrom pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel\nfrom splicemachine.stats import *\n\n# Split our data into a training and testing set\ntrain, test = df.randomSplit([0.8, 0.2])\n\nmlflow.lp(\"train_ratio\", 0.80)","execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":"## Fitting our model \n<blockquote> Training our model and logging executing time using Splcie's custom <code>with mlflow.timer('timer_name')</code> block function to track the time it takes to complete a block. Everything in the block will be timed, and then logged to mlflow under the timer name provided to the function. <footer>Splice Machine</footer>\n</blockquote>"},{"metadata":{"scrolled":true,"trusted":true},"cell_type":"code","source":"with mlflow.timer('training'):\n fitted_model = mlpipe.fit(train)\n# Log the parameters of the best model\nmlflow.log_model_params(fitted_model)\n","execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":"## Assessing our Model Performance\n<blockquote> Making predicitons on the test set, evaluating performance, and logging this to MLFlow <footer>Splice Machine</footer>\n</blockquote>"},{"metadata":{"scrolled":true,"trusted":true},"cell_type":"code","source":"#Inference\npredictions = fitted_model.transform(test)\n\n#Performance Evaluation\nbinary_evaluator = SpliceBinaryClassificationEvaluator(spark, labelCol = \"CLASS_RESULT\")\nbinary_evaluator.input(predictions)\nperformance_metrics = binary_evaluator.get_results(as_dict = True)\n\n#Logging Performance\nmlflow.log_metrics(performance_metrics)","execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":"## Logging Artifacts of this Run\n<blockquote> We can store the notebook associated with a particular run as well as the fitted model created by this run <footer>Splice Machine</footer>\n</blockquote>"},{"metadata":{"trusted":true},"cell_type":"code","source":"# Store the notebook for easy retrieval\nmlflow.log_artifact('7.5 Model Creation.ipynb', 'training_notebook')\n#Log the best model\nmlflow.log_model(fitted_model, 'rf_model')","execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":"## Finish our run\n<blockquote>Now we'll end our run, and view the results in the <a href=\"/mlflow\">MLFlow UI</a>. We can look at our different runs, the parameters, metrics, tags and artifacts logged, and download our notebook directly. You'll know the run is complete fom the small green check mark on the leftmost side of the run</blockquote>"},{"metadata":{"trusted":true},"cell_type":"code","source":"from splicemachine.notebook import get_mlflow_ui\nget_mlflow_ui(mlflow.current_exp_id(), mlflow.current_run_id())","execution_count":null,"outputs":[]},{"metadata":{"trusted":true},"cell_type":"code","source":"mlflow.end_run()","execution_count":null,"outputs":[]},{"metadata":{"trusted":true},"cell_type":"code","source":"spark.stop()","execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":"# Fantastic!\n<blockquote> \nThis basically shows how our platform can be used to train and evaluate machine learning models! <br>\n Next Up: <a href='./7.6 Model Deployment.ipynb'>Using MLManager to Deploy Machine Learning Models</a>\n<footer>Splice Machine</footer>\n</blockquote>"}],"metadata":{"kernelspec":{"name":"python3","display_name":"Python 3","language":"python"},"language_info":{"name":"python","version":"3.7.6","mimetype":"text/x-python","codemirror_mode":{"name":"ipython","version":3},"pygments_lexer":"ipython3","nbconvert_exporter":"python","file_extension":".py"},"toc":{"nav_menu":{},"number_sections":false,"sideBar":true,"skip_h1_title":false,"base_numbering":1,"title_cell":"Table of Contents","title_sidebar":"Contents","toc_cell":false,"toc_position":{"height":"calc(100% - 180px)","left":"10px","top":"150px","width":"220px"},"toc_section_display":true,"toc_window_display":false}},"nbformat":4,"nbformat_minor":4}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment