Skip to content

Instantly share code, notes, and snippets.

@erjan
Last active February 24, 2025 11:15
Show Gist options
  • Save erjan/eeb12acba9aedb3610aae5b6b1889dfe to your computer and use it in GitHub Desktop.
Save erjan/eeb12acba9aedb3610aae5b6b1889dfe to your computer and use it in GitHub Desktop.
generate json file guid 470mb - task from mandy
import json
import os
import time
def split_json_by_guid_station(input_file, output_dir, chunk_size_mb):
"""
Splits a JSON file into smaller files, ensuring that each file
contains complete 'guid_station' entries. It avoids splitting a
'guid_station' across multiple files.
this includes timing and progress output.
Args:
input_file (str): Path to the input JSON file.
output_dir (str): Directory to store the split JSON files.
chunk_size_mb (int): Approximate size of each output file in MB.
It's an approximate target; files may be slightly
larger to avoid splitting 'guid_station' entries.
It checks that the size of a batch of station_id data can fit altogether or not at all -so the data will never be split in middle
avg completion time: 70 sec for 430 mb file size
"""
start_time = time.time() # Start the overall timer
if not os.path.exists(output_dir):
os.makedirs(output_dir)
chunk_size_bytes = chunk_size_mb * 1024 * 1024
file_count = 0
current_chunk = {}
current_chunk_size = 0
output_file = None
print(f"Starting to process {input_file}...")
with open(input_file, 'r') as f:
try:
data = json.load(f) # Load the entire file into a Python dictionary
except json.JSONDecodeError as e:
print(f"Error decoding JSON: {e}")
return
if not isinstance(data, dict):
print("Error: Input file does not contain a top-level JSON object (dictionary).")
return
total_guid_stations = len(data)
guid_station_count = 0
# Store the first guid_station of each chunk for validation
first_guid_stations = {}
for guid_station, contents in data.items():
loop_start_time = time.time() # Start timer for each guid_station
entry_string = json.dumps({guid_station: contents}) # Convert entry to JSON string to measure its size
entry_size = len(entry_string.encode('utf-8'))
if current_chunk_size + entry_size > chunk_size_bytes and current_chunk:
# Save the current chunk to a file
if output_file:
json.dump(current_chunk, output_file, indent=4)
output_file.close()
print(f"Chunk saved to: {output_filename}")
# Validation: Check for split guid_stations
current_chunk_keys = list(current_chunk.keys())
if len(current_chunk_keys) > 0:
first_guid_stations[file_count] = current_chunk_keys[0] # Save the *first* guid_station in chunk
output_file = None # Reset output_file to None after closing
else:
print(f"Chunk was empty and not saved")
# Start a new chunk
# file_count += 1 #<-------- REMOVE THIS LINE
current_chunk = {}
current_chunk_size = 0
# Add the guid_station entry to the current chunk
current_chunk[guid_station] = contents
current_chunk_size += entry_size
# Open new output file if not already open
if output_file is None:
file_count += 1
output_filename = os.path.join(output_dir, f"chunk_{file_count:03d}.json")
output_file = open(output_filename, 'w')
#Save the first guid station for the file to validate that there are no splits between them
first_guid_stations[file_count] = guid_station # Save the *first* guid_station in chunk
guid_station_count += 1
loop_end_time = time.time() # End timer for each guid_station
loop_duration = loop_end_time - loop_start_time
print(f"Processed guid_station {guid_station_count}/{total_guid_stations} ({guid_station[:20]}...): Chunk size: {current_chunk_size / (1024 * 1024):.2f} MB. Time taken: {loop_duration:.2f} seconds. Current file {output_filename}")
# Save the last chunk if it's not empty
if current_chunk:
if output_file:
json.dump(current_chunk, output_file, indent=4)
output_file.close()
print(f"Final chunk saved to: {output_filename}")
# Validation: Check for split guid_stations
current_chunk_keys = list(current_chunk.keys())
if len(current_chunk_keys) > 0:
first_guid_stations[file_count] = current_chunk_keys[0]
else:
print(f"Final chunk was empty and not saved")
end_time = time.time() # End the overall timer
total_time = end_time - start_time
print(f"JSON file split into {file_count} files in: {output_dir}")
print(f"Total time taken: {total_time:.2f} seconds")
'''
#this validation was more like helper utility function to debug.
# Final Validation Check - after splitting
print("\nValidating that guid_stations were not split between files...")
files = sorted(os.listdir(output_dir))
last_guid_station = None
for i, filename in enumerate(files):
if filename.endswith(".json"): # Only process JSON files
filepath = os.path.join(output_dir, filename)
with open(filepath, 'r') as f:
try:
chunk_data = json.load(f)
#The chunk_data is a dictionary
for guid_station in chunk_data.keys():
#print(chunk_data.keys())
#Do the validation here
break;
except json.JSONDecodeError as e:
print(f"Error decoding JSON in file {filename}: {e}")
continue
if i > 0: #check every file except the first
# Load the previous chunk
prev_filename = os.path.join(output_dir, files[i - 1])
with open(prev_filename) as f:
try:
prev_chunk_data = json.load(f)
#Check the last key against the first key in the second file
last_key = list(prev_chunk_data.keys())[-1]
except (json.JSONDecodeError, KeyError) as e:
print(f"Error processing previous chunk {files[i-1]}: {e}")
continue # Skip to the next file
if i in first_guid_stations: # Check if file_count exists in first_guid_stations - adding this line to solve it
if first_guid_stations[i] == last_key: # If the first guid_station on file matches last key there is an error.
print(f"ERROR: guid_station {first_guid_stations[i]} spans files {files[i - 1]} and {filename}!")
else:
print(f"Warning: No first guid_station found for file {filename} (file_count {i}). This might indicate an issue during chunking.")
last_guid_station = guid_station
print("Validation complete.")
'''
input_file = "nearest_51_100_20250101.json" # Replace with the name of your actual JSON file
output_dir = "output_chunks"
chunk_size_mb = 40
split_json_by_guid_station(input_file, output_dir, chunk_size_mb)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment