Last active
January 2, 2024 15:35
-
-
Save piojanu/2ee6b91ba385dee17b85ffebf63ca583 to your computer and use it in GitHub Desktop.
A basic wrapper around the BigQuery Python client for performing group-by and aggregation operations similar to those in Pandas.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from google.cloud import bigquery | |
from pandas import DataFrame | |
class BigQueryDataFrame: | |
""" | |
This class is a wrapper around the BigQuery Python client for performing groupby and aggregation operations | |
similar to those in pandas, but on BigQuery data. | |
Basic usage: | |
```python | |
bigquery_df = BigQueryDataFrame("your-project.your-dataset.your-table") | |
bigquery_df = bigquery_df.filter("column5 = 'fashion'") | |
bigquery_df = bigquery_df.groupby("column1", "column2") | |
result_df = bigquery_df.agg(count_alias="count", column3="SUM", column4="AVG").to_dataframe() | |
``` | |
This will execute a query similar to: | |
```sql | |
SELECT | |
column1, column2, | |
COUNT(*) as count, SUM(column3) as sum_column3, AVG(column4) as avg_column4 | |
FROM `your-project.your-dataset.your-table` | |
WHERE column5 = 'fashion' | |
GROUP BY column1, column2 | |
``` | |
and return the results as a pandas DataFrame. | |
The class also supports nesting of queries. For example: | |
```python | |
bigquery_df = bigquery_df.groupby("item_id").agg(count_alias="item_freq") | |
bigquery_df = bigquery_df.groupby("item_freq").agg(count_alias="item_count") | |
result_df = bigquery_df.to_dataframe() | |
``` | |
In this example, the class builds nested queries to first count occurrences of each item, and then count the resulting counts. | |
""" | |
def __init__(self, table_id: str): | |
""" | |
Initializes the BigQuery client and sets the table ID. | |
""" | |
self.client = bigquery.Client() | |
self.table_id = table_id | |
self.filters = None | |
self.groupby_columns = None | |
self.agg_columns = None | |
self.query = None | |
def filter(self, *args: str) -> 'BigQueryDataFrame': | |
""" | |
Sets the filters to apply. Does not execute the query. | |
""" | |
self.filters = " AND ".join(args) | |
return self | |
def groupby(self, *args: str) -> 'BigQueryDataFrame': | |
""" | |
Sets the columns to group by. Does not execute the query. | |
""" | |
self.groupby_columns = ", ".join(args) | |
return self | |
def agg(self, count_alias=None, **kwargs: str) -> 'BigQueryDataFrame': | |
""" | |
Sets the aggregations to perform. Does not execute the query. | |
""" | |
if count_alias: | |
self.agg_columns = f"COUNT(*) as {count_alias}" | |
if kwargs: | |
self.agg_columns += ", " | |
self.agg_columns += ", ".join( | |
f"{value}({key}) as {value.lower()}_{key}" | |
for key, value in kwargs.items() | |
) if kwargs else "" | |
self._build_query() | |
return self | |
def to_dataframe(self) -> DataFrame: | |
""" | |
Executes the query, returns the result as a pandas DataFrame, and resets the query. | |
""" | |
df = self._execute_query() | |
self.query = None # Reset the query | |
return df | |
def _build_query(self) -> None: | |
""" | |
Builds the SQL query. | |
""" | |
# Step 1: Begin the query with "SELECT" | |
query = f"SELECT\n\t{self.groupby_columns},\n\t{self.agg_columns}\n" | |
# Step 2: Add the "FROM" clause | |
# If a query already exists, nest it as a subquery | |
if self.query: | |
indented_query = '\n\t'.join(self.query.split('\n')) # Indent each line of the existing query | |
query += f"FROM (\n\t{indented_query}\n)" | |
else: | |
query += f"FROM `{self.table_id}`" | |
# Step 3: Add the "WHERE" clause if filters have been set | |
if self.filters: | |
query += f"\nWHERE {self.filters}" | |
self.filters = None # Reset the filters | |
# Step 4: Add the "GROUP BY" clause | |
query += f"\nGROUP BY {self.groupby_columns}" | |
self.query = query | |
def _execute_query(self) -> DataFrame: | |
""" | |
Executes the query and returns the result as a pandas DataFrame. | |
""" | |
return self.client.query(self.query).to_dataframe() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def plot_data(df, x_col, y_col, alpha=0.1, log_scale=True, log_log_scale=False): | |
# Sort the data by x_col so the line plot makes sense | |
df_sorted = df.sort_values(x_col) | |
# Calculate exponential moving average | |
if alpha is not None: | |
df_sorted[f'{y_col}_smooth'] = df_sorted[y_col].ewm(alpha=alpha, adjust=True).mean() | |
plt.plot(df_sorted[x_col], df_sorted[y_col], label='Original') | |
# Plot smoothed line only when window_size is not None | |
if alpha is not None: | |
plt.plot(df_sorted[x_col], df_sorted[f'{y_col}_smooth'], label='Smoothed', color='red') | |
plt.xlabel(x_col) | |
plt.ylabel(y_col) | |
if log_scale or log_log_scale: | |
plt.yscale('log') # This line changes the scale of the y-axis to logarithmic | |
if log_log_scale: | |
plt.xscale('log') # This line changes the scale of the x-axis to logarithmic | |
plt.title(f'{y_col} versus {x_col}') | |
plt.grid(True) # Add gridlines for better visualization | |
plt.legend() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment