Created
August 14, 2024 07:18
-
-
Save iTrauco/b5eddf4656b42c47beafda1864eb86db to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
import pandas as pd | |
import random | |
import os | |
from google.cloud import storage, bigquery | |
def authenticate_user(): | |
print("Authenticating with Google Cloud...") | |
os.system("gcloud auth login") | |
os.system("gcloud auth application-default login") | |
def set_default_project(): | |
project_id = input("Enter your GCP Project ID: ") | |
os.system(f"gcloud config set project {project_id}") | |
return project_id | |
def create_bucket(bucket_name, project_id): | |
storage_client = storage.Client(project=project_id) | |
bucket = storage_client.bucket(bucket_name) | |
if not bucket.exists(): | |
bucket = storage_client.create_bucket(bucket_name) | |
print(f"Bucket {bucket_name} created.") | |
else: | |
print(f"Bucket {bucket_name} already exists.") | |
return bucket_name | |
def upload_to_bucket(file_path, bucket_name): | |
storage_client = storage.Client() | |
bucket = storage_client.bucket(bucket_name) | |
blob = bucket.blob(os.path.basename(file_path)) | |
blob.upload_from_filename(file_path) | |
print(f"File {file_path} uploaded to {bucket_name}.") | |
def setup_bigquery_dataset(project_id, dataset_id): | |
client = bigquery.Client(project=project_id) | |
dataset_ref = client.dataset(dataset_id) | |
try: | |
client.get_dataset(dataset_ref) # Check if dataset exists | |
print(f"Dataset {dataset_id} already exists.") | |
except: | |
dataset = bigquery.Dataset(dataset_ref) | |
dataset.location = "US" | |
client.create_dataset(dataset) | |
print(f"Dataset {dataset_id} created.") | |
return dataset_id | |
def dirty_data(df, choices, dirtiness_level): | |
# Adjust the level of dirtiness based on the user's input (1-10 scale) | |
iterations = dirtiness_level * 2 # More iterations for higher dirtiness | |
for choice in choices: | |
if choice == '1': | |
# Introduce missing values | |
for _ in range(random.randint(1, iterations)): | |
df.iloc[random.randint(0, len(df)-1), random.randint(0, len(df.columns)-1)] = None | |
if choice == '2': | |
# Add duplicates | |
for _ in range(random.randint(1, iterations)): | |
df = pd.concat([df, df.sample(random.randint(1, 5))]) | |
if choice == '3': | |
# Insert incorrect data | |
for _ in range(random.randint(1, iterations)): | |
df.iloc[random.randint(0, len(df)-1), random.randint(0, len(df.columns)-1)] = random.choice(['???', -999, 'N/A', 'forty-two']) | |
if choice == '4': | |
# Mix up data types | |
for _ in range(random.randint(1, iterations)): | |
col = random.choice(df.columns) | |
df[col] = df[col].apply(lambda x: str(x) if random.random() > 0.5 else x) | |
if choice == '5': | |
# Introduce random NaNs | |
for _ in range(random.randint(1, iterations)): | |
df.iloc[random.randint(0, len(df)-1), random.randint(0, len(df.columns)-1)] = None | |
if choice == '6': | |
# Random column swaps | |
for _ in range(random.randint(1, iterations)): | |
col1, col2 = random.sample(list(df.columns), 2) | |
df[col1], df[col2] = df[col2], df[col1] | |
if choice == '7': | |
# Date/Time corruptions | |
for _ in range(random.randint(1, iterations)): | |
for col in df.select_dtypes(include=[object, 'datetime']).columns: | |
df[col] = df[col].apply(lambda x: str(x).replace("-", "/") if random.random() > 0.5 else x) | |
if choice == '8': | |
# Case inversions | |
for _ in range(random.randint(1, iterations)): | |
for col in df.select_dtypes(include=[object]).columns: | |
df[col] = df[col].apply(lambda x: str(x).swapcase() if isinstance(x, str) and random.random() > 0.5 else x) | |
if choice == '9': | |
# Add extra spaces | |
for _ in range(random.randint(1, iterations)): | |
for col in df.select_dtypes(include=[object]).columns: | |
df[col] = df[col].apply(lambda x: f" {x} " if isinstance(x, str) and random.random() > 0.5 else x) | |
if choice == '10': | |
# Scramble data within columns | |
for _ in range(random.randint(1, iterations)): | |
for col in df.columns: | |
df[col] = df[col].sample(frac=1).reset_index(drop=True) | |
if choice == '11': | |
# Inject special characters | |
special_chars = ['@', '#', '$', '%', '&'] | |
for _ in range(random.randint(1, iterations)): | |
for col in df.select_dtypes(include=[object]).columns: | |
df[col] = df[col].apply(lambda x: ''.join([c + random.choice(special_chars) if random.random() > 0.5 else c for c in str(x)]) if isinstance(x, str) else x) | |
if choice == '12': | |
# Logical inconsistencies | |
for _ in range(random.randint(1, iterations)): | |
for col in df.select_dtypes(include=[int, float]).columns: | |
df[col] = df[col].apply(lambda x: -x if random.random() > 0.5 else x) | |
if choice == '13': | |
# Language switching | |
languages = ['fr', 'de', 'es'] | |
for _ in range(random.randint(1, iterations)): | |
for col in df.select_dtypes(include=[object]).columns: | |
df[col] = df[col].apply(lambda x: f"{x} ({random.choice(languages)})" if isinstance(x, str) and random.random() > 0.5 else x) | |
if choice == '14': | |
# Duplicate IDs | |
for _ in range(random.randint(1, iterations)): | |
if 'id' in df.columns: | |
df = pd.concat([df, df[df['id'].duplicated()]]) | |
return df | |
def load_dataset(choice): | |
urls = { | |
'1': "https://gist.githubusercontent.com/armgilles/194bcff35001e7eb53a2a8b441e8b2c6/raw/92200bc0a673d5ce2110aaad4544ed6c4010f687/pokemon.csv", | |
'2': "https://raw.githubusercontent.com/TheMLGuy/Game-of-Thrones-Dataset/master/character-deaths.csv", | |
'3': "https://raw.githubusercontent.com/tidyverse/dplyr/main/data-raw/starwars.csv", | |
'4': "https://raw.githubusercontent.com/fivethirtyeight/data/master/comic-characters/marvel-wikia-data.csv" | |
} | |
df = pd.read_csv(urls[choice]) | |
return df | |
def bigquery_datasets(): | |
print("\nChoose a BigQuery public dataset:") | |
print("1. New York City Taxi Trips") | |
print("2. Chicago Crime Data") | |
print("3. San Francisco 311 Service Requests") | |
print("4. Wikipedia Clickstream Data") | |
print("5. Global Surface Summary of the Day Weather Data") | |
choice = input("Enter the number of your choice: ") | |
datasets = { | |
'1': "bigquery-public-data.new_york_taxi_trips.tlc_yellow_trips_2018", | |
'2': "bigquery-public-data.chicago_crime.crime", | |
'3': "bigquery-public-data.san_francisco_311.311_service_requests", | |
'4': "bigquery-public-data.wikipedia.pageviews_2018", | |
'5': "bigquery-public-data.noaa_gsod.gsod2020" | |
} | |
return datasets.get(choice, None) | |
def main(): | |
print("Choose an option:") | |
print("1. Authenticate with Google Cloud") | |
print("2. Set Default Project and Application Credentials") | |
print("3. Create a Cloud Storage Bucket") | |
print("4. Choose a Dataset and Dirty It") | |
print("5. Upload Dirty Dataset to Cloud Storage") | |
print("6. Set Up BigQuery Dataset") | |
choice = input("Enter the number of your choice: ") | |
if choice == '1': | |
authenticate_user() | |
elif choice == '2': | |
project_id = set_default_project() | |
print(f"Default project set to {project_id}") | |
elif choice == '3': | |
project_id = input("Enter your GCP Project ID: ") | |
bucket_name = input("Enter a name for your new Cloud Storage bucket: ") | |
create_bucket(bucket_name, project_id) | |
elif choice == '4': | |
print("Choose a dataset to dirty:") | |
print("1. Pokémon Data") | |
print("2. Game of Thrones Character Deaths") | |
print("3. Star Wars Characters") | |
print("4. Marvel Characters") | |
print("5. BigQuery Public Datasets") | |
dataset_choice = input("Enter the number of your choice: ") | |
if dataset_choice in ['1', '2', '3', '4']: | |
df = load_dataset(dataset_choice) | |
print("\nChoose the types of dirtiness to apply (e.g., 1,3,5 for multiple types):") | |
print("1. Introduce missing values") | |
print("2. Add duplicates") | |
print("3. Insert incorrect data") | |
print("4. Mix up data types") | |
print("5. Introduce random NaNs") | |
print("6. Random column swaps") | |
print("7. Date/Time corruptions") | |
print("8. Case inversions") | |
print("9. Add extra spaces") | |
print("10. Scramble data within columns") | |
print("11. Inject special characters") | |
print("12. Logical inconsistencies") | |
print("13. Language switching") | |
print("14. Duplicate IDs") | |
dirtiness_choices = input("Enter your choices (comma-separated): ").split(',') | |
dirtiness_level = int(input("Enter the level of dirtiness (1-10): ")) | |
df = dirty_data(df, dirtiness_choices, dirtiness_level) | |
output_file = f"dirty_dataset_{dataset_choice}.csv" | |
df.to_csv(output_file, index=False) | |
print(f"\nDirty dataset saved as {output_file}") | |
elif dataset_choice == '5': | |
dataset_id = bigquery_datasets() | |
if dataset_id: | |
print(f"\nYou chose the BigQuery dataset: {dataset_id}") | |
# Load and dirty the BigQuery dataset here | |
else: | |
print("Invalid choice for BigQuery dataset.") | |
else: | |
print("Invalid choice. Please try again.") | |
elif choice == '5': | |
file_path = input("Enter the path to the file you want to upload: ") | |
bucket_name = input("Enter your Cloud Storage bucket name: ") | |
upload_to_bucket(file_path, bucket_name) | |
elif choice == '6': | |
project_id = input("Enter your GCP Project ID: ") | |
dataset_id = input("Enter a name for your BigQuery dataset: ") | |
setup_bigquery_dataset(project_id, dataset_id) | |
else: | |
print("Invalid choice. Please try again.") | |
if __name__ == "__main__": | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment