Skip to content

Instantly share code, notes, and snippets.

@ojii
Created April 12, 2017 08:21
Show Gist options
  • Save ojii/a3443ac793036d75a44e47bd0a68ac8d to your computer and use it in GitHub Desktop.
Save ojii/a3443ac793036d75a44e47bd0a68ac8d to your computer and use it in GitHub Desktop.
Run moto in a separate process so it works on py3
from contextlib import contextmanager
from multiprocessing import Process, Queue
from moto.server import DomainDispatcherApplication, create_backend_app
from werkzeug.serving import make_server
def server_runner(port_queue: Queue, service: str):
main_app = DomainDispatcherApplication(create_backend_app, service=service)
server = make_server(host='localhost', port=0, app=main_app, threaded=True)
port_queue.put(server.port)
server.serve_forever()
@contextmanager
def moto_server(service):
"""
Start a Moto (fake aws) service and yield the port it runs on.
"""
port_queue = Queue()
process = Process(target=server_runner, args=(port_queue, service))
process.start()
try:
yield port_queue.get(timeout=5)
finally:
process.terminate()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment