Created
November 20, 2022 12:01
-
-
Save Mytherin/a841f01eb68ba8abe1d7e321d4f927e1 to your computer and use it in GitHub Desktop.
Script for generating anonymized parquet files with random data based on parquet metadata
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
def generate_integers(count, np_dtype, min_val=-1000000, max_val=1000000): | |
import numpy as np | |
return np.random.randint(low=min_val, high=max_val, size=(count,), dtype=np_dtype) | |
def generate_floats(count, min_val=0, max_val=1.0): | |
import numpy as np | |
return np.random.uniform(low=min_val, high=max_val, size=(count,)) | |
def generate_timestamps(count): | |
# faker.date_of_birth() | |
# faker.date_time_this_year() | |
# faker.date_time_this_month() | |
# faker.date_time_between(start_date='-15y', end_date='now') | |
import duckdb | |
con = duckdb.connect() | |
result_df = con.execute(f"SELECT NOW() - INTERVAL (1) YEAR + INTERVAL (RANDOM() * 365*24*60*60*1000) MILLISECONDS AS generated_ts FROM range({count})").df() | |
return result_df['generated_ts'] | |
def generate_string(faker, string_idx): | |
if string_idx == 0: | |
return faker.name() | |
elif string_idx == 1: | |
return faker.address() | |
elif string_idx == 2: | |
return faker.job() | |
elif string_idx == 3: | |
return faker.phone_number() | |
elif string_idx == 4: | |
return faker.currency() | |
elif string_idx == 5: | |
return faker.currency_name() | |
elif string_idx == 6: | |
return faker.currency_code() | |
elif string_idx == 7: | |
return faker.word() | |
elif string_idx == 8: | |
return faker.words(5) | |
elif string_idx == 9: | |
return faker.md5() | |
elif string_idx == 10: | |
return faker.email() | |
elif string_idx == 11: | |
return faker.hostname() | |
elif string_idx == 12: | |
return faker.domain_name() | |
elif string_idx == 13: | |
return faker.ipv4() | |
elif string_idx == 14: | |
return faker.ipv6() | |
elif string_idx == 15: | |
return faker.mac_address() | |
elif string_idx == 16: | |
return faker.image_url() | |
elif string_idx == 17: | |
return faker.slug() | |
elif string_idx == 18: | |
return faker.uuid4() | |
else: | |
raise Exception("string idx out of bounds") | |
cached_string_dictionaries = {} | |
def get_dictionary(string_idx): | |
global cached_string_dictionaries | |
if string_idx in cached_string_dictionaries: | |
return cached_string_dictionaries[string_idx] | |
dictionary_size = 1000000 | |
from faker import Faker | |
faker = Faker() | |
result = [] | |
print(f"Generating string dictionary for type {string_idx} (this only needs to happen once per column)") | |
for i in range(dictionary_size): | |
result.append(str(generate_string(faker, string_idx))) | |
cached_string_dictionaries[string_idx] = result | |
return result | |
def generate_strings(count, string_idx): | |
modulo_string_idx = string_idx % 19 | |
import random | |
string_dict = get_dictionary(modulo_string_idx) | |
result = [] | |
for i in range(count): | |
result.append(random.choice(string_dict)) | |
return result | |
def generate_row_group(row_group_count, schema): | |
import numpy as np | |
import pyarrow as pa | |
import pandas as pd | |
result_data = {} | |
string_idx = 0 | |
print(f"Generating row group of size {row_group_count}") | |
for col in schema: | |
col_name = col[0] | |
col_type = col[1] | |
print(f"Generating data for {col_name} - {str(col_type)}") | |
if col_type == pa.int32(): | |
result_data[col_name] = generate_integers(row_group_count, np.int32) | |
elif col_type == pa.int64(): | |
result_data[col_name] = generate_integers(row_group_count, np.int64) | |
elif col_type == pa.timestamp('ms'): | |
result_data[col_name] = generate_timestamps(row_group_count) | |
elif col_type == pa.timestamp('us'): | |
result_data[col_name] = generate_timestamps(row_group_count) | |
elif col_type == pa.string(): | |
result_data[col_name] = generate_strings(row_group_count, string_idx) | |
string_idx += 1 | |
elif col_type == pa.float64(): | |
result_data[col_name] = generate_floats(row_group_count) | |
else: | |
raise Exception(f"unrecognized col type {str(col_type)}") | |
return pd.DataFrame.from_dict(result_data) | |
def anonymize_connection(con, output_file): | |
import pyarrow as pa | |
import pyarrow.parquet as pq | |
# fetch a list of all of the types | |
result_schema = con.execute('SELECT name, type, converted_type, logical_type FROM schema WHERE num_children=0').fetchall() | |
row_groups = con.execute('SELECT DISTINCT row_group_id, row_group_num_rows FROM metadata ORDER BY row_group_id').fetchall() | |
compression = con.execute('SELECT compression FROM metadata LIMIT 1').fetchall()[0][0] | |
# construct an arrow schema | |
schema_objects = [] | |
for i in range(len(result_schema)): | |
column_name = result_schema[i][0] | |
column_type = result_schema[i][1] | |
converted_type = result_schema[i][2] | |
if converted_type is None: | |
converted_type = '' | |
column_logical_type = result_schema[i][3] | |
if column_logical_type is None: | |
column_logical_type = '' | |
arrow_type = None | |
if column_type == 'INT32': | |
arrow_type = pa.int32() | |
elif column_type == 'INT64': | |
if 'MILLIS=MilliSeconds()' in column_logical_type or converted_type=='TIMESTAMP_MILLIS': | |
arrow_type = pa.timestamp('ms') | |
elif 'MICROS=MicroSeconds()' in column_logical_type: | |
arrow_type = pa.timestamp('us') | |
else: | |
arrow_type = pa.int64() | |
elif column_type == 'BYTE_ARRAY': | |
if 'StringType()' in column_logical_type or 'UTF8' in converted_type: | |
arrow_type = pa.string() | |
else: | |
arrow_type = pa.binary() | |
elif column_type == 'DOUBLE': | |
arrow_type = pa.float64() | |
else: | |
raise Exception(f"Unrecognized parquet type {column_type} for {column_name}") | |
schema_objects.append((column_name, arrow_type)) | |
# generate data | |
arrow_schema = pa.schema(schema_objects) | |
with pq.ParquetWriter(output_file, arrow_schema, compression=compression) as writer: | |
for i in range(len(row_groups)): | |
count = row_groups[i][1] | |
df = generate_row_group(count, schema_objects) | |
writer.write_table(pa.Table.from_pandas(df, schema=arrow_schema)) | |
def anonymize_parquet(input_file, output_file): | |
# read the schema and metadata | |
import duckdb | |
con = duckdb.connect() | |
con.execute(f"CREATE VIEW schema AS SELECT * FROM parquet_schema('{input_file}')") | |
con.execute(f"CREATE VIEW metadata AS SELECT * FROM parquet_metadata('{input_file}')") | |
anonymize_connection(con, output_file) | |
def anonymize_parquet_metadata(schema_file, metadata_file, output_dir): | |
if os.path.exists(output_dir): | |
print(f"Path {output_dir} already exists!") | |
exit(1) | |
import duckdb | |
con = duckdb.connect() | |
con.execute(f"create or replace view all_schemas as select * from '{schema_file}';") | |
con.execute(f"create or replace view all_metadata as select * from '{metadata_file}';") | |
file_names = con.execute('select distinct file_name from all_schemas').fetchall() | |
os.mkdir(output_dir) | |
for entry in file_names: | |
file_name = entry[0] | |
base_name = os.path.basename(file_name) | |
target_file = os.path.join(output_dir, base_name) | |
print(f'Processing file name {base_name} -> {target_file} (full original path {file_name})') | |
con.execute(f"create or replace view schema as select * from all_schemas where file_name='{file_name}';") | |
con.execute(f"create or replace view metadata as select * from all_metadata where file_name='{file_name}';") | |
anonymize_connection(con, target_file) | |
anonymize_parquet_metadata('~/Downloads/test_data_parquet_schema.parquet', '~/Downloads/test_data_parquet_metadata.parquet', 'generated_parquet_dir') | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment