-
-
Save hrishikeshvganu/4e9412cc93edcc14d3c48e1353c88fdc to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
import time | |
from queue import Empty, Queue | |
import numpy as np | |
from flask import Flask, request as flask_request | |
from build_big_model import build_big_model | |
BATCH_SIZE = 20 | |
BATCH_TIMEOUT = 0.5 | |
CHECK_INTERVAL = 0.01 | |
model = build_big_model() | |
requests_queue = Queue() | |
app = Flask(__name__) | |
def handle_requests_by_batch(): | |
while True: | |
requests_batch = [] | |
while not ( | |
len(requests_batch) > BATCH_SIZE or | |
(len(requests_batch) > 0 and time.time() - requests_batch[0]['time'] > BATCH_TIMEOUT) | |
): | |
try: | |
requests_batch.append(requests_queue.get(timeout=CHECK_INTERVAL)) | |
except Empty: | |
continue | |
batch_inputs = np.array([request['input'] for request in requests_batch]) | |
batch_outputs = model.predict(batch_inputs) | |
for request, output in zip(requests_batch, batch_outputs): | |
request['output'] = output | |
''' | |
~~Doing threading.thread here starts a thread as a fork from the main thread. | |
The main thread for each worker runs under the main process of gunicorn | |
So in all 3 threads per gu~~ | |
This is the main process from which threads are started. Not sure if 20 threads are created which run this | |
wsgi.py code. Looks like yes. | |
So 1 process->3 threads for each worker-><worker thread, app fork, threading fork> | |
The requsest_batch[] is a shared list-anything that's created in a thread can be shared | |
-unless explicitly a thread local variable is created | |
''' | |
threading.Thread(target=handle_requests_by_batch).start() | |
@app.route('/predict', methods=['POST']) | |
def predict(): | |
received_input = np.array(flask_request.json['instances'][0]) | |
#This look like a global time | |
request = {'input': received_input, 'time': time.time()} | |
requests_queue.put(request) | |
while 'output' not in request: | |
time.sleep(CHECK_INTERVAL) | |
return {'predictions': request['output'].tolist()} | |
if __name__ == '__main__': | |
app.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment