Created
January 22, 2025 13:13
-
-
Save vitalibaranov/843621c1926758b60cefcb2b57ff9e57 to your computer and use it in GitHub Desktop.
Spit talantix csv file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import csv | |
import os.path | |
MAX_AVAILABLE_SIZE = 10_000_000 | |
def read_vacancies(vacancies_file_path): | |
if not vacancies_file_path: | |
return {} | |
with open(vacancies_file_path, 'r') as csv_file: | |
csv_reader = csv.reader(csv_file, delimiter=';') | |
return {row[0]: row[1] for row in csv_reader if len(row) > 0} | |
def main(file_path, output_directory_path, vacancies_file_path): | |
existing_vacancies = read_vacancies(vacancies_file_path) | |
header_row, data_rows = read_file_data(file_path, existing_vacancies) | |
vacancy_sizes = get_vacancy_rows_sizes(data_rows) | |
vacancies_buckets = get_vacancies_buckets(vacancy_sizes) | |
written_rows = write_files(output_directory_path, header_row, vacancies_buckets, data_rows, vacancy_sizes) | |
verify_all_processed(data_rows, written_rows) | |
initial_file_lines_count = sum(len(rows) for rows in data_rows.values()) | |
print(f'Processed {initial_file_lines_count} lines (headers not counted).') | |
def read_file_data(file_path, existing_vacancies): | |
with open(file_path) as csv_file: | |
csv_reader = csv.reader(csv_file, delimiter=';') | |
initial_file_lines_count = 0 | |
vacancy_rows = {} | |
for row in csv_reader: | |
if initial_file_lines_count == 0: | |
header_row = row | |
fields_index = {} | |
for idx, name in enumerate(row): | |
fields_index[name] = idx | |
initial_file_lines_count += 1 | |
else: | |
vacancy_name = row[fields_index['new_vacancy_name']] | |
existing_vacancy_id = existing_vacancies.get(vacancy_name) | |
if existing_vacancy_id: | |
row[fields_index['new_vacancy_name']] = None | |
row[fields_index['talantix_vacancy_id']] = existing_vacancies.get(vacancy_name) | |
if vacancy_name in vacancy_rows: | |
vacancy_rows[vacancy_name].append(row) | |
else: | |
vacancy_rows[vacancy_name] = [row] | |
initial_file_lines_count += 1 | |
return header_row, vacancy_rows | |
def write_files(output_path, header_row, vacancies_buckets, vacancy_rows, vacancy_sizes): | |
written_rows = [] | |
for idx, bucket in enumerate(vacancies_buckets): | |
bucket_size = get_bucket_size(bucket, vacancy_sizes) | |
if bucket_size > MAX_AVAILABLE_SIZE: | |
assert len(bucket) == 1 | |
file_parts = bucket_size // MAX_AVAILABLE_SIZE + 1 | |
part_rows_count = len(vacancy_rows[bucket[0]]) // file_parts + 1 | |
for file_part_id in range(file_parts): | |
file_path = os.path.join(output_path, f'data-{idx}-{file_part_id}.csv') | |
rows = vacancy_rows[bucket[0]][part_rows_count * file_part_id: part_rows_count * (file_part_id + 1)] | |
write_csv_file(file_path, header_row, rows) | |
written_rows.extend(rows) | |
else: | |
file_path = os.path.join(output_path, f'data-{idx}.csv') | |
rows = get_bucket_rows(bucket, vacancy_rows) | |
write_csv_file(file_path, header_row, rows) | |
written_rows.extend(rows) | |
return written_rows | |
def get_bucket_size(bucket, vacancy_sizes): | |
return sum(vacancy_sizes[v] for v in bucket) | |
def get_bucket_rows(bucket, vacancy_rows): | |
rows = [] | |
for vacancy_name in bucket: | |
for row in vacancy_rows[vacancy_name]: | |
rows.append(row) | |
return rows | |
def write_csv_file(file_path, header_row, rows): | |
with open(file_path, 'w') as out: | |
csv_writer = csv.writer(out, delimiter=';') | |
csv_writer.writerow(header_row) | |
for row in rows: | |
csv_writer.writerow(row) | |
def get_vacancies_buckets(vacancy_sizes): | |
vacancies_buckets = [] | |
for vacancy_name, vacancy_size in vacancy_sizes.items(): | |
if not vacancies_buckets: | |
vacancies_buckets.append([vacancy_name]) | |
else: | |
for bucket in vacancies_buckets: | |
if get_bucket_size(bucket, vacancy_sizes) + vacancy_size < MAX_AVAILABLE_SIZE: | |
bucket.append(vacancy_name) | |
break | |
else: | |
vacancies_buckets.append([vacancy_name]) | |
return vacancies_buckets | |
def get_vacancy_rows_sizes(vacancy_rows): | |
vacancy_sizes = {} | |
for vacancy_name, rows in vacancy_rows.items(): | |
vacancy_sizes[vacancy_name] = len( | |
"\n".join([";".join(f'"{i}' for i in row) for row in rows]).encode('utf-8')) | |
return vacancy_sizes | |
def verify_all_processed(vacancy_rows, written_rows): | |
initial_rows = [row for rows in vacancy_rows.values() for row in rows] | |
assert sorted(initial_rows, key=get_row_hash) == sorted(written_rows, key=get_row_hash) | |
def get_row_hash(row): | |
return hash(','.join(str(i) for i in row)) | |
def parse_arguments(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-f", "--file_path", help="initial file path") | |
parser.add_argument("-d", "--output_directory_path", help="path of output directory") | |
parser.add_argument("-v", "--existing_vacancies_file_path", help="existing vacancies file path") | |
args = parser.parse_args() | |
return args.file_path, args.output_directory_path, args.existing_vacancies_file_path | |
if __name__ == '__main__': | |
file, output_directory, vacancies_file = parse_arguments() | |
main(file, output_directory, vacancies_file) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment