Streaming pandas DataFrame to/from S3 with on-the-fly processing and GZIP compression
def s3_to_pandas(client, bucket, key, header=None):
# get key using boto3 client
obj = client.get_object(Bucket=bucket, Key=key)
gz = gzip.GzipFile(fileobj=obj['Body'])
# load stream directly to DF
return pd.read_csv(gz, header=header, dtype=str)
def s3_to_pandas_with_processing(client, bucket, key, header=None):
# get key using boto3 client
obj = client.get_object(Bucket=bucket, Key=key)
gz = gzip.GzipFile(fileobj=obj['Body'])
# replace some characters in incomming stream and load it to DF
lines = "\n".join([line.replace('?', ' ') for line in'utf-8').split('\n')])
return pd.read_csv(io.StringIO(lines), header=None, dtype=str)
def pandas_to_s3(df, client, bucket, key):
# write DF to string stream
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
# reset stream position
# create binary stream
gz_buffer = io.BytesIO()
# compress string stream using gzip
with gzip.GzipFile(mode='w', fileobj=gz_buffer) as gz_file:
gz_file.write(bytes(csv_buffer.getvalue(), 'utf-8'))
# write stream to S3
obj = client.put_object(Bucket=bucket, Key=key, Body=gz_buffer.getvalue())
What is key here?

@devm2024 It is a usual key of object or path to object in non-s3 terms

ghost commented Nov 14, 2018

This is pretty sweet, any chance of turning it into a module and maybe wrapping some unit tests around it? 🤓

I would be happy to assist.

katrienbaert commented Dec 5, 2018

Nice piece of code.
Any thoughts on whether / how we can make sure the dtypes can be maintained in this process?

Update - this is how we solved this for now (leaving out some Class definitions, just showing the general idea):

obj = pd.DataFrame({'id': [1, 2, 3], 'country': ['BE', 'FR', 'NL']})
contents = bytearray(source=obj.to_csv(index=False), encoding='utf-8')
# Persist on S3, using different approach depending on object size
if sys.getsizeof(contents) < 1e9:
    session.resource('s3').Bucket(bucket).put_object(Key=path, Body=contents)
    local_store.persist(path=path, contents=contents)
    session.client('s3').upload_file( + path, Bucket=bucket, Key=path
    if remove_local:
         os.remove( + path)
# Load from S3 bucket
response = session.client('s3').get_object(Bucket=bucket, Key=path)
obj_returned = response['Body'].read()
# Validate

Seems like we really need the bytearray to maintain the dtypes...
All feedback is more than welcome.

do you know how to write csv to s3 with utf-8 encoding. I keep seeing symbols like √ in my csv reports. So annoying. I'm using StringIO() and boto3.client.put_object()


import boto3
from io import StringIO

DATA_KEY_CSV = 'here/my/file.csv'

s3 = boto3.resource('s3')
bucket = s3.Bucket(BUCKET_NAME)

csv_buffer = StringIO()
schedule_events.to_csv(csv_buffer, encoding="utf-8")  # here is encoding you need
bucket.put_object(Body=csv_buffer.getvalue(), Key=DATA_KEY_CSV)

btw, no need to use gzip when you read a file.

import s3fs

df = pd.read_json('s3://bucket/path/to/file.json.gz', orient="columns")

works well.

gudata commented Oct 26, 2021

s3fs comes with a lot of dependencies and it is hard to install maintain.

gudata commented Oct 26, 2021

This gist is awesome! thank you!

