-
-
Save datavudeja/5c0369c1ee488aa1e21c48b528569392 to your computer and use it in GitHub Desktop.
Pandas util functions
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import pandas as pd | |
| from IPython.display import display | |
| def compare_dfs_cols_and_types(df_x, df_y, df_x_name="x", df_y_name="y"): | |
| """Function to compare two DataFrames, checking column | |
| names and types, prints the differences (if they exists) | |
| and return a DataFrame with the NaNs sinalizing the mismatches. | |
| :param df_x: First Dataframe | |
| :type df_x: pd.DataFrame | |
| :param df_y: Second DataFrame | |
| :type df_y: pd.DataFrame | |
| :param df_x_name: Name of the first table | |
| to display on result, defaults to "x" | |
| :type df_x_name: str, optional | |
| :param df_y_name: Name of the second table | |
| to display on result, defaults to "y" | |
| :type df_y_name: str, optional | |
| :return: Columns and types of the DataFrames merged. | |
| :rtype: pd.DataFrame | |
| """ | |
| def generate_dtypes_df(df, col_names): | |
| """Generate a dtypes DataFrame. | |
| :param df: DataFrame to extract the dtypes | |
| :type df: pd.DataFrame | |
| :param col_names: Two column names to replace in DataFrame. | |
| :type col_names: list | |
| :return: dtypes DataFrame | |
| :rtype: pd.DataFrame | |
| """ | |
| dtypes = df.dtypes.apply(str) | |
| dtypes = dtypes.reset_index() | |
| dtypes.columns = col_names | |
| dtypes = dtypes.sort_values(by=col_names[0]).reset_index(drop=True) | |
| return dtypes | |
| names_x = [f"{df_x_name}_cols", f"{df_x_name}_types"] | |
| names_y = [f"{df_y_name}_cols", f"{df_y_name}_types"] | |
| x = generate_dtypes_df(df_x, names_x) | |
| y = generate_dtypes_df(df_y, names_y) | |
| # merge by column names | |
| compare = x.merge( | |
| right=y, | |
| left_on=names_x[0], | |
| right_on=names_y[0], | |
| how="outer" | |
| ) | |
| # reorder columns | |
| compare = compare[[names_x[0], names_y[0], names_x[1], names_y[1]]] | |
| compare["is_equal"] = ( | |
| # x_cols == y_cols | |
| (compare[names_x[0]] == compare[names_y[0]]) | |
| & | |
| # x_types == y_types | |
| (compare[names_x[1]] == compare[names_y[1]]) | |
| ) | |
| if compare["is_equal"].all(): | |
| print("The DataFrames are identical.") | |
| elif not compare["is_equal"].any(): | |
| print("The Dataframes are completely different.") | |
| else: | |
| print("The DataFrames are different because of the columns:") | |
| display(compare[compare["is_equal"] == False]) | |
| return compare | |
| def filter_columns(df, col_list): | |
| try: | |
| df = df[col_list] | |
| except KeyError as e: | |
| e = str(e) | |
| e = e[e.find("[") + 1 : e.find("]")].replace("'", "") | |
| col_list.remove(e) | |
| df = filter_columns(df, col_list) | |
| return df | |
| from pyspark.sql.types import * | |
| def pandas_to_spark(pandas_df): | |
| """Create a spark dataframe from pandas dataframe | |
| Args: | |
| pandas_df (pd.DataFrame): pandas dataframe to use | |
| Returns: | |
| df: Returns a spark dataframe. | |
| """ | |
| # Monkey patch to avoid the issue with iteritems | |
| # AttributeError: 'DataFrame' object has no attribute 'iteritems' | |
| pd.DataFrame.iteritems = pd.DataFrame.items | |
| def equivalent_type(_type): | |
| _type = str(_type).lower() | |
| if "int" in _type: | |
| if int(re.sub("[^\d]+", "", _type)) > 32: | |
| return LongType() | |
| else: | |
| return IntegerType() | |
| elif "float" in _type: | |
| if int(re.sub("[^\d]+", "", _type)) > 32: | |
| return DoubleType() | |
| else: | |
| return FloatType() | |
| elif "datetime" in _type: | |
| return TimestampType() | |
| elif "bool" in _type: | |
| return BooleanType() | |
| elif "byte" in _type: | |
| return ByteType() | |
| else: | |
| return StringType() | |
| def define_structure(column, format_type): | |
| try: | |
| _type = equivalent_type(format_type) | |
| except Exception as e: | |
| print(e) | |
| _type = StringType() | |
| return StructField(column, _type) | |
| columns = list(pandas_df.columns) | |
| types = list(pandas_df.dtypes) | |
| struct_list = [ | |
| define_structure(column, _type) for column, _type in zip(columns, types) | |
| ] | |
| p_schema = StructType(struct_list) | |
| return spark.createDataFrame(pandas_df, p_schema) | |
| path = "./" | |
| all_files = glob.glob(os.path.join(path, "*23.csv")) | |
| df = pd.concat((read_files(f) for f in all_files), ignore_index=True) | |
| df = df.sort_values(by=["trunc_day"]) | |
| def remove_outliers(df, columns, n_std=3): | |
| for col in columns: | |
| print(f"Working on column: {col}") | |
| mean = df[col].mean() | |
| sd = df[col].std() | |
| df = df[(df[col] <= mean + (n_std * sd))] | |
| return df | |
| def get_period(start, end, freq="MS"): | |
| """Get a list of dates between start and end parameters, | |
| with first and last day of months. Date format: "%Y-%m-%d", | |
| time is optional Datetime format: "%Y-%m-%d %H:%M:%S" | |
| final_period list structure = [ | |
| first day of first month 00:00:00, | |
| last day of first month 23:59:59, | |
| first day of second month 00:00:00, | |
| last day of second month 23:59:59, | |
| ] | |
| If the period is less than one month, returns the start | |
| and end dates. | |
| Args: | |
| start (str): start date | |
| end (str): end date | |
| freq (str): frequence of dates. | |
| Defaults to "MS" | |
| https://pandas.pydata.org/docs/user_guide/timeseries.html#timeseries-offset-aliases | |
| Returns: | |
| list: list of datetime | |
| Ex: get_period("2021-01-01", "2022-03-31") | |
| return [ | |
| "2021-01-01 00:00:00", | |
| "2021-01-31 23:59:59", | |
| "2021-02-01 00:00:00", | |
| "2021-02-28 23:59:59", | |
| "2021-03-01 00:00:00", | |
| "2022-03-31 23:59:59" | |
| ] | |
| Ex: get_period("2022-12-01", "2022-12-07") | |
| return [ | |
| "2021-12-01 00:00:00", | |
| "2021-12-07 23:59:59", | |
| ] | |
| """ | |
| def split_datetime_str(datetime_str): | |
| split_str = datetime_str.split(" ") | |
| return ( | |
| (split_str[0], split_str[1]) if len(split_str) > 1 else (split_str[0], "") | |
| ) | |
| def parse_datetime(datestring, format="%Y-%m-%d %H:%M:%S"): | |
| if datestring: | |
| return datetime.strptime(datestring, format) | |
| else: | |
| return None | |
| if start > end: | |
| raise Exception(f"{start=} date is greater than {end=}") | |
| start_date, start_time = split_datetime_str(start) | |
| end_date, end_time = split_datetime_str(end) | |
| period = list(pd.date_range(start_date, end_date, freq=freq)) | |
| final_period = [] | |
| if len(period) > 1: | |
| # day of start | |
| if start_date[8:10] != "01": | |
| period[0] = parse_datetime(start, format="%Y-%m-%d") | |
| for i in range(len(period)): | |
| try: | |
| final_period.append(period[i]) | |
| final_period.append(period[i + 1] - relativedelta(seconds=1)) | |
| # end of period list | |
| except IndexError: | |
| final_period.append(parse_datetime(f"{end_date} 23:59:59")) | |
| # period is smaller than the frequency | |
| else: | |
| final_period = [ | |
| parse_datetime(f"{start_date} 00:00:00"), | |
| parse_datetime(f"{end_date} 23:59:59"), | |
| ] | |
| # time was passed on start param | |
| if start_time: | |
| final_period[0] = parse_datetime(f"{start_date} {start_time}") | |
| # time was passed on end param | |
| if end_time: | |
| final_period[-1] = parse_datetime(f"{end_date} {end_time}") | |
| return final_period |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment