2022-12-06 12:34:08 +01:00
/ * * S D - U I B a c k e n d c o n t r o l a n d c l a s s e s .
* /
( function ( ) { "use strict" ;
const RETRY _DELAY _IF _BUFFER _IS _EMPTY = 1000 // ms
const RETRY _DELAY _IF _SERVER _IS _BUSY = 30 * 1000 // ms, status_code 503, already a task running
const RETRY _DELAY _ON _ERROR = 4000 // ms
const TASK _STATE _SERVER _UPDATE _DELAY = 1500 // ms
const SERVER _STATE _VALIDITY _DURATION = 90 * 1000 // ms - 90 seconds to allow ping to timeout more than once before killing tasks.
const HEALTH _PING _INTERVAL = 5000 // ms
const IDLE _COOLDOWN = 2500 // ms
2022-12-22 10:52:25 +01:00
const CONCURRENT _TASK _INTERVAL = 100 // ms
2022-12-06 12:34:08 +01:00
2022-12-11 02:31:23 +01:00
/ * * C o n n e c t s t o a n e n d p o i n t a n d r e s u m e s c o n n e c t i o n a f t e r r e a c h i n g e n d o f s t r e a m u n t i l a l l d a t a i s r e c e i v e d .
* Allows closing the connection while the server buffers more data .
2022-12-06 12:34:08 +01:00
* /
class ChunkedStreamReader {
# bufferedString = '' // Data received waiting to be read.
# url
# fetchOptions
# response
constructor ( url , initialContent = '' , options = { } ) {
if ( typeof url !== 'string' && ! ( url instanceof String ) ) {
throw new Error ( 'url is not a string.' )
}
if ( typeof initialContent !== 'undefined' && typeof initialContent !== 'string' ) {
throw new Error ( 'initialContent is not a string.' )
}
this . # bufferedString = initialContent
this . # url = url
this . # fetchOptions = Object . assign ( {
headers : {
'Content-Type' : 'application/json'
}
} , options )
this . onNext = undefined
}
get url ( ) {
if ( this . # response . redirected ) {
return this . # response . url
}
return this . # url
}
get bufferedString ( ) {
return this . # bufferedString
}
get status ( ) {
this . # response ? . status
}
get statusText ( ) {
this . # response ? . statusText
}
parse ( value ) {
if ( typeof value === 'undefined' ) {
return
}
if ( ! isArrayOrTypedArray ( value ) ) {
return [ value ]
}
if ( value . length === 0 ) {
return value
}
if ( typeof this . textDecoder === 'undefined' ) {
this . textDecoder = new TextDecoder ( )
}
return [ this . textDecoder . decode ( value ) ]
}
onComplete ( value ) {
return value
}
onError ( response ) {
throw new Error ( response . statusText )
}
onNext ( { value , done } , response ) {
return { value , done }
}
async * [ Symbol . asyncIterator ] ( ) {
return this . open ( )
}
async * open ( ) {
let value = undefined
let done = undefined
do {
if ( this . # response ) {
await asyncDelay ( RETRY _DELAY _IF _BUFFER _IS _EMPTY )
}
this . # response = await fetch ( this . # url , this . # fetchOptions )
if ( ! this . # response . ok ) {
if ( this . # response . status === 425 ) {
continue
}
// Request status indicate failure
console . warn ( 'Stream %o stopped unexpectedly.' , this . # response )
value = await Promise . resolve ( this . onError ( this . # response ) )
if ( typeof value === 'boolean' && value ) {
continue
}
return value
}
const reader = this . # response . body . getReader ( )
done = false
do {
const readState = await reader . read ( )
value = this . parse ( readState . value )
if ( value ) {
for ( let sVal of value ) {
( { value : sVal , done } = await Promise . resolve ( this . onNext ( { value : sVal , done : readState . done } ) ) )
yield sVal
if ( done ) {
return this . onComplete ( sVal )
}
}
}
if ( done ) {
return
}
} while ( value && ! done )
} while ( ! done && ( this . # response . ok || this . # response . status === 425 ) )
}
* readStreamAsJSON ( jsonStr , throwOnError ) {
if ( typeof jsonStr !== 'string' ) {
throw new Error ( 'jsonStr is not a string.' )
}
do {
if ( this . # bufferedString . length > 0 ) {
// Append new data when required
if ( jsonStr . length > 0 ) {
jsonStr = this . # bufferedString + jsonStr
} else {
jsonStr = this . # bufferedString
}
this . # bufferedString = ''
}
if ( ! jsonStr ) {
return
}
// Find next delimiter
let lastChunkIdx = jsonStr . indexOf ( '}{' )
if ( lastChunkIdx >= 0 ) {
this . # bufferedString = jsonStr . substring ( 0 , lastChunkIdx + 1 )
jsonStr = jsonStr . substring ( lastChunkIdx + 1 )
} else {
this . # bufferedString = jsonStr
jsonStr = ''
}
if ( this . # bufferedString . length <= 0 ) {
return
}
// hack for a middleman buffering all the streaming updates, and unleashing them on the poor browser in one shot.
// this results in having to parse JSON like {"step": 1}{"step": 2}{"step": 3}{"ste...
// which is obviously invalid and can happen at any point while rendering.
// So we need to extract only the next {} section
try { // Try to parse
const jsonObj = JSON . parse ( this . # bufferedString )
this . # bufferedString = jsonStr
jsonStr = ''
yield jsonObj
} catch ( e ) {
if ( throwOnError ) {
console . error ( ` Parsing: " ${ this . # bufferedString } ", Buffer: " ${ jsonStr } " ` )
}
this . # bufferedString += jsonStr
if ( e instanceof SyntaxError && ! throwOnError ) {
return
}
throw e
}
} while ( this . # bufferedString . length > 0 && this . # bufferedString . indexOf ( '}' ) >= 0 )
}
}
const EVENT _IDLE = 'idle'
const EVENT _STATUS _CHANGED = 'statusChange'
const EVENT _UNHANDLED _REJECTION = 'unhandledRejection'
const EVENT _TASK _QUEUED = 'taskQueued'
const EVENT _TASK _START = 'taskStart'
const EVENT _TASK _END = 'taskEnd'
const EVENT _TASK _ERROR = 'task_error'
const EVENT _UNEXPECTED _RESPONSE = 'unexpectedResponse'
const EVENTS _TYPES = [
EVENT _IDLE ,
EVENT _STATUS _CHANGED ,
EVENT _UNHANDLED _REJECTION ,
EVENT _TASK _QUEUED ,
EVENT _TASK _START ,
EVENT _TASK _END ,
EVENT _TASK _ERROR ,
EVENT _UNEXPECTED _RESPONSE ,
]
Object . freeze ( EVENTS _TYPES )
const eventSource = new GenericEventSource ( EVENTS _TYPES )
function setServerStatus ( msgType , msg ) {
2022-12-11 06:52:52 +01:00
return eventSource . fireEvent ( EVENT _STATUS _CHANGED , { type : msgType , message : msg } )
2022-12-06 12:34:08 +01:00
}
const ServerStates = {
init : 'Init' ,
loadingModel : 'LoadingModel' ,
online : 'Online' ,
rendering : 'Rendering' ,
unavailable : 'Unavailable' ,
}
Object . freeze ( ServerStates )
let sessionId = Date . now ( )
let serverState = { 'status' : ServerStates . unavailable , 'time' : Date . now ( ) }
async function healthCheck ( ) {
if ( Date . now ( ) < serverState . time + ( HEALTH _PING _INTERVAL / 2 ) && isServerAvailable ( ) ) {
// Ping confirmed online less than half of HEALTH_PING_INTERVAL ago.
return true
}
if ( Date . now ( ) >= serverState . time + SERVER _STATE _VALIDITY _DURATION ) {
console . warn ( 'WARNING! SERVER_STATE_VALIDITY_DURATION has elapsed since the last Ping completed.' )
}
try {
let res = undefined
if ( typeof sessionId !== 'undefined' ) {
res = await fetch ( '/ping?session_id=' + sessionId )
} else {
res = await fetch ( '/ping' )
}
serverState = await res . json ( )
if ( typeof serverState !== 'object' || typeof serverState . status !== 'string' ) {
console . error ( ` Server reply didn't contain a state value. ` )
serverState = { 'status' : ServerStates . unavailable , 'time' : Date . now ( ) }
setServerStatus ( 'error' , 'offline' )
return false
}
// Set status
switch ( serverState . status ) {
case ServerStates . init :
// Wait for init to complete before updating status.
break
case ServerStates . online :
setServerStatus ( 'online' , 'ready' )
break
case ServerStates . loadingModel :
setServerStatus ( 'busy' , 'loading..' )
break
case ServerStates . rendering :
setServerStatus ( 'busy' , 'rendering..' )
break
default : // Unavailable
2022-12-11 02:45:14 +01:00
console . error ( 'Ping received an unexpected server status. Status: %s' , serverState . status )
2022-12-06 12:34:08 +01:00
setServerStatus ( 'error' , serverState . status . toLowerCase ( ) )
break
}
serverState . time = Date . now ( )
return true
} catch ( e ) {
console . error ( e )
serverState = { 'status' : ServerStates . unavailable , 'time' : Date . now ( ) }
setServerStatus ( 'error' , 'offline' )
}
return false
}
function isServerAvailable ( ) {
if ( typeof serverState !== 'object' ) {
2022-12-11 02:31:23 +01:00
console . error ( 'serverState not set to a value. Connection to server could be lost...' )
2022-12-06 12:34:08 +01:00
return false
}
if ( Date . now ( ) >= serverState . time + SERVER _STATE _VALIDITY _DURATION ) {
2022-12-11 02:31:23 +01:00
console . warn ( 'SERVER_STATE_VALIDITY_DURATION elapsed. Connection to server could be lost...' )
2022-12-06 12:34:08 +01:00
return false
}
switch ( serverState . status ) {
case ServerStates . loadingModel :
case ServerStates . rendering :
case ServerStates . online :
return true
default :
2022-12-11 02:45:14 +01:00
console . warn ( 'Unexpected server status. Server could be unavailable... Status: %s' , serverState . status )
2022-12-06 12:34:08 +01:00
return false
}
}
async function waitUntil ( isReadyFn , delay , timeout ) {
if ( typeof delay === 'number' ) {
const msDelay = delay
delay = ( ) => asyncDelay ( msDelay )
}
if ( typeof delay !== 'function' ) {
throw new Error ( 'delay is not a number or a function.' )
}
if ( typeof timeout !== 'undefined' && typeof timeout !== 'number' ) {
throw new Error ( 'timeout is not a number.' )
}
if ( typeof timeout === 'undefined' || timeout < 0 ) {
timeout = Number . MAX _SAFE _INTEGER
}
timeout = Date . now ( ) + timeout
while ( timeout > Date . now ( )
&& Date . now ( ) < serverState . time + SERVER _STATE _VALIDITY _DURATION
&& ! Boolean ( await Promise . resolve ( isReadyFn ( ) ) )
) {
await delay ( )
if ( ! isServerAvailable ( ) ) { // Can fail if ping got frozen/suspended...
if ( await healthCheck ( ) && isServerAvailable ( ) ) { // Force a recheck of server status before failure...
continue // Continue waiting if last healthCheck confirmed the server is still alive.
}
2022-12-11 02:31:23 +01:00
throw new Error ( 'Connection with server lost.' )
2022-12-06 12:34:08 +01:00
}
}
if ( Date . now ( ) >= serverState . time + SERVER _STATE _VALIDITY _DURATION ) {
console . warn ( 'SERVER_STATE_VALIDITY_DURATION elapsed. Released waitUntil on stale server state.' )
}
}
const TaskStatus = {
init : 'init' ,
pending : 'pending' , // Queued locally, not yet posted to server
waiting : 'waiting' , // Waiting to run on server
processing : 'processing' ,
stopped : 'stopped' ,
completed : 'completed' ,
failed : 'failed' ,
}
Object . freeze ( TaskStatus )
const TASK _STATUS _ORDER = [
TaskStatus . init ,
TaskStatus . pending ,
TaskStatus . waiting ,
TaskStatus . processing ,
//Don't add status that are final.
]
const task _queue = new Map ( )
const concurrent _generators = new Map ( )
const weak _results = new WeakMap ( )
class Task {
// Private properties...
_reqBody = { } // request body of this task.
# reader = undefined
# status = TaskStatus . init
# id = undefined
# exception = undefined
constructor ( options = { } ) {
this . _reqBody = Object . assign ( { } , options )
if ( typeof this . _reqBody . session _id === 'undefined' ) {
this . _reqBody . session _id = sessionId
} else if ( this . _reqBody . session _id !== SD . sessionId && String ( this . _reqBody . session _id ) !== String ( SD . sessionId ) ) {
throw new Error ( 'Use SD.sessionId to set the request session_id.' )
}
this . _reqBody . session _id = String ( this . _reqBody . session _id )
}
get id ( ) {
return this . # id
}
_setId ( id ) {
if ( typeof this . # id !== 'undefined' ) {
throw new Error ( 'The task ID can only be set once.' )
}
this . # id = id
}
get exception ( ) {
return this . # exception
}
async abort ( exception ) {
if ( this . isCompleted || this . isStopped || this . hasFailed ) {
return
}
if ( typeof exception !== 'undefined' ) {
if ( typeof exception === 'string' ) {
exception = new Error ( exception )
}
if ( typeof exception !== 'object' ) {
throw new Error ( 'exception is not an object.' )
}
if ( ! ( exception instanceof Error ) ) {
throw new Error ( 'exception is not an Error or a string.' )
}
}
2022-12-08 06:42:46 +01:00
const res = await fetch ( '/image/stop?task=' + this . id )
2022-12-06 12:34:08 +01:00
if ( ! res . ok ) {
console . log ( 'Stop response:' , res )
throw new Error ( res . statusText )
}
task _queue . delete ( this )
this . # exception = exception
this . # status = ( exception ? TaskStatus . failed : TaskStatus . stopped )
}
get reqBody ( ) {
if ( this . # status === TaskStatus . init ) {
return this . _reqBody
}
console . warn ( 'Task reqBody cannot be changed after the init state.' )
return Object . assign ( { } , this . _reqBody )
}
get isPending ( ) {
return TASK _STATUS _ORDER . indexOf ( this . # status ) >= 0
}
get isCompleted ( ) {
return this . # status === TaskStatus . completed
}
get hasFailed ( ) {
return this . # status === TaskStatus . failed
}
get isStopped ( ) {
return this . # status === TaskStatus . stopped
}
get status ( ) {
return this . # status
}
_setStatus ( status ) {
if ( status === this . # status ) {
return
}
const currentIdx = TASK _STATUS _ORDER . indexOf ( this . # status )
if ( currentIdx < 0 ) {
throw Error ( ` The task status ${ this . # status } is final and can't be changed. ` )
}
const newIdx = TASK _STATUS _ORDER . indexOf ( status )
if ( newIdx >= 0 && newIdx < currentIdx ) {
throw Error ( ` The task status ${ status } can't replace ${ this . # status } . ` )
}
this . # status = status
}
/ * * S e n d c u r r e n t t a s k t o s e r v e r .
* @ param { * } [ timeout = - 1 ] Optional timeout value in ms
* @ returns the response from the render request .
* @ memberof Task
* /
async post ( url , timeout = - 1 ) {
if ( this . status !== TaskStatus . init && this . status !== TaskStatus . pending ) {
throw new Error ( ` Task status ${ this . status } is not valid for post. ` )
}
this . _setStatus ( TaskStatus . pending )
Object . freeze ( this . _reqBody )
const abortSignal = ( timeout >= 0 ? AbortSignal . timeout ( timeout ) : undefined )
let res = undefined
try {
this . checkReqBody ( )
do {
abortSignal ? . throwIfAborted ( )
res = await fetch ( url , {
method : 'POST' ,
headers : {
'Content-Type' : 'application/json'
} ,
body : JSON . stringify ( this . _reqBody ) ,
signal : abortSignal
} )
// status_code 503, already a task running.
} while ( res . status === 503 && await asyncDelay ( RETRY _DELAY _IF _SERVER _IS _BUSY ) )
} catch ( err ) {
this . abort ( err )
throw err
}
if ( ! res . ok ) {
const err = new Error ( ` Unexpected response HTTP ${ res . status } . Details: ${ res . statusText } ` )
this . abort ( err )
throw err
}
return await res . json ( )
}
static getReader ( url ) {
const reader = new ChunkedStreamReader ( url )
const parseToString = reader . parse
reader . parse = function ( value ) {
value = parseToString . call ( this , value )
if ( ! value || value . length <= 0 ) {
return
}
return reader . readStreamAsJSON ( value . join ( '' ) )
}
reader . onNext = function ( { done , value } ) {
// By default is completed when the return value has a status defined.
if ( typeof value === 'object' && 'status' in value ) {
done = true
}
return { done , value }
}
return reader
}
_setReader ( reader ) {
if ( typeof this . # reader !== 'undefined' ) {
throw new Error ( 'The task reader can only be set once.' )
}
this . # reader = reader
}
get reader ( ) {
if ( this . # reader ) {
return this . # reader
}
if ( ! this . streamUrl ) {
throw new Error ( 'The task has no stream Url defined.' )
}
this . # reader = Task . getReader ( this . streamUrl )
const task = this
const onNext = this . # reader . onNext
this . # reader . onNext = function ( { done , value } ) {
if ( value && typeof value === 'object' ) {
if ( task . status === TaskStatus . init
|| task . status === TaskStatus . pending
|| task . status === TaskStatus . waiting
) {
task . _setStatus ( TaskStatus . processing )
}
if ( 'step' in value && 'total_steps' in value ) {
task . step = value . step
task . total _steps = value . total _steps
}
}
return onNext . call ( this , { done , value } )
}
this . # reader . onComplete = function ( value ) {
task . result = value
if ( task . isPending ) {
task . _setStatus ( TaskStatus . completed )
}
return value
}
this . # reader . onError = function ( response ) {
const err = new Error ( response . statusText )
task . abort ( err )
throw err
}
return this . # reader
}
async waitUntil ( { timeout = - 1 , callback , status , signal } ) {
const currentIdx = TASK _STATUS _ORDER . indexOf ( this . # status )
if ( currentIdx <= 0 ) {
return false
}
const stIdx = ( status ? TASK _STATUS _ORDER . indexOf ( status ) : currentIdx + 1 )
if ( stIdx >= 0 && stIdx <= currentIdx ) {
return true
}
if ( stIdx < 0 && currentIdx < 0 ) {
return this . # status === ( status || TaskStatus . completed )
}
if ( signal ? . aborted ) {
return false
}
const task = this
switch ( this . # status ) {
case TaskStatus . pending :
case TaskStatus . waiting :
// Wait for server status to include this task.
2022-12-08 06:42:46 +01:00
await waitUntil (
async ( ) => {
if ( task . # id && typeof serverState . tasks === 'object' && Object . keys ( serverState . tasks ) . includes ( String ( task . # id ) ) ) {
return true
}
if ( await Promise . resolve ( callback ? . call ( task ) ) || signal ? . aborted ) {
return true
}
} ,
2022-12-06 12:34:08 +01:00
TASK _STATE _SERVER _UPDATE _DELAY ,
timeout ,
)
2022-12-08 06:42:46 +01:00
if ( this . # id && typeof serverState . tasks === 'object' && Object . keys ( serverState . tasks ) . includes ( String ( task . # id ) ) ) {
2022-12-06 12:34:08 +01:00
this . _setStatus ( TaskStatus . waiting )
}
if ( await Promise . resolve ( callback ? . call ( this ) ) || signal ? . aborted ) {
return false
}
if ( stIdx >= 0 && stIdx <= TASK _STATUS _ORDER . indexOf ( TaskStatus . waiting ) ) {
return true
}
// Wait for task to start on server.
2022-12-08 06:42:46 +01:00
await waitUntil (
async ( ) => {
if ( typeof serverState . tasks !== 'object' || serverState . tasks [ String ( task . # id ) ] !== 'pending' ) {
return true
}
if ( await Promise . resolve ( callback ? . call ( task ) ) || signal ? . aborted ) {
return true
}
} ,
2022-12-06 12:34:08 +01:00
TASK _STATE _SERVER _UPDATE _DELAY ,
timeout ,
)
2022-12-08 06:42:46 +01:00
const state = ( typeof serverState . tasks === 'object' ? serverState . tasks [ String ( task . # id ) ] : undefined )
if ( state === 'running' || state === 'buffer' || state === 'completed' ) {
2022-12-06 12:34:08 +01:00
this . _setStatus ( TaskStatus . processing )
}
if ( await Promise . resolve ( callback ? . call ( task ) ) || signal ? . aborted ) {
return false
}
if ( stIdx >= 0 && stIdx <= TASK _STATUS _ORDER . indexOf ( TaskStatus . processing ) ) {
return true
}
case TaskStatus . processing :
2022-12-08 06:42:46 +01:00
await waitUntil (
async ( ) => {
if ( typeof serverState . tasks !== 'object' || serverState . tasks [ String ( task . # id ) ] !== 'running' ) {
return true
}
if ( await Promise . resolve ( callback ? . call ( task ) ) || signal ? . aborted ) {
return true
}
} ,
2022-12-06 12:34:08 +01:00
TASK _STATE _SERVER _UPDATE _DELAY ,
timeout ,
)
await Promise . resolve ( callback ? . call ( this ) )
default :
return this . # status === ( status || TaskStatus . completed )
}
}
async enqueue ( promiseGenerator , ... args ) {
if ( this . status !== TaskStatus . init ) {
throw new Error ( ` Task is in an invalid status ${ this . status } to add to queue. ` )
}
this . _setStatus ( TaskStatus . pending )
task _queue . set ( this , promiseGenerator )
2022-12-11 06:52:52 +01:00
await eventSource . fireEvent ( EVENT _TASK _QUEUED , { task : this } )
2022-12-06 12:34:08 +01:00
await Task . enqueue ( promiseGenerator , ... args )
await this . waitUntil ( { status : TaskStatus . completed } )
if ( this . exception ) {
throw this . exception
}
return this . result
}
static async enqueue ( promiseGenerator , ... args ) {
if ( typeof promiseGenerator === 'undefined' ) {
throw new Error ( 'To enqueue a concurrent task, a *Promise Generator is needed but undefined was found.' )
}
//if (Symbol.asyncIterator in result || Symbol.iterator in result) {
//concurrent_generators.set(result, Promise.resolve(args))
if ( typeof promiseGenerator === 'function' ) {
concurrent _generators . set ( asGenerator ( { callback : promiseGenerator } ) , Promise . resolve ( args ) )
} else {
concurrent _generators . set ( promiseGenerator , Promise . resolve ( args ) )
}
await waitUntil ( ( ) => ! concurrent _generators . has ( promiseGenerator ) , CONCURRENT _TASK _INTERVAL )
return weak _results . get ( promiseGenerator )
}
static enqueueNew ( task , classCtor , progressCallback ) {
if ( task . status !== TaskStatus . init ) {
throw new Error ( 'Task has an invalid status to add to queue.' )
}
if ( ! ( task instanceof classCtor ) ) {
throw new Error ( 'Task is not a instance of classCtor.' )
}
let promiseGenerator = undefined
if ( typeof progressCallback === 'undefined' ) {
promiseGenerator = classCtor . start ( task )
} else if ( typeof progressCallback === 'function' ) {
promiseGenerator = classCtor . start ( task , progressCallback )
} else {
throw new Error ( 'progressCallback is not a function.' )
}
return Task . prototype . enqueue . call ( task , promiseGenerator )
}
static async run ( promiseGenerator , { callback , signal , timeout = - 1 } = { } ) {
let value = undefined
let done = undefined
if ( timeout < 0 ) {
timeout = Number . MAX _SAFE _INTEGER
}
timeout = Date . now ( ) + timeout
do {
( { value , done } = await Promise . resolve ( promiseGenerator . next ( value ) ) )
if ( value instanceof Promise ) {
value = await value
}
if ( callback ) {
( { value , done } = await Promise . resolve ( callback . call ( promiseGenerator , { value , done } ) ) )
}
if ( value instanceof Promise ) {
value = await value
}
} while ( ! done && ! signal ? . aborted && timeout > Date . now ( ) )
return value
}
static async * asGenerator ( { callback , generator , signal , timeout = - 1 } = { } ) {
let value = undefined
let done = undefined
if ( timeout < 0 ) {
timeout = Number . MAX _SAFE _INTEGER
}
timeout = Date . now ( ) + timeout
do {
( { value , done } = await Promise . resolve ( generator . next ( value ) ) )
if ( value instanceof Promise ) {
value = await value
}
if ( callback ) {
( { value , done } = await Promise . resolve ( callback . call ( generator , { value , done } ) ) )
if ( value instanceof Promise ) {
value = await value
}
}
value = yield value
} while ( ! done && ! signal ? . aborted && timeout > Date . now ( ) )
return value
}
}
const TASK _REQUIRED = {
"session_id" : 'string' ,
"prompt" : 'string' ,
"negative_prompt" : 'string' ,
"width" : 'number' ,
"height" : 'number' ,
"seed" : 'number' ,
2023-02-03 17:10:03 +01:00
"sampler_name" : 'string' ,
2022-12-06 12:34:08 +01:00
"use_stable_diffusion_model" : 'string' ,
"num_inference_steps" : 'number' ,
"guidance_scale" : 'number' ,
"num_outputs" : 'number' ,
"stream_progress_updates" : 'boolean' ,
"stream_image_progress" : 'boolean' ,
"show_only_filtered_image" : 'boolean' ,
"output_format" : 'string' ,
"output_quality" : 'number' ,
}
const TASK _DEFAULTS = {
2023-02-03 17:10:03 +01:00
"sampler_name" : "plms" ,
2022-12-06 12:34:08 +01:00
"use_stable_diffusion_model" : "sd-v1-4" ,
"num_inference_steps" : 50 ,
"guidance_scale" : 7.5 ,
"negative_prompt" : "" ,
"num_outputs" : 1 ,
"stream_progress_updates" : true ,
"stream_image_progress" : true ,
"show_only_filtered_image" : true ,
"output_format" : "png" ,
"output_quality" : 75 ,
}
const TASK _OPTIONAL = {
"device" : 'string' ,
"init_image" : 'string' ,
"mask" : 'string' ,
"save_to_disk_path" : 'string' ,
"use_face_correction" : 'string' ,
"use_upscale" : 'string' ,
"use_vae_model" : 'string' ,
2022-12-07 08:05:36 +01:00
"use_hypernetwork_model" : 'string' ,
"hypernetwork_strength" : 'number' ,
2022-12-06 12:34:08 +01:00
}
// Higer values will result in...
// pytorch_lightning/utilities/seed.py:60: UserWarning: X is not in bounds, numpy accepts from 0 to 4294967295
const MAX _SEED _VALUE = 4294967295
class RenderTask extends Task {
constructor ( options = { } ) {
super ( options )
if ( typeof this . _reqBody . seed === 'undefined' ) {
this . _reqBody . seed = Math . floor ( Math . random ( ) * ( MAX _SEED _VALUE + 1 ) )
}
if ( typeof typeof this . _reqBody . seed === 'number' && ( this . _reqBody . seed > MAX _SEED _VALUE || this . _reqBody . seed < 0 ) ) {
throw new Error ( ` seed must be in range 0 to ${ MAX _SEED _VALUE } . ` )
}
if ( 'use_cpu' in this . _reqBody ) {
if ( this . _reqBody . use _cpu ) {
this . _reqBody . device = 'cpu'
}
delete this . _reqBody . use _cpu
}
if ( this . _reqBody . init _image ) {
if ( typeof this . _reqBody . prompt _strength === 'undefined' ) {
this . _reqBody . prompt _strength = 0.8
} else if ( typeof this . _reqBody . prompt _strength !== 'number' ) {
throw new Error ( ` prompt_strength need to be of type number but ${ typeof this . _reqBody . prompt _strength } was found. ` )
}
}
if ( 'modifiers' in this . _reqBody ) {
if ( Array . isArray ( this . _reqBody . modifiers ) && this . _reqBody . modifiers . length > 0 ) {
this . _reqBody . modifiers = this . _reqBody . modifiers . filter ( ( val ) => val . trim ( ) )
if ( this . _reqBody . modifiers . length > 0 ) {
this . _reqBody . prompt = ` ${ this . _reqBody . prompt } , ${ this . _reqBody . modifiers . join ( ', ' ) } `
}
}
if ( typeof this . _reqBody . modifiers === 'string' && this . _reqBody . modifiers . length > 0 ) {
this . _reqBody . modifiers = this . _reqBody . modifiers . trim ( )
if ( this . _reqBody . modifiers . length > 0 ) {
this . _reqBody . prompt = ` ${ this . _reqBody . prompt } , ${ this . _reqBody . modifiers } `
}
}
delete this . _reqBody . modifiers
}
this . checkReqBody ( )
}
checkReqBody ( ) {
for ( const key in TASK _DEFAULTS ) {
if ( typeof this . _reqBody [ key ] === 'undefined' ) {
this . _reqBody [ key ] = TASK _DEFAULTS [ key ]
}
}
for ( const key in TASK _REQUIRED ) {
if ( typeof this . _reqBody [ key ] !== TASK _REQUIRED [ key ] ) {
throw new Error ( ` ${ key } need to be of type ${ TASK _REQUIRED [ key ] } but ${ typeof this . _reqBody [ key ] } was found. ` )
}
}
for ( const key in this . _reqBody ) {
if ( key in TASK _REQUIRED ) {
continue
}
if ( key in TASK _OPTIONAL ) {
2022-12-06 17:19:05 +01:00
if ( typeof this . _reqBody [ key ] == "undefined" ) {
delete this . _reqBody [ key ]
console . warn ( ` reqBody[ ${ key } ] was set to undefined. Removing optional key without value... ` )
continue
}
2022-12-06 12:34:08 +01:00
if ( typeof this . _reqBody [ key ] !== TASK _OPTIONAL [ key ] ) {
throw new Error ( ` ${ key } need to be of type ${ TASK _OPTIONAL [ key ] } but ${ typeof this . _reqBody [ key ] } was found. ` )
}
}
}
}
/ * * S e n d c u r r e n t t a s k t o s e r v e r .
* @ param { * } [ timeout = - 1 ] Optional timeout value in ms
* @ returns the response from the render request .
* @ memberof Task
* /
async post ( timeout = - 1 ) {
2023-01-09 15:03:23 +01:00
if ( typeof performance == "object" && performance . mark && performance . measure ) {
performance . mark ( 'make-render-request' )
if ( performance . getEntriesByName ( 'click-makeImage' , 'mark' ) . length > 0 ) {
console . log ( 'delay between clicking and making the server request:' , performance . measure ( 'diff' , 'click-makeImage' , 'make-render-request' ) . duration + ' ms' )
}
2022-12-23 06:24:40 +01:00
}
2023-01-09 15:03:23 +01:00
2022-12-06 12:34:08 +01:00
let jsonResponse = await super . post ( '/render' , timeout )
if ( typeof jsonResponse ? . task !== 'number' ) {
console . warn ( 'Endpoint error response: ' , jsonResponse )
const event = Object . assign ( { task : this } , jsonResponse )
2022-12-11 06:52:52 +01:00
await eventSource . fireEvent ( EVENT _UNEXPECTED _RESPONSE , event )
2022-12-06 12:34:08 +01:00
if ( 'continueWith' in event ) {
jsonResponse = await Promise . resolve ( event . continueWith )
}
if ( typeof jsonResponse ? . task !== 'number' ) {
const err = new Error ( jsonResponse ? . detail || 'Endpoint response does not contains a task ID.' )
this . abort ( err )
throw err
}
}
this . _setId ( jsonResponse . task )
if ( jsonResponse . stream ) {
this . streamUrl = jsonResponse . stream
}
this . _setStatus ( TaskStatus . waiting )
return jsonResponse
}
enqueue ( progressCallback ) {
return Task . enqueueNew ( this , RenderTask , progressCallback )
}
* start ( progressCallback ) {
if ( typeof progressCallback !== 'undefined' && typeof progressCallback !== 'function' ) {
throw new Error ( 'progressCallback is not a function. progressCallback type: ' + typeof progressCallback )
}
if ( this . isStopped ) {
return
}
this . _setStatus ( TaskStatus . pending )
progressCallback ? . call ( this , { reqBody : this . _reqBody } )
Object . freeze ( this . _reqBody )
// Post task request to backend
let renderRequest = undefined
try {
renderRequest = yield this . post ( )
yield progressCallback ? . call ( this , { renderResponse : renderRequest } )
} catch ( e ) {
yield progressCallback ? . call ( this , { detail : e . message } )
throw e
}
try { // Wait for task to start on server.
yield this . waitUntil ( {
callback : function ( ) { return progressCallback ? . call ( this , { } ) } ,
status : TaskStatus . processing ,
} )
} catch ( e ) {
this . abort ( err )
throw e
}
// Update class status and callback.
2022-12-08 06:42:46 +01:00
const taskState = ( typeof serverState . tasks === 'object' ? serverState . tasks [ String ( this . id ) ] : undefined )
switch ( taskState ) {
2022-12-06 12:34:08 +01:00
case 'pending' : // Session has pending tasks.
console . error ( 'Server %o render request %o is still waiting.' , serverState , renderRequest )
//Only update status if not already set by waitUntil
if ( this . status === TaskStatus . init
|| this . status === TaskStatus . pending
) {
// Set status as Waiting in backend.
this . _setStatus ( TaskStatus . waiting )
}
break
case 'running' :
case 'buffer' :
// Normal expected messages.
this . _setStatus ( TaskStatus . processing )
break
case 'completed' :
if ( this . isPending ) {
// Set state to processing until we read the reply
this . _setStatus ( TaskStatus . processing )
}
console . warn ( 'Server %o render request %o completed unexpectedly' , serverState , renderRequest )
break // Continue anyway to try to read cached result.
case 'error' :
this . _setStatus ( TaskStatus . failed )
console . error ( 'Server %o render request %o has failed' , serverState , renderRequest )
break // Still valid, Update UI with error message
case 'stopped' :
this . _setStatus ( TaskStatus . stopped )
console . log ( 'Server %o render request %o was stopped' , serverState , renderRequest )
return false
default :
if ( ! progressCallback ) {
2022-12-08 06:42:46 +01:00
const err = new Error ( 'Unexpected server task state: ' + taskState || 'Undefined' )
2022-12-06 12:34:08 +01:00
this . abort ( err )
throw err
}
const response = yield progressCallback . call ( this , { } )
if ( response instanceof Error ) {
this . abort ( response )
throw response
}
if ( ! response ) {
return false
}
}
// Task started!
// Open the reader.
const reader = this . reader
const task = this
reader . onError = function ( response ) {
if ( progressCallback ) {
task . abort ( new Error ( response . statusText ) )
return progressCallback . call ( task , { response , reader } )
}
return Task . prototype . onError . call ( task , response )
}
yield progressCallback ? . call ( this , { reader } )
//Start streaming the results.
const streamGenerator = reader . open ( )
let value = undefined
let done = undefined
yield progressCallback ? . call ( this , { stream : streamGenerator } )
do {
( { value , done } = yield streamGenerator . next ( ) )
if ( typeof value !== 'object' ) {
continue
}
yield progressCallback ? . call ( this , { update : value } )
} while ( ! done )
return value
}
static start ( task , progressCallback ) {
if ( typeof task !== 'object' ) {
throw new Error ( 'task is not an object. task type: ' + typeof task )
}
if ( ! ( task instanceof Task ) ) {
if ( task . reqBody ) {
task = new RenderTask ( task . reqBody )
} else {
task = new RenderTask ( task )
}
}
return task . start ( progressCallback )
}
static run ( task , progressCallback ) {
const promiseGenerator = RenderTask . start ( task , progressCallback )
return Task . run ( promiseGenerator )
}
}
class FilterTask extends Task {
constructor ( options = { } ) {
}
/ * * S e n d c u r r e n t t a s k t o s e r v e r .
* @ param { * } [ timeout = - 1 ] Optional timeout value in ms
* @ returns the response from the render request .
* @ memberof Task
* /
async post ( timeout = - 1 ) {
let jsonResponse = await super . post ( '/filter' , timeout )
//this._setId(jsonResponse.task)
this . _setStatus ( TaskStatus . waiting )
}
enqueue ( progressCallback ) {
return Task . enqueueNew ( this , FilterTask , progressCallback )
}
* start ( progressCallback ) {
if ( typeof progressCallback !== 'undefined' && typeof progressCallback !== 'function' ) {
throw new Error ( 'progressCallback is not a function. progressCallback type: ' + typeof progressCallback )
}
if ( this . isStopped ) {
return
}
}
static start ( task , progressCallback ) {
if ( typeof task !== 'object' ) {
throw new Error ( 'task is not an object. task type: ' + typeof task )
}
if ( ! ( task instanceof Task ) ) {
if ( task . reqBody ) {
task = new FilterTask ( task . reqBody )
} else {
task = new FilterTask ( task )
}
}
return task . start ( progressCallback )
}
static run ( task , progressCallback ) {
const promiseGenerator = FilterTask . start ( task , progressCallback )
return Task . run ( promiseGenerator )
}
}
const getSystemInfo = debounce ( async function ( ) {
let systemInfo = {
devices : {
all : { } ,
active : { } ,
} ,
hosts : [ ]
}
try {
const res = await fetch ( '/get/system_info' )
if ( ! res . ok ) {
console . error ( 'Invalid response fetching devices' , res . statusText )
return systemInfo
}
systemInfo = await res . json ( )
} catch ( e ) {
console . error ( 'error fetching system info' , e )
}
return systemInfo
} , 250 , true )
async function getDevices ( ) {
let systemInfo = getSystemInfo ( )
return systemInfo . devices
}
async function getHosts ( ) {
let systemInfo = getSystemInfo ( )
return systemInfo . hosts
}
async function getModels ( ) {
let models = {
'stable-diffusion' : [ ] ,
'vae' : [ ] ,
}
try {
const res = await fetch ( '/get/models' )
if ( ! res . ok ) {
console . error ( 'Invalid response fetching models' , res . statusText )
return models
}
models = await res . json ( )
console . log ( 'get models response' , models )
} catch ( e ) {
console . log ( 'get models error' , e )
}
return models
}
2022-12-08 06:42:46 +01:00
function getServerCapacity ( ) {
let activeDevicesCount = Object . keys ( serverState ? . devices ? . active || { } ) . length
2022-12-11 00:26:48 +01:00
if ( typeof window === "object" && window . document . visibilityState === 'hidden' ) {
2022-12-08 06:42:46 +01:00
activeDevicesCount = 1 + activeDevicesCount
}
return activeDevicesCount
}
2022-12-11 06:52:52 +01:00
let idleEventPromise = undefined
2022-12-06 12:34:08 +01:00
function continueTasks ( ) {
if ( typeof navigator ? . scheduling ? . isInputPending === 'function' ) {
const inputPendingOptions = {
// Report mouse/pointer move events when queue is empty.
// Delay idle after mouse moves stops.
includeContinuous : Boolean ( task _queue . size <= 0 && concurrent _generators . size <= 0 )
}
if ( navigator . scheduling . isInputPending ( inputPendingOptions ) ) {
// Browser/User still active.
return asyncDelay ( CONCURRENT _TASK _INTERVAL )
}
}
2022-12-08 06:42:46 +01:00
const serverCapacity = getServerCapacity ( )
2022-12-06 12:34:08 +01:00
if ( task _queue . size <= 0 && concurrent _generators . size <= 0 ) {
2022-12-11 06:52:52 +01:00
if ( ! idleEventPromise ? . isPending ) {
idleEventPromise = makeQuerablePromise ( eventSource . fireEvent ( EVENT _IDLE , { capacity : serverCapacity , idle : true } ) )
}
2022-12-06 12:34:08 +01:00
// Calling idle could result in task being added to queue.
2022-12-22 10:52:25 +01:00
// if (task_queue.size <= 0 && concurrent_generators.size <= 0) {
// return asyncDelay(IDLE_COOLDOWN).then(() => idleEventPromise)
// }
2022-12-06 12:34:08 +01:00
}
2022-12-08 06:42:46 +01:00
if ( task _queue . size < serverCapacity ) {
2022-12-11 06:52:52 +01:00
if ( ! idleEventPromise ? . isPending ) {
idleEventPromise = makeQuerablePromise ( eventSource . fireEvent ( EVENT _IDLE , { capacity : serverCapacity - task _queue . size } ) )
}
2022-12-08 06:42:46 +01:00
}
2022-12-06 12:34:08 +01:00
const completedTasks = [ ]
for ( let [ generator , promise ] of concurrent _generators . entries ( ) ) {
if ( promise . isPending ) {
continue
}
let value = promise . resolvedValue ? . value || promise . resolvedValue
if ( promise . isRejected ) {
console . error ( promise . rejectReason )
const event = { generator , reason : promise . rejectReason }
eventSource . fireEvent ( EVENT _UNHANDLED _REJECTION , event )
if ( 'continueWith' in event ) {
value = Promise . resolve ( event . continueWith )
} else {
concurrent _generators . delete ( generator )
completedTasks . push ( { generator , promise } )
continue
}
}
if ( value instanceof Promise ) {
promise = makeQuerablePromise ( value . then ( ( val ) => ( { done : promise . resolvedValue ? . done , value : val } ) ) )
concurrent _generators . set ( generator , promise )
continue
}
weak _results . set ( generator , value )
if ( promise . resolvedValue ? . done ) {
concurrent _generators . delete ( generator )
completedTasks . push ( { generator , promise } )
continue
}
promise = generator . next ( value )
if ( ! ( promise instanceof Promise ) ) {
promise = Promise . resolve ( promise )
}
promise = makeQuerablePromise ( promise )
concurrent _generators . set ( generator , promise )
}
for ( let [ task , generator ] of task _queue . entries ( ) ) {
const cTsk = completedTasks . find ( ( item ) => item . generator === generator )
if ( cTsk ? . promise ? . rejectReason || task . hasFailed ) {
eventSource . fireEvent ( EVENT _TASK _ERROR , { task , generator , reason : cTsk ? . promise ? . rejectReason || task . exception } )
task _queue . delete ( task )
continue
}
if ( task . isCompleted || task . isStopped || cTsk ) {
const eventEndArgs = { task , generator }
if ( task . isStopped ) {
eventEndArgs . stopped = true
}
eventSource . fireEvent ( EVENT _TASK _END , eventEndArgs )
task _queue . delete ( task )
continue
}
if ( concurrent _generators . size > serverCapacity ) {
break
}
if ( ! generator ) {
if ( typeof task . start === 'function' ) {
generator = task . start ( )
}
} else if ( concurrent _generators . has ( generator ) ) {
continue
}
const event = { task , generator } ;
2022-12-11 06:52:52 +01:00
const beforeStart = eventSource . fireEvent ( EVENT _TASK _START , event ) // optional beforeStart promise to wait on before starting task.
const promise = makeQuerablePromise ( beforeStart . then ( ( ) => Promise . resolve ( event . beforeStart ) ) )
2022-12-06 12:34:08 +01:00
concurrent _generators . set ( event . generator , promise )
task _queue . set ( task , event . generator )
}
const promises = Array . from ( concurrent _generators . values ( ) )
if ( promises . length <= 0 ) {
return asyncDelay ( CONCURRENT _TASK _INTERVAL )
}
return Promise . race ( promises ) . finally ( continueTasks )
}
let taskPromise = undefined
function startCheck ( ) {
if ( taskPromise ? . isPending ) {
return
}
do {
if ( taskPromise ? . resolvedValue instanceof Promise ) {
taskPromise = makeQuerablePromise ( taskPromise . resolvedValue )
continue
}
if ( typeof navigator ? . scheduling ? . isInputPending === 'function' && navigator . scheduling . isInputPending ( ) ) {
return
}
const continuePromise = continueTasks ( ) . catch ( async function ( err ) {
console . error ( err )
2022-12-11 06:52:52 +01:00
await eventSource . fireEvent ( EVENT _UNHANDLED _REJECTION , { reason : err } )
2022-12-06 12:34:08 +01:00
await asyncDelay ( RETRY _DELAY _ON _ERROR )
} )
taskPromise = makeQuerablePromise ( continuePromise )
} while ( taskPromise ? . isResolved )
}
const SD = {
ChunkedStreamReader ,
ServerStates ,
TaskStatus ,
Task ,
RenderTask ,
FilterTask ,
Events : EVENTS _TYPES ,
init : async function ( options = { } ) {
if ( 'events' in options ) {
for ( const key in options . events ) {
eventSource . addEventListener ( key , options . events [ key ] )
}
}
await healthCheck ( )
setInterval ( healthCheck , HEALTH _PING _INTERVAL )
setInterval ( startCheck , CONCURRENT _TASK _INTERVAL )
} ,
/ * * A d d a n e w e v e n t l i s t e n e r
* /
addEventListener : ( ... args ) => eventSource . addEventListener ( ... args ) ,
/ * * R e m o v e t h e e v e n t l i s t e n e r
* /
removeEventListener : ( ... args ) => eventSource . removeEventListener ( ... args ) ,
isServerAvailable ,
2022-12-08 06:42:46 +01:00
getServerCapacity ,
2022-12-06 12:34:08 +01:00
getSystemInfo ,
getDevices ,
getHosts ,
getModels ,
render : ( ... args ) => RenderTask . run ( ... args ) ,
filter : ( ... args ) => FilterTask . run ( ... args ) ,
waitUntil ,
} ;
Object . defineProperties ( SD , {
serverState : {
configurable : false ,
get : ( ) => serverState ,
} ,
2022-12-08 06:42:46 +01:00
isAvailable : {
configurable : false ,
get : ( ) => isServerAvailable ( ) ,
} ,
serverCapacity : {
configurable : false ,
get : ( ) => getServerCapacity ( ) ,
} ,
2022-12-06 12:34:08 +01:00
sessionId : {
configurable : false ,
get : ( ) => sessionId ,
set : ( val ) => {
if ( typeof val === 'undefined' ) {
throw new Error ( "Can't set sessionId to undefined." )
}
sessionId = val
} ,
} ,
MAX _SEED _VALUE : {
configurable : false ,
get : ( ) => MAX _SEED _VALUE ,
} ,
activeTasks : {
configurable : false ,
get : ( ) => task _queue ,
} ,
} )
Object . defineProperties ( getGlobal ( ) , {
SD : {
configurable : false ,
get : ( ) => SD ,
} ,
sessionId : { //TODO Remove in the future in favor of SD.sessionId
configurable : false ,
get : ( ) => {
console . warn ( 'Deprecated window.sessionId has been replaced with SD.sessionId.' )
console . trace ( )
return SD . sessionId
} ,
set : ( val ) => {
console . warn ( 'Deprecated window.sessionId has been replaced with SD.sessionId.' )
console . trace ( )
SD . sessionId = val
}
}
} )
} ) ( )