Skip to content

Instantly share code, notes, and snippets.

View skrawcz's full-sized avatar

Stefan Krawczyk skrawcz

View GitHub Profile
@skrawcz
skrawcz / async_funcs.py
Created June 13, 2023 18:42
Shows how to do async based functions in hamilton -- and then a suggestion for another way to parallelize
from hamilton.function_modifiers import extract_columns
import pandas as pd
async def _run_query(col_name: str, query_string: str) -> pd.DataFrame:
# this would go to the database -- ideally the client is passed in as a parameter
# the assumption here is that the database driver is asyncio based, else there's no
# value in doing this :)
return pd.DataFrame({col_name: [query_string]})
# async Hamilton func for query #1
@skrawcz
skrawcz / embed_this_code.py
Created February 7, 2023 21:50
Hamilton streaming pseudo code example
from hamilton import driver
import transforms
# load the client
kafka_client = KafkaClient() # or whatever
config = {...}
dr = driver.Driver(config, transforms, adapter=...)
@skrawcz
skrawcz / indicators.py
Last active January 19, 2023 18:22
Constant passing in Hamilton
import pandas as pd
def total_distance_travelled_by_train( ... ) -> pd.Series:
return # placeholder -- fill with actual logic, etc.
def distance_travelled_till_intermediate_station( ... ) -> pd.Series:
return # placeholder -- fill with actual logic, etc.
def journey_distance(total_distance_travelled_by_train: pd.Series, distance_travelled_till_intermediate_station: pd.Series) -> pd.Series:
@skrawcz
skrawcz / my_builder.py
Last active October 26, 2022 22:44
Custom Result Builder Example
from typing import Tuple
from hamilton import base
class PandasDFWithDebugResultBuilder(base.ResultMixin):
"""This class is an example to show how you can extend the result building functionality of Hamilton.
This result builder returns a dataframe, and dictionary of outputs that we don't want to make into a dataframe,
but are useful for debugging (for example). Caveat: this wont work for ray, dask, or spark usage without some tweaks.
Example Usage::
@skrawcz
skrawcz / my_functions.py
Last active August 6, 2022 22:22
Code to get Hamilton to run asynchronously -- no parallelization
from typing import Any
import pandas as pd
"""
Notes:
1. This file is used for all the [ray|dask|spark]/hello_world examples.
2. It therefore show cases how you can write something once and not only scale it, but port it
to different frameworks with ease!
"""
@skrawcz
skrawcz / example_function.py
Created July 24, 2022 01:04
Function for Tidy Pandas in Production post
@tag(owner='Data-Science', pii='False')
@check_output(data_type=np.float64, range=(-5.0, 5.0), allow_nans=False)
def height_zero_mean_unit_variance(height_zero_mean: pd.Series,
height_std_dev: pd.Series) -> pd.Series:
"""Zero mean unit variance value of height"""
return height_zero_mean / height_std_dev
@skrawcz
skrawcz / my_functions.py
Last active November 13, 2021 06:45
Hamilton with a single value node
# --- my_functions.py
import pandas as pd
def avg_3wk_spend(spend: pd.Series) -> pd.Series:
"""Rolling 3 week average spend."""
return spend.rolling(3).mean()
def spend_per_signup(spend: pd.Series, signups: pd.Series) -> pd.Series:
@skrawcz
skrawcz / PyBayFormatter.py
Last active October 8, 2021 22:46
JSON Structured Logger
import json
import logging
import sys
logger = logging.getLogger(__name__)
class PyBayFormatter(logging.Formatter):
"""Implementation of JSON structured logging that works for most handlers."""