Skip to content

Instantly share code, notes, and snippets.

@vitalibaranov
Created January 21, 2025 07:01
Show Gist options
  • Save vitalibaranov/547d4bd2d0ef21c644241c3174aa0bfe to your computer and use it in GitHub Desktop.
Save vitalibaranov/547d4bd2d0ef21c644241c3174aa0bfe to your computer and use it in GitHub Desktop.
import argparse
import csv
import os.path
MAX_AVAILABLE_SIZE = 10_000_000
def main(file_path, output_directory_path):
header_row, vacancy_rows, fields_index = read_file_data(file_path)
vacancy_sizes = get_vacancy_rows_sizes(vacancy_rows)
vacancies_buckets = get_vacancies_buckets(vacancy_sizes)
written_rows = write_files(output_directory_path, header_row, vacancies_buckets, vacancy_rows, vacancy_sizes)
verify_all_processed(vacancy_rows, written_rows)
initial_file_lines_count = sum(len(rows) for rows in vacancy_rows.values())
print(f'Processed {initial_file_lines_count} lines (headers not counted).')
def read_file_data(file_path):
with open(file_path) as csv_file:
csv_reader = csv.reader(csv_file, delimiter=';')
initial_file_lines_count = 0
vacancy_rows = {}
for row in csv_reader:
if initial_file_lines_count == 0:
header_row = row
fields_index = {}
for idx, name in enumerate(row):
fields_index[name] = idx
initial_file_lines_count += 1
else:
vacancy_name = row[fields_index['new_vacancy_name']]
if vacancy_name in vacancy_rows:
vacancy_rows[vacancy_name].append(row)
else:
vacancy_rows[vacancy_name] = [row]
initial_file_lines_count += 1
return header_row, vacancy_rows, fields_index
def write_files(output_path, header_row, vacancies_buckets, vacancy_rows, vacancy_sizes):
written_rows = []
for idx, bucket in enumerate(vacancies_buckets):
bucket_size = get_bucket_size(bucket, vacancy_sizes)
if bucket_size > MAX_AVAILABLE_SIZE:
file_parts = bucket_size // MAX_AVAILABLE_SIZE + 1
part_rows_count = len(vacancy_rows[bucket[0]]) // file_parts + 1
for file_part_id in range(file_parts):
file_path = os.path.join(output_path, f'data-{idx}-{file_part_id}.csv')
rows = vacancy_rows[bucket[0]][part_rows_count * file_part_id: part_rows_count * (file_part_id + 1)]
write_csv_file(file_path, header_row, rows)
written_rows.extend(rows)
else:
file_path = os.path.join(output_path, f'data-{idx}.csv')
rows = get_bucket_rows(bucket, vacancy_rows)
write_csv_file(file_path, header_row, rows)
written_rows.extend(rows)
return written_rows
def get_bucket_size(bucket, vacancy_sizes):
return sum(vacancy_sizes[v] for v in bucket)
def get_bucket_rows(bucket, vacancy_rows):
rows = []
for vacancy_name in bucket:
for row in vacancy_rows[vacancy_name]:
rows.append(row)
return rows
def write_csv_file(file_path, header_row, rows):
with open(file_path, 'w') as out:
csv_writer = csv.writer(out, delimiter=';')
csv_writer.writerow(header_row)
for row in rows:
csv_writer.writerow(row)
def get_vacancies_buckets(vacancy_sizes):
vacancies_buckets = []
for vacancy_name, vacancy_size in vacancy_sizes.items():
if not vacancies_buckets:
vacancies_buckets.append([vacancy_name])
else:
for bucket in vacancies_buckets:
if get_bucket_size(bucket, vacancy_sizes) + vacancy_size < MAX_AVAILABLE_SIZE:
bucket.append(vacancy_name)
break
else:
vacancies_buckets.append([vacancy_name])
return vacancies_buckets
def get_vacancy_rows_sizes(vacancy_rows):
vacancy_sizes = {}
for vacancy_name, rows in vacancy_rows.items():
vacancy_sizes[vacancy_name] = len(
"\n".join([";".join(f'"{i}' for i in row) for row in rows]).encode('utf-8'))
return vacancy_sizes
def verify_all_processed(vacancy_rows, written_rows):
initial_rows = [row for rows in vacancy_rows.values() for row in rows]
assert sorted(initial_rows, key=get_row_hash) == sorted(written_rows, key=get_row_hash)
def get_row_hash(row):
return hash(','.join(str(i) for i in row))
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument("-f", "--file_path", help="initial file path")
parser.add_argument("-d", "--output_directory_path", help="path of output directory")
args = parser.parse_args()
return args.file_path, args.output_directory_path
if __name__ == '__main__':
file, output_directory = parse_arguments()
main(file, output_directory)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment