Skip to content

Instantly share code, notes, and snippets.

View sllynn's full-sized avatar

Stuart Lynn sllynn

View GitHub Profile
@sllynn
sllynn / pyspark-melt.py
Created September 17, 2019 13:52
melt a pyspark dataframe
from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql import DataFrame
from typing import Iterable
def melt(
df: DataFrame,
id_vars: Iterable[str], value_vars: Iterable[str],
var_name: str="variable", value_name: str="value") -> DataFrame:
"""Convert :class:`DataFrame` from wide to long format."""
@sllynn
sllynn / pandas-dateadd
Created September 17, 2019 13:58
add days to dates in pandas dataframe / series
import pandas as pd
def create_date_window(in_date, window_size):
date_lower = pd.to_datetime(in_date) - pd.DateOffset(days=window_size)
date_upper = pd.to_datetime(in_date) + pd.DateOffset(days=window_size)
return date_lower, date_upper
if __name__ == "__main__":
print(create_date_window("2019-09-17", 28))
@sllynn
sllynn / snowflake-sparklyr.R
Created September 17, 2019 14:05
connect to snowflake using sparklyr
library(sparklyr)
SparkR::sparkR.session()
sc <- spark_connect(method="databricks")
snow.df.sparklyr <- spark_read_source(
sc=sc,
name = "adult",
source = "snowflake",
options = list(
@sllynn
sllynn / sparklyr-display.R
Created September 17, 2019 14:05
equivalent to `display()` for sparklyr dataframes
library(dplyr)
sdisplay <- function(x) {
x %>% sample_n(1000) %>% collect() %>% display
}
@sllynn
sllynn / parallel-notebooks.py
Created September 19, 2019 08:29
Run multiple notebooks in parallel as ephemeral jobs using python threads
from threading import Thread
def producer_method():
dbutils.notebook.run(
path="./kinesis-producer",
timeout_seconds=600,
arguments={
"kinesisRegion": KINESIS_REGION,
"inputStream": INPUT_STREAM,
"newsgroupDataLocation": NEWSGROUP_DATA_PATH
@sllynn
sllynn / custom-kinesis-writer.py
Last active September 20, 2019 07:42
kinesis writer (includes some other logic relevant to multiclass classification of documents)
import boto3
import json
import numpy as np
import pandas as pd
from math import ceil
class KinesisWriter:
def __init__(self, region, stream, classes):
self.kinesis_client = None
@sllynn
sllynn / mlflow-pyfunc-wrapper.py
Created September 20, 2019 07:36
Custom mlflow pyfunc wrapper for Keras models
import mlflow.pyfunc
import mlflow.keras
class KerasWrapper(mlflow.pyfunc.PythonModel):
def __init__(self, keras_model_name):
self.keras_model_name = keras_model_name
def load_context(self, context):
@sllynn
sllynn / pandas_multi.py
Created October 2, 2019 15:37
Example of selecting groups of columns in a pandas dataframe using numpy slicing tools
data.iloc[:, np.r_[5:data.columns.size,1]]
@sllynn
sllynn / from_xltime.py
Created June 15, 2020 15:18
Pandas UDF for converting Excel dates to Spark timestamps
@pandas_udf("timestamp", PandasUDFType.SCALAR)
def from_xltime(x):
import pandas as pd
import datetime as dt
return (pd.TimedeltaIndex(x, unit='d') + dt.datetime(1899,12,30)).to_series()
@sllynn
sllynn / vectorToArray.scala
Created June 15, 2020 15:48
Convert VectorUDT to ArrayType (Scala UDF)
import org.apache.spark.ml.linalg.Vector
val toArray = udf { v: Vector => v.toArray }
spark.sqlContext.udf.register("toArray", toArray)