Skip to content

Instantly share code, notes, and snippets.

@hrishikeshvganu
Forked from ywolff/wsgi.py
Last active October 11, 2020 19:09
Show Gist options
  • Save hrishikeshvganu/4e9412cc93edcc14d3c48e1353c88fdc to your computer and use it in GitHub Desktop.
Save hrishikeshvganu/4e9412cc93edcc14d3c48e1353c88fdc to your computer and use it in GitHub Desktop.
import threading
import time
from queue import Empty, Queue
import numpy as np
from flask import Flask, request as flask_request
from build_big_model import build_big_model
BATCH_SIZE = 20
BATCH_TIMEOUT = 0.5
CHECK_INTERVAL = 0.01
model = build_big_model()
requests_queue = Queue()
app = Flask(__name__)
def handle_requests_by_batch():
while True:
requests_batch = []
while not (
len(requests_batch) > BATCH_SIZE or
(len(requests_batch) > 0 and time.time() - requests_batch[0]['time'] > BATCH_TIMEOUT)
):
try:
requests_batch.append(requests_queue.get(timeout=CHECK_INTERVAL))
except Empty:
continue
batch_inputs = np.array([request['input'] for request in requests_batch])
batch_outputs = model.predict(batch_inputs)
for request, output in zip(requests_batch, batch_outputs):
request['output'] = output
'''
~~Doing threading.thread here starts a thread as a fork from the main thread.
The main thread for each worker runs under the main process of gunicorn
So in all 3 threads per gu~~
This is the main process from which threads are started. Not sure if 20 threads are created which run this
wsgi.py code. Looks like yes.
So 1 process->3 threads for each worker-><worker thread, app fork, threading fork>
The requsest_batch[] is a shared list-anything that's created in a thread can be shared
-unless explicitly a thread local variable is created
'''
threading.Thread(target=handle_requests_by_batch).start()
@app.route('/predict', methods=['POST'])
def predict():
received_input = np.array(flask_request.json['instances'][0])
#This look like a global time
request = {'input': received_input, 'time': time.time()}
requests_queue.put(request)
while 'output' not in request:
time.sleep(CHECK_INTERVAL)
return {'predictions': request['output'].tolist()}
if __name__ == '__main__':
app.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment