Created
January 21, 2025 07:06
-
-
Save vitalibaranov/1851c595afa000b1bba954619d870d92 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import csv | |
import os.path | |
MAX_AVAILABLE_SIZE = 10_000_000 | |
def main(file_path, output_directory_path): | |
header_row, data_rows = read_file_data(file_path) | |
vacancy_sizes = get_vacancy_rows_sizes(data_rows) | |
vacancies_buckets = get_vacancies_buckets(vacancy_sizes) | |
written_rows = write_files(output_directory_path, header_row, vacancies_buckets, data_rows, vacancy_sizes) | |
verify_all_processed(data_rows, written_rows) | |
initial_file_lines_count = sum(len(rows) for rows in data_rows.values()) | |
print(f'Processed {initial_file_lines_count} lines (headers not counted).') | |
def read_file_data(file_path): | |
with open(file_path) as csv_file: | |
csv_reader = csv.reader(csv_file, delimiter=';') | |
initial_file_lines_count = 0 | |
vacancy_rows = {} | |
for row in csv_reader: | |
if initial_file_lines_count == 0: | |
header_row = row | |
fields_index = {} | |
for idx, name in enumerate(row): | |
fields_index[name] = idx | |
initial_file_lines_count += 1 | |
else: | |
vacancy_name = row[fields_index['new_vacancy_name']] | |
if vacancy_name in vacancy_rows: | |
vacancy_rows[vacancy_name].append(row) | |
else: | |
vacancy_rows[vacancy_name] = [row] | |
initial_file_lines_count += 1 | |
return header_row, vacancy_rows | |
def write_files(output_path, header_row, vacancies_buckets, vacancy_rows, vacancy_sizes): | |
written_rows = [] | |
for idx, bucket in enumerate(vacancies_buckets): | |
bucket_size = get_bucket_size(bucket, vacancy_sizes) | |
if bucket_size > MAX_AVAILABLE_SIZE: | |
file_parts = bucket_size // MAX_AVAILABLE_SIZE + 1 | |
part_rows_count = len(vacancy_rows[bucket[0]]) // file_parts + 1 | |
for file_part_id in range(file_parts): | |
file_path = os.path.join(output_path, f'data-{idx}-{file_part_id}.csv') | |
rows = vacancy_rows[bucket[0]][part_rows_count * file_part_id: part_rows_count * (file_part_id + 1)] | |
write_csv_file(file_path, header_row, rows) | |
written_rows.extend(rows) | |
else: | |
file_path = os.path.join(output_path, f'data-{idx}.csv') | |
rows = get_bucket_rows(bucket, vacancy_rows) | |
write_csv_file(file_path, header_row, rows) | |
written_rows.extend(rows) | |
return written_rows | |
def get_bucket_size(bucket, vacancy_sizes): | |
return sum(vacancy_sizes[v] for v in bucket) | |
def get_bucket_rows(bucket, vacancy_rows): | |
rows = [] | |
for vacancy_name in bucket: | |
for row in vacancy_rows[vacancy_name]: | |
rows.append(row) | |
return rows | |
def write_csv_file(file_path, header_row, rows): | |
with open(file_path, 'w') as out: | |
csv_writer = csv.writer(out, delimiter=';') | |
csv_writer.writerow(header_row) | |
for row in rows: | |
csv_writer.writerow(row) | |
def get_vacancies_buckets(vacancy_sizes): | |
vacancies_buckets = [] | |
for vacancy_name, vacancy_size in vacancy_sizes.items(): | |
if not vacancies_buckets: | |
vacancies_buckets.append([vacancy_name]) | |
else: | |
for bucket in vacancies_buckets: | |
if get_bucket_size(bucket, vacancy_sizes) + vacancy_size < MAX_AVAILABLE_SIZE: | |
bucket.append(vacancy_name) | |
break | |
else: | |
vacancies_buckets.append([vacancy_name]) | |
return vacancies_buckets | |
def get_vacancy_rows_sizes(vacancy_rows): | |
vacancy_sizes = {} | |
for vacancy_name, rows in vacancy_rows.items(): | |
vacancy_sizes[vacancy_name] = len( | |
"\n".join([";".join(f'"{i}' for i in row) for row in rows]).encode('utf-8')) | |
return vacancy_sizes | |
def verify_all_processed(vacancy_rows, written_rows): | |
initial_rows = [row for rows in vacancy_rows.values() for row in rows] | |
assert sorted(initial_rows, key=get_row_hash) == sorted(written_rows, key=get_row_hash) | |
def get_row_hash(row): | |
return hash(','.join(str(i) for i in row)) | |
def parse_arguments(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-f", "--file_path", help="initial file path") | |
parser.add_argument("-d", "--output_directory_path", help="path of output directory") | |
args = parser.parse_args() | |
return args.file_path, args.output_directory_path | |
if __name__ == '__main__': | |
file, output_directory = parse_arguments() | |
main(file, output_directory) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment