mirror of
https://github.com/easydiffusion/easydiffusion.git
synced 2025-04-19 00:48:38 +02:00
* Support custom modifiers with images * Add dash support * Avoid needing to upgrade fastapi * Revert gitignore
329 lines
12 KiB
Python
329 lines
12 KiB
Python
import os
|
|
import socket
|
|
import sys
|
|
import json
|
|
import traceback
|
|
import logging
|
|
import shlex
|
|
import urllib
|
|
from rich.logging import RichHandler
|
|
|
|
from sdkit.utils import log as sdkit_log # hack, so we can overwrite the log config
|
|
|
|
from easydiffusion import task_manager
|
|
from easydiffusion.utils import log
|
|
|
|
# Remove all handlers associated with the root logger object.
|
|
for handler in logging.root.handlers[:]:
|
|
logging.root.removeHandler(handler)
|
|
|
|
LOG_FORMAT = "%(asctime)s.%(msecs)03d %(levelname)s %(threadName)s %(message)s"
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format=LOG_FORMAT,
|
|
datefmt="%X",
|
|
handlers=[RichHandler(markup=True, rich_tracebacks=False, show_time=False, show_level=False)],
|
|
)
|
|
|
|
SD_DIR = os.getcwd()
|
|
|
|
SD_UI_DIR = os.getenv("SD_UI_PATH", None)
|
|
|
|
CONFIG_DIR = os.path.abspath(os.path.join(SD_UI_DIR, "..", "scripts"))
|
|
MODELS_DIR = os.path.abspath(os.path.join(SD_DIR, "..", "models"))
|
|
|
|
USER_PLUGINS_DIR = os.path.abspath(os.path.join(SD_DIR, "..", "plugins"))
|
|
CORE_PLUGINS_DIR = os.path.abspath(os.path.join(SD_UI_DIR, "plugins"))
|
|
|
|
USER_UI_PLUGINS_DIR = os.path.join(USER_PLUGINS_DIR, "ui")
|
|
CORE_UI_PLUGINS_DIR = os.path.join(CORE_PLUGINS_DIR, "ui")
|
|
USER_SERVER_PLUGINS_DIR = os.path.join(USER_PLUGINS_DIR, "server")
|
|
UI_PLUGINS_SOURCES = ((CORE_UI_PLUGINS_DIR, "core"), (USER_UI_PLUGINS_DIR, "user"))
|
|
|
|
sys.path.append(os.path.dirname(SD_UI_DIR))
|
|
sys.path.append(USER_SERVER_PLUGINS_DIR)
|
|
|
|
OUTPUT_DIRNAME = "Stable Diffusion UI" # in the user's home folder
|
|
PRESERVE_CONFIG_VARS = ["FORCE_FULL_PRECISION"]
|
|
TASK_TTL = 15 * 60 # Discard last session's task timeout
|
|
APP_CONFIG_DEFAULTS = {
|
|
# auto: selects the cuda device with the most free memory, cuda: use the currently active cuda device.
|
|
"render_devices": "auto", # valid entries: 'auto', 'cpu' or 'cuda:N' (where N is a GPU index)
|
|
"update_branch": "main",
|
|
"ui": {
|
|
"open_browser_on_start": True,
|
|
},
|
|
}
|
|
|
|
IMAGE_EXTENSIONS = [".png", ".apng", ".jpg", ".jpeg", ".jfif", ".pjpeg", ".pjp", ".jxl", ".gif", ".webp", ".avif", ".svg"]
|
|
CUSTOM_MODIFIERS_DIR = os.path.abspath(os.path.join(SD_DIR, "..", "modifiers"))
|
|
CUSTOM_MODIFIERS_PORTRAIT_EXTENSIONS=[".portrait", "_portrait", " portrait", "-portrait"]
|
|
CUSTOM_MODIFIERS_LANDSCAPE_EXTENSIONS=[".landscape", "_landscape", " landscape", "-landscape"]
|
|
|
|
def init():
|
|
os.makedirs(USER_UI_PLUGINS_DIR, exist_ok=True)
|
|
os.makedirs(USER_SERVER_PLUGINS_DIR, exist_ok=True)
|
|
|
|
load_server_plugins()
|
|
|
|
update_render_threads()
|
|
|
|
|
|
def getConfig(default_val=APP_CONFIG_DEFAULTS):
|
|
try:
|
|
config_json_path = os.path.join(CONFIG_DIR, "config.json")
|
|
if not os.path.exists(config_json_path):
|
|
config = default_val
|
|
else:
|
|
with open(config_json_path, "r", encoding="utf-8") as f:
|
|
config = json.load(f)
|
|
if "net" not in config:
|
|
config["net"] = {}
|
|
if os.getenv("SD_UI_BIND_PORT") is not None:
|
|
config["net"]["listen_port"] = int(os.getenv("SD_UI_BIND_PORT"))
|
|
else:
|
|
config["net"]["listen_port"] = 9000
|
|
if os.getenv("SD_UI_BIND_IP") is not None:
|
|
config["net"]["listen_to_network"] = os.getenv("SD_UI_BIND_IP") == "0.0.0.0"
|
|
else:
|
|
config["net"]["listen_to_network"] = True
|
|
return config
|
|
except Exception as e:
|
|
log.warn(traceback.format_exc())
|
|
return default_val
|
|
|
|
|
|
def setConfig(config):
|
|
try: # config.json
|
|
config_json_path = os.path.join(CONFIG_DIR, "config.json")
|
|
with open(config_json_path, "w", encoding="utf-8") as f:
|
|
json.dump(config, f)
|
|
except:
|
|
log.error(traceback.format_exc())
|
|
|
|
try: # config.bat
|
|
config_bat_path = os.path.join(CONFIG_DIR, "config.bat")
|
|
config_bat = []
|
|
|
|
if "update_branch" in config:
|
|
config_bat.append(f"@set update_branch={config['update_branch']}")
|
|
|
|
config_bat.append(f"@set SD_UI_BIND_PORT={config['net']['listen_port']}")
|
|
bind_ip = "0.0.0.0" if config["net"]["listen_to_network"] else "127.0.0.1"
|
|
config_bat.append(f"@set SD_UI_BIND_IP={bind_ip}")
|
|
|
|
# Preserve these variables if they are set
|
|
for var in PRESERVE_CONFIG_VARS:
|
|
if os.getenv(var) is not None:
|
|
config_bat.append(f"@set {var}={os.getenv(var)}")
|
|
|
|
if len(config_bat) > 0:
|
|
with open(config_bat_path, "w", encoding="utf-8") as f:
|
|
f.write("\n".join(config_bat))
|
|
except:
|
|
log.error(traceback.format_exc())
|
|
|
|
try: # config.sh
|
|
config_sh_path = os.path.join(CONFIG_DIR, "config.sh")
|
|
config_sh = ["#!/bin/bash"]
|
|
|
|
if "update_branch" in config:
|
|
config_sh.append(f"export update_branch={config['update_branch']}")
|
|
|
|
config_sh.append(f"export SD_UI_BIND_PORT={config['net']['listen_port']}")
|
|
bind_ip = "0.0.0.0" if config["net"]["listen_to_network"] else "127.0.0.1"
|
|
config_sh.append(f"export SD_UI_BIND_IP={bind_ip}")
|
|
|
|
# Preserve these variables if they are set
|
|
for var in PRESERVE_CONFIG_VARS:
|
|
if os.getenv(var) is not None:
|
|
config_bat.append(f'export {var}="{shlex.quote(os.getenv(var))}"')
|
|
|
|
if len(config_sh) > 1:
|
|
with open(config_sh_path, "w", encoding="utf-8") as f:
|
|
f.write("\n".join(config_sh))
|
|
except:
|
|
log.error(traceback.format_exc())
|
|
|
|
|
|
def save_to_config(ckpt_model_name, vae_model_name, hypernetwork_model_name, vram_usage_level):
|
|
config = getConfig()
|
|
if "model" not in config:
|
|
config["model"] = {}
|
|
|
|
config["model"]["stable-diffusion"] = ckpt_model_name
|
|
config["model"]["vae"] = vae_model_name
|
|
config["model"]["hypernetwork"] = hypernetwork_model_name
|
|
|
|
if vae_model_name is None or vae_model_name == "":
|
|
del config["model"]["vae"]
|
|
if hypernetwork_model_name is None or hypernetwork_model_name == "":
|
|
del config["model"]["hypernetwork"]
|
|
|
|
config["vram_usage_level"] = vram_usage_level
|
|
|
|
setConfig(config)
|
|
|
|
|
|
def update_render_threads():
|
|
config = getConfig()
|
|
render_devices = config.get("render_devices", "auto")
|
|
active_devices = task_manager.get_devices()["active"].keys()
|
|
|
|
log.debug(f"requesting for render_devices: {render_devices}")
|
|
task_manager.update_render_threads(render_devices, active_devices)
|
|
|
|
|
|
def getUIPlugins():
|
|
plugins = []
|
|
|
|
for plugins_dir, dir_prefix in UI_PLUGINS_SOURCES:
|
|
for file in os.listdir(plugins_dir):
|
|
if file.endswith(".plugin.js"):
|
|
plugins.append(f"/plugins/{dir_prefix}/{file}")
|
|
|
|
return plugins
|
|
|
|
|
|
def load_server_plugins():
|
|
if not os.path.exists(USER_SERVER_PLUGINS_DIR):
|
|
return
|
|
|
|
import importlib
|
|
|
|
def load_plugin(file):
|
|
mod_path = file.replace(".py", "")
|
|
return importlib.import_module(mod_path)
|
|
|
|
def apply_plugin(file, plugin):
|
|
if hasattr(plugin, "get_cond_and_uncond"):
|
|
import sdkit.generate.image_generator
|
|
|
|
sdkit.generate.image_generator.get_cond_and_uncond = plugin.get_cond_and_uncond
|
|
log.info(f"Overridden get_cond_and_uncond with the one in the server plugin: {file}")
|
|
|
|
for file in os.listdir(USER_SERVER_PLUGINS_DIR):
|
|
file_path = os.path.join(USER_SERVER_PLUGINS_DIR, file)
|
|
if (not os.path.isdir(file_path) and not file_path.endswith("_plugin.py")) or (
|
|
os.path.isdir(file_path) and not file_path.endswith("_plugin")
|
|
):
|
|
continue
|
|
|
|
try:
|
|
log.info(f"Loading server plugin: {file}")
|
|
mod = load_plugin(file)
|
|
|
|
log.info(f"Applying server plugin: {file}")
|
|
apply_plugin(file, mod)
|
|
except:
|
|
log.warn(f"Error while loading a server plugin")
|
|
log.warn(traceback.format_exc())
|
|
|
|
|
|
def getIPConfig():
|
|
try:
|
|
ips = socket.gethostbyname_ex(socket.gethostname())
|
|
ips[2].append(ips[0])
|
|
return ips[2]
|
|
except Exception as e:
|
|
log.exception(e)
|
|
return []
|
|
|
|
|
|
def open_browser():
|
|
config = getConfig()
|
|
ui = config.get("ui", {})
|
|
net = config.get("net", {"listen_port": 9000})
|
|
port = net.get("listen_port", 9000)
|
|
if ui.get("open_browser_on_start", True):
|
|
import webbrowser
|
|
|
|
webbrowser.open(f"http://localhost:{port}")
|
|
|
|
def get_image_modifiers():
|
|
modifiers_json_path = os.path.join(SD_UI_DIR, "modifiers.json")
|
|
|
|
modifier_categories = {}
|
|
original_category_order=[]
|
|
with open(modifiers_json_path, "r", encoding="utf-8") as f:
|
|
modifiers_file = json.load(f)
|
|
|
|
# The trailing slash is needed to support symlinks
|
|
if not os.path.isdir(f"{CUSTOM_MODIFIERS_DIR}/"):
|
|
return modifiers_file
|
|
|
|
# convert modifiers from a list of objects to a dict of dicts
|
|
for category_item in modifiers_file:
|
|
category_name = category_item['category']
|
|
original_category_order.append(category_name)
|
|
category = {}
|
|
for modifier_item in category_item['modifiers']:
|
|
modifier = {}
|
|
for preview_item in modifier_item['previews']:
|
|
modifier[preview_item['name']] = preview_item['path']
|
|
category[modifier_item['modifier']] = modifier
|
|
modifier_categories[category_name] = category
|
|
|
|
def scan_directory(directory_path: str, category_name="Modifiers"):
|
|
for entry in os.scandir(directory_path):
|
|
if entry.is_file():
|
|
file_extension = list(filter(lambda e: entry.name.endswith(e), IMAGE_EXTENSIONS))
|
|
if len(file_extension) == 0:
|
|
continue
|
|
|
|
modifier_name = entry.name[: -len(file_extension[0])]
|
|
modifier_path = f"custom/{entry.path[len(CUSTOM_MODIFIERS_DIR) + 1:]}"
|
|
# URL encode path segments
|
|
modifier_path = "/".join(map(lambda segment: urllib.parse.quote(segment), modifier_path.split("/")))
|
|
is_portrait = True
|
|
is_landscape = True
|
|
|
|
portrait_extension = list(filter(lambda e: modifier_name.lower().endswith(e), CUSTOM_MODIFIERS_PORTRAIT_EXTENSIONS))
|
|
landscape_extension = list(filter(lambda e: modifier_name.lower().endswith(e), CUSTOM_MODIFIERS_LANDSCAPE_EXTENSIONS))
|
|
|
|
if len(portrait_extension) > 0:
|
|
is_landscape = False
|
|
modifier_name = modifier_name[: -len(portrait_extension[0])]
|
|
elif len(landscape_extension) > 0:
|
|
is_portrait = False
|
|
modifier_name = modifier_name[: -len(landscape_extension[0])]
|
|
|
|
if (category_name not in modifier_categories):
|
|
modifier_categories[category_name] = {}
|
|
|
|
category = modifier_categories[category_name]
|
|
|
|
if (modifier_name not in category):
|
|
category[modifier_name] = {}
|
|
|
|
if (is_portrait or "portrait" not in category[modifier_name]):
|
|
category[modifier_name]["portrait"] = modifier_path
|
|
|
|
if (is_landscape or "landscape" not in category[modifier_name]):
|
|
category[modifier_name]["landscape"] = modifier_path
|
|
elif entry.is_dir():
|
|
scan_directory(
|
|
entry.path,
|
|
entry.name if directory_path==CUSTOM_MODIFIERS_DIR else f"{category_name}/{entry.name}",
|
|
)
|
|
|
|
scan_directory(CUSTOM_MODIFIERS_DIR)
|
|
|
|
custom_categories = sorted(
|
|
[cn for cn in modifier_categories.keys() if cn not in original_category_order],
|
|
key=str.casefold,
|
|
)
|
|
|
|
# convert the modifiers back into a list of objects
|
|
modifier_categories_list = []
|
|
for category_name in [*original_category_order, *custom_categories]:
|
|
category = { 'category': category_name, 'modifiers': [] }
|
|
for modifier_name in sorted(modifier_categories[category_name].keys(), key=str.casefold):
|
|
modifier = { 'modifier': modifier_name, 'previews': [] }
|
|
for preview_name, preview_path in modifier_categories[category_name][modifier_name].items():
|
|
modifier['previews'].append({ 'name': preview_name, 'path': preview_path })
|
|
category['modifiers'].append(modifier)
|
|
modifier_categories_list.append(category)
|
|
|
|
return modifier_categories_list
|