Skip to content

Instantly share code, notes, and snippets.

@KyleJamesWalker
Last active October 20, 2016 21:58
Show Gist options
  • Save KyleJamesWalker/a4370a2975b1ee615a657a67832f48c1 to your computer and use it in GitHub Desktop.
Save KyleJamesWalker/a4370a2975b1ee615a657a67832f48c1 to your computer and use it in GitHub Desktop.
Simple Multipart CVS Writer / Multipart S3 Uploader Examples
import csv
import gzip
import sys
import humanfriendly
class MultipartCSVWriter:
def __init__(self, dest_format, fieldnames, max_size,
compress=True, writeheader=True):
self.dest_format = dest_format
self.fieldnames = fieldnames
self.max_size = humanfriendly.parse_size(max_size)
self.compress = compress
self.writeheader = writeheader
self.total_size = 0
self.chunk = 0
self.fp = None
self.csv = None
def _next_fp(self):
if self.fp:
print("New file {}".format(self.fp.tell()))
self.fp.close()
self.chunk += 1
filename = self.dest_format.format(part=self.chunk)
if self.compress:
self.fp = gzip.open(filename, 'wt')
else:
self.fp = open(filename, 'wt')
self.csv = csv.DictWriter(self.fp, self.fieldnames)
if self.writeheader:
self.csv.writeheader()
def writerow(self, row):
# lengh of the string represntation of the row isn't perfect
if self.fp is None or self.fp.tell() + len(repr(row)) > self.max_size:
self._next_fp()
size_start = self.fp.tell()
self.csv.writerow(row)
self.total_size += self.fp.tell() - size_start
def close(self):
if self.fp:
self.fp.close()
self.fp = None
def main():
with open(sys.argv[1]) as csv_file:
csv_reader = csv.DictReader(csv_file)
multi_csv = MultipartCSVWriter("test-{part:05}.csv.gz",
csv_reader.fieldnames,
"4Mib")
for row in csv_reader:
multi_csv.writerow(row)
multi_csv.close()
if __name__ == '__main__':
main()
from io import BytesIO
import boto3
import humanfriendly
def main():
s3 = boto3.resource('s3')
key = s3.Object('my_bucket', 'prefix/sample/testing.txt')
multipart_upload = key.initiate_multipart_upload()
buf = BytesIO()
part = 1
while part < 5:
cur_part = multipart_upload.Part(part)
buf.write(bytearray(humanfriendly.parse_size("5MiB")))
cur_part.upload(
Body=buf.getvalue(),
)
buf.seek(0)
buf.truncate()
part += 1
multipart_upload.complete(
MultipartUpload={'Parts': [
{'ETag': part.e_tag, 'PartNumber': part.part_number}
for part in multipart_upload.parts.all()
]},
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment