Created
January 9, 2025 20:08
-
-
Save deepai-org/0887f44606d67247a8a164da1506ff2c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from __future__ import division | |
from __future__ import absolute_import | |
from __future__ import with_statement | |
from __future__ import print_function | |
from __future__ import unicode_literals | |
import collections | |
import random | |
import time | |
import uuid | |
from hashlib import sha1 | |
from threading import Lock | |
import msgpack | |
from redis.client import Pipeline | |
from redis.exceptions import NoScriptError | |
# Queue TODO | |
# Finish tests, performance tests | |
# It def requires redis 5 just due to zpopmax. | |
# Make example of running a ai-integration backend from within django. | |
# Replace the way that queue sizes are monitored in cron/nagios/etc... | |
# Integrate unified queue into batch mode. Then deprecate old batch mode workers! | |
# Ensure batch mode can run at 5000Qps ??? | |
# How often get_next_request will ping redis to mark an idle backend as still alive | |
DEFAULT_IDLE_KEEPALIVE_INTERVAL_SECONDS = 55 | |
DEFAULT_IDLE_BACKEND_MAX_AGE_SECONDS = DEFAULT_IDLE_KEEPALIVE_INTERVAL_SECONDS + 5 # Tuned to be just a bit longer than the max keepalive interval | |
QUEUE_PRIORITY_BATCH = 1 # Lowest priority | |
QUEUE_PRIORITY_STANDARD = 2 | |
QUEUE_PRIORITY_TRYIT = 3 | |
QUEUE_PRIORITY_REALTIME = 4 # Highest priority | |
QUEUE_PRIORITY_LEVELS = [QUEUE_PRIORITY_BATCH, QUEUE_PRIORITY_STANDARD, QUEUE_PRIORITY_TRYIT, QUEUE_PRIORITY_REALTIME] | |
# Sorted set scores can store integers up to 9007199254740992 without precision loss. | |
# Unix timestamp of year 3000 is 32503680000, in milliseconds it is 32503680000000 | |
# 9007199254740992 / 32503680000000 = 277. | |
# We can have store 3000 years 277 times so we can have 277 priority levels | |
REDIS_SORTED_SET_MAX = 9000000000000000 # 9007199254740992 is the real value but this looks nicer and may be easier to reason about | |
REDIS_PRIORITY_LEVEL_SCALE_FACTOR = 32503680000000 # The number to multiply your priority level by to get the offset for a given priority level. | |
SUBMIT_REQUEST_LUA_SCRIPT = None # This will get set the first time submit is called. | |
SUBMIT_REQUEST_LUA_SCRIPT_HASH = None # This will get set the first time submit is called. | |
CANCEL_REQUEST_LUA_SCRIPT = None # This will get set the first time cancel is called. | |
CANCEL_REQUEST_LUA_SCRIPT_HASH = None # This will get set the first time cancel is called. | |
GET_REQUEST_LUA_SCRIPT = None # This will get set the first get request is called. | |
GET_REQUEST_LUA_SCRIPT_HASH = None # This will get set the first get request is called. | |
GET_QUEUE_FULL_OVERVIEW_LUA_SCRIPT = None # This will get set the first time get full overview is called. | |
GET_QUEUE_FULL_OVERVIEW_LUA_SCRIPT_HASH = None # This will get set the first time get full overview is called. | |
DELETE_QUEUE_LUA_SCRIPT = None # This will get set the first time delete queue is called. | |
DELETE_QUEUE_LUA_SCRIPT_HASH = None # This will get set the first delete queue is called. | |
PRINT_SCRIPTS = False | |
SCRIPT_GENERATOR_LOCK = Lock() | |
# Key Names in Redis: | |
# | |
# All keys are model-specific | |
# | |
# User-Queue lookup: | |
# | |
# SORTED SET densecap-user-data-to-userqueue (map "ip-123.123.123.123" or "user-4000" to userqueue id: random number between 0-9007199254740992 ) | |
# (make this set self cleaning, delete all keys with ZREMRANGEBYSCORE the moment a userqueue becomes empty) | |
# (this allows easy bidirectional lookup, given a userqueue id we can find the ips and users, and given ip/user we can find the userqueue) | |
# SORTED SET densecap-userqueue-last-dequeued-times (map userqueue id like "99254740992" to the last unix timestamp in milliseconds when a request was dequeued and sent to a backend) | |
# (make this set self cleaning, delete with ... the moment a userqueue becomes empty) | |
# (or rather use zpopmin and do zadd to add it back under the new score within the same transaction) | |
# Use ZPOPMIN to get the userqueue that was handled the longest ago. This implements round robin. When inserting a | |
# new userqueue - use the current time as last handled. New requests go to the back of the round robin line. | |
# | |
# SORTED SET densecap-currently-idle-backends (map backend id -> backend priority) | |
# SORTED SET densecap-userqueue-99254740992 (id is in the queue name - map request uuid to insert timestamp in milliseconds) | |
# Only store the uuid of the request in the sorted set for perf. Generate the uuid in python. no dashes in uuid. | |
# HASH densecap-requests-by-uuid (map uuid of request to full request) | |
# Make this hash self cleaning, delete as you go. | |
# LIST densecap-idle-backend-fd98df7df987df (This is the inbox of a single idle backend container ... that the backend will BLPOP on to wait for the message) | |
def get_currently_idle_backends_name(queue_name): | |
return queue_name + "-currently-idle-backends" | |
def get_idle_backend_id_to_expire_time_name(queue_name): | |
return queue_name + "-idle-backend-id-to-expire-time" | |
def get_user_data_to_userqueue_name(queue_name): | |
return queue_name + "-user-data-to-userqueue" | |
def get_userqueue_last_dequeued_times_name(queue_name): | |
return queue_name + "-userqueue-last-dequeued-times" | |
def get_requests_by_uuid_name(queue_name): | |
return queue_name + "-requests-by-uuid" | |
def get_userqueue_key_name(queue_name, userqueue_id): | |
return queue_name + "-userqueue-" + userqueue_id | |
def redis_conn_is_pipeline(redis_conn): | |
if isinstance(redis_conn, Pipeline): | |
return True | |
return False | |
# Returns (request_uuid, userqueue_id) | |
# userqueue_id might be None if the request got handled immediately with no queueing. | |
def submit_request(redis_conn=None, # Required - this can be a pipeline | |
request_bytes=None, # Required | |
queue_name=None, # Required | |
user_id=None, # Optional | |
user_ip_addr=None, # Optional | |
priority_level=None, # Required | |
send_to_backend_id=None, # Optional - send to a specific backend ID instead of any backend... | |
send_full_script_for_pipeline=False | |
# Optional - send the full script instead of just the hash - intended to be used at the start of a pipeline... | |
): | |
# Use this to send a request to a model | |
global SUBMIT_REQUEST_LUA_SCRIPT | |
global SUBMIT_REQUEST_LUA_SCRIPT_HASH | |
global SCRIPT_GENERATOR_LOCK | |
if redis_conn is None: | |
raise Exception('redis_conn must be passed') | |
if request_bytes is None: | |
raise Exception('request_bytes must be passed') | |
if not queue_name: | |
raise Exception('queue_name must be passed') | |
if priority_level not in QUEUE_PRIORITY_LEVELS: | |
raise Exception('priority_level must passed and one of the QUEUE_PRIORITY_LEVELS') | |
is_pipeline = redis_conn_is_pipeline(redis_conn) | |
# if is_pipeline: | |
# print('This connection is a pipeline') | |
# else: | |
# print('This connection is NOT a pipeline') | |
if send_to_backend_id is None: | |
send_to_backend_id = '' # cant send a None value to redis but can send an empty string | |
new_userqueue_id = random.randint(0, | |
REDIS_SORTED_SET_MAX) # type: int # This is the id of the new userqueue queue if we end up needing to create a new one | |
request_uuid = uuid.uuid4().hex # type: str | |
time_in_millis = int(1000 * time.time()) | |
if user_id is None: | |
user_id = "anonymous-" + str(random.randint(0, | |
100000000)) # If this request is for an anonymous user - do not try to group by user id at all. Instead only group by IP address. | |
else: | |
user_id = "userid-" + str(user_id) | |
if user_ip_addr is None: | |
user_ip_addr = "unknownipaddr-" + str(random.randint(0, | |
100000000)) # If for some weird reason we don't have a source ip address, don't try to group by it. Instead group only by user id. | |
else: | |
user_ip_addr = "ipaddr-" + str(user_ip_addr) | |
# Higher priority -> Head of queue | |
# More recent -> back of queue | |
this_request_queue_order = ( | |
priority_level * REDIS_PRIORITY_LEVEL_SCALE_FACTOR) - time_in_millis # This is designed to work until year 3000, see notes above on math. | |
currently_idle_backends_name = get_currently_idle_backends_name(queue_name) | |
user_data_to_userqueue_name = get_user_data_to_userqueue_name(queue_name) | |
userqueue_last_dequeued_times_name = get_userqueue_last_dequeued_times_name(queue_name) | |
requests_by_uuid_name = get_requests_by_uuid_name(queue_name) | |
# Only the key variable NAMES are put into the lua script... The values are passed when script is run. | |
lua_keys = collections.OrderedDict() | |
lua_keys['currently_idle_backends_name'] = currently_idle_backends_name | |
lua_keys['idle_backend_id_to_expire_time_name'] = get_idle_backend_id_to_expire_time_name(queue_name) | |
lua_keys['user_data_to_userqueue_name'] = user_data_to_userqueue_name | |
lua_keys['userqueue_last_dequeued_times_name'] = userqueue_last_dequeued_times_name | |
lua_keys['requests_by_uuid_name'] = requests_by_uuid_name | |
# Only the argument variable NAMES are put into the lua script... The values are passed when script is run. | |
lua_args = collections.OrderedDict() | |
lua_args['queue_name'] = queue_name | |
lua_args['request_bytes'] = request_bytes | |
lua_args['user_id'] = user_id | |
lua_args['user_ip_addr'] = user_ip_addr | |
lua_args['new_userqueue_id'] = new_userqueue_id | |
lua_args['time_in_millis'] = time_in_millis | |
lua_args['this_request_queue_order'] = this_request_queue_order | |
lua_args['request_uuid'] = request_uuid | |
lua_args['send_to_backend_id'] = send_to_backend_id | |
with SCRIPT_GENERATOR_LOCK: | |
if SUBMIT_REQUEST_LUA_SCRIPT is None: | |
if PRINT_SCRIPTS: | |
print('Generating Submit Request Lua Script...') | |
# You can't change the script source code on every request such with putting a dynamic variable in the source code. | |
# That would not use the redis compiled script cache and be bad for performance. | |
# Instead use args or keys. | |
# NOTE! This script will not work on a redis server using cluster mode because it accesses keys not mentioned in the KEYS array. | |
# One example of that is the lpush into the backend inbox which needs to do a dynamic lookup... | |
# Lua Pseudocode | |
# IF there's any idle backends: | |
# put the request in the highest priority backend | |
# ELSE: | |
# put the request in the user queue and wait for a model to pick it up. (don't wait in this function, return immediately.) | |
# Set the last dequeued time on the userqueue to the current time ONLY if we are creating a new queue in this call... | |
lua_keys_vars = "" | |
for i, (key, value) in enumerate(lua_keys.items()): | |
lua_keys_vars += u"local {} = KEYS[{}]\n".format(key, i + 1) | |
lua_args_vars = "" | |
for i, (key, value) in enumerate(lua_args.items()): | |
lua_args_vars += u"local {} = ARGV[{}]\n".format(key, i + 1) | |
SUBMIT_REQUEST_LUA_SCRIPT = lua_keys_vars + lua_args_vars + u""" | |
-- first check if we are sending direct to a single backend, the simplest case. | |
if send_to_backend_id ~= '' then | |
local backend_inbox_name = queue_name .. "-idle-backend-" .. send_to_backend_id | |
redis.call("zrem", currently_idle_backends_name, send_to_backend_id) -- remove the target backend from idle set. | |
redis.call("zrem", idle_backend_id_to_expire_time_name, send_to_backend_id) -- also remove from expire time set... | |
redis.call("lpush", backend_inbox_name, request_bytes) | |
return false -- this is how you return None | |
end | |
-- remove idle backends that have expired: | |
-- remove from idle_backend_id_to_expire_time_name where score < current time... | |
-- ZREMRANGEBYSCORE key min max | |
redis.call("zremrangebyscore", idle_backend_id_to_expire_time_name, 0, time_in_millis) | |
-- Then keep only the non expired backends, careful not to destroy the backend priorities. | |
-- ZINTERSTORE destination numkeys key key WEIGHTS weight weight AGGREGATE SUM | |
redis.call("zinterstore", currently_idle_backends_name, 2, currently_idle_backends_name, idle_backend_id_to_expire_time_name, "WEIGHTS", 1, 0, "AGGREGATE", "SUM") | |
local num_idle_backends = tonumber(redis.call("zcard", currently_idle_backends_name)) | |
if num_idle_backends > 0 then | |
local backend_id = redis.call("zpopmax", currently_idle_backends_name)[1] | |
redis.call("zrem", idle_backend_id_to_expire_time_name, backend_id) -- also remove the expire time of the previously idle backend... | |
local backend_inbox_name = queue_name .. "-idle-backend-" .. backend_id | |
redis.call("lpush", backend_inbox_name, request_bytes) | |
return false -- this is how you return None | |
else | |
local user_id_queue_lookup = redis.call("zscore", user_data_to_userqueue_name, user_id) | |
local ip_addr_queue_lookup = redis.call("zscore", user_data_to_userqueue_name, user_ip_addr) | |
local chosen_userqueue_id | |
if user_id_queue_lookup ~= false then | |
chosen_userqueue_id = user_id_queue_lookup | |
elseif ip_addr_queue_lookup ~= false then | |
chosen_userqueue_id = ip_addr_queue_lookup | |
else | |
chosen_userqueue_id = new_userqueue_id -- this is a fresh queue id | |
end | |
-- mark the IP address and user id as pointing to the userqueue we just chose | |
-- this might overwrite but that is fine | |
redis.call("zadd", user_data_to_userqueue_name, chosen_userqueue_id, user_id) -- make user id point to chosen queue | |
redis.call("zadd", user_data_to_userqueue_name, chosen_userqueue_id, user_ip_addr) -- make ip addr point to chosen queue | |
local last_dequeued_time = redis.call("zscore", userqueue_last_dequeued_times_name, chosen_userqueue_id) | |
if last_dequeued_time == false then | |
-- we just created a new queue - set the last dequeued time so that this request goes to the back of the round robin line | |
redis.call("zadd", userqueue_last_dequeued_times_name, time_in_millis, chosen_userqueue_id) | |
end | |
-- add to requests by uuid hash | |
redis.call("hset", requests_by_uuid_name, request_uuid, request_bytes) | |
-- add to userqueue by uuid | |
local userqueue_key_name = queue_name .. "-userqueue-" .. chosen_userqueue_id | |
redis.call("zadd", userqueue_key_name, this_request_queue_order, request_uuid) | |
return chosen_userqueue_id | |
end | |
""" | |
SUBMIT_REQUEST_LUA_SCRIPT = SUBMIT_REQUEST_LUA_SCRIPT.encode('utf-8') | |
SUBMIT_REQUEST_LUA_SCRIPT_HASH = sha1(SUBMIT_REQUEST_LUA_SCRIPT).hexdigest() | |
if PRINT_SCRIPTS: | |
print('Succesfully Generated Submit Request Lua Script...') | |
print(SUBMIT_REQUEST_LUA_SCRIPT) | |
else: | |
if PRINT_SCRIPTS: | |
print('Using pre-generated Submit Request Lua Script...') | |
numkeys = len(lua_keys) | |
keys_and_args = list(lua_keys.values()) + list(lua_args.values()) | |
if is_pipeline: | |
if send_full_script_for_pipeline: | |
# call this the first time inside a pipeline | |
redis_conn.script_load(SUBMIT_REQUEST_LUA_SCRIPT) | |
# for pipeline mode assume script already loaded. | |
redis_conn.evalsha(SUBMIT_REQUEST_LUA_SCRIPT_HASH, numkeys, *keys_and_args) | |
return (request_uuid, None,) # this is all we can return in pipeline mode... | |
else: | |
try: | |
chosen_userqueue_id = redis_conn.evalsha(SUBMIT_REQUEST_LUA_SCRIPT_HASH, numkeys, *keys_and_args) | |
except NoScriptError: | |
chosen_userqueue_id = redis_conn.eval(SUBMIT_REQUEST_LUA_SCRIPT, numkeys, *keys_and_args) | |
if chosen_userqueue_id is None: | |
print('submit_request has sent request to an idle backend.') | |
else: | |
chosen_userqueue_id = chosen_userqueue_id.decode('utf-8') | |
print( | |
'submit_request has enqueued the request - no idle backends. userqueue id = {}'.format( | |
chosen_userqueue_id)) | |
return (request_uuid, chosen_userqueue_id,) | |
def cancel_request(redis_conn=None, # Required, | |
request_uuid=None, | |
queue_name=None, | |
userqueue_id=None): | |
# Use this to cancel a request you sent to a model - indicating that you are no longer interested | |
# This function should be safe to call multiple times, or if the request was already dequeued. | |
global SCRIPT_GENERATOR_LOCK | |
global CANCEL_REQUEST_LUA_SCRIPT | |
global CANCEL_REQUEST_LUA_SCRIPT_HASH | |
if not redis_conn: | |
raise Exception('redis_conn must be passed') | |
if not request_uuid: | |
raise Exception('request_uuid must be passed') | |
if not queue_name: | |
raise Exception('queue_name must be passed') | |
if not userqueue_id: | |
raise Exception('userqueue_id must be passed') | |
userqueue_key_name = get_userqueue_key_name(queue_name, userqueue_id) | |
requests_by_uuid_name = get_requests_by_uuid_name(queue_name) | |
userqueue_last_dequeued_times_name = get_userqueue_last_dequeued_times_name(queue_name) | |
user_data_to_userqueue_name = get_user_data_to_userqueue_name(queue_name) | |
# Only the key variable NAMES are put into the lua script... The values are passed when script is run. | |
lua_keys = collections.OrderedDict() | |
lua_keys['userqueue_key_name'] = userqueue_key_name | |
lua_keys['requests_by_uuid_name'] = requests_by_uuid_name | |
lua_keys['userqueue_last_dequeued_times_name'] = userqueue_last_dequeued_times_name | |
lua_keys['user_data_to_userqueue_name'] = user_data_to_userqueue_name | |
# Only the argument variable NAMES are put into the lua script... The values are passed when script is run. | |
lua_args = collections.OrderedDict() | |
lua_args['request_uuid'] = request_uuid | |
lua_args['userqueue_id'] = userqueue_id | |
with SCRIPT_GENERATOR_LOCK: | |
if CANCEL_REQUEST_LUA_SCRIPT is None: | |
if PRINT_SCRIPTS: | |
print('Generating Cancel Request Lua Script...') | |
# You can't change the script source code on every request such with putting a dynamic variable in the source code. | |
# That would not use the redis compiled script cache and be bad for performance. | |
# Instead use args or keys. | |
# Lua Pseudocode | |
# delete from userqueue | |
# delete from densecap-requests-by-uuid | |
# if needed, delete userqueue and all its pieces | |
lua_keys_vars = "" | |
for i, (key, value) in enumerate(lua_keys.items()): | |
lua_keys_vars += u"local {} = KEYS[{}]\n".format(key, i + 1) | |
lua_args_vars = "" | |
for i, (key, value) in enumerate(lua_args.items()): | |
lua_args_vars += u"local {} = ARGV[{}]\n".format(key, i + 1) | |
CANCEL_REQUEST_LUA_SCRIPT = lua_keys_vars + lua_args_vars + u""" | |
redis.call("zrem", userqueue_key_name, request_uuid) | |
redis.call("hdel", requests_by_uuid_name, request_uuid) | |
local new_userqueue_size = tonumber(redis.call("zcard", userqueue_key_name)) | |
if new_userqueue_size == 0 then | |
-- the userqueue is empty, delete all traces of it. | |
redis.call("zrem", userqueue_last_dequeued_times_name, userqueue_id) | |
-- This sorted set maps user data to queue ID (integer). So you can remove all entries that map to this queue id by removing those with score between [userqueue_id, userqueue_id] inclusive. | |
-- ZREMRANGEBYSCORE key min max | |
redis.call("zremrangebyscore", user_data_to_userqueue_name, userqueue_id, userqueue_id) | |
end | |
""" | |
CANCEL_REQUEST_LUA_SCRIPT = CANCEL_REQUEST_LUA_SCRIPT.encode('utf-8') | |
CANCEL_REQUEST_LUA_SCRIPT_HASH = sha1(CANCEL_REQUEST_LUA_SCRIPT).hexdigest() | |
if PRINT_SCRIPTS: | |
print('Succesfully Generated Cancel Request Lua Script...') | |
print(CANCEL_REQUEST_LUA_SCRIPT) | |
else: | |
if PRINT_SCRIPTS: | |
print('Using pre-generated Cancel Request Lua Script...') | |
numkeys = len(lua_keys) | |
keys_and_args = list(lua_keys.values()) + list(lua_args.values()) | |
try: | |
redis_conn.evalsha( | |
CANCEL_REQUEST_LUA_SCRIPT_HASH, numkeys, *keys_and_args) | |
except NoScriptError: | |
redis_conn.eval( | |
CANCEL_REQUEST_LUA_SCRIPT, numkeys, *keys_and_args) | |
def wait_for_request_response(redis_conn=None, # Required | |
request_uuid=None, # Required | |
userqueue_id=None, # Optional | |
response_queue_name=None, # Required | |
queue_name=None, # Required | |
max_wait_seconds=None, # Optional - if not passed - no timeout, wait indefinitely | |
cancel_request_upon_timeout=True): | |
# IF request is finished/failed: | |
# return response | |
# ELSE: | |
# do blocking wait and return response | |
# IF timeout - cancel request. | |
if not redis_conn: | |
raise Exception('redis_conn must be passed') | |
if not request_uuid: | |
raise Exception('request_uuid must be passed') | |
if not response_queue_name: | |
raise Exception('response_queue_name must be passed') | |
if not queue_name: | |
raise Exception('queue_name must be passed') | |
if not max_wait_seconds: | |
max_wait_seconds = 0 # means wait forever | |
brpop_response = redis_conn.brpop(response_queue_name, timeout=max_wait_seconds) | |
if brpop_response is None: | |
if cancel_request_upon_timeout: | |
if userqueue_id: | |
cancel_request( | |
redis_conn=redis_conn, | |
request_uuid=request_uuid, | |
queue_name=queue_name, | |
userqueue_id=userqueue_id | |
) | |
else: | |
print('userqueue id is None - cannot cancel this request as it was never queued.') | |
return None | |
response_bytes = brpop_response[1] | |
return response_bytes | |
def get_backend_inbox_key_name(queue_name, backend_id): | |
return queue_name + "-idle-backend-" + backend_id | |
# Returns request_bytes | |
def get_next_request(redis_conn=None, # Required | |
queue_name=None, # Required | |
backend_id=None, | |
# Required - identifier of the container making this call. should be unique and never reused. | |
backend_priority=None, | |
# Required - floating point number. The priority level of this server compared to others when deciding where to route a request. Higher number is higher priority. | |
idle_keepalive_interval_seconds=None, | |
# optional, how many seconds between marking myself active again... | |
idle_backend_max_age_seconds=None | |
# Optional, how many seconds of inactivity before I will be considered not-idle again. Ideally a few seconds longer than idle_keepalive_interval_seconds | |
): | |
# This one is called by the model. | |
# Wait for a request to show up in our inbox... | |
global SCRIPT_GENERATOR_LOCK | |
global GET_REQUEST_LUA_SCRIPT | |
global GET_REQUEST_LUA_SCRIPT_HASH | |
if idle_keepalive_interval_seconds is None: | |
idle_keepalive_interval_seconds = DEFAULT_IDLE_KEEPALIVE_INTERVAL_SECONDS | |
if idle_backend_max_age_seconds is None: | |
idle_backend_max_age_seconds = DEFAULT_IDLE_BACKEND_MAX_AGE_SECONDS | |
if not redis_conn: | |
raise Exception('redis_conn must be passed') | |
if not queue_name: | |
raise Exception('queue_name must be passed') | |
if backend_id is None: | |
raise Exception('backend_id must be passed') | |
if backend_priority is None: | |
raise Exception('backend_priority must be passed') | |
my_inbox_key_name = get_backend_inbox_key_name(queue_name, backend_id) | |
while True: | |
# The time in millis will change every iteration so this must be done in the loop... it's lightweight... | |
# Only the key variable NAMES are put into the lua script... The values are passed when script is run. | |
lua_keys = collections.OrderedDict() | |
lua_keys['my_inbox_key_name'] = my_inbox_key_name | |
lua_keys['userqueue_last_dequeued_times_name'] = get_userqueue_last_dequeued_times_name(queue_name) | |
lua_keys['requests_by_uuid_name'] = get_requests_by_uuid_name(queue_name) | |
lua_keys['user_data_to_userqueue_name'] = get_user_data_to_userqueue_name(queue_name) | |
lua_keys['currently_idle_backends_name'] = get_currently_idle_backends_name(queue_name) | |
lua_keys['idle_backend_id_to_expire_time_name'] = get_idle_backend_id_to_expire_time_name(queue_name) | |
# Only the argument variable NAMES are put into the lua script... The values are passed when script is run. | |
lua_args = collections.OrderedDict() | |
lua_args['queue_name'] = queue_name | |
lua_args['time_in_millis'] = int(1000 * time.time()) | |
lua_args['expire_time_in_millis'] = int(1000 * (time.time() + idle_backend_max_age_seconds)) | |
lua_args['backend_priority'] = backend_priority | |
lua_args['backend_id'] = backend_id | |
with SCRIPT_GENERATOR_LOCK: | |
if GET_REQUEST_LUA_SCRIPT is None: | |
if PRINT_SCRIPTS: | |
print('Generating Get Request Lua Script...') | |
# You can't change the script source code on every request such with putting a dynamic variable in the source code. | |
# That would not use the redis compiled script cache and be bad for performance. | |
# Instead use args or keys. | |
# PseudoCode of the process: | |
# In Lua: | |
# Check if there is anything in our backend inbox: | |
# If inbox has item: | |
# pop that item and return it. | |
# If inbox has no item: | |
# Check if queue has anything to process | |
# If queue has an item: | |
# pop item and return it. | |
# set the current time as in userqueue_last_dequeued_times_name ... | |
# clean the queue data structures if needed. | |
# If queue has no item: | |
# add myself to the idle backend set | |
# return nothing from lua. | |
# after lua returns, do a BLPOP on our inbox waiting until someone sends us some work. | |
lua_keys_vars = "" | |
for i, (key, value) in enumerate(lua_keys.items()): | |
lua_keys_vars += u"local {} = KEYS[{}]\n".format(key, i + 1) | |
lua_args_vars = "" | |
for i, (key, value) in enumerate(lua_args.items()): | |
lua_args_vars += u"local {} = ARGV[{}]\n".format(key, i + 1) | |
GET_REQUEST_LUA_SCRIPT = lua_keys_vars + lua_args_vars + u""" | |
local my_inbox_request = redis.call("rpop", my_inbox_key_name) | |
if my_inbox_request ~= false then | |
-- we got an item from our inbox immediately. | |
return my_inbox_request | |
else | |
-- our inbox was empty, now check the queue | |
local num_userqueues = tonumber(redis.call("zcard", userqueue_last_dequeued_times_name)) | |
if num_userqueues > 0 then | |
local oldest_userqueue_id = redis.call("zpopmin", userqueue_last_dequeued_times_name)[1] -- this seems to throw an error if its empty??? | |
-- we got the id of a userqueue with something in it | |
local userqueue_key_name = queue_name .. "-userqueue-" .. oldest_userqueue_id | |
local request_uuid = redis.call("zpopmax", userqueue_key_name)[1] | |
local request_bytes = redis.call("hget", requests_by_uuid_name, request_uuid) | |
redis.call("hdel", requests_by_uuid_name, request_uuid) | |
local new_userqueue_size = redis.call("zcard", userqueue_key_name) | |
if new_userqueue_size == 0 then | |
-- the userqueue is empty, delete all traces of it. | |
redis.call("zrem", userqueue_last_dequeued_times_name, oldest_userqueue_id) | |
-- This sorted set maps user data to queue ID (integer). So you can remove all entries that map to this queue id by removing those with score between [oldest_userqueue_id, oldest_userqueue_id] inclusive. | |
-- ZREMRANGEBYSCORE key min max | |
redis.call("zremrangebyscore", user_data_to_userqueue_name, oldest_userqueue_id, oldest_userqueue_id) | |
else | |
-- the userqueue still has some requests remaining in it. | |
-- set the last dequeued time so it goes to the back of the round robin line. | |
redis.call("zadd", userqueue_last_dequeued_times_name, time_in_millis, oldest_userqueue_id) | |
end | |
-- return the request we just got from the queue | |
return request_bytes | |
else | |
-- there are no userqueues at the moment. add myself to idle backends. | |
-- zadd key score member | |
redis.call("zadd", currently_idle_backends_name, backend_priority, backend_id) | |
redis.call("zadd", idle_backend_id_to_expire_time_name, expire_time_in_millis, backend_id) | |
-- return None back to python because we got nothing from the queue | |
return false | |
end | |
end | |
""" | |
GET_REQUEST_LUA_SCRIPT = GET_REQUEST_LUA_SCRIPT.encode('utf-8') | |
GET_REQUEST_LUA_SCRIPT_HASH = sha1(GET_REQUEST_LUA_SCRIPT).hexdigest() | |
if PRINT_SCRIPTS: | |
print('Succesfully Generated Get Request Lua Script...') | |
print(GET_REQUEST_LUA_SCRIPT) | |
else: | |
if PRINT_SCRIPTS: | |
print('Using pre-generated Get Request Lua Script...') | |
numkeys = len(lua_keys) | |
keys_and_args = list(lua_keys.values()) + list(lua_args.values()) | |
try: | |
request_bytes = redis_conn.evalsha( | |
GET_REQUEST_LUA_SCRIPT_HASH, numkeys, *keys_and_args) | |
except NoScriptError: | |
request_bytes = redis_conn.eval( | |
GET_REQUEST_LUA_SCRIPT, numkeys, *keys_and_args) | |
if request_bytes is not None: | |
print('get_next_request got a request from the inbox or queue without waiting.') | |
return request_bytes | |
# we didn't get any request from the queue so we must now sit and wait for something to arrive in our inbox. | |
print('get_next_request didnt get a request immediately so now it is waiting') | |
keys_to_wait_on = [my_inbox_key_name] | |
res = redis_conn.brpop(keys_to_wait_on, | |
timeout=idle_keepalive_interval_seconds) # this can wait on multiple keys - timeout should be 5s shorter than socket timeout. | |
if res is not None: | |
# Loop until brpop returns an actual value | |
request_bytes = res[1] | |
print('get_next_request got a result from BRPOP') | |
return request_bytes | |
print('get_next_request got nothing from BRPOP -- running script again...') | |
def deregister_idle_backend(redis_conn=None, # Required | |
queue_name=None, # Required | |
backend_id=None, | |
# Required - identifier of the container making this call. should be unique and never reused. | |
): | |
# Use to immediately remove an idle backend from the idle backend set if the backend knows it is disappearing now... | |
if not redis_conn: | |
raise Exception('redis_conn must be passed') | |
if not queue_name: | |
raise Exception('queue_name must be passed') | |
if backend_id is None: | |
raise Exception('backend_id must be passed') | |
idle_backends_name = get_currently_idle_backends_name(queue_name) | |
expire_time_set_name = get_idle_backend_id_to_expire_time_name(queue_name) | |
redis_conn.zrem(idle_backends_name, backend_id) | |
redis_conn.zrem(expire_time_set_name, backend_id) | |
return None | |
def submit_response(redis_conn=None, # Required | |
response_bytes=None, # Required | |
response_queue_name=None # Required | |
): | |
# This one is called by the model. | |
# send the response to the result queue | |
if not redis_conn: | |
raise Exception('redis_conn must be passed') | |
if not response_bytes: | |
raise Exception('response_bytes must be passed') | |
if not response_queue_name: | |
raise Exception('response_queue_name must be passed') | |
pipe = redis_conn.pipeline() | |
pipe.lpush(response_queue_name, response_bytes) | |
pipe.expire(response_queue_name, 60 * 60) # Responses will disappear after 1 hour. | |
pipe.execute() | |
def make_dict_from_zrange_withscores_output(flat_list): | |
return dict(zip(flat_list[::2], flat_list[1::2])) | |
def decode_dict_key_bytes(the_dict): | |
# decode the keys, only needed for python3 | |
decoded_keys = {} | |
for key in the_dict: | |
val = the_dict[key] | |
if isinstance(val, dict): | |
val = decode_dict_key_bytes(val) | |
decoded_keys[key.decode('utf-8')] = val | |
return decoded_keys | |
def get_queue_full_overview_data(redis_conn=None, queue_name=None): | |
if not redis_conn: | |
raise Exception('redis_conn must be passed') | |
if not queue_name: | |
raise Exception('queue_name must be passed') | |
global SCRIPT_GENERATOR_LOCK | |
global GET_QUEUE_FULL_OVERVIEW_LUA_SCRIPT | |
global GET_QUEUE_FULL_OVERVIEW_LUA_SCRIPT_HASH | |
requests_by_uuid_name = get_requests_by_uuid_name(queue_name) | |
userqueue_last_dequeued_times_name = get_userqueue_last_dequeued_times_name(queue_name) | |
user_data_to_userqueue_name = get_user_data_to_userqueue_name(queue_name) | |
# Only the key variable NAMES are put into the lua script... The values are passed when script is run. | |
lua_keys = collections.OrderedDict() | |
lua_keys['requests_by_uuid_name'] = requests_by_uuid_name | |
lua_keys['userqueue_last_dequeued_times_name'] = userqueue_last_dequeued_times_name | |
lua_keys['user_data_to_userqueue_name'] = user_data_to_userqueue_name | |
lua_keys['currently_idle_backends_name'] = get_currently_idle_backends_name(queue_name) | |
lua_keys['idle_backend_id_to_expire_time_name'] = get_idle_backend_id_to_expire_time_name(queue_name) | |
# Only the argument variable NAMES are put into the lua script... The values are passed when script is run. | |
lua_args = collections.OrderedDict() | |
lua_args['queue_name'] = queue_name | |
with SCRIPT_GENERATOR_LOCK: | |
if GET_QUEUE_FULL_OVERVIEW_LUA_SCRIPT is None: | |
if PRINT_SCRIPTS: | |
print('Generating Get Queue Full Overview Lua Script...') | |
# You can't change the script source code on every request such with putting a dynamic variable in the source code. | |
# That would not use the redis compiled script cache and be bad for performance. | |
# Instead use args or keys. | |
lua_keys_vars = "" | |
for i, (key, value) in enumerate(lua_keys.items()): | |
lua_keys_vars += u"local {} = KEYS[{}]\n".format(key, i + 1) | |
lua_args_vars = "" | |
for i, (key, value) in enumerate(lua_args.items()): | |
lua_args_vars += u"local {} = ARGV[{}]\n".format(key, i + 1) | |
GET_QUEUE_FULL_OVERVIEW_LUA_SCRIPT = lua_keys_vars + lua_args_vars + u""" | |
local ret_data = { | |
userqueues = {}; | |
}; | |
local userqueue_ids = redis.call("zrange", userqueue_last_dequeued_times_name, 0, -1) | |
for idx, userqueue_id in pairs(userqueue_ids) do | |
local userqueue_data = {}; | |
local userqueue_key_name = queue_name .. "-userqueue-" .. userqueue_id | |
userqueue_data['requests_by_uuid'] = redis.call("zrange", userqueue_key_name, 0, -1, "WITHSCORES") | |
ret_data['userqueues'][userqueue_id] = userqueue_data | |
end | |
ret_data['last_dequeued_times'] = redis.call("zrange", userqueue_last_dequeued_times_name, 0, -1, "WITHSCORES") | |
ret_data['user_data_to_userqueue'] = redis.call("zrange", user_data_to_userqueue_name, 0, -1, "WITHSCORES") | |
ret_data['currently_idle_backends'] = redis.call("zrange", currently_idle_backends_name, 0, -1, "WITHSCORES") | |
ret_data['idle_backend_expire_times'] = redis.call("zrange", idle_backend_id_to_expire_time_name, 0, -1, "WITHSCORES") | |
ret_data['num_requests_by_uuid'] = redis.call("hlen", requests_by_uuid_name) | |
return cmsgpack.pack(ret_data) | |
""" | |
GET_QUEUE_FULL_OVERVIEW_LUA_SCRIPT = GET_QUEUE_FULL_OVERVIEW_LUA_SCRIPT.encode('utf-8') | |
GET_QUEUE_FULL_OVERVIEW_LUA_SCRIPT_HASH = sha1(GET_QUEUE_FULL_OVERVIEW_LUA_SCRIPT).hexdigest() | |
if PRINT_SCRIPTS: | |
print('Succesfully Generated Get Queue Full Overview Lua Script...') | |
print(GET_QUEUE_FULL_OVERVIEW_LUA_SCRIPT) | |
else: | |
if PRINT_SCRIPTS: | |
print('Using pre-generated Get Queue Full Overview Lua Script...') | |
numkeys = len(lua_keys) | |
keys_and_args = list(lua_keys.values()) + list(lua_args.values()) | |
try: | |
ret_data_msgpack = redis_conn.evalsha( | |
GET_QUEUE_FULL_OVERVIEW_LUA_SCRIPT_HASH, numkeys, *keys_and_args) | |
except NoScriptError: | |
ret_data_msgpack = redis_conn.eval( | |
GET_QUEUE_FULL_OVERVIEW_LUA_SCRIPT, numkeys, *keys_and_args) | |
ret_data = msgpack.unpackb(ret_data_msgpack, use_list=False) | |
ret_data = decode_dict_key_bytes(ret_data) | |
ret_data['currently_idle_backends'] = make_dict_from_zrange_withscores_output(ret_data['currently_idle_backends']) | |
ret_data['idle_backend_expire_times'] = make_dict_from_zrange_withscores_output( | |
ret_data['idle_backend_expire_times']) | |
ret_data['last_dequeued_times'] = make_dict_from_zrange_withscores_output(ret_data['last_dequeued_times']) | |
ret_data['user_data_to_userqueue'] = make_dict_from_zrange_withscores_output(ret_data['user_data_to_userqueue']) | |
ret_data['num_requests_by_uuid'] = int(ret_data['num_requests_by_uuid']) | |
for userqueue_id in ret_data['userqueues']: | |
ret_data['userqueues'][userqueue_id]['requests_by_uuid'] = make_dict_from_zrange_withscores_output( | |
ret_data['userqueues'][userqueue_id]['requests_by_uuid']) | |
return ret_data | |
def delete_queue(redis_conn=None, queue_name=None): | |
if not redis_conn: | |
raise Exception('redis_conn must be passed') | |
if not queue_name: | |
raise Exception('queue_name must be passed') | |
global SCRIPT_GENERATOR_LOCK | |
global DELETE_QUEUE_LUA_SCRIPT | |
global DELETE_QUEUE_LUA_SCRIPT_HASH | |
requests_by_uuid_name = get_requests_by_uuid_name(queue_name) | |
userqueue_last_dequeued_times_name = get_userqueue_last_dequeued_times_name(queue_name) | |
user_data_to_userqueue_name = get_user_data_to_userqueue_name(queue_name) | |
# Only the key variable NAMES are put into the lua script... The values are passed when script is run. | |
lua_keys = collections.OrderedDict() | |
lua_keys['requests_by_uuid_name'] = requests_by_uuid_name | |
lua_keys['userqueue_last_dequeued_times_name'] = userqueue_last_dequeued_times_name | |
lua_keys['user_data_to_userqueue_name'] = user_data_to_userqueue_name | |
lua_keys['currently_idle_backends_name'] = get_currently_idle_backends_name(queue_name) | |
lua_keys['idle_backend_id_to_expire_time_name'] = get_idle_backend_id_to_expire_time_name(queue_name) | |
# Only the argument variable NAMES are put into the lua script... The values are passed when script is run. | |
lua_args = collections.OrderedDict() | |
lua_args['queue_name'] = queue_name | |
with SCRIPT_GENERATOR_LOCK: | |
if DELETE_QUEUE_LUA_SCRIPT is None: | |
if PRINT_SCRIPTS: | |
print('Generating Delete Queue Lua Script...') | |
# You can't change the script source code on every request such with putting a dynamic variable in the source code. | |
# That would not use the redis compiled script cache and be bad for performance. | |
# Instead use args or keys. | |
lua_keys_vars = "" | |
for i, (key, value) in enumerate(lua_keys.items()): | |
lua_keys_vars += u"local {} = KEYS[{}]\n".format(key, i + 1) | |
lua_args_vars = "" | |
for i, (key, value) in enumerate(lua_args.items()): | |
lua_args_vars += u"local {} = ARGV[{}]\n".format(key, i + 1) | |
DELETE_QUEUE_LUA_SCRIPT = lua_keys_vars + lua_args_vars + u""" | |
local userqueue_ids = redis.call("zrange", userqueue_last_dequeued_times_name, 0, -1) | |
for idx, userqueue_id in pairs(userqueue_ids) do | |
local userqueue_key_name = queue_name .. "-userqueue-" .. userqueue_id | |
redis.call("del", userqueue_key_name) | |
end | |
redis.call("del", userqueue_last_dequeued_times_name) | |
redis.call("del", user_data_to_userqueue_name) | |
redis.call("del", currently_idle_backends_name) | |
redis.call("del", idle_backend_id_to_expire_time_name) | |
redis.call("del", requests_by_uuid_name) | |
""" | |
DELETE_QUEUE_LUA_SCRIPT = DELETE_QUEUE_LUA_SCRIPT.encode('utf-8') | |
DELETE_QUEUE_LUA_SCRIPT_HASH = sha1(DELETE_QUEUE_LUA_SCRIPT).hexdigest() | |
if PRINT_SCRIPTS: | |
print('Succesfully Generated Delete Queue Lua Script...') | |
print(DELETE_QUEUE_LUA_SCRIPT) | |
else: | |
if PRINT_SCRIPTS: | |
print('Using pre-generated Delete Queue Lua Script...') | |
numkeys = len(lua_keys) | |
keys_and_args = list(lua_keys.values()) + list(lua_args.values()) | |
try: | |
redis_conn.evalsha( | |
DELETE_QUEUE_LUA_SCRIPT_HASH, numkeys, *keys_and_args) | |
except NoScriptError: | |
redis_conn.eval( | |
DELETE_QUEUE_LUA_SCRIPT, numkeys, *keys_and_args) | |
def get_queue_size(queue_name=None, priority_level=None): | |
# if priority_level is None, get sum of all | |
if not queue_name: | |
raise Exception('queue_name must be passed') | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment