Skip to content

Instantly share code, notes, and snippets.

@zacharysyoung
Last active January 16, 2022 23:24
Show Gist options
  • Save zacharysyoung/d9f3c64886f0d24737e8d96880c2b9d8 to your computer and use it in GitHub Desktop.
Save zacharysyoung/d9f3c64886f0d24737e8d96880c2b9d8 to your computer and use it in GitHub Desktop.

Merge hundreds of CSVs, each with millions of rows

500 CSVs, each with over 1 million rows need to be merged together into one CSV.

  • each CSV represents a sensor which recorded a value and the timestamp of the recording, will millions of timestamp/value rows
  • all CSVs have the same number of rows

How can we "merge" the CSVs such that each sensor's value-column is added to the merged CSV (500 value columns), and the timestamps for each row for each sensor are averaged into a single column?

sensor1.csv
  timestamp  value
  10000        1.9
  10010        2.2
  ... (millions of rows)

sensor2.csv
  timestamp  value
  10004        3.5
  10012        4.3

... (498 more files)

Result should look like this (timestamp in this file is the average of all the timestamps from all 500+ input files, names of sensors, e.g. sensor1, sensor2 and etc., come from filenames):

merged_file.csv
timestamp  sensor1  sensor2   ...  sensor500
  10002      1.9      3.5            2.1
  10011      2.2      4.3            3.5

How can we merge without reading/holding everything in RAM?

Solution

I see your post as two distinct questions: 1) How to merge 500 CSVs? 2) whatever comes next, some DB?

This is a solution for the first question. I'm using Python, there are languages/runtimes that could do this faster, but I think Python will give you a good first start at the problem and I'm expecting Python will be more accessbile and easier to use for you.

Also, my solution is predicated on the fact that all 500 CSVs have identical row counts.

My solution opens all 500 CSVs at once, creates an outer loop over a set number of rows, and an inner loop over each CSV:

  • The inner loop reads the timestamp and value for a row in each CSV, averaging the 500 timestamps into a single column, and accumulating the 500 distinct values in their own columns, and all that goes into a final merged row with 501 columns.

  • The outer loop repeats that process for as many rows as there are across all 500 CSVs.

I generated some sample data, 500 CSVs each with 1_000_000 rows, for 6.5G of CSVs. I ran the following script on my M1 Macbook Air. It completed in 8.3 minutes and peaked at 34.6M of RAM and produced a final CSV that is about 2G on disk.

import csv
import glob

# Fill this in based on your own knowledge, or, based on the output of 'analyze_stats.py'
NUM_ROWS = 1_000_000

sensor_filenames = sorted(glob.glob('sensor*.csv'))

# Sort: trim leading 6 chars, 'sensor', and trailing 4 chars, '.csv', leaving just the number in the middle
sensor_filenames = sorted(sensor_filenames, key=lambda x: int(x[6:-4]))

# Get handles to all files, and create input CSV readers
sensor_readers = []
for sensor_fname in sensor_filenames:
    f = open(sensor_fname, newline='')
    sensor_readers.append(csv.reader(f))

# Create output CSV writer
f = open('merged_sensors.csv', 'w', newline='')
writer = csv.writer(f)

# Discard all sensor headers
for reader in sensor_readers:
    next(reader)

# Build up output header and write
output_header = ['timestamp']
for sensor_fname in sensor_filenames:
    sensor_name = sensor_fname[:-4]  # trim off '.csv'
    output_header.append(sensor_name)
writer.writerow(output_header)

row_count = 0
while row_count < NUM_ROWS:
    row_count += 1
    values = []
    timestamps = []
    for reader in sensor_readers:
        row = next(reader)

        ts, val = row
        timestamps.append(int(ts))
        values.append(val)
    
    if row_count % 1000 == 0:
        print(f'Merged {row_count} rows')
 
    avg_ts = int(sum(timestamps)/len(timestamps))
    writer.writerow([avg_ts] + values)

I haven't profiled this, but I believe the only real allocations of memory that add up are going to be:

  • the 500 file handles and CSV readers (which is small) for the entirety of the process
  • each row from the input CSVs in the inner loop
  • the final merged row in the outer loop

At the top of the script I mention analyze_stats.py. Even if this were my data, I'd be very patient and break down the entire process into multiple steps, each which I could verify, and I would ultimately arrive at the correct, final CSV. This is especially true for me trying to help you, because I don't control the data or "know" it like you, so I'm going to offer up this bigger process:

  1. Read all the CSVs and record some stats: headers, column counts, and especially row counts.

  2. Analyze those stats for "conformance", making sure no CSVs deviates from your idea of what it should be, and especially get confirmation that all 500 CSVs have the same number of columns and rows.

  3. Use the proven row count as input into the merge process.

    There are ways to write the merge script so it doesn't have to know "the row count" up front, but it's more code, it's slightly more confusing, and it won't help you if there ever is a problem... you probably don't want to find out on row 2 million that there was problem "somewhere", I know I hate it when that happens.

If you're new to Python or the CSV readers/writers I recommend you read these scripts first.

  • [get_sensor_stats.py]1: reads all your sensor data and records the header, the minimum and maximum number of columns seens, and the row count for each CSV; it writes all those stats out to a CSV

  • [analyze_stats.py]2: reads in the stats CSV and checks to make sure header and column counts meet pre-defined values; it also keeps a tally of all the row counts for each file, and if there are files with different row counts it will let you know

Also, here's the script I used to generate my 6G of sample data:

  • [gen_sensor_data.py]3: is my attempt to meaningfully represent your problem space, both in size and complexity (which is very easy, thankfully 🙂)

Footnotes

  1. https://gist.github.com/zacharysyoung/d9f3c64886f0d24737e8d96880c2b9d8#file-get_sensor_stats-py

  2. https://gist.github.com/zacharysyoung/d9f3c64886f0d24737e8d96880c2b9d8#file-analyze_stats-py

  3. https://gist.github.com/zacharysyoung/d9f3c64886f0d24737e8d96880c2b9d8#file-gen_sensor_data-py

#!/usr/bin/env python3
import csv
import sys
from collections import defaultdict
COLS_NUM = 2
HEADER = 'timestamp,value'
# Ideally, one row count with a list of *all* sensors, e.g., {1_000_000: ['1', '2,' ... '500']}
row_count_sensors_map = defaultdict(list)
row_count_discrepancy = False
header_error = False
column_error = False
with open('Sensor_stats.csv', newline='') as f:
reader = csv.DictReader(f)
for stat_row in reader:
sensor = stat_row['Sensor']
header = stat_row['Header']
if header != HEADER:
header_error = True
print(f'Error with sensor {sensor}, header problem: expected {HEADER}, got {header}')
min_cols = int(stat_row['Min Cols'])
max_cols = int(stat_row['Max Cols'])
if min_cols != COLS_NUM or max_cols != COLS_NUM:
column_error = True
print(f'Error with sensor {sensor}, column-count problem: some rows have as few as {min_cols}, some have as many as {max_cols}')
row_count = int(stat_row['Num Rows'])
row_count_sensors_map[row_count].append(sensor)
# More than row count was found
if len(row_count_sensors_map.keys()) > 1:
row_count_discrepancy = True
print('Error, found different row counts across all sensor CSVs:')
for row_count, sensors in row_count_sensors_map.items():
sensors = sensors[:20] + ['...'] if len(sensors) > 20 else sensors
print(f' Row count {row_count}: {",".join(sensors)}')
if header_error or column_error or row_count_discrepancy:
sys.exit(1)
# Else, all is good!
row_count = list(row_count_sensors_map.keys())[0]
sensors = row_count_sensors_map[row_count]
print(f'All {len(sensors)} sensors are conformant:')
print(f' Header: "{HEADER}"')
print(f' Columns: {COLS_NUM}')
print(f' Rows: {row_count}')
#!/usr/bin/env python3
import csv
import random
for i in range(1, 501):
fname = f'sensor{i}.csv'
with open(fname, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['timestamp', 'value'])
# Generate a series of timestamps, all sequential, but with some random variance in the order of 1-in-10 seconds
for k in range(0, 10_000_000, 10):
var = random.randrange(1, 10)
ts = k + var
whole = random.randrange(0, 20)
part = random.randrange(0, 10)
val = f'{whole}.{part}'
writer.writerow([ts, val])
print(f'Wrote {fname}')
#!/usr/bin/env python3
import csv
import glob
# Get all sensor CSVs
sensor_filenames = glob.glob('sensor*.csv')
# Sort: trim leading 6 chars, 'sensor', and trailing 4 chars, '.csv', leaving just the number in the middle
sensor_filenames = sorted(sensor_filenames, key=lambda x: int(x[6:-4]))
# After iterating all rows of a sensor.csv, record some stats for that CSV
ROW_TMPL = {'Sensor': None, 'Header': None,
'Min Cols': None, 'Max Cols': None, 'Num Rows': None}
all_rows = []
for sensor_fname in sensor_filenames:
f = open(sensor_fname, newline='')
reader = csv.reader(f)
header = next(reader)
min_cols = len(header)
max_cols = len(header)
# Don't count header as a row
row_count = 0
for row in reader:
row_count += 1
col_count = len(row)
if col_count < min_cols:
min_cols = col_count
if col_count > max_cols:
max_cols = col_count
row = dict(ROW_TMPL) # copy dict/hash-map
row['Sensor'] = sensor_fname[6:-4]
row['Header'] = ','.join(header)
row['Min Cols'] = min_cols
row['Max Cols'] = max_cols
row['Num Rows'] = row_count
all_rows.append(row)
f.close()
with open('Sensor_stats.csv', 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=ROW_TMPL.keys())
writer.writeheader()
writer.writerows(all_rows)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment