Skip to content

Instantly share code, notes, and snippets.

@deepai-org
Created January 9, 2025 20:08
Show Gist options
  • Save deepai-org/0887f44606d67247a8a164da1506ff2c to your computer and use it in GitHub Desktop.
Save deepai-org/0887f44606d67247a8a164da1506ff2c to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
from __future__ import division
from __future__ import absolute_import
from __future__ import with_statement
from __future__ import print_function
from __future__ import unicode_literals
import collections
import random
import time
import uuid
from hashlib import sha1
from threading import Lock
import msgpack
from redis.client import Pipeline
from redis.exceptions import NoScriptError
# Queue TODO
# Finish tests, performance tests
# It def requires redis 5 just due to zpopmax.
# Make example of running a ai-integration backend from within django.
# Replace the way that queue sizes are monitored in cron/nagios/etc...
# Integrate unified queue into batch mode. Then deprecate old batch mode workers!
# Ensure batch mode can run at 5000Qps ???
# How often get_next_request will ping redis to mark an idle backend as still alive
DEFAULT_IDLE_KEEPALIVE_INTERVAL_SECONDS = 55
DEFAULT_IDLE_BACKEND_MAX_AGE_SECONDS = DEFAULT_IDLE_KEEPALIVE_INTERVAL_SECONDS + 5 # Tuned to be just a bit longer than the max keepalive interval
QUEUE_PRIORITY_BATCH = 1 # Lowest priority
QUEUE_PRIORITY_STANDARD = 2
QUEUE_PRIORITY_TRYIT = 3
QUEUE_PRIORITY_REALTIME = 4 # Highest priority
QUEUE_PRIORITY_LEVELS = [QUEUE_PRIORITY_BATCH, QUEUE_PRIORITY_STANDARD, QUEUE_PRIORITY_TRYIT, QUEUE_PRIORITY_REALTIME]
# Sorted set scores can store integers up to 9007199254740992 without precision loss.
# Unix timestamp of year 3000 is 32503680000, in milliseconds it is 32503680000000
# 9007199254740992 / 32503680000000 = 277.
# We can have store 3000 years 277 times so we can have 277 priority levels
REDIS_SORTED_SET_MAX = 9000000000000000 # 9007199254740992 is the real value but this looks nicer and may be easier to reason about
REDIS_PRIORITY_LEVEL_SCALE_FACTOR = 32503680000000 # The number to multiply your priority level by to get the offset for a given priority level.
SUBMIT_REQUEST_LUA_SCRIPT = None # This will get set the first time submit is called.
SUBMIT_REQUEST_LUA_SCRIPT_HASH = None # This will get set the first time submit is called.
CANCEL_REQUEST_LUA_SCRIPT = None # This will get set the first time cancel is called.
CANCEL_REQUEST_LUA_SCRIPT_HASH = None # This will get set the first time cancel is called.
GET_REQUEST_LUA_SCRIPT = None # This will get set the first get request is called.
GET_REQUEST_LUA_SCRIPT_HASH = None # This will get set the first get request is called.
GET_QUEUE_FULL_OVERVIEW_LUA_SCRIPT = None # This will get set the first time get full overview is called.
GET_QUEUE_FULL_OVERVIEW_LUA_SCRIPT_HASH = None # This will get set the first time get full overview is called.
DELETE_QUEUE_LUA_SCRIPT = None # This will get set the first time delete queue is called.
DELETE_QUEUE_LUA_SCRIPT_HASH = None # This will get set the first delete queue is called.
PRINT_SCRIPTS = False
SCRIPT_GENERATOR_LOCK = Lock()
# Key Names in Redis:
#
# All keys are model-specific
#
# User-Queue lookup:
#
# SORTED SET densecap-user-data-to-userqueue (map "ip-123.123.123.123" or "user-4000" to userqueue id: random number between 0-9007199254740992 )
# (make this set self cleaning, delete all keys with ZREMRANGEBYSCORE the moment a userqueue becomes empty)
# (this allows easy bidirectional lookup, given a userqueue id we can find the ips and users, and given ip/user we can find the userqueue)
# SORTED SET densecap-userqueue-last-dequeued-times (map userqueue id like "99254740992" to the last unix timestamp in milliseconds when a request was dequeued and sent to a backend)
# (make this set self cleaning, delete with ... the moment a userqueue becomes empty)
# (or rather use zpopmin and do zadd to add it back under the new score within the same transaction)
# Use ZPOPMIN to get the userqueue that was handled the longest ago. This implements round robin. When inserting a
# new userqueue - use the current time as last handled. New requests go to the back of the round robin line.
#
# SORTED SET densecap-currently-idle-backends (map backend id -> backend priority)
# SORTED SET densecap-userqueue-99254740992 (id is in the queue name - map request uuid to insert timestamp in milliseconds)
# Only store the uuid of the request in the sorted set for perf. Generate the uuid in python. no dashes in uuid.
# HASH densecap-requests-by-uuid (map uuid of request to full request)
# Make this hash self cleaning, delete as you go.
# LIST densecap-idle-backend-fd98df7df987df (This is the inbox of a single idle backend container ... that the backend will BLPOP on to wait for the message)
def get_currently_idle_backends_name(queue_name):
return queue_name + "-currently-idle-backends"
def get_idle_backend_id_to_expire_time_name(queue_name):
return queue_name + "-idle-backend-id-to-expire-time"
def get_user_data_to_userqueue_name(queue_name):
return queue_name + "-user-data-to-userqueue"
def get_userqueue_last_dequeued_times_name(queue_name):
return queue_name + "-userqueue-last-dequeued-times"
def get_requests_by_uuid_name(queue_name):
return queue_name + "-requests-by-uuid"
def get_userqueue_key_name(queue_name, userqueue_id):
return queue_name + "-userqueue-" + userqueue_id
def redis_conn_is_pipeline(redis_conn):
if isinstance(redis_conn, Pipeline):
return True
return False
# Returns (request_uuid, userqueue_id)
# userqueue_id might be None if the request got handled immediately with no queueing.
def submit_request(redis_conn=None, # Required - this can be a pipeline
request_bytes=None, # Required
queue_name=None, # Required
user_id=None, # Optional
user_ip_addr=None, # Optional
priority_level=None, # Required
send_to_backend_id=None, # Optional - send to a specific backend ID instead of any backend...
send_full_script_for_pipeline=False
# Optional - send the full script instead of just the hash - intended to be used at the start of a pipeline...
):
# Use this to send a request to a model
global SUBMIT_REQUEST_LUA_SCRIPT
global SUBMIT_REQUEST_LUA_SCRIPT_HASH
global SCRIPT_GENERATOR_LOCK
if redis_conn is None:
raise Exception('redis_conn must be passed')
if request_bytes is None:
raise Exception('request_bytes must be passed')
if not queue_name:
raise Exception('queue_name must be passed')
if priority_level not in QUEUE_PRIORITY_LEVELS:
raise Exception('priority_level must passed and one of the QUEUE_PRIORITY_LEVELS')
is_pipeline = redis_conn_is_pipeline(redis_conn)
# if is_pipeline:
# print('This connection is a pipeline')
# else:
# print('This connection is NOT a pipeline')
if send_to_backend_id is None:
send_to_backend_id = '' # cant send a None value to redis but can send an empty string
new_userqueue_id = random.randint(0,
REDIS_SORTED_SET_MAX) # type: int # This is the id of the new userqueue queue if we end up needing to create a new one
request_uuid = uuid.uuid4().hex # type: str
time_in_millis = int(1000 * time.time())
if user_id is None:
user_id = "anonymous-" + str(random.randint(0,
100000000)) # If this request is for an anonymous user - do not try to group by user id at all. Instead only group by IP address.
else:
user_id = "userid-" + str(user_id)
if user_ip_addr is None:
user_ip_addr = "unknownipaddr-" + str(random.randint(0,
100000000)) # If for some weird reason we don't have a source ip address, don't try to group by it. Instead group only by user id.
else:
user_ip_addr = "ipaddr-" + str(user_ip_addr)
# Higher priority -> Head of queue
# More recent -> back of queue
this_request_queue_order = (
priority_level * REDIS_PRIORITY_LEVEL_SCALE_FACTOR) - time_in_millis # This is designed to work until year 3000, see notes above on math.
currently_idle_backends_name = get_currently_idle_backends_name(queue_name)
user_data_to_userqueue_name = get_user_data_to_userqueue_name(queue_name)
userqueue_last_dequeued_times_name = get_userqueue_last_dequeued_times_name(queue_name)
requests_by_uuid_name = get_requests_by_uuid_name(queue_name)
# Only the key variable NAMES are put into the lua script... The values are passed when script is run.
lua_keys = collections.OrderedDict()
lua_keys['currently_idle_backends_name'] = currently_idle_backends_name
lua_keys['idle_backend_id_to_expire_time_name'] = get_idle_backend_id_to_expire_time_name(queue_name)
lua_keys['user_data_to_userqueue_name'] = user_data_to_userqueue_name
lua_keys['userqueue_last_dequeued_times_name'] = userqueue_last_dequeued_times_name
lua_keys['requests_by_uuid_name'] = requests_by_uuid_name
# Only the argument variable NAMES are put into the lua script... The values are passed when script is run.
lua_args = collections.OrderedDict()
lua_args['queue_name'] = queue_name
lua_args['request_bytes'] = request_bytes
lua_args['user_id'] = user_id
lua_args['user_ip_addr'] = user_ip_addr
lua_args['new_userqueue_id'] = new_userqueue_id
lua_args['time_in_millis'] = time_in_millis
lua_args['this_request_queue_order'] = this_request_queue_order
lua_args['request_uuid'] = request_uuid
lua_args['send_to_backend_id'] = send_to_backend_id
with SCRIPT_GENERATOR_LOCK:
if SUBMIT_REQUEST_LUA_SCRIPT is None:
if PRINT_SCRIPTS:
print('Generating Submit Request Lua Script...')
# You can't change the script source code on every request such with putting a dynamic variable in the source code.
# That would not use the redis compiled script cache and be bad for performance.
# Instead use args or keys.
# NOTE! This script will not work on a redis server using cluster mode because it accesses keys not mentioned in the KEYS array.
# One example of that is the lpush into the backend inbox which needs to do a dynamic lookup...
# Lua Pseudocode
# IF there's any idle backends:
# put the request in the highest priority backend
# ELSE:
# put the request in the user queue and wait for a model to pick it up. (don't wait in this function, return immediately.)
# Set the last dequeued time on the userqueue to the current time ONLY if we are creating a new queue in this call...
lua_keys_vars = ""
for i, (key, value) in enumerate(lua_keys.items()):
lua_keys_vars += u"local {} = KEYS[{}]\n".format(key, i + 1)
lua_args_vars = ""
for i, (key, value) in enumerate(lua_args.items()):
lua_args_vars += u"local {} = ARGV[{}]\n".format(key, i + 1)
SUBMIT_REQUEST_LUA_SCRIPT = lua_keys_vars + lua_args_vars + u"""
-- first check if we are sending direct to a single backend, the simplest case.
if send_to_backend_id ~= '' then
local backend_inbox_name = queue_name .. "-idle-backend-" .. send_to_backend_id
redis.call("zrem", currently_idle_backends_name, send_to_backend_id) -- remove the target backend from idle set.
redis.call("zrem", idle_backend_id_to_expire_time_name, send_to_backend_id) -- also remove from expire time set...
redis.call("lpush", backend_inbox_name, request_bytes)
return false -- this is how you return None
end
-- remove idle backends that have expired:
-- remove from idle_backend_id_to_expire_time_name where score < current time...
-- ZREMRANGEBYSCORE key min max
redis.call("zremrangebyscore", idle_backend_id_to_expire_time_name, 0, time_in_millis)
-- Then keep only the non expired backends, careful not to destroy the backend priorities.
-- ZINTERSTORE destination numkeys key key WEIGHTS weight weight AGGREGATE SUM
redis.call("zinterstore", currently_idle_backends_name, 2, currently_idle_backends_name, idle_backend_id_to_expire_time_name, "WEIGHTS", 1, 0, "AGGREGATE", "SUM")
local num_idle_backends = tonumber(redis.call("zcard", currently_idle_backends_name))
if num_idle_backends > 0 then
local backend_id = redis.call("zpopmax", currently_idle_backends_name)[1]
redis.call("zrem", idle_backend_id_to_expire_time_name, backend_id) -- also remove the expire time of the previously idle backend...
local backend_inbox_name = queue_name .. "-idle-backend-" .. backend_id
redis.call("lpush", backend_inbox_name, request_bytes)
return false -- this is how you return None
else
local user_id_queue_lookup = redis.call("zscore", user_data_to_userqueue_name, user_id)
local ip_addr_queue_lookup = redis.call("zscore", user_data_to_userqueue_name, user_ip_addr)
local chosen_userqueue_id
if user_id_queue_lookup ~= false then
chosen_userqueue_id = user_id_queue_lookup
elseif ip_addr_queue_lookup ~= false then
chosen_userqueue_id = ip_addr_queue_lookup
else
chosen_userqueue_id = new_userqueue_id -- this is a fresh queue id
end
-- mark the IP address and user id as pointing to the userqueue we just chose
-- this might overwrite but that is fine
redis.call("zadd", user_data_to_userqueue_name, chosen_userqueue_id, user_id) -- make user id point to chosen queue
redis.call("zadd", user_data_to_userqueue_name, chosen_userqueue_id, user_ip_addr) -- make ip addr point to chosen queue
local last_dequeued_time = redis.call("zscore", userqueue_last_dequeued_times_name, chosen_userqueue_id)
if last_dequeued_time == false then
-- we just created a new queue - set the last dequeued time so that this request goes to the back of the round robin line
redis.call("zadd", userqueue_last_dequeued_times_name, time_in_millis, chosen_userqueue_id)
end
-- add to requests by uuid hash
redis.call("hset", requests_by_uuid_name, request_uuid, request_bytes)
-- add to userqueue by uuid
local userqueue_key_name = queue_name .. "-userqueue-" .. chosen_userqueue_id
redis.call("zadd", userqueue_key_name, this_request_queue_order, request_uuid)
return chosen_userqueue_id
end
"""
SUBMIT_REQUEST_LUA_SCRIPT = SUBMIT_REQUEST_LUA_SCRIPT.encode('utf-8')
SUBMIT_REQUEST_LUA_SCRIPT_HASH = sha1(SUBMIT_REQUEST_LUA_SCRIPT).hexdigest()
if PRINT_SCRIPTS:
print('Succesfully Generated Submit Request Lua Script...')
print(SUBMIT_REQUEST_LUA_SCRIPT)
else:
if PRINT_SCRIPTS:
print('Using pre-generated Submit Request Lua Script...')
numkeys = len(lua_keys)
keys_and_args = list(lua_keys.values()) + list(lua_args.values())
if is_pipeline:
if send_full_script_for_pipeline:
# call this the first time inside a pipeline
redis_conn.script_load(SUBMIT_REQUEST_LUA_SCRIPT)
# for pipeline mode assume script already loaded.
redis_conn.evalsha(SUBMIT_REQUEST_LUA_SCRIPT_HASH, numkeys, *keys_and_args)
return (request_uuid, None,) # this is all we can return in pipeline mode...
else:
try:
chosen_userqueue_id = redis_conn.evalsha(SUBMIT_REQUEST_LUA_SCRIPT_HASH, numkeys, *keys_and_args)
except NoScriptError:
chosen_userqueue_id = redis_conn.eval(SUBMIT_REQUEST_LUA_SCRIPT, numkeys, *keys_and_args)
if chosen_userqueue_id is None:
print('submit_request has sent request to an idle backend.')
else:
chosen_userqueue_id = chosen_userqueue_id.decode('utf-8')
print(
'submit_request has enqueued the request - no idle backends. userqueue id = {}'.format(
chosen_userqueue_id))
return (request_uuid, chosen_userqueue_id,)
def cancel_request(redis_conn=None, # Required,
request_uuid=None,
queue_name=None,
userqueue_id=None):
# Use this to cancel a request you sent to a model - indicating that you are no longer interested
# This function should be safe to call multiple times, or if the request was already dequeued.
global SCRIPT_GENERATOR_LOCK
global CANCEL_REQUEST_LUA_SCRIPT
global CANCEL_REQUEST_LUA_SCRIPT_HASH
if not redis_conn:
raise Exception('redis_conn must be passed')
if not request_uuid:
raise Exception('request_uuid must be passed')
if not queue_name:
raise Exception('queue_name must be passed')
if not userqueue_id:
raise Exception('userqueue_id must be passed')
userqueue_key_name = get_userqueue_key_name(queue_name, userqueue_id)
requests_by_uuid_name = get_requests_by_uuid_name(queue_name)
userqueue_last_dequeued_times_name = get_userqueue_last_dequeued_times_name(queue_name)
user_data_to_userqueue_name = get_user_data_to_userqueue_name(queue_name)
# Only the key variable NAMES are put into the lua script... The values are passed when script is run.
lua_keys = collections.OrderedDict()
lua_keys['userqueue_key_name'] = userqueue_key_name
lua_keys['requests_by_uuid_name'] = requests_by_uuid_name
lua_keys['userqueue_last_dequeued_times_name'] = userqueue_last_dequeued_times_name
lua_keys['user_data_to_userqueue_name'] = user_data_to_userqueue_name
# Only the argument variable NAMES are put into the lua script... The values are passed when script is run.
lua_args = collections.OrderedDict()
lua_args['request_uuid'] = request_uuid
lua_args['userqueue_id'] = userqueue_id
with SCRIPT_GENERATOR_LOCK:
if CANCEL_REQUEST_LUA_SCRIPT is None:
if PRINT_SCRIPTS:
print('Generating Cancel Request Lua Script...')
# You can't change the script source code on every request such with putting a dynamic variable in the source code.
# That would not use the redis compiled script cache and be bad for performance.
# Instead use args or keys.
# Lua Pseudocode
# delete from userqueue
# delete from densecap-requests-by-uuid
# if needed, delete userqueue and all its pieces
lua_keys_vars = ""
for i, (key, value) in enumerate(lua_keys.items()):
lua_keys_vars += u"local {} = KEYS[{}]\n".format(key, i + 1)
lua_args_vars = ""
for i, (key, value) in enumerate(lua_args.items()):
lua_args_vars += u"local {} = ARGV[{}]\n".format(key, i + 1)
CANCEL_REQUEST_LUA_SCRIPT = lua_keys_vars + lua_args_vars + u"""
redis.call("zrem", userqueue_key_name, request_uuid)
redis.call("hdel", requests_by_uuid_name, request_uuid)
local new_userqueue_size = tonumber(redis.call("zcard", userqueue_key_name))
if new_userqueue_size == 0 then
-- the userqueue is empty, delete all traces of it.
redis.call("zrem", userqueue_last_dequeued_times_name, userqueue_id)
-- This sorted set maps user data to queue ID (integer). So you can remove all entries that map to this queue id by removing those with score between [userqueue_id, userqueue_id] inclusive.
-- ZREMRANGEBYSCORE key min max
redis.call("zremrangebyscore", user_data_to_userqueue_name, userqueue_id, userqueue_id)
end
"""
CANCEL_REQUEST_LUA_SCRIPT = CANCEL_REQUEST_LUA_SCRIPT.encode('utf-8')
CANCEL_REQUEST_LUA_SCRIPT_HASH = sha1(CANCEL_REQUEST_LUA_SCRIPT).hexdigest()
if PRINT_SCRIPTS:
print('Succesfully Generated Cancel Request Lua Script...')
print(CANCEL_REQUEST_LUA_SCRIPT)
else:
if PRINT_SCRIPTS:
print('Using pre-generated Cancel Request Lua Script...')
numkeys = len(lua_keys)
keys_and_args = list(lua_keys.values()) + list(lua_args.values())
try:
redis_conn.evalsha(
CANCEL_REQUEST_LUA_SCRIPT_HASH, numkeys, *keys_and_args)
except NoScriptError:
redis_conn.eval(
CANCEL_REQUEST_LUA_SCRIPT, numkeys, *keys_and_args)
def wait_for_request_response(redis_conn=None, # Required
request_uuid=None, # Required
userqueue_id=None, # Optional
response_queue_name=None, # Required
queue_name=None, # Required
max_wait_seconds=None, # Optional - if not passed - no timeout, wait indefinitely
cancel_request_upon_timeout=True):
# IF request is finished/failed:
# return response
# ELSE:
# do blocking wait and return response
# IF timeout - cancel request.
if not redis_conn:
raise Exception('redis_conn must be passed')
if not request_uuid:
raise Exception('request_uuid must be passed')
if not response_queue_name:
raise Exception('response_queue_name must be passed')
if not queue_name:
raise Exception('queue_name must be passed')
if not max_wait_seconds:
max_wait_seconds = 0 # means wait forever
brpop_response = redis_conn.brpop(response_queue_name, timeout=max_wait_seconds)
if brpop_response is None:
if cancel_request_upon_timeout:
if userqueue_id:
cancel_request(
redis_conn=redis_conn,
request_uuid=request_uuid,
queue_name=queue_name,
userqueue_id=userqueue_id
)
else:
print('userqueue id is None - cannot cancel this request as it was never queued.')
return None
response_bytes = brpop_response[1]
return response_bytes
def get_backend_inbox_key_name(queue_name, backend_id):
return queue_name + "-idle-backend-" + backend_id
# Returns request_bytes
def get_next_request(redis_conn=None, # Required
queue_name=None, # Required
backend_id=None,
# Required - identifier of the container making this call. should be unique and never reused.
backend_priority=None,
# Required - floating point number. The priority level of this server compared to others when deciding where to route a request. Higher number is higher priority.
idle_keepalive_interval_seconds=None,
# optional, how many seconds between marking myself active again...
idle_backend_max_age_seconds=None
# Optional, how many seconds of inactivity before I will be considered not-idle again. Ideally a few seconds longer than idle_keepalive_interval_seconds
):
# This one is called by the model.
# Wait for a request to show up in our inbox...
global SCRIPT_GENERATOR_LOCK
global GET_REQUEST_LUA_SCRIPT
global GET_REQUEST_LUA_SCRIPT_HASH
if idle_keepalive_interval_seconds is None:
idle_keepalive_interval_seconds = DEFAULT_IDLE_KEEPALIVE_INTERVAL_SECONDS
if idle_backend_max_age_seconds is None:
idle_backend_max_age_seconds = DEFAULT_IDLE_BACKEND_MAX_AGE_SECONDS
if not redis_conn:
raise Exception('redis_conn must be passed')
if not queue_name:
raise Exception('queue_name must be passed')
if backend_id is None:
raise Exception('backend_id must be passed')
if backend_priority is None:
raise Exception('backend_priority must be passed')
my_inbox_key_name = get_backend_inbox_key_name(queue_name, backend_id)
while True:
# The time in millis will change every iteration so this must be done in the loop... it's lightweight...
# Only the key variable NAMES are put into the lua script... The values are passed when script is run.
lua_keys = collections.OrderedDict()
lua_keys['my_inbox_key_name'] = my_inbox_key_name
lua_keys['userqueue_last_dequeued_times_name'] = get_userqueue_last_dequeued_times_name(queue_name)
lua_keys['requests_by_uuid_name'] = get_requests_by_uuid_name(queue_name)
lua_keys['user_data_to_userqueue_name'] = get_user_data_to_userqueue_name(queue_name)
lua_keys['currently_idle_backends_name'] = get_currently_idle_backends_name(queue_name)
lua_keys['idle_backend_id_to_expire_time_name'] = get_idle_backend_id_to_expire_time_name(queue_name)
# Only the argument variable NAMES are put into the lua script... The values are passed when script is run.
lua_args = collections.OrderedDict()
lua_args['queue_name'] = queue_name
lua_args['time_in_millis'] = int(1000 * time.time())
lua_args['expire_time_in_millis'] = int(1000 * (time.time() + idle_backend_max_age_seconds))
lua_args['backend_priority'] = backend_priority
lua_args['backend_id'] = backend_id
with SCRIPT_GENERATOR_LOCK:
if GET_REQUEST_LUA_SCRIPT is None:
if PRINT_SCRIPTS:
print('Generating Get Request Lua Script...')
# You can't change the script source code on every request such with putting a dynamic variable in the source code.
# That would not use the redis compiled script cache and be bad for performance.
# Instead use args or keys.
# PseudoCode of the process:
# In Lua:
# Check if there is anything in our backend inbox:
# If inbox has item:
# pop that item and return it.
# If inbox has no item:
# Check if queue has anything to process
# If queue has an item:
# pop item and return it.
# set the current time as in userqueue_last_dequeued_times_name ...
# clean the queue data structures if needed.
# If queue has no item:
# add myself to the idle backend set
# return nothing from lua.
# after lua returns, do a BLPOP on our inbox waiting until someone sends us some work.
lua_keys_vars = ""
for i, (key, value) in enumerate(lua_keys.items()):
lua_keys_vars += u"local {} = KEYS[{}]\n".format(key, i + 1)
lua_args_vars = ""
for i, (key, value) in enumerate(lua_args.items()):
lua_args_vars += u"local {} = ARGV[{}]\n".format(key, i + 1)
GET_REQUEST_LUA_SCRIPT = lua_keys_vars + lua_args_vars + u"""
local my_inbox_request = redis.call("rpop", my_inbox_key_name)
if my_inbox_request ~= false then
-- we got an item from our inbox immediately.
return my_inbox_request
else
-- our inbox was empty, now check the queue
local num_userqueues = tonumber(redis.call("zcard", userqueue_last_dequeued_times_name))
if num_userqueues > 0 then
local oldest_userqueue_id = redis.call("zpopmin", userqueue_last_dequeued_times_name)[1] -- this seems to throw an error if its empty???
-- we got the id of a userqueue with something in it
local userqueue_key_name = queue_name .. "-userqueue-" .. oldest_userqueue_id
local request_uuid = redis.call("zpopmax", userqueue_key_name)[1]
local request_bytes = redis.call("hget", requests_by_uuid_name, request_uuid)
redis.call("hdel", requests_by_uuid_name, request_uuid)
local new_userqueue_size = redis.call("zcard", userqueue_key_name)
if new_userqueue_size == 0 then
-- the userqueue is empty, delete all traces of it.
redis.call("zrem", userqueue_last_dequeued_times_name, oldest_userqueue_id)
-- This sorted set maps user data to queue ID (integer). So you can remove all entries that map to this queue id by removing those with score between [oldest_userqueue_id, oldest_userqueue_id] inclusive.
-- ZREMRANGEBYSCORE key min max
redis.call("zremrangebyscore", user_data_to_userqueue_name, oldest_userqueue_id, oldest_userqueue_id)
else
-- the userqueue still has some requests remaining in it.
-- set the last dequeued time so it goes to the back of the round robin line.
redis.call("zadd", userqueue_last_dequeued_times_name, time_in_millis, oldest_userqueue_id)
end
-- return the request we just got from the queue
return request_bytes
else
-- there are no userqueues at the moment. add myself to idle backends.
-- zadd key score member
redis.call("zadd", currently_idle_backends_name, backend_priority, backend_id)
redis.call("zadd", idle_backend_id_to_expire_time_name, expire_time_in_millis, backend_id)
-- return None back to python because we got nothing from the queue
return false
end
end
"""
GET_REQUEST_LUA_SCRIPT = GET_REQUEST_LUA_SCRIPT.encode('utf-8')
GET_REQUEST_LUA_SCRIPT_HASH = sha1(GET_REQUEST_LUA_SCRIPT).hexdigest()
if PRINT_SCRIPTS:
print('Succesfully Generated Get Request Lua Script...')
print(GET_REQUEST_LUA_SCRIPT)
else:
if PRINT_SCRIPTS:
print('Using pre-generated Get Request Lua Script...')
numkeys = len(lua_keys)
keys_and_args = list(lua_keys.values()) + list(lua_args.values())
try:
request_bytes = redis_conn.evalsha(
GET_REQUEST_LUA_SCRIPT_HASH, numkeys, *keys_and_args)
except NoScriptError:
request_bytes = redis_conn.eval(
GET_REQUEST_LUA_SCRIPT, numkeys, *keys_and_args)
if request_bytes is not None:
print('get_next_request got a request from the inbox or queue without waiting.')
return request_bytes
# we didn't get any request from the queue so we must now sit and wait for something to arrive in our inbox.
print('get_next_request didnt get a request immediately so now it is waiting')
keys_to_wait_on = [my_inbox_key_name]
res = redis_conn.brpop(keys_to_wait_on,
timeout=idle_keepalive_interval_seconds) # this can wait on multiple keys - timeout should be 5s shorter than socket timeout.
if res is not None:
# Loop until brpop returns an actual value
request_bytes = res[1]
print('get_next_request got a result from BRPOP')
return request_bytes
print('get_next_request got nothing from BRPOP -- running script again...')
def deregister_idle_backend(redis_conn=None, # Required
queue_name=None, # Required
backend_id=None,
# Required - identifier of the container making this call. should be unique and never reused.
):
# Use to immediately remove an idle backend from the idle backend set if the backend knows it is disappearing now...
if not redis_conn:
raise Exception('redis_conn must be passed')
if not queue_name:
raise Exception('queue_name must be passed')
if backend_id is None:
raise Exception('backend_id must be passed')
idle_backends_name = get_currently_idle_backends_name(queue_name)
expire_time_set_name = get_idle_backend_id_to_expire_time_name(queue_name)
redis_conn.zrem(idle_backends_name, backend_id)
redis_conn.zrem(expire_time_set_name, backend_id)
return None
def submit_response(redis_conn=None, # Required
response_bytes=None, # Required
response_queue_name=None # Required
):
# This one is called by the model.
# send the response to the result queue
if not redis_conn:
raise Exception('redis_conn must be passed')
if not response_bytes:
raise Exception('response_bytes must be passed')
if not response_queue_name:
raise Exception('response_queue_name must be passed')
pipe = redis_conn.pipeline()
pipe.lpush(response_queue_name, response_bytes)
pipe.expire(response_queue_name, 60 * 60) # Responses will disappear after 1 hour.
pipe.execute()
def make_dict_from_zrange_withscores_output(flat_list):
return dict(zip(flat_list[::2], flat_list[1::2]))
def decode_dict_key_bytes(the_dict):
# decode the keys, only needed for python3
decoded_keys = {}
for key in the_dict:
val = the_dict[key]
if isinstance(val, dict):
val = decode_dict_key_bytes(val)
decoded_keys[key.decode('utf-8')] = val
return decoded_keys
def get_queue_full_overview_data(redis_conn=None, queue_name=None):
if not redis_conn:
raise Exception('redis_conn must be passed')
if not queue_name:
raise Exception('queue_name must be passed')
global SCRIPT_GENERATOR_LOCK
global GET_QUEUE_FULL_OVERVIEW_LUA_SCRIPT
global GET_QUEUE_FULL_OVERVIEW_LUA_SCRIPT_HASH
requests_by_uuid_name = get_requests_by_uuid_name(queue_name)
userqueue_last_dequeued_times_name = get_userqueue_last_dequeued_times_name(queue_name)
user_data_to_userqueue_name = get_user_data_to_userqueue_name(queue_name)
# Only the key variable NAMES are put into the lua script... The values are passed when script is run.
lua_keys = collections.OrderedDict()
lua_keys['requests_by_uuid_name'] = requests_by_uuid_name
lua_keys['userqueue_last_dequeued_times_name'] = userqueue_last_dequeued_times_name
lua_keys['user_data_to_userqueue_name'] = user_data_to_userqueue_name
lua_keys['currently_idle_backends_name'] = get_currently_idle_backends_name(queue_name)
lua_keys['idle_backend_id_to_expire_time_name'] = get_idle_backend_id_to_expire_time_name(queue_name)
# Only the argument variable NAMES are put into the lua script... The values are passed when script is run.
lua_args = collections.OrderedDict()
lua_args['queue_name'] = queue_name
with SCRIPT_GENERATOR_LOCK:
if GET_QUEUE_FULL_OVERVIEW_LUA_SCRIPT is None:
if PRINT_SCRIPTS:
print('Generating Get Queue Full Overview Lua Script...')
# You can't change the script source code on every request such with putting a dynamic variable in the source code.
# That would not use the redis compiled script cache and be bad for performance.
# Instead use args or keys.
lua_keys_vars = ""
for i, (key, value) in enumerate(lua_keys.items()):
lua_keys_vars += u"local {} = KEYS[{}]\n".format(key, i + 1)
lua_args_vars = ""
for i, (key, value) in enumerate(lua_args.items()):
lua_args_vars += u"local {} = ARGV[{}]\n".format(key, i + 1)
GET_QUEUE_FULL_OVERVIEW_LUA_SCRIPT = lua_keys_vars + lua_args_vars + u"""
local ret_data = {
userqueues = {};
};
local userqueue_ids = redis.call("zrange", userqueue_last_dequeued_times_name, 0, -1)
for idx, userqueue_id in pairs(userqueue_ids) do
local userqueue_data = {};
local userqueue_key_name = queue_name .. "-userqueue-" .. userqueue_id
userqueue_data['requests_by_uuid'] = redis.call("zrange", userqueue_key_name, 0, -1, "WITHSCORES")
ret_data['userqueues'][userqueue_id] = userqueue_data
end
ret_data['last_dequeued_times'] = redis.call("zrange", userqueue_last_dequeued_times_name, 0, -1, "WITHSCORES")
ret_data['user_data_to_userqueue'] = redis.call("zrange", user_data_to_userqueue_name, 0, -1, "WITHSCORES")
ret_data['currently_idle_backends'] = redis.call("zrange", currently_idle_backends_name, 0, -1, "WITHSCORES")
ret_data['idle_backend_expire_times'] = redis.call("zrange", idle_backend_id_to_expire_time_name, 0, -1, "WITHSCORES")
ret_data['num_requests_by_uuid'] = redis.call("hlen", requests_by_uuid_name)
return cmsgpack.pack(ret_data)
"""
GET_QUEUE_FULL_OVERVIEW_LUA_SCRIPT = GET_QUEUE_FULL_OVERVIEW_LUA_SCRIPT.encode('utf-8')
GET_QUEUE_FULL_OVERVIEW_LUA_SCRIPT_HASH = sha1(GET_QUEUE_FULL_OVERVIEW_LUA_SCRIPT).hexdigest()
if PRINT_SCRIPTS:
print('Succesfully Generated Get Queue Full Overview Lua Script...')
print(GET_QUEUE_FULL_OVERVIEW_LUA_SCRIPT)
else:
if PRINT_SCRIPTS:
print('Using pre-generated Get Queue Full Overview Lua Script...')
numkeys = len(lua_keys)
keys_and_args = list(lua_keys.values()) + list(lua_args.values())
try:
ret_data_msgpack = redis_conn.evalsha(
GET_QUEUE_FULL_OVERVIEW_LUA_SCRIPT_HASH, numkeys, *keys_and_args)
except NoScriptError:
ret_data_msgpack = redis_conn.eval(
GET_QUEUE_FULL_OVERVIEW_LUA_SCRIPT, numkeys, *keys_and_args)
ret_data = msgpack.unpackb(ret_data_msgpack, use_list=False)
ret_data = decode_dict_key_bytes(ret_data)
ret_data['currently_idle_backends'] = make_dict_from_zrange_withscores_output(ret_data['currently_idle_backends'])
ret_data['idle_backend_expire_times'] = make_dict_from_zrange_withscores_output(
ret_data['idle_backend_expire_times'])
ret_data['last_dequeued_times'] = make_dict_from_zrange_withscores_output(ret_data['last_dequeued_times'])
ret_data['user_data_to_userqueue'] = make_dict_from_zrange_withscores_output(ret_data['user_data_to_userqueue'])
ret_data['num_requests_by_uuid'] = int(ret_data['num_requests_by_uuid'])
for userqueue_id in ret_data['userqueues']:
ret_data['userqueues'][userqueue_id]['requests_by_uuid'] = make_dict_from_zrange_withscores_output(
ret_data['userqueues'][userqueue_id]['requests_by_uuid'])
return ret_data
def delete_queue(redis_conn=None, queue_name=None):
if not redis_conn:
raise Exception('redis_conn must be passed')
if not queue_name:
raise Exception('queue_name must be passed')
global SCRIPT_GENERATOR_LOCK
global DELETE_QUEUE_LUA_SCRIPT
global DELETE_QUEUE_LUA_SCRIPT_HASH
requests_by_uuid_name = get_requests_by_uuid_name(queue_name)
userqueue_last_dequeued_times_name = get_userqueue_last_dequeued_times_name(queue_name)
user_data_to_userqueue_name = get_user_data_to_userqueue_name(queue_name)
# Only the key variable NAMES are put into the lua script... The values are passed when script is run.
lua_keys = collections.OrderedDict()
lua_keys['requests_by_uuid_name'] = requests_by_uuid_name
lua_keys['userqueue_last_dequeued_times_name'] = userqueue_last_dequeued_times_name
lua_keys['user_data_to_userqueue_name'] = user_data_to_userqueue_name
lua_keys['currently_idle_backends_name'] = get_currently_idle_backends_name(queue_name)
lua_keys['idle_backend_id_to_expire_time_name'] = get_idle_backend_id_to_expire_time_name(queue_name)
# Only the argument variable NAMES are put into the lua script... The values are passed when script is run.
lua_args = collections.OrderedDict()
lua_args['queue_name'] = queue_name
with SCRIPT_GENERATOR_LOCK:
if DELETE_QUEUE_LUA_SCRIPT is None:
if PRINT_SCRIPTS:
print('Generating Delete Queue Lua Script...')
# You can't change the script source code on every request such with putting a dynamic variable in the source code.
# That would not use the redis compiled script cache and be bad for performance.
# Instead use args or keys.
lua_keys_vars = ""
for i, (key, value) in enumerate(lua_keys.items()):
lua_keys_vars += u"local {} = KEYS[{}]\n".format(key, i + 1)
lua_args_vars = ""
for i, (key, value) in enumerate(lua_args.items()):
lua_args_vars += u"local {} = ARGV[{}]\n".format(key, i + 1)
DELETE_QUEUE_LUA_SCRIPT = lua_keys_vars + lua_args_vars + u"""
local userqueue_ids = redis.call("zrange", userqueue_last_dequeued_times_name, 0, -1)
for idx, userqueue_id in pairs(userqueue_ids) do
local userqueue_key_name = queue_name .. "-userqueue-" .. userqueue_id
redis.call("del", userqueue_key_name)
end
redis.call("del", userqueue_last_dequeued_times_name)
redis.call("del", user_data_to_userqueue_name)
redis.call("del", currently_idle_backends_name)
redis.call("del", idle_backend_id_to_expire_time_name)
redis.call("del", requests_by_uuid_name)
"""
DELETE_QUEUE_LUA_SCRIPT = DELETE_QUEUE_LUA_SCRIPT.encode('utf-8')
DELETE_QUEUE_LUA_SCRIPT_HASH = sha1(DELETE_QUEUE_LUA_SCRIPT).hexdigest()
if PRINT_SCRIPTS:
print('Succesfully Generated Delete Queue Lua Script...')
print(DELETE_QUEUE_LUA_SCRIPT)
else:
if PRINT_SCRIPTS:
print('Using pre-generated Delete Queue Lua Script...')
numkeys = len(lua_keys)
keys_and_args = list(lua_keys.values()) + list(lua_args.values())
try:
redis_conn.evalsha(
DELETE_QUEUE_LUA_SCRIPT_HASH, numkeys, *keys_and_args)
except NoScriptError:
redis_conn.eval(
DELETE_QUEUE_LUA_SCRIPT, numkeys, *keys_and_args)
def get_queue_size(queue_name=None, priority_level=None):
# if priority_level is None, get sum of all
if not queue_name:
raise Exception('queue_name must be passed')
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment