Created
April 9, 2025 18:07
-
-
Save NickHeiner/152f58297abfa69272e78b0432be2bb7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import json | |
import re # Using regex to help clean keys | |
def clean_key(key): | |
"""Removes the prefix and cleans up the key name.""" | |
# Remove prefix like '1. a_', '3. sxs_', etc. | |
cleaned = re.sub(r'^\d+\.\s*(?:[ab]_|sxs_)?', '', key) | |
# Clean specific known patterns like '(rating)' or ' (text)' | |
cleaned = cleaned.replace(' (rating)', '_rating') | |
cleaned = cleaned.replace(' (text)', '_text') | |
# Replace remaining spaces/special chars if needed (optional) | |
# cleaned = cleaned.replace(' ', '_').replace('-', '_') | |
return cleaned | |
def transform_row_to_nested_json(row_dict): | |
""" | |
Transforms a flat dictionary row from the CSV into a nested structure. | |
""" | |
output = {} | |
base_keys = ['worker_id', 'task_id', 'task_response_id', 'chat'] | |
# 1. Copy base identifiers | |
for key in base_keys: | |
if key in row_dict: | |
output[key] = row_dict[key] | |
output['evaluations'] = {} | |
# 2. Process each evaluation set (1 through 5) | |
for i in range(1, 6): | |
set_prefix = f"{i}. " | |
set_key = str(i) # Use string keys '1', '2', ... for JSON object | |
# Check if this set exists (e.g., by checking for prompt_category) | |
prompt_cat_key = f"{set_prefix}prompt_category" | |
if prompt_cat_key not in row_dict or not row_dict[prompt_cat_key]: | |
continue # Skip if this set seems empty or doesn't exist | |
current_set_data = {} | |
# --- Prompt Info --- | |
current_set_data['prompt'] = { | |
'category': row_dict.get(f"{set_prefix}prompt_category"), | |
'opinions_l2': row_dict.get(f"{set_prefix}opinions_and_recommendations_l2"), | |
'expert_l2': row_dict.get(f"{set_prefix}expert_advice_l2") | |
} | |
# --- Response A Info --- | |
response_a_data = {} | |
a_prefix = f"{set_prefix}a_" | |
for key, value in row_dict.items(): | |
if key.startswith(a_prefix): | |
attr_name = clean_key(key) # Use original name after prefix | |
response_a_data[attr_name] = value | |
current_set_data['response_a'] = response_a_data | |
# --- Response B Info --- | |
response_b_data = {} | |
b_prefix = f"{set_prefix}b_" | |
for key, value in row_dict.items(): | |
if key.startswith(b_prefix): | |
attr_name = clean_key(key) # Use original name after prefix | |
response_b_data[attr_name] = value | |
current_set_data['response_b'] = response_b_data | |
# --- SxS Comparison Info --- | |
sxs_prefix = f"{set_prefix}sxs_" | |
sxs_data = {} | |
# Define explicit mapping for clarity due to special characters | |
sxs_mapping = { | |
f"{sxs_prefix}tone (rating)": "tone_rating", | |
f"{sxs_prefix}tone (text)": "tone_text", | |
f"{sxs_prefix}overall_quality (rating)": "overall_quality_rating", | |
f"{sxs_prefix}overall_quality (text)": "overall_quality_text", | |
} | |
for csv_key, json_key in sxs_mapping.items(): | |
sxs_data[json_key] = row_dict.get(csv_key) | |
current_set_data['sxs_comparison'] = sxs_data | |
# --- Rationale --- | |
current_set_data['rationale'] = row_dict.get(f"{set_prefix}rationale") | |
# Add the processed set to the main evaluations dictionary | |
output['evaluations'][set_key] = current_set_data | |
return output | |
# --- Main Script Execution --- | |
input_csv_filename = 'input.csv' | |
output_json_filename = 'output.json' | |
transformed_data = [] | |
try: | |
with open(input_csv_filename, mode='r', encoding='utf-8') as infile: | |
reader = csv.DictReader(infile) | |
# Check if headers seem correct (optional but good practice) | |
# print("Detected headers:", reader.fieldnames) | |
for row in reader: | |
transformed_row = transform_row_to_nested_json(row) | |
transformed_data.append(transformed_row) | |
# Write the list of transformed dictionaries to a JSON file | |
with open(output_json_filename, mode='w', encoding='utf-8') as outfile: | |
json.dump(transformed_data, outfile, indent=4) # indent for readability | |
print(f"Successfully transformed '{input_csv_filename}' to '{output_json_filename}'") | |
except FileNotFoundError: | |
print(f"Error: Input file '{input_csv_filename}' not found.") | |
except Exception as e: | |
print(f"An error occurred: {e}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment