mirror of
https://github.com/easydiffusion/easydiffusion.git
synced 2025-01-10 08:18:29 +01:00
1458 lines
56 KiB
JavaScript
1458 lines
56 KiB
JavaScript
/** SD-UI Backend control and classes.
|
|
*/
|
|
;(function() {
|
|
"use strict"
|
|
const RETRY_DELAY_IF_BUFFER_IS_EMPTY = 1000 // ms
|
|
const RETRY_DELAY_IF_SERVER_IS_BUSY = 30 * 1000 // ms, status_code 503, already a task running
|
|
const RETRY_DELAY_ON_ERROR = 4000 // ms
|
|
const TASK_STATE_SERVER_UPDATE_DELAY = 1500 // ms
|
|
const SERVER_STATE_VALIDITY_DURATION = 90 * 1000 // ms - 90 seconds to allow ping to timeout more than once before killing tasks.
|
|
const HEALTH_PING_INTERVAL = 5000 // ms
|
|
const IDLE_COOLDOWN = 2500 // ms
|
|
const CONCURRENT_TASK_INTERVAL = 100 // ms
|
|
|
|
/** Connects to an endpoint and resumes connection after reaching end of stream until all data is received.
|
|
* Allows closing the connection while the server buffers more data.
|
|
*/
|
|
class ChunkedStreamReader {
|
|
#bufferedString = "" // Data received waiting to be read.
|
|
#url
|
|
#fetchOptions
|
|
#response
|
|
|
|
constructor(url, initialContent = "", options = {}) {
|
|
if (typeof url !== "string" && !(url instanceof String)) {
|
|
throw new Error("url is not a string.")
|
|
}
|
|
if (typeof initialContent !== "undefined" && typeof initialContent !== "string") {
|
|
throw new Error("initialContent is not a string.")
|
|
}
|
|
this.#bufferedString = initialContent
|
|
this.#url = url
|
|
this.#fetchOptions = Object.assign(
|
|
{
|
|
headers: {
|
|
"Content-Type": "application/json",
|
|
},
|
|
},
|
|
options
|
|
)
|
|
this.onNext = undefined
|
|
}
|
|
|
|
get url() {
|
|
if (this.#response.redirected) {
|
|
return this.#response.url
|
|
}
|
|
return this.#url
|
|
}
|
|
get bufferedString() {
|
|
return this.#bufferedString
|
|
}
|
|
get status() {
|
|
this.#response?.status
|
|
}
|
|
get statusText() {
|
|
this.#response?.statusText
|
|
}
|
|
|
|
parse(value) {
|
|
if (typeof value === "undefined") {
|
|
return
|
|
}
|
|
if (!isArrayOrTypedArray(value)) {
|
|
return [value]
|
|
}
|
|
if (value.length === 0) {
|
|
return value
|
|
}
|
|
if (typeof this.textDecoder === "undefined") {
|
|
this.textDecoder = new TextDecoder()
|
|
}
|
|
return [this.textDecoder.decode(value)]
|
|
}
|
|
onComplete(value) {
|
|
return value
|
|
}
|
|
onError(response) {
|
|
throw new Error(response.statusText)
|
|
}
|
|
onNext({ value, done }, response) {
|
|
return { value, done }
|
|
}
|
|
|
|
async *[Symbol.asyncIterator]() {
|
|
return this.open()
|
|
}
|
|
async *open() {
|
|
let value = undefined
|
|
let done = undefined
|
|
do {
|
|
if (this.#response) {
|
|
await asyncDelay(RETRY_DELAY_IF_BUFFER_IS_EMPTY)
|
|
}
|
|
this.#response = await fetch(this.#url, this.#fetchOptions)
|
|
if (!this.#response.ok) {
|
|
if (this.#response.status === 425) {
|
|
continue
|
|
}
|
|
// Request status indicate failure
|
|
console.warn("Stream %o stopped unexpectedly.", this.#response)
|
|
value = await Promise.resolve(this.onError(this.#response))
|
|
if (typeof value === "boolean" && value) {
|
|
continue
|
|
}
|
|
return value
|
|
}
|
|
const reader = this.#response.body.getReader()
|
|
done = false
|
|
do {
|
|
const readState = await reader.read()
|
|
value = this.parse(readState.value)
|
|
if (value) {
|
|
for (let sVal of value) {
|
|
;({ value: sVal, done } = await Promise.resolve(
|
|
this.onNext({ value: sVal, done: readState.done })
|
|
))
|
|
yield sVal
|
|
if (done) {
|
|
return this.onComplete(sVal)
|
|
}
|
|
}
|
|
}
|
|
if (done) {
|
|
return
|
|
}
|
|
} while (value && !done)
|
|
} while (!done && (this.#response.ok || this.#response.status === 425))
|
|
}
|
|
*readStreamAsJSON(jsonStr, throwOnError) {
|
|
if (typeof jsonStr !== "string") {
|
|
throw new Error("jsonStr is not a string.")
|
|
}
|
|
do {
|
|
if (this.#bufferedString.length > 0) {
|
|
// Append new data when required
|
|
if (jsonStr.length > 0) {
|
|
jsonStr = this.#bufferedString + jsonStr
|
|
} else {
|
|
jsonStr = this.#bufferedString
|
|
}
|
|
this.#bufferedString = ""
|
|
}
|
|
if (!jsonStr) {
|
|
return
|
|
}
|
|
// Find next delimiter
|
|
let lastChunkIdx = jsonStr.indexOf("}{")
|
|
if (lastChunkIdx >= 0) {
|
|
this.#bufferedString = jsonStr.substring(0, lastChunkIdx + 1)
|
|
jsonStr = jsonStr.substring(lastChunkIdx + 1)
|
|
} else {
|
|
this.#bufferedString = jsonStr
|
|
jsonStr = ""
|
|
}
|
|
if (this.#bufferedString.length <= 0) {
|
|
return
|
|
}
|
|
// hack for a middleman buffering all the streaming updates, and unleashing them on the poor browser in one shot.
|
|
// this results in having to parse JSON like {"step": 1}{"step": 2}{"step": 3}{"ste...
|
|
// which is obviously invalid and can happen at any point while rendering.
|
|
// So we need to extract only the next {} section
|
|
try {
|
|
// Try to parse
|
|
const jsonObj = JSON.parse(this.#bufferedString)
|
|
this.#bufferedString = jsonStr
|
|
jsonStr = ""
|
|
yield jsonObj
|
|
} catch (e) {
|
|
if (throwOnError) {
|
|
console.error(`Parsing: "${this.#bufferedString}", Buffer: "${jsonStr}"`)
|
|
}
|
|
this.#bufferedString += jsonStr
|
|
if (e instanceof SyntaxError && !throwOnError) {
|
|
return
|
|
}
|
|
throw e
|
|
}
|
|
} while (this.#bufferedString.length > 0 && this.#bufferedString.indexOf("}") >= 0)
|
|
}
|
|
}
|
|
|
|
const EVENT_IDLE = "idle"
|
|
const EVENT_STATUS_CHANGED = "statusChange"
|
|
const EVENT_UNHANDLED_REJECTION = "unhandledRejection"
|
|
const EVENT_TASK_QUEUED = "taskQueued"
|
|
const EVENT_TASK_START = "taskStart"
|
|
const EVENT_TASK_END = "taskEnd"
|
|
const EVENT_TASK_ERROR = "task_error"
|
|
const EVENT_PING = "ping"
|
|
const EVENT_UNEXPECTED_RESPONSE = "unexpectedResponse"
|
|
const EVENTS_TYPES = [
|
|
EVENT_IDLE,
|
|
EVENT_STATUS_CHANGED,
|
|
EVENT_UNHANDLED_REJECTION,
|
|
|
|
EVENT_TASK_QUEUED,
|
|
EVENT_TASK_START,
|
|
EVENT_TASK_END,
|
|
EVENT_TASK_ERROR,
|
|
EVENT_PING,
|
|
|
|
EVENT_UNEXPECTED_RESPONSE,
|
|
]
|
|
Object.freeze(EVENTS_TYPES)
|
|
const eventSource = new GenericEventSource(EVENTS_TYPES)
|
|
|
|
function setServerStatus(msgType, msg) {
|
|
return eventSource.fireEvent(EVENT_STATUS_CHANGED, { type: msgType, message: msg })
|
|
}
|
|
|
|
const ServerStates = {
|
|
init: "Init",
|
|
loadingModel: "LoadingModel",
|
|
online: "Online",
|
|
rendering: "Rendering",
|
|
unavailable: "Unavailable",
|
|
}
|
|
Object.freeze(ServerStates)
|
|
|
|
let sessionId = Date.now()
|
|
let serverState = { status: ServerStates.unavailable, time: Date.now() }
|
|
|
|
async function healthCheck() {
|
|
if (Date.now() < serverState.time + HEALTH_PING_INTERVAL / 2 && isServerAvailable()) {
|
|
// Ping confirmed online less than half of HEALTH_PING_INTERVAL ago.
|
|
return true
|
|
}
|
|
if (Date.now() >= serverState.time + SERVER_STATE_VALIDITY_DURATION) {
|
|
console.warn("WARNING! SERVER_STATE_VALIDITY_DURATION has elapsed since the last Ping completed.")
|
|
}
|
|
try {
|
|
let res = undefined
|
|
if (typeof sessionId !== "undefined") {
|
|
res = await fetch("/ping?session_id=" + sessionId)
|
|
} else {
|
|
res = await fetch("/ping")
|
|
}
|
|
serverState = await res.json()
|
|
if (typeof serverState !== "object" || typeof serverState.status !== "string") {
|
|
console.error(`Server reply didn't contain a state value.`)
|
|
serverState = { status: ServerStates.unavailable, time: Date.now() }
|
|
setServerStatus("error", "offline")
|
|
return false
|
|
}
|
|
|
|
// Set status
|
|
switch (serverState.status) {
|
|
case ServerStates.init:
|
|
// Wait for init to complete before updating status.
|
|
break
|
|
case ServerStates.online:
|
|
setServerStatus("online", "ready")
|
|
break
|
|
case ServerStates.loadingModel:
|
|
setServerStatus("busy", "loading..")
|
|
break
|
|
case ServerStates.rendering:
|
|
setServerStatus("busy", "rendering..")
|
|
break
|
|
default:
|
|
// Unavailable
|
|
console.error("Ping received an unexpected server status. Status: %s", serverState.status)
|
|
setServerStatus("error", serverState.status.toLowerCase())
|
|
break
|
|
}
|
|
serverState.time = Date.now()
|
|
await eventSource.fireEvent(EVENT_PING, serverState)
|
|
return true
|
|
} catch (e) {
|
|
console.error(e)
|
|
serverState = { status: ServerStates.unavailable, time: Date.now() }
|
|
setServerStatus("error", "offline")
|
|
}
|
|
return false
|
|
}
|
|
|
|
function isServerAvailable() {
|
|
if (typeof serverState !== "object") {
|
|
console.error("serverState not set to a value. Connection to server could be lost...")
|
|
return false
|
|
}
|
|
if (Date.now() >= serverState.time + SERVER_STATE_VALIDITY_DURATION) {
|
|
console.warn("SERVER_STATE_VALIDITY_DURATION elapsed. Connection to server could be lost...")
|
|
return false
|
|
}
|
|
switch (serverState.status) {
|
|
case ServerStates.loadingModel:
|
|
case ServerStates.rendering:
|
|
case ServerStates.online:
|
|
return true
|
|
default:
|
|
console.warn("Unexpected server status. Server could be unavailable... Status: %s", serverState.status)
|
|
return false
|
|
}
|
|
}
|
|
|
|
async function waitUntil(isReadyFn, delay, timeout) {
|
|
if (typeof delay === "number") {
|
|
const msDelay = delay
|
|
delay = () => asyncDelay(msDelay)
|
|
}
|
|
if (typeof delay !== "function") {
|
|
throw new Error("delay is not a number or a function.")
|
|
}
|
|
if (typeof timeout !== "undefined" && typeof timeout !== "number") {
|
|
throw new Error("timeout is not a number.")
|
|
}
|
|
if (typeof timeout === "undefined" || timeout < 0) {
|
|
timeout = Number.MAX_SAFE_INTEGER
|
|
}
|
|
timeout = Date.now() + timeout
|
|
while (
|
|
timeout > Date.now() &&
|
|
Date.now() < serverState.time + SERVER_STATE_VALIDITY_DURATION &&
|
|
!Boolean(await Promise.resolve(isReadyFn()))
|
|
) {
|
|
await delay()
|
|
if (!isServerAvailable()) {
|
|
// Can fail if ping got frozen/suspended...
|
|
if ((await healthCheck()) && isServerAvailable()) {
|
|
// Force a recheck of server status before failure...
|
|
continue // Continue waiting if last healthCheck confirmed the server is still alive.
|
|
}
|
|
throw new Error("Connection with server lost.")
|
|
}
|
|
}
|
|
if (Date.now() >= serverState.time + SERVER_STATE_VALIDITY_DURATION) {
|
|
console.warn("SERVER_STATE_VALIDITY_DURATION elapsed. Released waitUntil on stale server state.")
|
|
}
|
|
}
|
|
|
|
const TaskStatus = {
|
|
init: "init",
|
|
pending: "pending", // Queued locally, not yet posted to server
|
|
waiting: "waiting", // Waiting to run on server
|
|
processing: "processing",
|
|
stopped: "stopped",
|
|
completed: "completed",
|
|
failed: "failed",
|
|
}
|
|
Object.freeze(TaskStatus)
|
|
|
|
const TASK_STATUS_ORDER = [
|
|
TaskStatus.init,
|
|
TaskStatus.pending,
|
|
TaskStatus.waiting,
|
|
TaskStatus.processing,
|
|
//Don't add status that are final.
|
|
]
|
|
|
|
const task_queue = new Map()
|
|
const concurrent_generators = new Map()
|
|
const weak_results = new WeakMap()
|
|
|
|
class Task {
|
|
// Private properties...
|
|
_reqBody = {} // request body of this task.
|
|
#reader = undefined
|
|
#status = TaskStatus.init
|
|
#id = undefined
|
|
#exception = undefined
|
|
|
|
constructor(options = {}) {
|
|
this._reqBody = Object.assign({}, options)
|
|
if (typeof this._reqBody.session_id === "undefined") {
|
|
this._reqBody.session_id = sessionId
|
|
} else if (
|
|
this._reqBody.session_id !== SD.sessionId &&
|
|
String(this._reqBody.session_id) !== String(SD.sessionId)
|
|
) {
|
|
throw new Error("Use SD.sessionId to set the request session_id.")
|
|
}
|
|
this._reqBody.session_id = String(this._reqBody.session_id)
|
|
}
|
|
|
|
get id() {
|
|
return this.#id
|
|
}
|
|
_setId(id) {
|
|
if (typeof this.#id !== "undefined") {
|
|
throw new Error("The task ID can only be set once.")
|
|
}
|
|
this.#id = id
|
|
}
|
|
|
|
get exception() {
|
|
return this.#exception
|
|
}
|
|
async abort(exception) {
|
|
if (this.isCompleted || this.isStopped || this.hasFailed) {
|
|
return
|
|
}
|
|
if (typeof exception !== "undefined") {
|
|
if (typeof exception === "string") {
|
|
exception = new Error(exception)
|
|
}
|
|
if (typeof exception !== "object") {
|
|
throw new Error("exception is not an object.")
|
|
}
|
|
if (!(exception instanceof Error)) {
|
|
throw new Error("exception is not an Error or a string.")
|
|
}
|
|
}
|
|
const res = await fetch("/image/stop?task=" + this.id)
|
|
if (!res.ok) {
|
|
console.log("Stop response:", res)
|
|
throw new Error(res.statusText)
|
|
}
|
|
task_queue.delete(this)
|
|
this.#exception = exception
|
|
this.#status = exception ? TaskStatus.failed : TaskStatus.stopped
|
|
}
|
|
|
|
get reqBody() {
|
|
if (this.#status === TaskStatus.init) {
|
|
return this._reqBody
|
|
}
|
|
console.warn("Task reqBody cannot be changed after the init state.")
|
|
return Object.assign({}, this._reqBody)
|
|
}
|
|
|
|
get isPending() {
|
|
return TASK_STATUS_ORDER.indexOf(this.#status) >= 0
|
|
}
|
|
get isCompleted() {
|
|
return this.#status === TaskStatus.completed
|
|
}
|
|
get hasFailed() {
|
|
return this.#status === TaskStatus.failed
|
|
}
|
|
get isStopped() {
|
|
return this.#status === TaskStatus.stopped
|
|
}
|
|
get status() {
|
|
return this.#status
|
|
}
|
|
_setStatus(status) {
|
|
if (status === this.#status) {
|
|
return
|
|
}
|
|
const currentIdx = TASK_STATUS_ORDER.indexOf(this.#status)
|
|
if (currentIdx < 0) {
|
|
throw Error(`The task status ${this.#status} is final and can't be changed.`)
|
|
}
|
|
const newIdx = TASK_STATUS_ORDER.indexOf(status)
|
|
if (newIdx >= 0 && newIdx < currentIdx) {
|
|
throw Error(`The task status ${status} can't replace ${this.#status}.`)
|
|
}
|
|
this.#status = status
|
|
}
|
|
|
|
/** Send current task to server.
|
|
* @param {*} [timeout=-1] Optional timeout value in ms
|
|
* @returns the response from the render request.
|
|
* @memberof Task
|
|
*/
|
|
async post(url, timeout = -1) {
|
|
if (this.status !== TaskStatus.init && this.status !== TaskStatus.pending) {
|
|
throw new Error(`Task status ${this.status} is not valid for post.`)
|
|
}
|
|
this._setStatus(TaskStatus.pending)
|
|
Object.freeze(this._reqBody)
|
|
|
|
const abortSignal = timeout >= 0 ? AbortSignal.timeout(timeout) : undefined
|
|
let res = undefined
|
|
try {
|
|
this.checkReqBody()
|
|
do {
|
|
abortSignal?.throwIfAborted()
|
|
res = await fetch(url, {
|
|
method: "POST",
|
|
headers: {
|
|
"Content-Type": "application/json",
|
|
},
|
|
body: JSON.stringify(this._reqBody),
|
|
signal: abortSignal,
|
|
})
|
|
// status_code 503, already a task running.
|
|
} while (res.status === 503 && (await asyncDelay(RETRY_DELAY_IF_SERVER_IS_BUSY)))
|
|
} catch (err) {
|
|
this.abort(err)
|
|
throw err
|
|
}
|
|
if (!res.ok) {
|
|
const err = new Error(`Unexpected response HTTP${res.status}. Details: ${res.statusText}`)
|
|
this.abort(err)
|
|
throw err
|
|
}
|
|
return await res.json()
|
|
}
|
|
|
|
static getReader(url) {
|
|
const reader = new ChunkedStreamReader(url)
|
|
const parseToString = reader.parse
|
|
reader.parse = function(value) {
|
|
value = parseToString.call(this, value)
|
|
if (!value || value.length <= 0) {
|
|
return
|
|
}
|
|
return reader.readStreamAsJSON(value.join(""))
|
|
}
|
|
reader.onNext = function({ done, value }) {
|
|
// By default is completed when the return value has a status defined.
|
|
if (typeof value === "object" && "status" in value) {
|
|
done = true
|
|
}
|
|
return { done, value }
|
|
}
|
|
return reader
|
|
}
|
|
_setReader(reader) {
|
|
if (typeof this.#reader !== "undefined") {
|
|
throw new Error("The task reader can only be set once.")
|
|
}
|
|
this.#reader = reader
|
|
}
|
|
get reader() {
|
|
if (this.#reader) {
|
|
return this.#reader
|
|
}
|
|
if (!this.streamUrl) {
|
|
throw new Error("The task has no stream Url defined.")
|
|
}
|
|
this.#reader = Task.getReader(this.streamUrl)
|
|
const task = this
|
|
const onNext = this.#reader.onNext
|
|
this.#reader.onNext = function({ done, value }) {
|
|
if (value && typeof value === "object") {
|
|
if (
|
|
task.status === TaskStatus.init ||
|
|
task.status === TaskStatus.pending ||
|
|
task.status === TaskStatus.waiting
|
|
) {
|
|
task._setStatus(TaskStatus.processing)
|
|
}
|
|
if ("step" in value && "total_steps" in value) {
|
|
task.step = value.step
|
|
task.total_steps = value.total_steps
|
|
}
|
|
}
|
|
return onNext.call(this, { done, value })
|
|
}
|
|
this.#reader.onComplete = function(value) {
|
|
task.result = value
|
|
if (task.isPending) {
|
|
task._setStatus(TaskStatus.completed)
|
|
}
|
|
return value
|
|
}
|
|
this.#reader.onError = function(response) {
|
|
const err = new Error(response.statusText)
|
|
task.abort(err)
|
|
throw err
|
|
}
|
|
return this.#reader
|
|
}
|
|
|
|
async waitUntil({ timeout = -1, callback, status, signal }) {
|
|
const currentIdx = TASK_STATUS_ORDER.indexOf(this.#status)
|
|
if (currentIdx <= 0) {
|
|
return false
|
|
}
|
|
const stIdx = status ? TASK_STATUS_ORDER.indexOf(status) : currentIdx + 1
|
|
if (stIdx >= 0 && stIdx <= currentIdx) {
|
|
return true
|
|
}
|
|
if (stIdx < 0 && currentIdx < 0) {
|
|
return this.#status === (status || TaskStatus.completed)
|
|
}
|
|
if (signal?.aborted) {
|
|
return false
|
|
}
|
|
const task = this
|
|
switch (this.#status) {
|
|
case TaskStatus.pending:
|
|
case TaskStatus.waiting:
|
|
// Wait for server status to include this task.
|
|
await waitUntil(
|
|
async () => {
|
|
if (
|
|
task.#id &&
|
|
typeof serverState.tasks === "object" &&
|
|
Object.keys(serverState.tasks).includes(String(task.#id))
|
|
) {
|
|
return true
|
|
}
|
|
if ((await Promise.resolve(callback?.call(task))) || signal?.aborted) {
|
|
return true
|
|
}
|
|
},
|
|
TASK_STATE_SERVER_UPDATE_DELAY,
|
|
timeout
|
|
)
|
|
if (
|
|
this.#id &&
|
|
typeof serverState.tasks === "object" &&
|
|
Object.keys(serverState.tasks).includes(String(task.#id))
|
|
) {
|
|
this._setStatus(TaskStatus.waiting)
|
|
}
|
|
if ((await Promise.resolve(callback?.call(this))) || signal?.aborted) {
|
|
return false
|
|
}
|
|
if (stIdx >= 0 && stIdx <= TASK_STATUS_ORDER.indexOf(TaskStatus.waiting)) {
|
|
return true
|
|
}
|
|
// Wait for task to start on server.
|
|
await waitUntil(
|
|
async () => {
|
|
if (
|
|
typeof serverState.tasks !== "object" ||
|
|
serverState.tasks[String(task.#id)] !== "pending"
|
|
) {
|
|
return true
|
|
}
|
|
if ((await Promise.resolve(callback?.call(task))) || signal?.aborted) {
|
|
return true
|
|
}
|
|
},
|
|
TASK_STATE_SERVER_UPDATE_DELAY,
|
|
timeout
|
|
)
|
|
const state =
|
|
typeof serverState.tasks === "object" ? serverState.tasks[String(task.#id)] : undefined
|
|
if (state === "running" || state === "buffer" || state === "completed") {
|
|
this._setStatus(TaskStatus.processing)
|
|
}
|
|
if ((await Promise.resolve(callback?.call(task))) || signal?.aborted) {
|
|
return false
|
|
}
|
|
if (stIdx >= 0 && stIdx <= TASK_STATUS_ORDER.indexOf(TaskStatus.processing)) {
|
|
return true
|
|
}
|
|
case TaskStatus.processing:
|
|
await waitUntil(
|
|
async () => {
|
|
if (
|
|
typeof serverState.tasks !== "object" ||
|
|
serverState.tasks[String(task.#id)] !== "running"
|
|
) {
|
|
return true
|
|
}
|
|
if ((await Promise.resolve(callback?.call(task))) || signal?.aborted) {
|
|
return true
|
|
}
|
|
},
|
|
TASK_STATE_SERVER_UPDATE_DELAY,
|
|
timeout
|
|
)
|
|
await Promise.resolve(callback?.call(this))
|
|
default:
|
|
return this.#status === (status || TaskStatus.completed)
|
|
}
|
|
}
|
|
|
|
async enqueue(promiseGenerator, ...args) {
|
|
if (this.status !== TaskStatus.init) {
|
|
throw new Error(`Task is in an invalid status ${this.status} to add to queue.`)
|
|
}
|
|
this._setStatus(TaskStatus.pending)
|
|
task_queue.set(this, promiseGenerator)
|
|
await eventSource.fireEvent(EVENT_TASK_QUEUED, { task: this })
|
|
await Task.enqueue(promiseGenerator, ...args)
|
|
await this.waitUntil({ status: TaskStatus.completed })
|
|
if (this.exception) {
|
|
throw this.exception
|
|
}
|
|
return this.result
|
|
}
|
|
static async enqueue(promiseGenerator, ...args) {
|
|
if (typeof promiseGenerator === "undefined") {
|
|
throw new Error("To enqueue a concurrent task, a *Promise Generator is needed but undefined was found.")
|
|
}
|
|
//if (Symbol.asyncIterator in result || Symbol.iterator in result) {
|
|
//concurrent_generators.set(result, Promise.resolve(args))
|
|
if (typeof promiseGenerator === "function") {
|
|
concurrent_generators.set(asGenerator({ callback: promiseGenerator }), Promise.resolve(args))
|
|
} else {
|
|
concurrent_generators.set(promiseGenerator, Promise.resolve(args))
|
|
}
|
|
await waitUntil(() => !concurrent_generators.has(promiseGenerator), CONCURRENT_TASK_INTERVAL)
|
|
return weak_results.get(promiseGenerator)
|
|
}
|
|
static enqueueNew(task, classCtor, progressCallback) {
|
|
if (task.status !== TaskStatus.init) {
|
|
throw new Error("Task has an invalid status to add to queue.")
|
|
}
|
|
if (!(task instanceof classCtor)) {
|
|
throw new Error("Task is not a instance of classCtor.")
|
|
}
|
|
let promiseGenerator = undefined
|
|
if (typeof progressCallback === "undefined") {
|
|
promiseGenerator = classCtor.start(task)
|
|
} else if (typeof progressCallback === "function") {
|
|
promiseGenerator = classCtor.start(task, progressCallback)
|
|
} else {
|
|
throw new Error("progressCallback is not a function.")
|
|
}
|
|
return Task.prototype.enqueue.call(task, promiseGenerator)
|
|
}
|
|
|
|
static async run(promiseGenerator, { callback, signal, timeout = -1 } = {}) {
|
|
let value = undefined
|
|
let done = undefined
|
|
if (timeout < 0) {
|
|
timeout = Number.MAX_SAFE_INTEGER
|
|
}
|
|
timeout = Date.now() + timeout
|
|
do {
|
|
;({ value, done } = await Promise.resolve(promiseGenerator.next(value)))
|
|
if (value instanceof Promise) {
|
|
value = await value
|
|
}
|
|
if (callback) {
|
|
;({ value, done } = await Promise.resolve(callback.call(promiseGenerator, { value, done })))
|
|
}
|
|
if (value instanceof Promise) {
|
|
value = await value
|
|
}
|
|
} while (!done && !signal?.aborted && timeout > Date.now())
|
|
return value
|
|
}
|
|
static async *asGenerator({ callback, generator, signal, timeout = -1 } = {}) {
|
|
let value = undefined
|
|
let done = undefined
|
|
if (timeout < 0) {
|
|
timeout = Number.MAX_SAFE_INTEGER
|
|
}
|
|
timeout = Date.now() + timeout
|
|
do {
|
|
;({ value, done } = await Promise.resolve(generator.next(value)))
|
|
if (value instanceof Promise) {
|
|
value = await value
|
|
}
|
|
if (callback) {
|
|
;({ value, done } = await Promise.resolve(callback.call(generator, { value, done })))
|
|
if (value instanceof Promise) {
|
|
value = await value
|
|
}
|
|
}
|
|
value = yield value
|
|
} while (!done && !signal?.aborted && timeout > Date.now())
|
|
return value
|
|
}
|
|
}
|
|
|
|
const TASK_REQUIRED = {
|
|
session_id: "string",
|
|
prompt: "string",
|
|
negative_prompt: "string",
|
|
width: "number",
|
|
height: "number",
|
|
seed: "number",
|
|
|
|
sampler_name: "string",
|
|
use_stable_diffusion_model: "string",
|
|
clip_skip: "boolean",
|
|
num_inference_steps: "number",
|
|
guidance_scale: "number",
|
|
|
|
num_outputs: "number",
|
|
stream_progress_updates: "boolean",
|
|
stream_image_progress: "boolean",
|
|
show_only_filtered_image: "boolean",
|
|
output_format: "string",
|
|
output_quality: "number",
|
|
}
|
|
const TASK_DEFAULTS = {
|
|
sampler_name: "plms",
|
|
use_stable_diffusion_model: "sd-v1-4",
|
|
clip_skip: false,
|
|
num_inference_steps: 50,
|
|
guidance_scale: 7.5,
|
|
negative_prompt: "",
|
|
|
|
num_outputs: 1,
|
|
stream_progress_updates: true,
|
|
stream_image_progress: true,
|
|
show_only_filtered_image: true,
|
|
block_nsfw: false,
|
|
output_format: "png",
|
|
output_quality: 75,
|
|
output_lossless: false,
|
|
}
|
|
const TASK_OPTIONAL = {
|
|
device: "string",
|
|
init_image: "string",
|
|
mask: "string",
|
|
save_to_disk_path: "string",
|
|
use_face_correction: "string",
|
|
use_upscale: "string",
|
|
use_vae_model: "string",
|
|
use_hypernetwork_model: "string",
|
|
hypernetwork_strength: "number",
|
|
output_lossless: "boolean",
|
|
tiling: "string",
|
|
}
|
|
|
|
// Higher values will result in...
|
|
// pytorch_lightning/utilities/seed.py:60: UserWarning: X is not in bounds, numpy accepts from 0 to 4294967295
|
|
const MAX_SEED_VALUE = 4294967295
|
|
|
|
class RenderTask extends Task {
|
|
constructor(options = {}) {
|
|
super(options)
|
|
if (typeof this._reqBody.seed === "undefined") {
|
|
this._reqBody.seed = Math.floor(Math.random() * (MAX_SEED_VALUE + 1))
|
|
}
|
|
if (
|
|
typeof typeof this._reqBody.seed === "number" &&
|
|
(this._reqBody.seed > MAX_SEED_VALUE || this._reqBody.seed < 0)
|
|
) {
|
|
throw new Error(`seed must be in range 0 to ${MAX_SEED_VALUE}.`)
|
|
}
|
|
|
|
if ("use_cpu" in this._reqBody) {
|
|
if (this._reqBody.use_cpu) {
|
|
this._reqBody.device = "cpu"
|
|
}
|
|
delete this._reqBody.use_cpu
|
|
}
|
|
if (this._reqBody.init_image) {
|
|
if (typeof this._reqBody.prompt_strength === "undefined") {
|
|
this._reqBody.prompt_strength = 0.8
|
|
} else if (typeof this._reqBody.prompt_strength !== "number") {
|
|
throw new Error(
|
|
`prompt_strength need to be of type number but ${typeof this._reqBody
|
|
.prompt_strength} was found.`
|
|
)
|
|
}
|
|
}
|
|
if ("modifiers" in this._reqBody) {
|
|
if (Array.isArray(this._reqBody.modifiers) && this._reqBody.modifiers.length > 0) {
|
|
this._reqBody.modifiers = this._reqBody.modifiers.filter((val) => val.trim())
|
|
if (this._reqBody.modifiers.length > 0) {
|
|
this._reqBody.prompt = `${this._reqBody.prompt}, ${this._reqBody.modifiers.join(", ")}`
|
|
}
|
|
}
|
|
if (typeof this._reqBody.modifiers === "string" && this._reqBody.modifiers.length > 0) {
|
|
this._reqBody.modifiers = this._reqBody.modifiers.trim()
|
|
if (this._reqBody.modifiers.length > 0) {
|
|
this._reqBody.prompt = `${this._reqBody.prompt}, ${this._reqBody.modifiers}`
|
|
}
|
|
}
|
|
delete this._reqBody.modifiers
|
|
}
|
|
this.checkReqBody()
|
|
}
|
|
|
|
checkReqBody() {
|
|
for (const key in TASK_DEFAULTS) {
|
|
if (typeof this._reqBody[key] === "undefined") {
|
|
this._reqBody[key] = TASK_DEFAULTS[key]
|
|
}
|
|
}
|
|
for (const key in TASK_REQUIRED) {
|
|
if (typeof this._reqBody[key] !== TASK_REQUIRED[key]) {
|
|
throw new Error(
|
|
`${key} need to be of type ${TASK_REQUIRED[key]} but ${typeof this._reqBody[key]} was found.`
|
|
)
|
|
}
|
|
}
|
|
for (const key in this._reqBody) {
|
|
if (key in TASK_REQUIRED) {
|
|
continue
|
|
}
|
|
if (key in TASK_OPTIONAL) {
|
|
if (typeof this._reqBody[key] == "undefined") {
|
|
delete this._reqBody[key]
|
|
console.warn(`reqBody[${key}] was set to undefined. Removing optional key without value...`)
|
|
continue
|
|
}
|
|
if (typeof this._reqBody[key] !== TASK_OPTIONAL[key]) {
|
|
throw new Error(
|
|
`${key} need to be of type ${TASK_OPTIONAL[key]} but ${typeof this._reqBody[
|
|
key
|
|
]} was found.`
|
|
)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/** Send current task to server.
|
|
* @param {*} [timeout=-1] Optional timeout value in ms
|
|
* @returns the response from the render request.
|
|
* @memberof Task
|
|
*/
|
|
async post(timeout = -1) {
|
|
performance.mark("make-render-request")
|
|
if (performance.getEntriesByName("click-makeImage", "mark").length > 0) {
|
|
performance.measure("diff", "click-makeImage", "make-render-request")
|
|
console.log(
|
|
"delay between clicking and making the server request:",
|
|
performance.getEntriesByName("diff", "measure")[0].duration + " ms"
|
|
)
|
|
}
|
|
|
|
let jsonResponse = await super.post("/render", timeout)
|
|
if (typeof jsonResponse?.task !== "number") {
|
|
console.warn("Endpoint error response: ", jsonResponse)
|
|
const event = Object.assign({ task: this }, jsonResponse)
|
|
await eventSource.fireEvent(EVENT_UNEXPECTED_RESPONSE, event)
|
|
if ("continueWith" in event) {
|
|
jsonResponse = await Promise.resolve(event.continueWith)
|
|
}
|
|
if (typeof jsonResponse?.task !== "number") {
|
|
const err = new Error(jsonResponse?.detail || "Endpoint response does not contains a task ID.")
|
|
this.abort(err)
|
|
throw err
|
|
}
|
|
}
|
|
this._setId(jsonResponse.task)
|
|
if (jsonResponse.stream) {
|
|
this.streamUrl = jsonResponse.stream
|
|
}
|
|
this._setStatus(TaskStatus.waiting)
|
|
return jsonResponse
|
|
}
|
|
|
|
enqueue(progressCallback) {
|
|
return Task.enqueueNew(this, RenderTask, progressCallback)
|
|
}
|
|
*start(progressCallback) {
|
|
if (typeof progressCallback !== "undefined" && typeof progressCallback !== "function") {
|
|
throw new Error("progressCallback is not a function. progressCallback type: " + typeof progressCallback)
|
|
}
|
|
if (this.isStopped) {
|
|
return
|
|
}
|
|
|
|
this._setStatus(TaskStatus.pending)
|
|
progressCallback?.call(this, { reqBody: this._reqBody })
|
|
Object.freeze(this._reqBody)
|
|
|
|
// Post task request to backend
|
|
let renderRequest = undefined
|
|
try {
|
|
renderRequest = yield this.post()
|
|
yield progressCallback?.call(this, { renderResponse: renderRequest })
|
|
} catch (e) {
|
|
yield progressCallback?.call(this, { detail: e.message })
|
|
throw e
|
|
}
|
|
try {
|
|
// Wait for task to start on server.
|
|
yield this.waitUntil({
|
|
callback: function() {
|
|
return progressCallback?.call(this, {})
|
|
},
|
|
status: TaskStatus.processing,
|
|
})
|
|
} catch (e) {
|
|
this.abort(err)
|
|
throw e
|
|
}
|
|
// Update class status and callback.
|
|
const taskState = typeof serverState.tasks === "object" ? serverState.tasks[String(this.id)] : undefined
|
|
switch (taskState) {
|
|
case "pending": // Session has pending tasks.
|
|
console.error("Server %o render request %o is still waiting.", serverState, renderRequest)
|
|
//Only update status if not already set by waitUntil
|
|
if (this.status === TaskStatus.init || this.status === TaskStatus.pending) {
|
|
// Set status as Waiting in backend.
|
|
this._setStatus(TaskStatus.waiting)
|
|
}
|
|
break
|
|
case "running":
|
|
case "buffer":
|
|
// Normal expected messages.
|
|
this._setStatus(TaskStatus.processing)
|
|
break
|
|
case "completed":
|
|
if (this.isPending) {
|
|
// Set state to processing until we read the reply
|
|
this._setStatus(TaskStatus.processing)
|
|
}
|
|
console.warn("Server %o render request %o completed unexpectedly", serverState, renderRequest)
|
|
break // Continue anyway to try to read cached result.
|
|
case "error":
|
|
this._setStatus(TaskStatus.failed)
|
|
console.error("Server %o render request %o has failed", serverState, renderRequest)
|
|
break // Still valid, Update UI with error message
|
|
case "stopped":
|
|
this._setStatus(TaskStatus.stopped)
|
|
console.log("Server %o render request %o was stopped", serverState, renderRequest)
|
|
return false
|
|
default:
|
|
if (!progressCallback) {
|
|
const err = new Error("Unexpected server task state: " + taskState || "Undefined")
|
|
this.abort(err)
|
|
throw err
|
|
}
|
|
const response = yield progressCallback.call(this, {})
|
|
if (response instanceof Error) {
|
|
this.abort(response)
|
|
throw response
|
|
}
|
|
if (!response) {
|
|
return false
|
|
}
|
|
}
|
|
|
|
// Task started!
|
|
// Open the reader.
|
|
const reader = this.reader
|
|
const task = this
|
|
reader.onError = function(response) {
|
|
if (progressCallback) {
|
|
task.abort(new Error(response.statusText))
|
|
return progressCallback.call(task, { response, reader })
|
|
}
|
|
return Task.prototype.onError.call(task, response)
|
|
}
|
|
yield progressCallback?.call(this, { reader })
|
|
|
|
//Start streaming the results.
|
|
const streamGenerator = reader.open()
|
|
let value = undefined
|
|
let done = undefined
|
|
yield progressCallback?.call(this, { stream: streamGenerator })
|
|
do {
|
|
;({ value, done } = yield streamGenerator.next())
|
|
if (typeof value !== "object") {
|
|
continue
|
|
}
|
|
yield progressCallback?.call(this, { update: value })
|
|
} while (!done)
|
|
return value
|
|
}
|
|
static start(task, progressCallback) {
|
|
if (typeof task !== "object") {
|
|
throw new Error("task is not an object. task type: " + typeof task)
|
|
}
|
|
if (!(task instanceof Task)) {
|
|
if (task.reqBody) {
|
|
task = new RenderTask(task.reqBody)
|
|
} else {
|
|
task = new RenderTask(task)
|
|
}
|
|
}
|
|
return task.start(progressCallback)
|
|
}
|
|
static run(task, progressCallback) {
|
|
const promiseGenerator = RenderTask.start(task, progressCallback)
|
|
return Task.run(promiseGenerator)
|
|
}
|
|
}
|
|
class FilterTask extends Task {
|
|
constructor(options = {}) {
|
|
super(options)
|
|
}
|
|
/** Send current task to server.
|
|
* @param {*} [timeout=-1] Optional timeout value in ms
|
|
* @returns the response from the render request.
|
|
* @memberof Task
|
|
*/
|
|
async post(timeout = -1) {
|
|
let jsonResponse = await super.post("/filter", timeout)
|
|
if (typeof jsonResponse?.task !== "number") {
|
|
console.warn("Endpoint error response: ", jsonResponse)
|
|
const event = Object.assign({ task: this }, jsonResponse)
|
|
await eventSource.fireEvent(EVENT_UNEXPECTED_RESPONSE, event)
|
|
if ("continueWith" in event) {
|
|
jsonResponse = await Promise.resolve(event.continueWith)
|
|
}
|
|
if (typeof jsonResponse?.task !== "number") {
|
|
const err = new Error(jsonResponse?.detail || "Endpoint response does not contains a task ID.")
|
|
this.abort(err)
|
|
throw err
|
|
}
|
|
}
|
|
this._setId(jsonResponse.task)
|
|
if (jsonResponse.stream) {
|
|
this.streamUrl = jsonResponse.stream
|
|
}
|
|
this._setStatus(TaskStatus.waiting)
|
|
return jsonResponse
|
|
}
|
|
checkReqBody() {}
|
|
enqueue(progressCallback) {
|
|
return Task.enqueueNew(this, FilterTask, progressCallback)
|
|
}
|
|
*start(progressCallback) {
|
|
if (typeof progressCallback !== "undefined" && typeof progressCallback !== "function") {
|
|
throw new Error("progressCallback is not a function. progressCallback type: " + typeof progressCallback)
|
|
}
|
|
if (this.isStopped) {
|
|
return
|
|
}
|
|
|
|
this._setStatus(TaskStatus.pending)
|
|
progressCallback?.call(this, { reqBody: this._reqBody })
|
|
Object.freeze(this._reqBody)
|
|
|
|
// Post task request to backend
|
|
let renderRes = undefined
|
|
try {
|
|
renderRes = yield this.post()
|
|
yield progressCallback?.call(this, { renderResponse: renderRes })
|
|
} catch (e) {
|
|
yield progressCallback?.call(this, { detail: e.message })
|
|
throw e
|
|
}
|
|
|
|
try {
|
|
// Wait for task to start on server.
|
|
yield this.waitUntil({
|
|
callback: function() {
|
|
return progressCallback?.call(this, {})
|
|
},
|
|
status: TaskStatus.processing,
|
|
})
|
|
} catch (e) {
|
|
this.abort(err)
|
|
throw e
|
|
}
|
|
|
|
// Task started!
|
|
// Open the reader.
|
|
const reader = this.reader
|
|
const task = this
|
|
reader.onError = function(response) {
|
|
if (progressCallback) {
|
|
task.abort(new Error(response.statusText))
|
|
return progressCallback.call(task, { response, reader })
|
|
}
|
|
return Task.prototype.onError.call(task, response)
|
|
}
|
|
yield progressCallback?.call(this, { reader })
|
|
|
|
//Start streaming the results.
|
|
const streamGenerator = reader.open()
|
|
let value = undefined
|
|
let done = undefined
|
|
yield progressCallback?.call(this, { stream: streamGenerator })
|
|
do {
|
|
;({ value, done } = yield streamGenerator.next())
|
|
if (typeof value !== "object") {
|
|
continue
|
|
}
|
|
if (value.status !== undefined) {
|
|
yield progressCallback?.call(this, value)
|
|
if (value.status === "succeeded" || value.status === "failed") {
|
|
done = true
|
|
}
|
|
}
|
|
} while (!done)
|
|
return value
|
|
}
|
|
static start(task, progressCallback) {
|
|
if (typeof task !== "object") {
|
|
throw new Error("task is not an object. task type: " + typeof task)
|
|
}
|
|
if (!(task instanceof Task)) {
|
|
if (task.reqBody) {
|
|
task = new FilterTask(task.reqBody)
|
|
} else {
|
|
task = new FilterTask(task)
|
|
}
|
|
}
|
|
return task.start(progressCallback)
|
|
}
|
|
static run(task, progressCallback) {
|
|
const promiseGenerator = FilterTask.start(task, progressCallback)
|
|
return Task.run(promiseGenerator)
|
|
}
|
|
}
|
|
|
|
const getSystemInfo = debounce(
|
|
async function() {
|
|
let systemInfo = {
|
|
devices: {
|
|
all: {},
|
|
active: {},
|
|
},
|
|
hosts: [],
|
|
}
|
|
try {
|
|
const res = await fetch("/get/system_info")
|
|
if (!res.ok) {
|
|
console.error("Invalid response fetching devices", res.statusText)
|
|
return systemInfo
|
|
}
|
|
systemInfo = await res.json()
|
|
} catch (e) {
|
|
console.error("error fetching system info", e)
|
|
}
|
|
return systemInfo
|
|
},
|
|
250,
|
|
true
|
|
)
|
|
async function getDevices() {
|
|
let systemInfo = getSystemInfo()
|
|
return systemInfo.devices
|
|
}
|
|
async function getHosts() {
|
|
let systemInfo = getSystemInfo()
|
|
return systemInfo.hosts
|
|
}
|
|
|
|
async function getModels(scanForMalicious = true) {
|
|
let models = {
|
|
"stable-diffusion": [],
|
|
vae: [],
|
|
}
|
|
try {
|
|
const res = await fetch("/get/models?scan_for_malicious=" + scanForMalicious)
|
|
if (!res.ok) {
|
|
console.error("Invalid response fetching models", res.statusText)
|
|
return models
|
|
}
|
|
models = await res.json()
|
|
console.log("get models response", models)
|
|
} catch (e) {
|
|
console.log("get models error", e)
|
|
}
|
|
return models
|
|
}
|
|
|
|
function getServerCapacity() {
|
|
let activeDevicesCount = Object.keys(serverState?.devices?.active || {}).length
|
|
if (typeof window === "object" && window.document.visibilityState === "hidden") {
|
|
activeDevicesCount = 1 + activeDevicesCount
|
|
}
|
|
return activeDevicesCount
|
|
}
|
|
|
|
let idleEventPromise = undefined
|
|
function continueTasks() {
|
|
if (typeof navigator?.scheduling?.isInputPending === "function") {
|
|
const inputPendingOptions = {
|
|
// Report mouse/pointer move events when queue is empty.
|
|
// Delay idle after mouse moves stops.
|
|
includeContinuous: Boolean(task_queue.size <= 0 && concurrent_generators.size <= 0),
|
|
}
|
|
if (navigator.scheduling.isInputPending(inputPendingOptions)) {
|
|
// Browser/User still active.
|
|
return asyncDelay(CONCURRENT_TASK_INTERVAL)
|
|
}
|
|
}
|
|
const serverCapacity = getServerCapacity()
|
|
if (task_queue.size <= 0 && concurrent_generators.size <= 0) {
|
|
if (!idleEventPromise?.isPending) {
|
|
idleEventPromise = makeQuerablePromise(
|
|
eventSource.fireEvent(EVENT_IDLE, { capacity: serverCapacity, idle: true })
|
|
)
|
|
}
|
|
// Calling idle could result in task being added to queue.
|
|
// if (task_queue.size <= 0 && concurrent_generators.size <= 0) {
|
|
// return asyncDelay(IDLE_COOLDOWN).then(() => idleEventPromise)
|
|
// }
|
|
}
|
|
if (task_queue.size < serverCapacity) {
|
|
if (!idleEventPromise?.isPending) {
|
|
idleEventPromise = makeQuerablePromise(
|
|
eventSource.fireEvent(EVENT_IDLE, { capacity: serverCapacity - task_queue.size })
|
|
)
|
|
}
|
|
}
|
|
const completedTasks = []
|
|
for (let [generator, promise] of concurrent_generators.entries()) {
|
|
if (promise.isPending) {
|
|
continue
|
|
}
|
|
let value = promise.resolvedValue?.value || promise.resolvedValue
|
|
if (promise.isRejected) {
|
|
console.error(promise.rejectReason)
|
|
const event = { generator, reason: promise.rejectReason }
|
|
eventSource.fireEvent(EVENT_UNHANDLED_REJECTION, event)
|
|
if ("continueWith" in event) {
|
|
value = Promise.resolve(event.continueWith)
|
|
} else {
|
|
concurrent_generators.delete(generator)
|
|
completedTasks.push({ generator, promise })
|
|
continue
|
|
}
|
|
}
|
|
if (value instanceof Promise) {
|
|
promise = makeQuerablePromise(value.then((val) => ({ done: promise.resolvedValue?.done, value: val })))
|
|
concurrent_generators.set(generator, promise)
|
|
continue
|
|
}
|
|
weak_results.set(generator, value)
|
|
if (promise.resolvedValue?.done) {
|
|
concurrent_generators.delete(generator)
|
|
completedTasks.push({ generator, promise })
|
|
continue
|
|
}
|
|
|
|
promise = generator.next(value)
|
|
if (!(promise instanceof Promise)) {
|
|
promise = Promise.resolve(promise)
|
|
}
|
|
promise = makeQuerablePromise(promise)
|
|
concurrent_generators.set(generator, promise)
|
|
}
|
|
|
|
for (let [task, generator] of task_queue.entries()) {
|
|
const cTsk = completedTasks.find((item) => item.generator === generator)
|
|
if (cTsk?.promise?.rejectReason || task.hasFailed) {
|
|
eventSource.fireEvent(EVENT_TASK_ERROR, {
|
|
task,
|
|
generator,
|
|
reason: cTsk?.promise?.rejectReason || task.exception,
|
|
})
|
|
task_queue.delete(task)
|
|
continue
|
|
}
|
|
if (task.isCompleted || task.isStopped || cTsk) {
|
|
const eventEndArgs = { task, generator }
|
|
if (task.isStopped) {
|
|
eventEndArgs.stopped = true
|
|
}
|
|
eventSource.fireEvent(EVENT_TASK_END, eventEndArgs)
|
|
task_queue.delete(task)
|
|
continue
|
|
}
|
|
if (concurrent_generators.size > serverCapacity) {
|
|
break
|
|
}
|
|
if (!generator) {
|
|
if (typeof task.start === "function") {
|
|
generator = task.start()
|
|
}
|
|
} else if (concurrent_generators.has(generator)) {
|
|
continue
|
|
}
|
|
const event = { task, generator }
|
|
const beforeStart = eventSource.fireEvent(EVENT_TASK_START, event) // optional beforeStart promise to wait on before starting task.
|
|
const promise = makeQuerablePromise(beforeStart.then(() => Promise.resolve(event.beforeStart)))
|
|
concurrent_generators.set(event.generator, promise)
|
|
task_queue.set(task, event.generator)
|
|
}
|
|
const promises = Array.from(concurrent_generators.values())
|
|
if (promises.length <= 0) {
|
|
return asyncDelay(CONCURRENT_TASK_INTERVAL)
|
|
}
|
|
return Promise.race(promises).finally(continueTasks)
|
|
}
|
|
let taskPromise = undefined
|
|
function startCheck() {
|
|
if (taskPromise?.isPending) {
|
|
return
|
|
}
|
|
do {
|
|
if (taskPromise?.resolvedValue instanceof Promise) {
|
|
taskPromise = makeQuerablePromise(taskPromise.resolvedValue)
|
|
continue
|
|
}
|
|
if (typeof navigator?.scheduling?.isInputPending === "function" && navigator.scheduling.isInputPending()) {
|
|
return
|
|
}
|
|
const continuePromise = continueTasks().catch(async function(err) {
|
|
console.error(err)
|
|
await eventSource.fireEvent(EVENT_UNHANDLED_REJECTION, { reason: err })
|
|
await asyncDelay(RETRY_DELAY_ON_ERROR)
|
|
})
|
|
taskPromise = makeQuerablePromise(continuePromise)
|
|
} while (taskPromise?.isResolved)
|
|
}
|
|
|
|
const SD = {
|
|
ChunkedStreamReader,
|
|
ServerStates,
|
|
TaskStatus,
|
|
Task,
|
|
RenderTask,
|
|
FilterTask,
|
|
|
|
Events: EVENTS_TYPES,
|
|
init: async function(options = {}) {
|
|
if ("events" in options) {
|
|
for (const key in options.events) {
|
|
eventSource.addEventListener(key, options.events[key])
|
|
}
|
|
}
|
|
await healthCheck()
|
|
setInterval(healthCheck, HEALTH_PING_INTERVAL)
|
|
setInterval(startCheck, CONCURRENT_TASK_INTERVAL)
|
|
},
|
|
|
|
/** Add a new event listener
|
|
*/
|
|
addEventListener: (...args) => eventSource.addEventListener(...args),
|
|
/** Remove the event listener
|
|
*/
|
|
removeEventListener: (...args) => eventSource.removeEventListener(...args),
|
|
|
|
isServerAvailable,
|
|
getServerCapacity,
|
|
|
|
getSystemInfo,
|
|
getDevices,
|
|
getHosts,
|
|
|
|
getModels,
|
|
|
|
render: (...args) => RenderTask.run(...args),
|
|
filter: (...args) => FilterTask.run(...args),
|
|
waitUntil,
|
|
}
|
|
|
|
Object.defineProperties(SD, {
|
|
serverState: {
|
|
configurable: false,
|
|
get: () => serverState,
|
|
},
|
|
isAvailable: {
|
|
configurable: false,
|
|
get: () => isServerAvailable(),
|
|
},
|
|
serverCapacity: {
|
|
configurable: false,
|
|
get: () => getServerCapacity(),
|
|
},
|
|
sessionId: {
|
|
configurable: false,
|
|
get: () => sessionId,
|
|
set: (val) => {
|
|
if (typeof val === "undefined") {
|
|
throw new Error("Can't set sessionId to undefined.")
|
|
}
|
|
sessionId = val
|
|
},
|
|
},
|
|
MAX_SEED_VALUE: {
|
|
configurable: false,
|
|
get: () => MAX_SEED_VALUE,
|
|
},
|
|
activeTasks: {
|
|
configurable: false,
|
|
get: () => task_queue,
|
|
},
|
|
})
|
|
Object.defineProperties(getGlobal(), {
|
|
SD: {
|
|
configurable: false,
|
|
get: () => SD,
|
|
},
|
|
sessionId: {
|
|
//TODO Remove in the future in favor of SD.sessionId
|
|
configurable: false,
|
|
get: () => {
|
|
console.warn("Deprecated window.sessionId has been replaced with SD.sessionId.")
|
|
console.trace()
|
|
return SD.sessionId
|
|
},
|
|
set: (val) => {
|
|
console.warn("Deprecated window.sessionId has been replaced with SD.sessionId.")
|
|
console.trace()
|
|
SD.sessionId = val
|
|
},
|
|
},
|
|
})
|
|
})()
|