Skip to content

Instantly share code, notes, and snippets.

@Mytherin
Created November 20, 2022 12:01
Show Gist options
  • Save Mytherin/a841f01eb68ba8abe1d7e321d4f927e1 to your computer and use it in GitHub Desktop.
Save Mytherin/a841f01eb68ba8abe1d7e321d4f927e1 to your computer and use it in GitHub Desktop.
Script for generating anonymized parquet files with random data based on parquet metadata
import os
def generate_integers(count, np_dtype, min_val=-1000000, max_val=1000000):
import numpy as np
return np.random.randint(low=min_val, high=max_val, size=(count,), dtype=np_dtype)
def generate_floats(count, min_val=0, max_val=1.0):
import numpy as np
return np.random.uniform(low=min_val, high=max_val, size=(count,))
def generate_timestamps(count):
# faker.date_of_birth()
# faker.date_time_this_year()
# faker.date_time_this_month()
# faker.date_time_between(start_date='-15y', end_date='now')
import duckdb
con = duckdb.connect()
result_df = con.execute(f"SELECT NOW() - INTERVAL (1) YEAR + INTERVAL (RANDOM() * 365*24*60*60*1000) MILLISECONDS AS generated_ts FROM range({count})").df()
return result_df['generated_ts']
def generate_string(faker, string_idx):
if string_idx == 0:
return faker.name()
elif string_idx == 1:
return faker.address()
elif string_idx == 2:
return faker.job()
elif string_idx == 3:
return faker.phone_number()
elif string_idx == 4:
return faker.currency()
elif string_idx == 5:
return faker.currency_name()
elif string_idx == 6:
return faker.currency_code()
elif string_idx == 7:
return faker.word()
elif string_idx == 8:
return faker.words(5)
elif string_idx == 9:
return faker.md5()
elif string_idx == 10:
return faker.email()
elif string_idx == 11:
return faker.hostname()
elif string_idx == 12:
return faker.domain_name()
elif string_idx == 13:
return faker.ipv4()
elif string_idx == 14:
return faker.ipv6()
elif string_idx == 15:
return faker.mac_address()
elif string_idx == 16:
return faker.image_url()
elif string_idx == 17:
return faker.slug()
elif string_idx == 18:
return faker.uuid4()
else:
raise Exception("string idx out of bounds")
cached_string_dictionaries = {}
def get_dictionary(string_idx):
global cached_string_dictionaries
if string_idx in cached_string_dictionaries:
return cached_string_dictionaries[string_idx]
dictionary_size = 1000000
from faker import Faker
faker = Faker()
result = []
print(f"Generating string dictionary for type {string_idx} (this only needs to happen once per column)")
for i in range(dictionary_size):
result.append(str(generate_string(faker, string_idx)))
cached_string_dictionaries[string_idx] = result
return result
def generate_strings(count, string_idx):
modulo_string_idx = string_idx % 19
import random
string_dict = get_dictionary(modulo_string_idx)
result = []
for i in range(count):
result.append(random.choice(string_dict))
return result
def generate_row_group(row_group_count, schema):
import numpy as np
import pyarrow as pa
import pandas as pd
result_data = {}
string_idx = 0
print(f"Generating row group of size {row_group_count}")
for col in schema:
col_name = col[0]
col_type = col[1]
print(f"Generating data for {col_name} - {str(col_type)}")
if col_type == pa.int32():
result_data[col_name] = generate_integers(row_group_count, np.int32)
elif col_type == pa.int64():
result_data[col_name] = generate_integers(row_group_count, np.int64)
elif col_type == pa.timestamp('ms'):
result_data[col_name] = generate_timestamps(row_group_count)
elif col_type == pa.timestamp('us'):
result_data[col_name] = generate_timestamps(row_group_count)
elif col_type == pa.string():
result_data[col_name] = generate_strings(row_group_count, string_idx)
string_idx += 1
elif col_type == pa.float64():
result_data[col_name] = generate_floats(row_group_count)
else:
raise Exception(f"unrecognized col type {str(col_type)}")
return pd.DataFrame.from_dict(result_data)
def anonymize_connection(con, output_file):
import pyarrow as pa
import pyarrow.parquet as pq
# fetch a list of all of the types
result_schema = con.execute('SELECT name, type, converted_type, logical_type FROM schema WHERE num_children=0').fetchall()
row_groups = con.execute('SELECT DISTINCT row_group_id, row_group_num_rows FROM metadata ORDER BY row_group_id').fetchall()
compression = con.execute('SELECT compression FROM metadata LIMIT 1').fetchall()[0][0]
# construct an arrow schema
schema_objects = []
for i in range(len(result_schema)):
column_name = result_schema[i][0]
column_type = result_schema[i][1]
converted_type = result_schema[i][2]
if converted_type is None:
converted_type = ''
column_logical_type = result_schema[i][3]
if column_logical_type is None:
column_logical_type = ''
arrow_type = None
if column_type == 'INT32':
arrow_type = pa.int32()
elif column_type == 'INT64':
if 'MILLIS=MilliSeconds()' in column_logical_type or converted_type=='TIMESTAMP_MILLIS':
arrow_type = pa.timestamp('ms')
elif 'MICROS=MicroSeconds()' in column_logical_type:
arrow_type = pa.timestamp('us')
else:
arrow_type = pa.int64()
elif column_type == 'BYTE_ARRAY':
if 'StringType()' in column_logical_type or 'UTF8' in converted_type:
arrow_type = pa.string()
else:
arrow_type = pa.binary()
elif column_type == 'DOUBLE':
arrow_type = pa.float64()
else:
raise Exception(f"Unrecognized parquet type {column_type} for {column_name}")
schema_objects.append((column_name, arrow_type))
# generate data
arrow_schema = pa.schema(schema_objects)
with pq.ParquetWriter(output_file, arrow_schema, compression=compression) as writer:
for i in range(len(row_groups)):
count = row_groups[i][1]
df = generate_row_group(count, schema_objects)
writer.write_table(pa.Table.from_pandas(df, schema=arrow_schema))
def anonymize_parquet(input_file, output_file):
# read the schema and metadata
import duckdb
con = duckdb.connect()
con.execute(f"CREATE VIEW schema AS SELECT * FROM parquet_schema('{input_file}')")
con.execute(f"CREATE VIEW metadata AS SELECT * FROM parquet_metadata('{input_file}')")
anonymize_connection(con, output_file)
def anonymize_parquet_metadata(schema_file, metadata_file, output_dir):
if os.path.exists(output_dir):
print(f"Path {output_dir} already exists!")
exit(1)
import duckdb
con = duckdb.connect()
con.execute(f"create or replace view all_schemas as select * from '{schema_file}';")
con.execute(f"create or replace view all_metadata as select * from '{metadata_file}';")
file_names = con.execute('select distinct file_name from all_schemas').fetchall()
os.mkdir(output_dir)
for entry in file_names:
file_name = entry[0]
base_name = os.path.basename(file_name)
target_file = os.path.join(output_dir, base_name)
print(f'Processing file name {base_name} -> {target_file} (full original path {file_name})')
con.execute(f"create or replace view schema as select * from all_schemas where file_name='{file_name}';")
con.execute(f"create or replace view metadata as select * from all_metadata where file_name='{file_name}';")
anonymize_connection(con, target_file)
anonymize_parquet_metadata('~/Downloads/test_data_parquet_schema.parquet', '~/Downloads/test_data_parquet_metadata.parquet', 'generated_parquet_dir')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment