Last active
November 4, 2024 05:49
-
-
Save koenvo/81e795ff2e0861e75e6dac1630171ce6 to your computer and use it in GitHub Desktop.
"The One Billion Row Challenge" python submission
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from multiprocessing import Pool | |
# Notes: | |
# a) Let every process handle a single chunk. | |
# b) Use as many processes as cores | |
CHUNK_COUNT = 10 | |
CONCURRENCY = 10 | |
def read_chunk(filename, chunk_start, chunk_size): | |
station_measurements = {} | |
with open(filename, "r") as fp: | |
fp.seek(chunk_start) | |
bytes_read = 0 | |
while bytes_read < chunk_size: | |
for line in fp: | |
bytes_read += len(line) + 1 | |
if bytes_read > chunk_size: | |
break | |
tmp = line.split(";") | |
station = tmp[0] | |
measurement = float(tmp[1]) | |
try: | |
item = station_measurements[station] | |
item[0] = min(item[0], measurement) | |
item[1] = max(item[1], measurement) | |
item[2] += measurement | |
item[3] += 1 | |
except KeyError: | |
station_measurements[station] = [measurement, measurement, measurement, 1] | |
return station_measurements | |
def create_df(filename): | |
size = os.path.getsize(filename) | |
chunk_size = size // CHUNK_COUNT | |
start_positions = [ | |
i * chunk_size | |
for i in range(CHUNK_COUNT) | |
] | |
# Step 1: adjust chunks to snap to newlines | |
with open(filename, "r") as fp: | |
for i, start in enumerate(start_positions): | |
fp.seek(start) | |
data = fp.read(1024) | |
pos = data.index("\n") | |
# don't change first position | |
if i > 0: | |
# start just after newline | |
start_positions[i] += pos + 1 | |
# Step 2: define chunks start and size | |
chunks = [] | |
for start, end in zip(start_positions, start_positions[1:] + [size]): | |
chunks.append((filename, start, end - start)) | |
with Pool(CONCURRENCY) as pool: | |
res = pool.starmap( | |
read_chunk, | |
chunks | |
) | |
station_measurements = {} | |
for chunk in res: | |
for station, (min_, max_, sum_, count) in chunk.items(): | |
try: | |
item = station_measurements[station] | |
item[0] = min(item[0], min_) | |
item[1] = max(item[1], max_) | |
item[2] += sum_ | |
item[3] += count | |
except KeyError: | |
station_measurements[station] = [min_, max_, sum_, count] | |
return [ | |
(station, min_, max_, sum_ / count) for | |
(station, (min_, max_, sum_, count)) in list(sorted(station_measurements.items())) | |
] | |
if __name__ == "__main__": | |
import time | |
start_time = time.time() | |
df = create_df("measurements.txt") | |
took = time.time() - start_time | |
print(f"Took: {took:.2f} sec") | |
print(df) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment