-
-
Save uhho/a1490ae2abd112b556dcd539750aa151 to your computer and use it in GitHub Desktop.
def s3_to_pandas(client, bucket, key, header=None): | |
# get key using boto3 client | |
obj = client.get_object(Bucket=bucket, Key=key) | |
gz = gzip.GzipFile(fileobj=obj['Body']) | |
# load stream directly to DF | |
return pd.read_csv(gz, header=header, dtype=str) | |
def s3_to_pandas_with_processing(client, bucket, key, header=None): | |
# get key using boto3 client | |
obj = client.get_object(Bucket=bucket, Key=key) | |
gz = gzip.GzipFile(fileobj=obj['Body']) | |
# replace some characters in incomming stream and load it to DF | |
lines = "\n".join([line.replace('?', ' ') for line in gz.read().decode('utf-8').split('\n')]) | |
return pd.read_csv(io.StringIO(lines), header=None, dtype=str) | |
def pandas_to_s3(df, client, bucket, key): | |
# write DF to string stream | |
csv_buffer = io.StringIO() | |
df.to_csv(csv_buffer, index=False) | |
# reset stream position | |
csv_buffer.seek(0) | |
# create binary stream | |
gz_buffer = io.BytesIO() | |
# compress string stream using gzip | |
with gzip.GzipFile(mode='w', fileobj=gz_buffer) as gz_file: | |
gz_file.write(bytes(csv_buffer.getvalue(), 'utf-8')) | |
# write stream to S3 | |
obj = client.put_object(Bucket=bucket, Key=key, Body=gz_buffer.getvalue()) |
Nice piece of code.
Any thoughts on whether / how we can make sure the dtypes
can be maintained in this process?
Update - this is how we solved this for now (leaving out some Class definitions, just showing the general idea):
obj = pd.DataFrame({'id': [1, 2, 3], 'country': ['BE', 'FR', 'NL']})
contents = bytearray(source=obj.to_csv(index=False), encoding='utf-8')
# Persist on S3, using different approach depending on object size
if sys.getsizeof(contents) < 1e9:
session.resource('s3').Bucket(bucket).put_object(Key=path, Body=contents)
else:
local_store.persist(path=path, contents=contents)
session.client('s3').upload_file(
Filename=local_store.directory + path, Bucket=bucket, Key=path
)
if remove_local:
os.remove(local_store.directory + path)
# Load from S3 bucket
response = session.client('s3').get_object(Bucket=bucket, Key=path)
obj_returned = response['Body'].read()
# Validate
obj_returned.equals(obj)
Seems like we really need the bytearray
to maintain the dtypes
...
All feedback is more than welcome.
do you know how to write csv to s3 with utf-8 encoding. I keep seeing symbols like √ in my csv reports. So annoying. I'm using StringIO() and boto3.client.put_object()
thanks
do you know how to write csv to s3 with utf-8 encoding. I keep seeing symbols like √ in my csv reports. So annoying. I'm using StringIO() and boto3.client.put_object()
thanks
import boto3
from io import StringIO
BUCKET_NAME = 'YOUR_BUCKET_NAME'
DATA_KEY_CSV = 'here/my/file.csv'
s3 = boto3.resource('s3')
bucket = s3.Bucket(BUCKET_NAME)
csv_buffer = StringIO()
schedule_events.to_csv(csv_buffer, encoding="utf-8") # here is encoding you need
bucket.put_object(Body=csv_buffer.getvalue(), Key=DATA_KEY_CSV)
btw, no need to use gzip
when you read a file.
import s3fs
df = pd.read_json('s3://bucket/path/to/file.json.gz', orient="columns")
works well.
btw, no need to use
gzip
when you read a file.import s3fs df = pd.read_json('s3://bucket/path/to/file.json.gz', orient="columns")works well.
s3fs comes with a lot of dependencies and it is hard to install maintain.
This gist is awesome! thank you!
This is pretty sweet, any chance of turning it into a module and maybe wrapping some unit tests around it? 🤓
I would be happy to assist.