Skip to content

Instantly share code, notes, and snippets.

@iTrauco
Created August 14, 2024 07:18
Show Gist options
  • Save iTrauco/b5eddf4656b42c47beafda1864eb86db to your computer and use it in GitHub Desktop.
Save iTrauco/b5eddf4656b42c47beafda1864eb86db to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import pandas as pd
import random
import os
from google.cloud import storage, bigquery
def authenticate_user():
print("Authenticating with Google Cloud...")
os.system("gcloud auth login")
os.system("gcloud auth application-default login")
def set_default_project():
project_id = input("Enter your GCP Project ID: ")
os.system(f"gcloud config set project {project_id}")
return project_id
def create_bucket(bucket_name, project_id):
storage_client = storage.Client(project=project_id)
bucket = storage_client.bucket(bucket_name)
if not bucket.exists():
bucket = storage_client.create_bucket(bucket_name)
print(f"Bucket {bucket_name} created.")
else:
print(f"Bucket {bucket_name} already exists.")
return bucket_name
def upload_to_bucket(file_path, bucket_name):
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(os.path.basename(file_path))
blob.upload_from_filename(file_path)
print(f"File {file_path} uploaded to {bucket_name}.")
def setup_bigquery_dataset(project_id, dataset_id):
client = bigquery.Client(project=project_id)
dataset_ref = client.dataset(dataset_id)
try:
client.get_dataset(dataset_ref) # Check if dataset exists
print(f"Dataset {dataset_id} already exists.")
except:
dataset = bigquery.Dataset(dataset_ref)
dataset.location = "US"
client.create_dataset(dataset)
print(f"Dataset {dataset_id} created.")
return dataset_id
def dirty_data(df, choices, dirtiness_level):
# Adjust the level of dirtiness based on the user's input (1-10 scale)
iterations = dirtiness_level * 2 # More iterations for higher dirtiness
for choice in choices:
if choice == '1':
# Introduce missing values
for _ in range(random.randint(1, iterations)):
df.iloc[random.randint(0, len(df)-1), random.randint(0, len(df.columns)-1)] = None
if choice == '2':
# Add duplicates
for _ in range(random.randint(1, iterations)):
df = pd.concat([df, df.sample(random.randint(1, 5))])
if choice == '3':
# Insert incorrect data
for _ in range(random.randint(1, iterations)):
df.iloc[random.randint(0, len(df)-1), random.randint(0, len(df.columns)-1)] = random.choice(['???', -999, 'N/A', 'forty-two'])
if choice == '4':
# Mix up data types
for _ in range(random.randint(1, iterations)):
col = random.choice(df.columns)
df[col] = df[col].apply(lambda x: str(x) if random.random() > 0.5 else x)
if choice == '5':
# Introduce random NaNs
for _ in range(random.randint(1, iterations)):
df.iloc[random.randint(0, len(df)-1), random.randint(0, len(df.columns)-1)] = None
if choice == '6':
# Random column swaps
for _ in range(random.randint(1, iterations)):
col1, col2 = random.sample(list(df.columns), 2)
df[col1], df[col2] = df[col2], df[col1]
if choice == '7':
# Date/Time corruptions
for _ in range(random.randint(1, iterations)):
for col in df.select_dtypes(include=[object, 'datetime']).columns:
df[col] = df[col].apply(lambda x: str(x).replace("-", "/") if random.random() > 0.5 else x)
if choice == '8':
# Case inversions
for _ in range(random.randint(1, iterations)):
for col in df.select_dtypes(include=[object]).columns:
df[col] = df[col].apply(lambda x: str(x).swapcase() if isinstance(x, str) and random.random() > 0.5 else x)
if choice == '9':
# Add extra spaces
for _ in range(random.randint(1, iterations)):
for col in df.select_dtypes(include=[object]).columns:
df[col] = df[col].apply(lambda x: f" {x} " if isinstance(x, str) and random.random() > 0.5 else x)
if choice == '10':
# Scramble data within columns
for _ in range(random.randint(1, iterations)):
for col in df.columns:
df[col] = df[col].sample(frac=1).reset_index(drop=True)
if choice == '11':
# Inject special characters
special_chars = ['@', '#', '$', '%', '&']
for _ in range(random.randint(1, iterations)):
for col in df.select_dtypes(include=[object]).columns:
df[col] = df[col].apply(lambda x: ''.join([c + random.choice(special_chars) if random.random() > 0.5 else c for c in str(x)]) if isinstance(x, str) else x)
if choice == '12':
# Logical inconsistencies
for _ in range(random.randint(1, iterations)):
for col in df.select_dtypes(include=[int, float]).columns:
df[col] = df[col].apply(lambda x: -x if random.random() > 0.5 else x)
if choice == '13':
# Language switching
languages = ['fr', 'de', 'es']
for _ in range(random.randint(1, iterations)):
for col in df.select_dtypes(include=[object]).columns:
df[col] = df[col].apply(lambda x: f"{x} ({random.choice(languages)})" if isinstance(x, str) and random.random() > 0.5 else x)
if choice == '14':
# Duplicate IDs
for _ in range(random.randint(1, iterations)):
if 'id' in df.columns:
df = pd.concat([df, df[df['id'].duplicated()]])
return df
def load_dataset(choice):
urls = {
'1': "https://gist.githubusercontent.com/armgilles/194bcff35001e7eb53a2a8b441e8b2c6/raw/92200bc0a673d5ce2110aaad4544ed6c4010f687/pokemon.csv",
'2': "https://raw.githubusercontent.com/TheMLGuy/Game-of-Thrones-Dataset/master/character-deaths.csv",
'3': "https://raw.githubusercontent.com/tidyverse/dplyr/main/data-raw/starwars.csv",
'4': "https://raw.githubusercontent.com/fivethirtyeight/data/master/comic-characters/marvel-wikia-data.csv"
}
df = pd.read_csv(urls[choice])
return df
def bigquery_datasets():
print("\nChoose a BigQuery public dataset:")
print("1. New York City Taxi Trips")
print("2. Chicago Crime Data")
print("3. San Francisco 311 Service Requests")
print("4. Wikipedia Clickstream Data")
print("5. Global Surface Summary of the Day Weather Data")
choice = input("Enter the number of your choice: ")
datasets = {
'1': "bigquery-public-data.new_york_taxi_trips.tlc_yellow_trips_2018",
'2': "bigquery-public-data.chicago_crime.crime",
'3': "bigquery-public-data.san_francisco_311.311_service_requests",
'4': "bigquery-public-data.wikipedia.pageviews_2018",
'5': "bigquery-public-data.noaa_gsod.gsod2020"
}
return datasets.get(choice, None)
def main():
print("Choose an option:")
print("1. Authenticate with Google Cloud")
print("2. Set Default Project and Application Credentials")
print("3. Create a Cloud Storage Bucket")
print("4. Choose a Dataset and Dirty It")
print("5. Upload Dirty Dataset to Cloud Storage")
print("6. Set Up BigQuery Dataset")
choice = input("Enter the number of your choice: ")
if choice == '1':
authenticate_user()
elif choice == '2':
project_id = set_default_project()
print(f"Default project set to {project_id}")
elif choice == '3':
project_id = input("Enter your GCP Project ID: ")
bucket_name = input("Enter a name for your new Cloud Storage bucket: ")
create_bucket(bucket_name, project_id)
elif choice == '4':
print("Choose a dataset to dirty:")
print("1. Pokémon Data")
print("2. Game of Thrones Character Deaths")
print("3. Star Wars Characters")
print("4. Marvel Characters")
print("5. BigQuery Public Datasets")
dataset_choice = input("Enter the number of your choice: ")
if dataset_choice in ['1', '2', '3', '4']:
df = load_dataset(dataset_choice)
print("\nChoose the types of dirtiness to apply (e.g., 1,3,5 for multiple types):")
print("1. Introduce missing values")
print("2. Add duplicates")
print("3. Insert incorrect data")
print("4. Mix up data types")
print("5. Introduce random NaNs")
print("6. Random column swaps")
print("7. Date/Time corruptions")
print("8. Case inversions")
print("9. Add extra spaces")
print("10. Scramble data within columns")
print("11. Inject special characters")
print("12. Logical inconsistencies")
print("13. Language switching")
print("14. Duplicate IDs")
dirtiness_choices = input("Enter your choices (comma-separated): ").split(',')
dirtiness_level = int(input("Enter the level of dirtiness (1-10): "))
df = dirty_data(df, dirtiness_choices, dirtiness_level)
output_file = f"dirty_dataset_{dataset_choice}.csv"
df.to_csv(output_file, index=False)
print(f"\nDirty dataset saved as {output_file}")
elif dataset_choice == '5':
dataset_id = bigquery_datasets()
if dataset_id:
print(f"\nYou chose the BigQuery dataset: {dataset_id}")
# Load and dirty the BigQuery dataset here
else:
print("Invalid choice for BigQuery dataset.")
else:
print("Invalid choice. Please try again.")
elif choice == '5':
file_path = input("Enter the path to the file you want to upload: ")
bucket_name = input("Enter your Cloud Storage bucket name: ")
upload_to_bucket(file_path, bucket_name)
elif choice == '6':
project_id = input("Enter your GCP Project ID: ")
dataset_id = input("Enter a name for your BigQuery dataset: ")
setup_bigquery_dataset(project_id, dataset_id)
else:
print("Invalid choice. Please try again.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment