2022-10-19 05:58:55 +02:00
""" task_manager.py: manage tasks dispatching and render threads.
Notes :
render_threads should be the only hard reference held by the manager to the threads .
Use weak_thread_data to store all other data using weak keys .
This will allow for garbage collection after the thread dies .
"""
2022-10-15 09:28:20 +02:00
import json
import traceback
2022-10-20 13:52:01 +02:00
TASK_TTL = 15 * 60 # seconds, Discard last session's task timeout
2022-10-15 09:28:20 +02:00
2022-10-18 19:21:15 +02:00
import queue , threading , time , weakref
2022-10-15 09:28:20 +02:00
from typing import Any , Generator , Hashable , Optional , Union
from pydantic import BaseModel
from sd_internal import Request , Response
2022-10-17 05:06:41 +02:00
THREAD_NAME_PREFIX = ' Runtime-Render/ '
2022-10-17 03:41:39 +02:00
ERR_LOCK_FAILED = ' failed to acquire lock within timeout. '
LOCK_TIMEOUT = 15 # Maximum locking time in seconds before failing a task.
# It's better to get an exception than a deadlock... ALWAYS use timeout in critical paths.
2022-10-28 02:27:21 +02:00
DEVICE_START_TIMEOUT = 60 # seconds - Maximum time to wait for a render device to init.
2022-10-30 00:57:10 +02:00
CPU_UNLOAD_TIMEOUT = 4 * 60 # seconds - Idle time before CPU unload resource when GPUs are present.
2022-10-28 02:27:21 +02:00
2022-10-15 09:28:20 +02:00
class SymbolClass ( type ) : # Print nicely formatted Symbol names.
def __repr__ ( self ) : return self . __qualname__
def __str__ ( self ) : return self . __name__
class Symbol ( metaclass = SymbolClass ) : pass
class ServerStates :
class Init ( Symbol ) : pass
class LoadingModel ( Symbol ) : pass
class Online ( Symbol ) : pass
class Rendering ( Symbol ) : pass
class Unavailable ( Symbol ) : pass
class RenderTask ( ) : # Task with output queue and completion lock.
def __init__ ( self , req : Request ) :
self . request : Request = req # Initial Request
self . response : Any = None # Copy of the last reponse
2022-10-29 23:33:44 +02:00
self . render_device = None
2022-10-21 07:30:49 +02:00
self . temp_images : list = [ None ] * req . num_outputs * ( 1 if req . show_only_filtered_image else 2 )
2022-10-15 09:28:20 +02:00
self . error : Exception = None
self . lock : threading . Lock = threading . Lock ( ) # Locks at task start and unlocks when task is completed
self . buffer_queue : queue . Queue = queue . Queue ( ) # Queue of JSON string segments
async def read_buffer_generator ( self ) :
try :
while not self . buffer_queue . empty ( ) :
res = self . buffer_queue . get ( block = False )
self . buffer_queue . task_done ( )
yield res
except queue . Empty as e : yield
# defaults from https://huggingface.co/blog/stable_diffusion
class ImageRequest ( BaseModel ) :
session_id : str = " session "
prompt : str = " "
negative_prompt : str = " "
init_image : str = None # base64
mask : str = None # base64
num_outputs : int = 1
num_inference_steps : int = 50
guidance_scale : float = 7.5
width : int = 512
height : int = 512
seed : int = 42
prompt_strength : float = 0.8
sampler : str = None # "ddim", "plms", "heun", "euler", "euler_a", "dpm2", "dpm2_a", "lms"
# allow_nsfw: bool = False
save_to_disk_path : str = None
turbo : bool = True
2022-10-29 23:33:44 +02:00
use_cpu : bool = False ##TODO Remove after UI and plugins transition.
render_device : str = None
2022-10-15 09:28:20 +02:00
use_full_precision : bool = False
use_face_correction : str = None # or "GFPGANv1.3"
use_upscale : str = None # or "RealESRGAN_x4plus" or "RealESRGAN_x4plus_anime_6B"
use_stable_diffusion_model : str = " sd-v1-4 "
2022-10-28 16:36:44 +02:00
use_vae_model : str = None
2022-10-15 09:28:20 +02:00
show_only_filtered_image : bool = False
output_format : str = " jpeg " # or "png"
stream_progress_updates : bool = False
stream_image_progress : bool = False
2022-10-17 03:41:39 +02:00
class FilterRequest ( BaseModel ) :
session_id : str = " session "
model : str = None
name : str = " "
init_image : str = None # base64
width : int = 512
height : int = 512
save_to_disk_path : str = None
turbo : bool = True
2022-10-29 23:33:44 +02:00
render_device : str = None
2022-10-17 03:41:39 +02:00
use_full_precision : bool = False
output_format : str = " jpeg " # or "png"
2022-10-15 09:28:20 +02:00
# Temporary cache to allow to query tasks results for a short time after they are completed.
class TaskCache ( ) :
def __init__ ( self ) :
self . _base = dict ( )
2022-10-17 03:41:39 +02:00
self . _lock : threading . Lock = threading . Lock ( )
2022-10-15 09:28:20 +02:00
def _get_ttl_time ( self , ttl : int ) - > int :
return int ( time . time ( ) ) + ttl
def _is_expired ( self , timestamp : int ) - > bool :
return int ( time . time ( ) ) > = timestamp
def clean ( self ) - > None :
2022-10-17 03:41:39 +02:00
if not self . _lock . acquire ( blocking = True , timeout = LOCK_TIMEOUT ) : raise Exception ( ' TaskCache.clean ' + ERR_LOCK_FAILED )
2022-10-15 09:28:20 +02:00
try :
2022-10-15 10:39:45 +02:00
# Create a list of expired keys to delete
to_delete = [ ]
2022-10-15 09:28:20 +02:00
for key in self . _base :
ttl , _ = self . _base [ key ]
if self . _is_expired ( ttl ) :
2022-10-15 10:39:45 +02:00
to_delete . append ( key )
# Remove Items
for key in to_delete :
del self . _base [ key ]
print ( f ' Session { key } expired. Data removed. ' )
2022-10-15 09:28:20 +02:00
finally :
self . _lock . release ( )
def clear ( self ) - > None :
2022-10-17 03:41:39 +02:00
if not self . _lock . acquire ( blocking = True , timeout = LOCK_TIMEOUT ) : raise Exception ( ' TaskCache.clear ' + ERR_LOCK_FAILED )
2022-10-15 09:28:20 +02:00
try : self . _base . clear ( )
finally : self . _lock . release ( )
def delete ( self , key : Hashable ) - > bool :
2022-10-17 03:41:39 +02:00
if not self . _lock . acquire ( blocking = True , timeout = LOCK_TIMEOUT ) : raise Exception ( ' TaskCache.delete ' + ERR_LOCK_FAILED )
2022-10-15 09:28:20 +02:00
try :
if key not in self . _base :
return False
del self . _base [ key ]
return True
finally :
self . _lock . release ( )
def keep ( self , key : Hashable , ttl : int ) - > bool :
2022-10-17 03:41:39 +02:00
if not self . _lock . acquire ( blocking = True , timeout = LOCK_TIMEOUT ) : raise Exception ( ' TaskCache.keep ' + ERR_LOCK_FAILED )
2022-10-15 09:28:20 +02:00
try :
if key in self . _base :
_ , value = self . _base . get ( key )
self . _base [ key ] = ( self . _get_ttl_time ( ttl ) , value )
return True
return False
finally :
self . _lock . release ( )
def put ( self , key : Hashable , value : Any , ttl : int ) - > bool :
2022-10-17 03:41:39 +02:00
if not self . _lock . acquire ( blocking = True , timeout = LOCK_TIMEOUT ) : raise Exception ( ' TaskCache.put ' + ERR_LOCK_FAILED )
2022-10-15 09:28:20 +02:00
try :
self . _base [ key ] = (
self . _get_ttl_time ( ttl ) , value
)
2022-10-15 10:08:17 +02:00
except Exception as e :
print ( str ( e ) )
print ( traceback . format_exc ( ) )
2022-10-15 09:28:20 +02:00
return False
else :
return True
finally :
self . _lock . release ( )
def tryGet ( self , key : Hashable ) - > Any :
2022-10-17 03:41:39 +02:00
if not self . _lock . acquire ( blocking = True , timeout = LOCK_TIMEOUT ) : raise Exception ( ' TaskCache.tryGet ' + ERR_LOCK_FAILED )
2022-10-15 09:28:20 +02:00
try :
ttl , value = self . _base . get ( key , ( None , None ) )
if ttl is not None and self . _is_expired ( ttl ) :
2022-10-15 10:39:45 +02:00
print ( f ' Session { key } expired. Discarding data. ' )
2022-10-17 03:41:39 +02:00
del self . _base [ key ]
2022-10-15 09:28:20 +02:00
return None
return value
finally :
self . _lock . release ( )
2022-10-17 07:05:27 +02:00
manager_lock = threading . RLock ( )
2022-10-17 03:41:39 +02:00
render_threads = [ ]
2022-10-15 09:28:20 +02:00
current_state = ServerStates . Init
current_state_error : Exception = None
current_model_path = None
2022-10-28 16:36:44 +02:00
current_vae_path = None
2022-10-17 03:41:39 +02:00
tasks_queue = [ ]
2022-10-15 09:28:20 +02:00
task_cache = TaskCache ( )
default_model_to_load = None
2022-10-28 16:36:44 +02:00
default_vae_to_load = None
2022-10-18 19:21:15 +02:00
weak_thread_data = weakref . WeakKeyDictionary ( )
2022-10-15 09:28:20 +02:00
2022-10-28 16:36:44 +02:00
def preload_model ( ckpt_file_path = None , vae_file_path = None ) :
2022-10-15 09:28:20 +02:00
global current_state , current_state_error , current_model_path
2022-10-28 16:36:44 +02:00
if ckpt_file_path == None :
ckpt_file_path = default_model_to_load
if vae_file_path == None :
vae_file_path = default_vae_to_load
if ckpt_file_path == current_model_path and vae_file_path == current_vae_path :
2022-10-15 09:28:20 +02:00
return
current_state = ServerStates . LoadingModel
try :
from . import runtime
2022-10-28 16:36:44 +02:00
runtime . thread_data . ckpt_file = ckpt_file_path
runtime . thread_data . vae_file = vae_file_path
2022-10-17 03:41:39 +02:00
runtime . load_model_ckpt ( )
2022-10-28 16:36:44 +02:00
current_model_path = ckpt_file_path
current_vae_path = vae_file_path
2022-10-15 09:28:20 +02:00
current_state_error = None
current_state = ServerStates . Online
except Exception as e :
current_model_path = None
2022-10-28 16:36:44 +02:00
current_vae_path = None
2022-10-15 09:28:20 +02:00
current_state_error = e
current_state = ServerStates . Unavailable
print ( traceback . format_exc ( ) )
2022-10-22 04:45:19 +02:00
def thread_get_next_task ( ) :
2022-10-29 04:51:04 +02:00
from . import runtime
2022-10-22 04:45:19 +02:00
if not manager_lock . acquire ( blocking = True , timeout = LOCK_TIMEOUT ) :
print ( ' Render thread on device ' , runtime . thread_data . device , ' failed to acquire manager lock. ' )
return None
if len ( tasks_queue ) < = 0 :
manager_lock . release ( )
return None
task = None
try : # Select a render task.
for queued_task in tasks_queue :
if queued_task . request . use_face_correction : # TODO Remove when fixed - A bug with GFPGANer and facexlib needs to be fixed before use on other devices.
if is_alive ( 0 ) < = 0 : # Allows GFPGANer only on cuda:0.
queued_task . error = Exception ( ' cuda:0 is not available with the current config. Remove GFPGANer filter to run task. ' )
task = queued_task
break
2022-10-29 23:33:44 +02:00
if queued_task . render_device == ' cpu ' :
2022-10-22 04:45:19 +02:00
queued_task . error = Exception ( ' Cpu cannot be used to run this task. Remove GFPGANer filter to run task. ' )
task = queued_task
break
if not runtime . is_first_cuda_device ( runtime . thread_data . device ) :
continue # Wait for cuda:0
2022-10-29 23:33:44 +02:00
if queued_task . render_device and runtime . thread_data . device != queued_task . render_device :
# Is asking for a specific render device.
if is_alive ( queued_task . render_device ) > 0 :
continue # requested device alive, skip current one.
2022-10-22 04:45:19 +02:00
else :
2022-10-29 23:33:44 +02:00
# Requested device is not active, return error to UI.
queued_task . error = Exception ( str ( queued_task . render_device ) + ' is not currently active. ' )
2022-10-22 04:45:19 +02:00
task = queued_task
break
2022-10-29 23:33:44 +02:00
if not queued_task . render_device and runtime . thread_data . device == ' cpu ' and is_alive ( ) > 1 :
2022-10-30 06:33:17 +01:00
# not asking for any specific devices, cpu want to grab task but other render devices are alive.
continue # Skip Tasks, don't run on CPU unless there is nothing else or user asked for it.
2022-10-22 04:45:19 +02:00
task = queued_task
break
if task is not None :
del tasks_queue [ tasks_queue . index ( task ) ]
return task
finally :
manager_lock . release ( )
2022-10-17 03:41:39 +02:00
def thread_render ( device ) :
2022-10-28 16:36:44 +02:00
global current_state , current_state_error , current_model_path , current_vae_path
2022-10-15 09:28:20 +02:00
from . import runtime
2022-10-17 03:41:39 +02:00
try :
runtime . device_init ( device )
2022-10-30 11:04:06 +01:00
except Exception as e :
2022-10-17 03:41:39 +02:00
print ( traceback . format_exc ( ) )
2022-10-30 11:04:06 +01:00
weak_thread_data [ threading . current_thread ( ) ] = {
' error ' : e
}
2022-10-17 03:41:39 +02:00
return
2022-10-18 19:21:15 +02:00
weak_thread_data [ threading . current_thread ( ) ] = {
2022-10-29 23:33:44 +02:00
' device ' : runtime . thread_data . device ,
' device_name ' : runtime . thread_data . device_name
2022-10-18 19:21:15 +02:00
}
2022-10-28 03:57:50 +02:00
if runtime . thread_data . device != ' cpu ' or is_alive ( ) == 1 :
2022-10-27 20:34:33 +02:00
preload_model ( )
2022-10-28 10:09:34 +02:00
current_state = ServerStates . Online
2022-10-15 09:28:20 +02:00
while True :
task_cache . clean ( )
if isinstance ( current_state_error , SystemExit ) :
current_state = ServerStates . Unavailable
return
2022-10-22 04:45:19 +02:00
task = thread_get_next_task ( )
2022-10-17 03:41:39 +02:00
if task is None :
2022-11-08 14:32:21 +01:00
# if runtime.thread_data.device == 'cpu' and is_alive() > 1 and hasattr(runtime.thread_data, 'lastActive') and time.time() - runtime.thread_data.lastActive > CPU_UNLOAD_TIMEOUT:
# # GPUs present and CPU is idle. Unload resources.
# runtime.unload_models()
# runtime.unload_filters()
# del runtime.thread_data.lastActive
2022-10-17 03:41:39 +02:00
time . sleep ( 1 )
continue
2022-10-19 09:02:26 +02:00
if task . error is not None :
print ( task . error )
2022-10-22 04:45:19 +02:00
task . response = { " status " : ' failed ' , " detail " : str ( task . error ) }
2022-10-19 09:02:26 +02:00
task . buffer_queue . put ( json . dumps ( task . response ) )
continue
2022-10-15 09:28:20 +02:00
if current_state_error :
task . error = current_state_error
2022-10-22 04:45:19 +02:00
task . response = { " status " : ' failed ' , " detail " : str ( task . error ) }
task . buffer_queue . put ( json . dumps ( task . response ) )
2022-10-15 09:28:20 +02:00
continue
2022-11-02 03:28:10 +01:00
print ( f ' Session { task . request . session_id } starting task { id ( task ) } on { runtime . thread_data . device_name } ' )
2022-10-17 03:41:39 +02:00
if not task . lock . acquire ( blocking = False ) : raise Exception ( ' Got locked task from queue. ' )
2022-10-15 09:28:20 +02:00
try :
2022-10-30 00:57:10 +02:00
if runtime . thread_data . device == ' cpu ' and is_alive ( ) > 1 :
# CPU is not the only device. Keep track of active time to unload resources later.
runtime . thread_data . lastActive = time . time ( )
2022-10-17 03:41:39 +02:00
# Open data generator.
2022-10-15 09:28:20 +02:00
res = runtime . mk_img ( task . request )
if current_model_path == task . request . use_stable_diffusion_model :
current_state = ServerStates . Rendering
else :
current_state = ServerStates . LoadingModel
2022-10-17 03:41:39 +02:00
# Start reading from generator.
dataQueue = None
if task . request . stream_progress_updates :
dataQueue = task . buffer_queue
for result in res :
if current_state == ServerStates . LoadingModel :
current_state = ServerStates . Rendering
current_model_path = task . request . use_stable_diffusion_model
2022-10-28 16:36:44 +02:00
current_vae_path = task . request . use_vae_model
2022-10-17 03:41:39 +02:00
if isinstance ( current_state_error , SystemExit ) or isinstance ( current_state_error , StopAsyncIteration ) or isinstance ( task . error , StopAsyncIteration ) :
2022-10-22 04:45:19 +02:00
runtime . thread_data . stop_processing = True
2022-10-17 03:41:39 +02:00
if isinstance ( current_state_error , StopAsyncIteration ) :
task . error = current_state_error
current_state_error = None
print ( f ' Session { task . request . session_id } sent cancel signal for task { id ( task ) } ' )
if dataQueue :
dataQueue . put ( result )
if isinstance ( result , str ) :
result = json . loads ( result )
task . response = result
if ' output ' in result :
for out_obj in result [ ' output ' ] :
if ' path ' in out_obj :
img_id = out_obj [ ' path ' ] [ out_obj [ ' path ' ] . rindex ( ' / ' ) + 1 : ]
task . temp_images [ int ( img_id ) ] = runtime . thread_data . temp_images [ out_obj [ ' path ' ] [ 11 : ] ]
elif ' data ' in out_obj :
task . temp_images [ result [ ' output ' ] . index ( out_obj ) ] = out_obj [ ' data ' ]
# Before looping back to the generator, mark cache as still alive.
task_cache . keep ( task . request . session_id , TASK_TTL )
2022-10-15 09:28:20 +02:00
except Exception as e :
task . error = e
print ( traceback . format_exc ( ) )
continue
2022-10-17 03:41:39 +02:00
finally :
# Task completed
task . lock . release ( )
2022-10-15 09:28:20 +02:00
task_cache . keep ( task . request . session_id , TASK_TTL )
if isinstance ( task . error , StopAsyncIteration ) :
print ( f ' Session { task . request . session_id } task { id ( task ) } cancelled! ' )
elif task . error is not None :
print ( f ' Session { task . request . session_id } task { id ( task ) } failed! ' )
else :
2022-11-02 03:28:10 +01:00
print ( f ' Session { task . request . session_id } task { id ( task ) } completed by { runtime . thread_data . device_name } . ' )
2022-10-15 09:28:20 +02:00
current_state = ServerStates . Online
2022-10-22 19:52:13 +02:00
def get_cached_task ( session_id : str , update_ttl : bool = False ) :
# By calling keep before tryGet, wont discard if was expired.
if update_ttl and not task_cache . keep ( session_id , TASK_TTL ) :
# Failed to keep task, already gone.
return None
return task_cache . tryGet ( session_id )
2022-10-29 23:33:44 +02:00
def get_devices ( ) :
if not manager_lock . acquire ( blocking = True , timeout = LOCK_TIMEOUT ) : raise Exception ( ' get_devices ' + ERR_LOCK_FAILED )
try :
device_dict = { }
for rthread in render_threads :
2022-10-30 06:38:32 +01:00
if not rthread . is_alive ( ) :
continue
2022-10-29 23:33:44 +02:00
weak_data = weak_thread_data . get ( rthread )
2022-10-30 06:38:32 +01:00
if not weak_data or not ' device ' in weak_data or not ' device_name ' in weak_data :
continue
2022-10-29 23:33:44 +02:00
device_dict . update ( { weak_data [ ' device ' ] : weak_data [ ' device_name ' ] } )
return device_dict
finally :
manager_lock . release ( )
2022-10-17 08:27:30 +02:00
def is_first_cuda_device ( device ) :
from . import runtime # When calling runtime from outside thread_render DO NOT USE thread specific attributes or functions.
return runtime . is_first_cuda_device ( device )
2022-10-17 03:41:39 +02:00
def is_alive ( name = None ) :
if not manager_lock . acquire ( blocking = True , timeout = LOCK_TIMEOUT ) : raise Exception ( ' is_alive ' + ERR_LOCK_FAILED )
nbr_alive = 0
try :
for rthread in render_threads :
2022-10-17 07:05:51 +02:00
if name is not None :
2022-10-18 19:21:15 +02:00
weak_data = weak_thread_data . get ( rthread )
2022-10-30 11:04:06 +01:00
if weak_data is None or not ' device ' in weak_data or weak_data [ ' device ' ] is None :
2022-10-18 19:21:15 +02:00
continue
thread_name = str ( weak_data [ ' device ' ] ) . lower ( )
2022-10-17 08:27:30 +02:00
if is_first_cuda_device ( name ) :
if not is_first_cuda_device ( thread_name ) :
2022-10-17 07:05:51 +02:00
continue
elif thread_name != name :
2022-10-17 05:06:41 +02:00
continue
2022-10-17 03:41:39 +02:00
if rthread . is_alive ( ) :
nbr_alive + = 1
return nbr_alive
finally :
manager_lock . release ( )
2022-10-15 09:28:20 +02:00
2022-10-17 03:41:39 +02:00
def start_render_thread ( device = ' auto ' ) :
if not manager_lock . acquire ( blocking = True , timeout = LOCK_TIMEOUT ) : raise Exception ( ' start_render_threads ' + ERR_LOCK_FAILED )
print ( ' Start new Rendering Thread on device ' , device )
try :
rthread = threading . Thread ( target = thread_render , kwargs = { ' device ' : device } )
rthread . daemon = True
2022-10-17 05:06:41 +02:00
rthread . name = THREAD_NAME_PREFIX + device
2022-10-17 03:41:39 +02:00
rthread . start ( )
render_threads . append ( rthread )
finally :
manager_lock . release ( )
2022-10-29 04:52:00 +02:00
timeout = DEVICE_START_TIMEOUT
while not rthread . is_alive ( ) or not rthread in weak_thread_data or not ' device ' in weak_thread_data [ rthread ] :
2022-10-30 11:04:06 +01:00
if rthread in weak_thread_data and ' error ' in weak_thread_data [ rthread ] :
return False
2022-10-29 04:52:00 +02:00
if timeout < = 0 :
return False
timeout - = 1
time . sleep ( 1 )
return True
2022-10-15 09:28:20 +02:00
def shutdown_event ( ) : # Signal render thread to close on shutdown
global current_state_error
current_state_error = SystemExit ( ' Application shutting down. ' )
def render ( req : ImageRequest ) :
2022-10-25 09:00:50 +02:00
if is_alive ( ) < = 0 : # Render thread is dead
2022-10-15 09:28:20 +02:00
raise ChildProcessError ( ' Rendering thread has died. ' )
# Alive, check if task in cache
task = task_cache . tryGet ( req . session_id )
if task and not task . response and not task . error and not task . lock . locked ( ) :
# Unstarted task pending, deny queueing more than one.
raise ConnectionRefusedError ( f ' Session { req . session_id } has an already pending task. ' )
#
from . import runtime
r = Request ( )
r . session_id = req . session_id
r . prompt = req . prompt
r . negative_prompt = req . negative_prompt
r . init_image = req . init_image
r . mask = req . mask
r . num_outputs = req . num_outputs
r . num_inference_steps = req . num_inference_steps
r . guidance_scale = req . guidance_scale
r . width = req . width
r . height = req . height
r . seed = req . seed
r . prompt_strength = req . prompt_strength
r . sampler = req . sampler
# r.allow_nsfw = req.allow_nsfw
r . turbo = req . turbo
r . use_full_precision = req . use_full_precision
r . save_to_disk_path = req . save_to_disk_path
r . use_upscale : str = req . use_upscale
r . use_face_correction = req . use_face_correction
2022-10-17 17:52:05 +02:00
r . use_stable_diffusion_model = req . use_stable_diffusion_model
2022-10-28 16:36:44 +02:00
r . use_vae_model = req . use_vae_model
2022-10-15 09:28:20 +02:00
r . show_only_filtered_image = req . show_only_filtered_image
r . output_format = req . output_format
r . stream_progress_updates = True # the underlying implementation only supports streaming
r . stream_image_progress = req . stream_image_progress
if not req . stream_progress_updates :
r . stream_image_progress = False
new_task = RenderTask ( r )
2022-10-29 23:33:44 +02:00
new_task . render_device = req . render_device
2022-10-15 10:08:17 +02:00
if task_cache . put ( r . session_id , new_task , TASK_TTL ) :
2022-10-17 03:41:39 +02:00
# Use twice the normal timeout for adding user requests.
# Tries to force task_cache.put to fail before tasks_queue.put would.
if manager_lock . acquire ( blocking = True , timeout = LOCK_TIMEOUT * 2 ) :
try :
tasks_queue . append ( new_task )
return new_task
finally :
manager_lock . release ( )
2022-10-15 10:08:17 +02:00
raise RuntimeError ( ' Failed to add task to cache. ' )