Created
May 27, 2018 13:20
-
-
Save mndrake/2adf4a037ceccba87a70f4a24d432017 to your computer and use it in GitHub Desktop.
read/write to split parquet files
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from io import BytesIO | |
import pyarrow as pa | |
import pyarrow.parquet as pq | |
kilobytes = 1024 | |
megabytes = kilobytes * 1000 | |
chunksize = int(10 * megabytes) | |
def write_split_parquet(df, todir, chunksize=chunksize, compression='GZIP'): | |
# initialize output directory | |
if not os.path.exists(todir): | |
os.mkdir(todir) | |
else: | |
for file in os.listdir(todir): | |
os.remove(os.path.join(todir, file)) | |
# create parquet in-memory stream from dataframe | |
table = pa.Table.from_pandas(df) # pyarrow table | |
stream = BytesIO() | |
pq.write_table(table, stream, compression=compression) | |
stream.seek(0) # reset stream | |
# write chunks to files | |
i = 0 | |
while True: | |
chunk = stream.read(chunksize) | |
if not chunk: | |
break | |
i += 1 | |
filename = os.path.join(todir, ('part%04d' % i)) | |
with open(filename, 'wb') as f: | |
f.write(chunk) | |
stream.close() | |
assert i <= 9999 # join sort fails if 5 digits | |
return i | |
def read_split_parquet(fromdir): | |
with BytesIO() as s: | |
for file in os.listdir(fromdir): | |
with open(os.path.join(fromdir, file), 'rb') as f: | |
s.write(f.read()) | |
table = pq.read_table(s) | |
df = table.to_pandas() | |
return df |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment