Last active
October 20, 2016 21:58
-
-
Save KyleJamesWalker/a4370a2975b1ee615a657a67832f48c1 to your computer and use it in GitHub Desktop.
Simple Multipart CVS Writer / Multipart S3 Uploader Examples
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import gzip | |
import sys | |
import humanfriendly | |
class MultipartCSVWriter: | |
def __init__(self, dest_format, fieldnames, max_size, | |
compress=True, writeheader=True): | |
self.dest_format = dest_format | |
self.fieldnames = fieldnames | |
self.max_size = humanfriendly.parse_size(max_size) | |
self.compress = compress | |
self.writeheader = writeheader | |
self.total_size = 0 | |
self.chunk = 0 | |
self.fp = None | |
self.csv = None | |
def _next_fp(self): | |
if self.fp: | |
print("New file {}".format(self.fp.tell())) | |
self.fp.close() | |
self.chunk += 1 | |
filename = self.dest_format.format(part=self.chunk) | |
if self.compress: | |
self.fp = gzip.open(filename, 'wt') | |
else: | |
self.fp = open(filename, 'wt') | |
self.csv = csv.DictWriter(self.fp, self.fieldnames) | |
if self.writeheader: | |
self.csv.writeheader() | |
def writerow(self, row): | |
# lengh of the string represntation of the row isn't perfect | |
if self.fp is None or self.fp.tell() + len(repr(row)) > self.max_size: | |
self._next_fp() | |
size_start = self.fp.tell() | |
self.csv.writerow(row) | |
self.total_size += self.fp.tell() - size_start | |
def close(self): | |
if self.fp: | |
self.fp.close() | |
self.fp = None | |
def main(): | |
with open(sys.argv[1]) as csv_file: | |
csv_reader = csv.DictReader(csv_file) | |
multi_csv = MultipartCSVWriter("test-{part:05}.csv.gz", | |
csv_reader.fieldnames, | |
"4Mib") | |
for row in csv_reader: | |
multi_csv.writerow(row) | |
multi_csv.close() | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from io import BytesIO | |
import boto3 | |
import humanfriendly | |
def main(): | |
s3 = boto3.resource('s3') | |
key = s3.Object('my_bucket', 'prefix/sample/testing.txt') | |
multipart_upload = key.initiate_multipart_upload() | |
buf = BytesIO() | |
part = 1 | |
while part < 5: | |
cur_part = multipart_upload.Part(part) | |
buf.write(bytearray(humanfriendly.parse_size("5MiB"))) | |
cur_part.upload( | |
Body=buf.getvalue(), | |
) | |
buf.seek(0) | |
buf.truncate() | |
part += 1 | |
multipart_upload.complete( | |
MultipartUpload={'Parts': [ | |
{'ETag': part.e_tag, 'PartNumber': part.part_number} | |
for part in multipart_upload.parts.all() | |
]}, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment