Created
February 12, 2020 21:55
-
-
Save aialenti/acad08b609642c47a5abd14e78207173 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"print(\"hello\")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"sc.list_packages()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"sc.install_pypi_package(\"pandas\")\n", | |
"sc.install_pypi_package(\"xgboost\")\n", | |
"sc.install_pypi_package(\"interruptingcow\")\n", | |
"sc.install_pypi_package(\"scikit-learn\")\n", | |
"sc.install_pypi_package(\"joblibspark\")\n", | |
"sc.install_pypi_package(\"pymongo\")\n", | |
"sc.install_pypi_package(\"tqdm\")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"from sklearn.metrics import r2_score\n", | |
"import pandas as pd\n", | |
"import numpy as np\n", | |
"import itertools\n", | |
"import numpy as np\n", | |
"import xgboost as xgb\n", | |
"from interruptingcow import timeout\n", | |
"from sklearn.model_selection import KFold # import KFold\n", | |
"from sklearn.metrics import r2_score\n", | |
"import json\n", | |
"import random\n", | |
"import pymongo\n", | |
"from pyspark import SparkContext\n", | |
"from pyspark.sql import SparkSession\n", | |
"from joblibspark import register_spark\n", | |
"from joblib import parallel_backend, Parallel, delayed\n", | |
"from pprint import pprint\n", | |
"import pyspark\n", | |
"from itertools import islice\n", | |
"from tqdm import tqdm\n", | |
"import random\n", | |
"import math\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"NODES = 3\n", | |
"CORES = 4\n", | |
"MONGO_COLLECTION = \"test_collection_random\"\n", | |
"MONGO_DATABASE = \"test\"\n", | |
"MONGO_IP = \"<mongo ip>\"\n", | |
"MONGO_PORT = 27017\n", | |
"DATA_LOCATION = '<data location>'" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"def generate_random_configuration():\n", | |
" params = {'learning_rate': random.uniform(0.01, 0.25),\n", | |
" 'colsample_bytree': random.uniform(0.8, 1.0),\n", | |
" 'subsample': random.uniform(0.5, 1.0),\n", | |
" 'n_estimators': int(math.floor(random.uniform(100, 3000))),\n", | |
" 'reg_alpha': random.uniform(0.01, 0.5),\n", | |
" 'max_depth': int(math.floor(random.uniform(3, 15))),\n", | |
" 'gamma': int(math.floor(random.uniform(0, 10))),\n", | |
" 'nthread':1\n", | |
" }\n", | |
" return params" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"def xgb_r2_score(preds, dtrain):\n", | |
" labels = dtrain.get_label()\n", | |
" return 'r2', r2_score(labels, preds)\n", | |
"print(\"xgb_r2_score created\")\n", | |
"\n", | |
"def preprocessing():\n", | |
" # Read input data\n", | |
" train = spark.read.format(\"csv\").option(\"header\", \"true\").load(DATA_LOCATION).toPandas()\n", | |
" train[\"y\"] = pd.to_numeric(train[\"y\"])\n", | |
" categorical = [\"X0\", \"X1\", \"X2\", \"X3\", \"X4\", \"X5\", \"X6\", \"X8\"]\n", | |
"\n", | |
" # Convert categorical data\n", | |
" for c in categorical:\n", | |
" group_by = train.groupby(by=c)[\"y\"].mean().reset_index().rename(columns={\"y\": \"{}_converted\".format(c)})\n", | |
" train = pd.merge(train, group_by, how='inner', on=c)\n", | |
"\n", | |
" train = train.drop(categorical, axis=1).apply(pd.to_numeric)\n", | |
"\n", | |
" X = train.drop(\"ID\", axis=1).drop(\"y\", axis=1)\n", | |
" y = train[\"y\"]\n", | |
" return X,y\n", | |
"print(\"preprocessing created\")\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"print(\"define the evaluate function (to be distributed)\")\n", | |
"def evaluate(X, y, kf, boosting_rounds):\n", | |
" def evaluate_inner(X, y, kf, boosting_rounds):\n", | |
" score = []\n", | |
" params = generate_random_configuration()\n", | |
" params[\"objective\"] = \"reg:squarederror\"\n", | |
" print(params)\n", | |
" for train_index, test_index in kf.split(X.values):\n", | |
" X_train, X_test = X.iloc[train_index], X.iloc[test_index]\n", | |
" y_train, y_test = y.iloc[train_index], y.iloc[test_index]\n", | |
" d_train = xgb.DMatrix(X_train, label=y_train)\n", | |
" d_valid = xgb.DMatrix(X_test, label=y_test)\n", | |
" d_test = xgb.DMatrix(X_test)\n", | |
" watchlist = [(d_train, 'train'), (d_valid, 'valid')]\n", | |
" clf = xgb.train(params, d_train, boosting_rounds, watchlist, early_stopping_rounds=50,\n", | |
" feval=xgb_r2_score, maximize=True, verbose_eval=25)\n", | |
" y_hat = clf.predict(d_test)\n", | |
" score.append(r2_score(y_test, y_hat))\n", | |
"\n", | |
" output_dict = {\n", | |
" 'min_score': np.min(score),\n", | |
" 'max_score': np.max(score),\n", | |
" 'average_score': np.mean(score),\n", | |
" 'median_score': np.median(score),\n", | |
" 'params': params\n", | |
" }\n", | |
" \n", | |
" client = pymongo.MongoClient(MONGO_IP, 27017)\n", | |
" db = client[MONGO_DATABASE]\n", | |
" collection = db[MONGO_COLLECTION]\n", | |
" collection.insert_one(output_dict)\n", | |
" client.close()\n", | |
" return score\n", | |
" Parallel()(delayed(evaluate_inner)(X, y, kf, boosting_rounds) for i in range(0,100000))\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"print(\"define the grid search\")\n", | |
"def random_search_spark(timeout_seconds, cv_splits, boosting_rounds):\n", | |
" # Read input data\n", | |
" X, y = preprocessing()\n", | |
" register_spark() # register spark backend\n", | |
"\n", | |
" kf = KFold(n_splits=cv_splits)\n", | |
" \n", | |
" client = pymongo.MongoClient(MONGO_IP, 27017)\n", | |
" db = client[MONGO_DATABASE]\n", | |
" collection = db[MONGO_COLLECTION]\n", | |
" collection.drop()\n", | |
"\n", | |
" with timeout(timeout_seconds, exception=RuntimeError):\n", | |
" try:\n", | |
" Parallel(backend=\"spark\", n_jobs=NODES)(delayed(evaluate)(X, y, kf, boosting_rounds) for p in range(0, NODES))\n", | |
" except RuntimeError:\n", | |
" print(\"Runtime\")\n", | |
" db = client[MONGO_DATABASE]\n", | |
" collection = db[MONGO_COLLECTION]\n", | |
"\n", | |
" cursor = collection.find({})\n", | |
" for document in cursor:\n", | |
" pprint(document)\n", | |
" client.close()\n", | |
" sc.stop()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"random_search_spark(3600, 4, 500, spark)\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "PySpark", | |
"language": "", | |
"name": "pysparkkernel" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "python", | |
"version": 2 | |
}, | |
"mimetype": "text/x-python", | |
"name": "pyspark", | |
"pygments_lexer": "python2" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 4 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@aialenti : How can I go about printing the intermediate outputs from this? I am running this on databricks