Created
July 5, 2019 05:37
-
-
Save dpedu/49353778db399e70f90cfda1407fb49d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class S3WriteProxy(object): | |
def __init__(self, storage, key): | |
""" | |
A file-like class that accepts writes (until the writer closes me) and accepts reads (until closed) | |
""" | |
self.storage = storage | |
self.key = key | |
self.q = Queue(maxsize=16) # 16 # number of 4 byte buffers (OS enforced ... ) | |
self.leftovers = None | |
self.eof = False | |
self.uploaded = Queue(maxsize=1) | |
Thread(target=self._upload).start() | |
def _upload(self): | |
self.storage.s3.upload_fileobj(Fileobj=self, | |
Bucket=self.storage.bucket, | |
Key=self.key) | |
self.uploaded.put(None) | |
def write(self, data): | |
self.q.put(data) | |
def read(self, maxsize=0): | |
if self.eof: | |
return b'' | |
buf = self.leftovers if self.leftovers else b'' | |
while True: | |
p = self.q.get() | |
if not p: | |
self.eof = True | |
break | |
buf += p | |
if len(buf) >= maxsize: | |
break | |
if len(buf) > maxsize: | |
self.leftovers = buf[maxsize:] | |
buf = buf[0:maxsize] | |
else: | |
self.leftovers = None | |
return buf | |
def close(self): | |
self.q.put(None) | |
self.uploaded.get() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment