Last active
December 2, 2022 18:57
-
-
Save uhho/a1490ae2abd112b556dcd539750aa151 to your computer and use it in GitHub Desktop.
Streaming pandas DataFrame to/from S3 with on-the-fly processing and GZIP compression
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def s3_to_pandas(client, bucket, key, header=None): | |
# get key using boto3 client | |
obj = client.get_object(Bucket=bucket, Key=key) | |
gz = gzip.GzipFile(fileobj=obj['Body']) | |
# load stream directly to DF | |
return pd.read_csv(gz, header=header, dtype=str) | |
def s3_to_pandas_with_processing(client, bucket, key, header=None): | |
# get key using boto3 client | |
obj = client.get_object(Bucket=bucket, Key=key) | |
gz = gzip.GzipFile(fileobj=obj['Body']) | |
# replace some characters in incomming stream and load it to DF | |
lines = "\n".join([line.replace('?', ' ') for line in gz.read().decode('utf-8').split('\n')]) | |
return pd.read_csv(io.StringIO(lines), header=None, dtype=str) | |
def pandas_to_s3(df, client, bucket, key): | |
# write DF to string stream | |
csv_buffer = io.StringIO() | |
df.to_csv(csv_buffer, index=False) | |
# reset stream position | |
csv_buffer.seek(0) | |
# create binary stream | |
gz_buffer = io.BytesIO() | |
# compress string stream using gzip | |
with gzip.GzipFile(mode='w', fileobj=gz_buffer) as gz_file: | |
gz_file.write(bytes(csv_buffer.getvalue(), 'utf-8')) | |
# write stream to S3 | |
obj = client.put_object(Bucket=bucket, Key=key, Body=gz_buffer.getvalue()) |
do you know how to write csv to s3 with utf-8 encoding. I keep seeing symbols like √ in my csv reports. So annoying. I'm using StringIO() and boto3.client.put_object()
thanks
do you know how to write csv to s3 with utf-8 encoding. I keep seeing symbols like √ in my csv reports. So annoying. I'm using StringIO() and boto3.client.put_object()
thanks
import boto3
from io import StringIO
BUCKET_NAME = 'YOUR_BUCKET_NAME'
DATA_KEY_CSV = 'here/my/file.csv'
s3 = boto3.resource('s3')
bucket = s3.Bucket(BUCKET_NAME)
csv_buffer = StringIO()
schedule_events.to_csv(csv_buffer, encoding="utf-8") # here is encoding you need
bucket.put_object(Body=csv_buffer.getvalue(), Key=DATA_KEY_CSV)
btw, no need to use gzip
when you read a file.
import s3fs
df = pd.read_json('s3://bucket/path/to/file.json.gz', orient="columns")
works well.
btw, no need to use
gzip
when you read a file.import s3fs df = pd.read_json('s3://bucket/path/to/file.json.gz', orient="columns")works well.
s3fs comes with a lot of dependencies and it is hard to install maintain.
This gist is awesome! thank you!
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Nice piece of code.
Any thoughts on whether / how we can make sure the
dtypes
can be maintained in this process?Update - this is how we solved this for now (leaving out some Class definitions, just showing the general idea):
Seems like we really need the
bytearray
to maintain thedtypes
...All feedback is more than welcome.