Skip to content

Instantly share code, notes, and snippets.

@aialenti
Created February 12, 2020 21:55
Show Gist options
  • Save aialenti/acad08b609642c47a5abd14e78207173 to your computer and use it in GitHub Desktop.
Save aialenti/acad08b609642c47a5abd14e78207173 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(\"hello\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"sc.list_packages()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"sc.install_pypi_package(\"pandas\")\n",
"sc.install_pypi_package(\"xgboost\")\n",
"sc.install_pypi_package(\"interruptingcow\")\n",
"sc.install_pypi_package(\"scikit-learn\")\n",
"sc.install_pypi_package(\"joblibspark\")\n",
"sc.install_pypi_package(\"pymongo\")\n",
"sc.install_pypi_package(\"tqdm\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from sklearn.metrics import r2_score\n",
"import pandas as pd\n",
"import numpy as np\n",
"import itertools\n",
"import numpy as np\n",
"import xgboost as xgb\n",
"from interruptingcow import timeout\n",
"from sklearn.model_selection import KFold # import KFold\n",
"from sklearn.metrics import r2_score\n",
"import json\n",
"import random\n",
"import pymongo\n",
"from pyspark import SparkContext\n",
"from pyspark.sql import SparkSession\n",
"from joblibspark import register_spark\n",
"from joblib import parallel_backend, Parallel, delayed\n",
"from pprint import pprint\n",
"import pyspark\n",
"from itertools import islice\n",
"from tqdm import tqdm\n",
"import random\n",
"import math\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"NODES = 3\n",
"CORES = 4\n",
"MONGO_COLLECTION = \"test_collection_random\"\n",
"MONGO_DATABASE = \"test\"\n",
"MONGO_IP = \"<mongo ip>\"\n",
"MONGO_PORT = 27017\n",
"DATA_LOCATION = '<data location>'"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def generate_random_configuration():\n",
" params = {'learning_rate': random.uniform(0.01, 0.25),\n",
" 'colsample_bytree': random.uniform(0.8, 1.0),\n",
" 'subsample': random.uniform(0.5, 1.0),\n",
" 'n_estimators': int(math.floor(random.uniform(100, 3000))),\n",
" 'reg_alpha': random.uniform(0.01, 0.5),\n",
" 'max_depth': int(math.floor(random.uniform(3, 15))),\n",
" 'gamma': int(math.floor(random.uniform(0, 10))),\n",
" 'nthread':1\n",
" }\n",
" return params"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def xgb_r2_score(preds, dtrain):\n",
" labels = dtrain.get_label()\n",
" return 'r2', r2_score(labels, preds)\n",
"print(\"xgb_r2_score created\")\n",
"\n",
"def preprocessing():\n",
" # Read input data\n",
" train = spark.read.format(\"csv\").option(\"header\", \"true\").load(DATA_LOCATION).toPandas()\n",
" train[\"y\"] = pd.to_numeric(train[\"y\"])\n",
" categorical = [\"X0\", \"X1\", \"X2\", \"X3\", \"X4\", \"X5\", \"X6\", \"X8\"]\n",
"\n",
" # Convert categorical data\n",
" for c in categorical:\n",
" group_by = train.groupby(by=c)[\"y\"].mean().reset_index().rename(columns={\"y\": \"{}_converted\".format(c)})\n",
" train = pd.merge(train, group_by, how='inner', on=c)\n",
"\n",
" train = train.drop(categorical, axis=1).apply(pd.to_numeric)\n",
"\n",
" X = train.drop(\"ID\", axis=1).drop(\"y\", axis=1)\n",
" y = train[\"y\"]\n",
" return X,y\n",
"print(\"preprocessing created\")\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(\"define the evaluate function (to be distributed)\")\n",
"def evaluate(X, y, kf, boosting_rounds):\n",
" def evaluate_inner(X, y, kf, boosting_rounds):\n",
" score = []\n",
" params = generate_random_configuration()\n",
" params[\"objective\"] = \"reg:squarederror\"\n",
" print(params)\n",
" for train_index, test_index in kf.split(X.values):\n",
" X_train, X_test = X.iloc[train_index], X.iloc[test_index]\n",
" y_train, y_test = y.iloc[train_index], y.iloc[test_index]\n",
" d_train = xgb.DMatrix(X_train, label=y_train)\n",
" d_valid = xgb.DMatrix(X_test, label=y_test)\n",
" d_test = xgb.DMatrix(X_test)\n",
" watchlist = [(d_train, 'train'), (d_valid, 'valid')]\n",
" clf = xgb.train(params, d_train, boosting_rounds, watchlist, early_stopping_rounds=50,\n",
" feval=xgb_r2_score, maximize=True, verbose_eval=25)\n",
" y_hat = clf.predict(d_test)\n",
" score.append(r2_score(y_test, y_hat))\n",
"\n",
" output_dict = {\n",
" 'min_score': np.min(score),\n",
" 'max_score': np.max(score),\n",
" 'average_score': np.mean(score),\n",
" 'median_score': np.median(score),\n",
" 'params': params\n",
" }\n",
" \n",
" client = pymongo.MongoClient(MONGO_IP, 27017)\n",
" db = client[MONGO_DATABASE]\n",
" collection = db[MONGO_COLLECTION]\n",
" collection.insert_one(output_dict)\n",
" client.close()\n",
" return score\n",
" Parallel()(delayed(evaluate_inner)(X, y, kf, boosting_rounds) for i in range(0,100000))\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(\"define the grid search\")\n",
"def random_search_spark(timeout_seconds, cv_splits, boosting_rounds):\n",
" # Read input data\n",
" X, y = preprocessing()\n",
" register_spark() # register spark backend\n",
"\n",
" kf = KFold(n_splits=cv_splits)\n",
" \n",
" client = pymongo.MongoClient(MONGO_IP, 27017)\n",
" db = client[MONGO_DATABASE]\n",
" collection = db[MONGO_COLLECTION]\n",
" collection.drop()\n",
"\n",
" with timeout(timeout_seconds, exception=RuntimeError):\n",
" try:\n",
" Parallel(backend=\"spark\", n_jobs=NODES)(delayed(evaluate)(X, y, kf, boosting_rounds) for p in range(0, NODES))\n",
" except RuntimeError:\n",
" print(\"Runtime\")\n",
" db = client[MONGO_DATABASE]\n",
" collection = db[MONGO_COLLECTION]\n",
"\n",
" cursor = collection.find({})\n",
" for document in cursor:\n",
" pprint(document)\n",
" client.close()\n",
" sc.stop()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"random_search_spark(3600, 4, 500, spark)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "PySpark",
"language": "",
"name": "pysparkkernel"
},
"language_info": {
"codemirror_mode": {
"name": "python",
"version": 2
},
"mimetype": "text/x-python",
"name": "pyspark",
"pygments_lexer": "python2"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
@naadvar
Copy link

naadvar commented Feb 23, 2022

@aialenti : How can I go about printing the intermediate outputs from this? I am running this on databricks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment