Skip to content

Instantly share code, notes, and snippets.

@uhho
Last active December 2, 2022 18:57
Show Gist options
  • Save uhho/a1490ae2abd112b556dcd539750aa151 to your computer and use it in GitHub Desktop.
Save uhho/a1490ae2abd112b556dcd539750aa151 to your computer and use it in GitHub Desktop.
Streaming pandas DataFrame to/from S3 with on-the-fly processing and GZIP compression
def s3_to_pandas(client, bucket, key, header=None):
# get key using boto3 client
obj = client.get_object(Bucket=bucket, Key=key)
gz = gzip.GzipFile(fileobj=obj['Body'])
# load stream directly to DF
return pd.read_csv(gz, header=header, dtype=str)
def s3_to_pandas_with_processing(client, bucket, key, header=None):
# get key using boto3 client
obj = client.get_object(Bucket=bucket, Key=key)
gz = gzip.GzipFile(fileobj=obj['Body'])
# replace some characters in incomming stream and load it to DF
lines = "\n".join([line.replace('?', ' ') for line in gz.read().decode('utf-8').split('\n')])
return pd.read_csv(io.StringIO(lines), header=None, dtype=str)
def pandas_to_s3(df, client, bucket, key):
# write DF to string stream
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
# reset stream position
csv_buffer.seek(0)
# create binary stream
gz_buffer = io.BytesIO()
# compress string stream using gzip
with gzip.GzipFile(mode='w', fileobj=gz_buffer) as gz_file:
gz_file.write(bytes(csv_buffer.getvalue(), 'utf-8'))
# write stream to S3
obj = client.put_object(Bucket=bucket, Key=key, Body=gz_buffer.getvalue())
@hansoksendahl
Copy link

hansoksendahl commented Nov 14, 2018

This is pretty sweet, any chance of turning it into a module and maybe wrapping some unit tests around it? 🤓

I would be happy to assist.

@katrienbaert
Copy link

katrienbaert commented Dec 5, 2018

Nice piece of code.
Any thoughts on whether / how we can make sure the dtypes can be maintained in this process?

Update - this is how we solved this for now (leaving out some Class definitions, just showing the general idea):

obj = pd.DataFrame({'id': [1, 2, 3], 'country': ['BE', 'FR', 'NL']})
contents = bytearray(source=obj.to_csv(index=False), encoding='utf-8')
# Persist on S3, using different approach depending on object size
if sys.getsizeof(contents) < 1e9:
    session.resource('s3').Bucket(bucket).put_object(Key=path, Body=contents)
else:
    local_store.persist(path=path, contents=contents)
    session.client('s3').upload_file(
        Filename=local_store.directory + path, Bucket=bucket, Key=path
    )
    if remove_local:
         os.remove(local_store.directory + path)
# Load from S3 bucket
response = session.client('s3').get_object(Bucket=bucket, Key=path)
obj_returned = response['Body'].read()
# Validate
obj_returned.equals(obj)

Seems like we really need the bytearray to maintain the dtypes...
All feedback is more than welcome.

@RMCollins175
Copy link

do you know how to write csv to s3 with utf-8 encoding. I keep seeing symbols like √ in my csv reports. So annoying. I'm using StringIO() and boto3.client.put_object()

thanks

@dobeerman7
Copy link

do you know how to write csv to s3 with utf-8 encoding. I keep seeing symbols like √ in my csv reports. So annoying. I'm using StringIO() and boto3.client.put_object()

thanks

import boto3
from io import StringIO

BUCKET_NAME = 'YOUR_BUCKET_NAME'
DATA_KEY_CSV = 'here/my/file.csv'

s3 = boto3.resource('s3')
bucket = s3.Bucket(BUCKET_NAME)

csv_buffer = StringIO()
schedule_events.to_csv(csv_buffer, encoding="utf-8")  # here is encoding you need
bucket.put_object(Body=csv_buffer.getvalue(), Key=DATA_KEY_CSV)

@dobeerman7
Copy link

btw, no need to use gzip when you read a file.

import s3fs

df = pd.read_json('s3://bucket/path/to/file.json.gz', orient="columns")

works well.

@gudata
Copy link

gudata commented Oct 26, 2021

btw, no need to use gzip when you read a file.

import s3fs

df = pd.read_json('s3://bucket/path/to/file.json.gz', orient="columns")

works well.

s3fs comes with a lot of dependencies and it is hard to install maintain.

@gudata
Copy link

gudata commented Oct 26, 2021

This gist is awesome! thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment