Created
January 18, 2018 21:03
-
-
Save erikjohnston/a12016bcc6ac88dd4047cb19cff8e79d to your computer and use it in GitHub Desktop.
A storage provider that fetches media from an S3 bucket
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from twisted.internet import defer, threads, reactor | |
from twisted.python.failure import Failure | |
from synapse.rest.media.v1.storage_provider import StorageProvider | |
from synapse.rest.media.v1._base import Responder | |
import boto3 | |
import botocore | |
import logging | |
import threading | |
logger = logging.getLogger("synapse.s3") | |
class S3StorageProviderBackend(StorageProvider): | |
""" | |
Args: | |
hs (HomeServer) | |
config: The config returned by `parse_config` | |
""" | |
def __init__(self, hs, config): | |
self.cache_directory = hs.config.media_store_path | |
self.bucket = config | |
self.s3 = boto3.client('s3') | |
def store_file(self, path, file_info): | |
"""See StorageProvider.store_file""" | |
pass | |
def fetch(self, path, file_info): | |
"""See StorageProvider.fetch""" | |
d = defer.Deferred() | |
S3DownloadThread(self.bucket, path, d).start() | |
return d | |
@staticmethod | |
def parse_config(config): | |
"""Called on startup to parse config supplied. This should parse | |
the config and raise if there is a problem. | |
The returned value is passed into the constructor. | |
In this case we only care about a single param, the bucket, so lets | |
just pull that out. | |
""" | |
assert isinstance(config["bucket"], basestring) | |
return config["bucket"] | |
class S3Responder(Responder): | |
def __init__(self, wakeup_event, stop_event): | |
self.wakeup_event = wakeup_event | |
self.stop_event = stop_event | |
self.consumer = None | |
self.deferred = defer.Deferred() | |
def write_to_consumer(self, consumer): | |
self.consumer = consumer | |
consumer.registerProducer(self, False) | |
return self.deferred | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
self.stop_event.set() | |
self.wakeup_event.set() | |
def resumeProducing(self): | |
self.wakeup_event.set() | |
def stopProducing(self): | |
self.stop_event.set() | |
self.wakeup_event.set() | |
self.deferred.errback(Exception("Consumer ask to stop producing")) | |
def _write(self, chunk): | |
if self.consumer and not self.stop_event.is_set(): | |
self.consumer.write(chunk) | |
def _error(self, failure): | |
if self.consumer: | |
self.consumer.unregisterProducer() | |
self.consumer = None | |
if not self.deferred.called: | |
self.deferred.errback(failure) | |
def _finish(self): | |
if self.consumer: | |
self.consumer.unregisterProducer() | |
self.consumer = None | |
if not self.deferred.called: | |
self.deferred.callback(None) | |
class S3DownloadThread(threading.Thread): | |
def __init__(self, bucket, key, deferred): | |
super(S3DownloadThread, self).__init__(name="s3-download") | |
self.bucket = bucket | |
self.key = key | |
self.deferred = deferred | |
def run(self): | |
session = boto3.session.Session() | |
s3 = session.client('s3') | |
try: | |
resp = s3.get_object(Bucket=self.bucket, Key=self.key) | |
except botocore.exceptions.ClientError as e: | |
if e.response['Error']['Code'] == "404": | |
self.deferred.callback(None) | |
return | |
self.deferred.errback(Failure()) | |
return | |
wakeup_event = threading.Event() | |
stop_event = threading.Event() | |
producer = S3Responder(wakeup_event, stop_event) | |
self.deferred.callback(producer) | |
try: | |
body = resp["Body"] | |
while not stop_event.is_set(): | |
wakeup_event.wait() | |
if stop_event.is_set(): | |
return | |
chunk = body.read(4096) | |
if not chunk: | |
return | |
wakeup_event.clear() | |
reactor.callFromThread(producer._write, chunk) | |
except Exception: | |
reactor.callFromThread(producer._error, Failure()) | |
return | |
finally: | |
reactor.callFromThread(producer._finish) | |
if body: | |
body.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment