mirror of
https://github.com/comfyanonymous/ComfyUI.git
synced 2025-01-25 15:55:18 +00:00
5cfe38f41c
* Execution Model Inversion This PR inverts the execution model -- from recursively calling nodes to using a topological sort of the nodes. This change allows for modification of the node graph during execution. This allows for two major advantages: 1. The implementation of lazy evaluation in nodes. For example, if a "Mix Images" node has a mix factor of exactly 0.0, the second image input doesn't even need to be evaluated (and visa-versa if the mix factor is 1.0). 2. Dynamic expansion of nodes. This allows for the creation of dynamic "node groups". Specifically, custom nodes can return subgraphs that replace the original node in the graph. This is an incredibly powerful concept. Using this functionality, it was easy to implement: a. Components (a.k.a. node groups) b. Flow control (i.e. while loops) via tail recursion c. All-in-one nodes that replicate the WebUI functionality d. and more All of those were able to be implemented entirely via custom nodes, so those features are *not* a part of this PR. (There are some front-end changes that should occur before that functionality is made widely available, particularly around variant sockets.) The custom nodes associated with this PR can be found at: https://github.com/BadCafeCode/execution-inversion-demo-comfyui Note that some of them require that variant socket types ("*") be enabled. * Allow `input_info` to be of type `None` * Handle errors (like OOM) more gracefully * Add a command-line argument to enable variants This allows the use of nodes that have sockets of type '*' without applying a patch to the code. * Fix an overly aggressive assertion. This could happen when attempting to evaluate `IS_CHANGED` for a node during the creation of the cache (in order to create the cache key). * Fix Pyright warnings * Add execution model unit tests * Fix issue with unused literals Behavior should now match the master branch with regard to undeclared inputs. Undeclared inputs that are socket connections will be used while undeclared inputs that are literals will be ignored. * Make custom VALIDATE_INPUTS skip normal validation Additionally, if `VALIDATE_INPUTS` takes an argument named `input_types`, that variable will be a dictionary of the socket type of all incoming connections. If that argument exists, normal socket type validation will not occur. This removes the last hurdle for enabling variant types entirely from custom nodes, so I've removed that command-line option. I've added appropriate unit tests for these changes. * Fix example in unit test This wouldn't have caused any issues in the unit test, but it would have bugged the UI if someone copy+pasted it into their own node pack. * Use fstrings instead of '%' formatting syntax * Use custom exception types. * Display an error for dependency cycles Previously, dependency cycles that were created during node expansion would cause the application to quit (due to an uncaught exception). Now, we'll throw a proper error to the UI. We also make an attempt to 'blame' the most relevant node in the UI. * Add docs on when ExecutionBlocker should be used * Remove unused functionality * Rename ExecutionResult.SLEEPING to PENDING * Remove superfluous function parameter * Pass None for uneval inputs instead of default This applies to `VALIDATE_INPUTS`, `check_lazy_status`, and lazy values in evaluation functions. * Add a test for mixed node expansion This test ensures that a node that returns a combination of expanded subgraphs and literal values functions correctly. * Raise exception for bad get_node calls. * Minor refactor of IsChangedCache.get * Refactor `map_node_over_list` function * Fix ui output for duplicated nodes * Add documentation on `check_lazy_status` * Add file for execution model unit tests * Clean up Javascript code as per review * Improve documentation Converted some comments to docstrings as per review * Add a new unit test for mixed lazy results This test validates that when an output list is fed to a lazy node, the node will properly evaluate previous nodes that are needed by any inputs to the lazy node. No code in the execution model has been changed. The test already passes. * Allow kwargs in VALIDATE_INPUTS functions When kwargs are used, validation is skipped for all inputs as if they had been mentioned explicitly. * List cached nodes in `execution_cached` message This was previously just bugged in this PR.
739 lines
30 KiB
Python
739 lines
30 KiB
Python
import os
|
|
import sys
|
|
import asyncio
|
|
import traceback
|
|
|
|
import nodes
|
|
import folder_paths
|
|
import execution
|
|
import uuid
|
|
import urllib
|
|
import json
|
|
import glob
|
|
import struct
|
|
import ssl
|
|
from PIL import Image, ImageOps
|
|
from PIL.PngImagePlugin import PngInfo
|
|
from io import BytesIO
|
|
|
|
import aiohttp
|
|
from aiohttp import web
|
|
import logging
|
|
|
|
import mimetypes
|
|
from comfy.cli_args import args
|
|
import comfy.utils
|
|
import comfy.model_management
|
|
import node_helpers
|
|
from app.frontend_management import FrontendManager
|
|
from app.user_manager import UserManager
|
|
from model_filemanager import download_model, DownloadModelStatus
|
|
from typing import Optional
|
|
|
|
class BinaryEventTypes:
|
|
PREVIEW_IMAGE = 1
|
|
UNENCODED_PREVIEW_IMAGE = 2
|
|
|
|
async def send_socket_catch_exception(function, message):
|
|
try:
|
|
await function(message)
|
|
except (aiohttp.ClientError, aiohttp.ClientPayloadError, ConnectionResetError) as err:
|
|
logging.warning("send error: {}".format(err))
|
|
|
|
@web.middleware
|
|
async def cache_control(request: web.Request, handler):
|
|
response: web.Response = await handler(request)
|
|
if request.path.endswith('.js') or request.path.endswith('.css'):
|
|
response.headers.setdefault('Cache-Control', 'no-cache')
|
|
return response
|
|
|
|
def create_cors_middleware(allowed_origin: str):
|
|
@web.middleware
|
|
async def cors_middleware(request: web.Request, handler):
|
|
if request.method == "OPTIONS":
|
|
# Pre-flight request. Reply successfully:
|
|
response = web.Response()
|
|
else:
|
|
response = await handler(request)
|
|
|
|
response.headers['Access-Control-Allow-Origin'] = allowed_origin
|
|
response.headers['Access-Control-Allow-Methods'] = 'POST, GET, DELETE, PUT, OPTIONS'
|
|
response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization'
|
|
response.headers['Access-Control-Allow-Credentials'] = 'true'
|
|
return response
|
|
|
|
return cors_middleware
|
|
|
|
class PromptServer():
|
|
def __init__(self, loop):
|
|
PromptServer.instance = self
|
|
|
|
mimetypes.init()
|
|
mimetypes.types_map['.js'] = 'application/javascript; charset=utf-8'
|
|
|
|
self.user_manager = UserManager()
|
|
self.supports = ["custom_nodes_from_web"]
|
|
self.prompt_queue = None
|
|
self.loop = loop
|
|
self.messages = asyncio.Queue()
|
|
self.client_session:Optional[aiohttp.ClientSession] = None
|
|
self.number = 0
|
|
|
|
middlewares = [cache_control]
|
|
if args.enable_cors_header:
|
|
middlewares.append(create_cors_middleware(args.enable_cors_header))
|
|
|
|
max_upload_size = round(args.max_upload_size * 1024 * 1024)
|
|
self.app = web.Application(client_max_size=max_upload_size, middlewares=middlewares)
|
|
self.sockets = dict()
|
|
self.web_root = (
|
|
FrontendManager.init_frontend(args.front_end_version)
|
|
if args.front_end_root is None
|
|
else args.front_end_root
|
|
)
|
|
logging.info(f"[Prompt Server] web root: {self.web_root}")
|
|
routes = web.RouteTableDef()
|
|
self.routes = routes
|
|
self.last_node_id = None
|
|
self.client_id = None
|
|
|
|
self.on_prompt_handlers = []
|
|
|
|
@routes.get('/ws')
|
|
async def websocket_handler(request):
|
|
ws = web.WebSocketResponse()
|
|
await ws.prepare(request)
|
|
sid = request.rel_url.query.get('clientId', '')
|
|
if sid:
|
|
# Reusing existing session, remove old
|
|
self.sockets.pop(sid, None)
|
|
else:
|
|
sid = uuid.uuid4().hex
|
|
|
|
self.sockets[sid] = ws
|
|
|
|
try:
|
|
# Send initial state to the new client
|
|
await self.send("status", { "status": self.get_queue_info(), 'sid': sid }, sid)
|
|
# On reconnect if we are the currently executing client send the current node
|
|
if self.client_id == sid and self.last_node_id is not None:
|
|
await self.send("executing", { "node": self.last_node_id }, sid)
|
|
|
|
async for msg in ws:
|
|
if msg.type == aiohttp.WSMsgType.ERROR:
|
|
logging.warning('ws connection closed with exception %s' % ws.exception())
|
|
finally:
|
|
self.sockets.pop(sid, None)
|
|
return ws
|
|
|
|
@routes.get("/")
|
|
async def get_root(request):
|
|
response = web.FileResponse(os.path.join(self.web_root, "index.html"))
|
|
response.headers['Cache-Control'] = 'no-cache'
|
|
response.headers["Pragma"] = "no-cache"
|
|
response.headers["Expires"] = "0"
|
|
return response
|
|
|
|
@routes.get("/embeddings")
|
|
def get_embeddings(self):
|
|
embeddings = folder_paths.get_filename_list("embeddings")
|
|
return web.json_response(list(map(lambda a: os.path.splitext(a)[0], embeddings)))
|
|
|
|
@routes.get("/extensions")
|
|
async def get_extensions(request):
|
|
files = glob.glob(os.path.join(
|
|
glob.escape(self.web_root), 'extensions/**/*.js'), recursive=True)
|
|
|
|
extensions = list(map(lambda f: "/" + os.path.relpath(f, self.web_root).replace("\\", "/"), files))
|
|
|
|
for name, dir in nodes.EXTENSION_WEB_DIRS.items():
|
|
files = glob.glob(os.path.join(glob.escape(dir), '**/*.js'), recursive=True)
|
|
extensions.extend(list(map(lambda f: "/extensions/" + urllib.parse.quote(
|
|
name) + "/" + os.path.relpath(f, dir).replace("\\", "/"), files)))
|
|
|
|
return web.json_response(extensions)
|
|
|
|
def get_dir_by_type(dir_type):
|
|
if dir_type is None:
|
|
dir_type = "input"
|
|
|
|
if dir_type == "input":
|
|
type_dir = folder_paths.get_input_directory()
|
|
elif dir_type == "temp":
|
|
type_dir = folder_paths.get_temp_directory()
|
|
elif dir_type == "output":
|
|
type_dir = folder_paths.get_output_directory()
|
|
|
|
return type_dir, dir_type
|
|
|
|
def compare_image_hash(filepath, image):
|
|
hasher = node_helpers.hasher()
|
|
|
|
# function to compare hashes of two images to see if it already exists, fix to #3465
|
|
if os.path.exists(filepath):
|
|
a = hasher()
|
|
b = hasher()
|
|
with open(filepath, "rb") as f:
|
|
a.update(f.read())
|
|
b.update(image.file.read())
|
|
image.file.seek(0)
|
|
f.close()
|
|
return a.hexdigest() == b.hexdigest()
|
|
return False
|
|
|
|
def image_upload(post, image_save_function=None):
|
|
image = post.get("image")
|
|
overwrite = post.get("overwrite")
|
|
image_is_duplicate = False
|
|
|
|
image_upload_type = post.get("type")
|
|
upload_dir, image_upload_type = get_dir_by_type(image_upload_type)
|
|
|
|
if image and image.file:
|
|
filename = image.filename
|
|
if not filename:
|
|
return web.Response(status=400)
|
|
|
|
subfolder = post.get("subfolder", "")
|
|
full_output_folder = os.path.join(upload_dir, os.path.normpath(subfolder))
|
|
filepath = os.path.abspath(os.path.join(full_output_folder, filename))
|
|
|
|
if os.path.commonpath((upload_dir, filepath)) != upload_dir:
|
|
return web.Response(status=400)
|
|
|
|
if not os.path.exists(full_output_folder):
|
|
os.makedirs(full_output_folder)
|
|
|
|
split = os.path.splitext(filename)
|
|
|
|
if overwrite is not None and (overwrite == "true" or overwrite == "1"):
|
|
pass
|
|
else:
|
|
i = 1
|
|
while os.path.exists(filepath):
|
|
if compare_image_hash(filepath, image): #compare hash to prevent saving of duplicates with same name, fix for #3465
|
|
image_is_duplicate = True
|
|
break
|
|
filename = f"{split[0]} ({i}){split[1]}"
|
|
filepath = os.path.join(full_output_folder, filename)
|
|
i += 1
|
|
|
|
if not image_is_duplicate:
|
|
if image_save_function is not None:
|
|
image_save_function(image, post, filepath)
|
|
else:
|
|
with open(filepath, "wb") as f:
|
|
f.write(image.file.read())
|
|
|
|
return web.json_response({"name" : filename, "subfolder": subfolder, "type": image_upload_type})
|
|
else:
|
|
return web.Response(status=400)
|
|
|
|
@routes.post("/upload/image")
|
|
async def upload_image(request):
|
|
post = await request.post()
|
|
return image_upload(post)
|
|
|
|
|
|
@routes.post("/upload/mask")
|
|
async def upload_mask(request):
|
|
post = await request.post()
|
|
|
|
def image_save_function(image, post, filepath):
|
|
original_ref = json.loads(post.get("original_ref"))
|
|
filename, output_dir = folder_paths.annotated_filepath(original_ref['filename'])
|
|
|
|
# validation for security: prevent accessing arbitrary path
|
|
if filename[0] == '/' or '..' in filename:
|
|
return web.Response(status=400)
|
|
|
|
if output_dir is None:
|
|
type = original_ref.get("type", "output")
|
|
output_dir = folder_paths.get_directory_by_type(type)
|
|
|
|
if output_dir is None:
|
|
return web.Response(status=400)
|
|
|
|
if original_ref.get("subfolder", "") != "":
|
|
full_output_dir = os.path.join(output_dir, original_ref["subfolder"])
|
|
if os.path.commonpath((os.path.abspath(full_output_dir), output_dir)) != output_dir:
|
|
return web.Response(status=403)
|
|
output_dir = full_output_dir
|
|
|
|
file = os.path.join(output_dir, filename)
|
|
|
|
if os.path.isfile(file):
|
|
with Image.open(file) as original_pil:
|
|
metadata = PngInfo()
|
|
if hasattr(original_pil,'text'):
|
|
for key in original_pil.text:
|
|
metadata.add_text(key, original_pil.text[key])
|
|
original_pil = original_pil.convert('RGBA')
|
|
mask_pil = Image.open(image.file).convert('RGBA')
|
|
|
|
# alpha copy
|
|
new_alpha = mask_pil.getchannel('A')
|
|
original_pil.putalpha(new_alpha)
|
|
original_pil.save(filepath, compress_level=4, pnginfo=metadata)
|
|
|
|
return image_upload(post, image_save_function)
|
|
|
|
@routes.get("/view")
|
|
async def view_image(request):
|
|
if "filename" in request.rel_url.query:
|
|
filename = request.rel_url.query["filename"]
|
|
filename,output_dir = folder_paths.annotated_filepath(filename)
|
|
|
|
# validation for security: prevent accessing arbitrary path
|
|
if filename[0] == '/' or '..' in filename:
|
|
return web.Response(status=400)
|
|
|
|
if output_dir is None:
|
|
type = request.rel_url.query.get("type", "output")
|
|
output_dir = folder_paths.get_directory_by_type(type)
|
|
|
|
if output_dir is None:
|
|
return web.Response(status=400)
|
|
|
|
if "subfolder" in request.rel_url.query:
|
|
full_output_dir = os.path.join(output_dir, request.rel_url.query["subfolder"])
|
|
if os.path.commonpath((os.path.abspath(full_output_dir), output_dir)) != output_dir:
|
|
return web.Response(status=403)
|
|
output_dir = full_output_dir
|
|
|
|
filename = os.path.basename(filename)
|
|
file = os.path.join(output_dir, filename)
|
|
|
|
if os.path.isfile(file):
|
|
if 'preview' in request.rel_url.query:
|
|
with Image.open(file) as img:
|
|
preview_info = request.rel_url.query['preview'].split(';')
|
|
image_format = preview_info[0]
|
|
if image_format not in ['webp', 'jpeg'] or 'a' in request.rel_url.query.get('channel', ''):
|
|
image_format = 'webp'
|
|
|
|
quality = 90
|
|
if preview_info[-1].isdigit():
|
|
quality = int(preview_info[-1])
|
|
|
|
buffer = BytesIO()
|
|
if image_format in ['jpeg'] or request.rel_url.query.get('channel', '') == 'rgb':
|
|
img = img.convert("RGB")
|
|
img.save(buffer, format=image_format, quality=quality)
|
|
buffer.seek(0)
|
|
|
|
return web.Response(body=buffer.read(), content_type=f'image/{image_format}',
|
|
headers={"Content-Disposition": f"filename=\"{filename}\""})
|
|
|
|
if 'channel' not in request.rel_url.query:
|
|
channel = 'rgba'
|
|
else:
|
|
channel = request.rel_url.query["channel"]
|
|
|
|
if channel == 'rgb':
|
|
with Image.open(file) as img:
|
|
if img.mode == "RGBA":
|
|
r, g, b, a = img.split()
|
|
new_img = Image.merge('RGB', (r, g, b))
|
|
else:
|
|
new_img = img.convert("RGB")
|
|
|
|
buffer = BytesIO()
|
|
new_img.save(buffer, format='PNG')
|
|
buffer.seek(0)
|
|
|
|
return web.Response(body=buffer.read(), content_type='image/png',
|
|
headers={"Content-Disposition": f"filename=\"{filename}\""})
|
|
|
|
elif channel == 'a':
|
|
with Image.open(file) as img:
|
|
if img.mode == "RGBA":
|
|
_, _, _, a = img.split()
|
|
else:
|
|
a = Image.new('L', img.size, 255)
|
|
|
|
# alpha img
|
|
alpha_img = Image.new('RGBA', img.size)
|
|
alpha_img.putalpha(a)
|
|
alpha_buffer = BytesIO()
|
|
alpha_img.save(alpha_buffer, format='PNG')
|
|
alpha_buffer.seek(0)
|
|
|
|
return web.Response(body=alpha_buffer.read(), content_type='image/png',
|
|
headers={"Content-Disposition": f"filename=\"{filename}\""})
|
|
else:
|
|
return web.FileResponse(file, headers={"Content-Disposition": f"filename=\"{filename}\""})
|
|
|
|
return web.Response(status=404)
|
|
|
|
@routes.get("/view_metadata/{folder_name}")
|
|
async def view_metadata(request):
|
|
folder_name = request.match_info.get("folder_name", None)
|
|
if folder_name is None:
|
|
return web.Response(status=404)
|
|
if not "filename" in request.rel_url.query:
|
|
return web.Response(status=404)
|
|
|
|
filename = request.rel_url.query["filename"]
|
|
if not filename.endswith(".safetensors"):
|
|
return web.Response(status=404)
|
|
|
|
safetensors_path = folder_paths.get_full_path(folder_name, filename)
|
|
if safetensors_path is None:
|
|
return web.Response(status=404)
|
|
out = comfy.utils.safetensors_header(safetensors_path, max_size=1024*1024)
|
|
if out is None:
|
|
return web.Response(status=404)
|
|
dt = json.loads(out)
|
|
if not "__metadata__" in dt:
|
|
return web.Response(status=404)
|
|
return web.json_response(dt["__metadata__"])
|
|
|
|
@routes.get("/system_stats")
|
|
async def get_queue(request):
|
|
device = comfy.model_management.get_torch_device()
|
|
device_name = comfy.model_management.get_torch_device_name(device)
|
|
vram_total, torch_vram_total = comfy.model_management.get_total_memory(device, torch_total_too=True)
|
|
vram_free, torch_vram_free = comfy.model_management.get_free_memory(device, torch_free_too=True)
|
|
system_stats = {
|
|
"system": {
|
|
"os": os.name,
|
|
"python_version": sys.version,
|
|
"embedded_python": os.path.split(os.path.split(sys.executable)[0])[1] == "python_embeded"
|
|
},
|
|
"devices": [
|
|
{
|
|
"name": device_name,
|
|
"type": device.type,
|
|
"index": device.index,
|
|
"vram_total": vram_total,
|
|
"vram_free": vram_free,
|
|
"torch_vram_total": torch_vram_total,
|
|
"torch_vram_free": torch_vram_free,
|
|
}
|
|
]
|
|
}
|
|
return web.json_response(system_stats)
|
|
|
|
@routes.get("/prompt")
|
|
async def get_prompt(request):
|
|
return web.json_response(self.get_queue_info())
|
|
|
|
def node_info(node_class):
|
|
obj_class = nodes.NODE_CLASS_MAPPINGS[node_class]
|
|
info = {}
|
|
info['input'] = obj_class.INPUT_TYPES()
|
|
info['input_order'] = {key: list(value.keys()) for (key, value) in obj_class.INPUT_TYPES().items()}
|
|
info['output'] = obj_class.RETURN_TYPES
|
|
info['output_is_list'] = obj_class.OUTPUT_IS_LIST if hasattr(obj_class, 'OUTPUT_IS_LIST') else [False] * len(obj_class.RETURN_TYPES)
|
|
info['output_name'] = obj_class.RETURN_NAMES if hasattr(obj_class, 'RETURN_NAMES') else info['output']
|
|
info['name'] = node_class
|
|
info['display_name'] = nodes.NODE_DISPLAY_NAME_MAPPINGS[node_class] if node_class in nodes.NODE_DISPLAY_NAME_MAPPINGS.keys() else node_class
|
|
info['description'] = obj_class.DESCRIPTION if hasattr(obj_class,'DESCRIPTION') else ''
|
|
info['python_module'] = getattr(obj_class, "RELATIVE_PYTHON_MODULE", "nodes")
|
|
info['category'] = 'sd'
|
|
if hasattr(obj_class, 'OUTPUT_NODE') and obj_class.OUTPUT_NODE == True:
|
|
info['output_node'] = True
|
|
else:
|
|
info['output_node'] = False
|
|
|
|
if hasattr(obj_class, 'CATEGORY'):
|
|
info['category'] = obj_class.CATEGORY
|
|
|
|
if hasattr(obj_class, 'OUTPUT_TOOLTIPS'):
|
|
info['output_tooltips'] = obj_class.OUTPUT_TOOLTIPS
|
|
return info
|
|
|
|
@routes.get("/object_info")
|
|
async def get_object_info(request):
|
|
out = {}
|
|
for x in nodes.NODE_CLASS_MAPPINGS:
|
|
try:
|
|
out[x] = node_info(x)
|
|
except Exception as e:
|
|
logging.error(f"[ERROR] An error occurred while retrieving information for the '{x}' node.")
|
|
logging.error(traceback.format_exc())
|
|
return web.json_response(out)
|
|
|
|
@routes.get("/object_info/{node_class}")
|
|
async def get_object_info_node(request):
|
|
node_class = request.match_info.get("node_class", None)
|
|
out = {}
|
|
if (node_class is not None) and (node_class in nodes.NODE_CLASS_MAPPINGS):
|
|
out[node_class] = node_info(node_class)
|
|
return web.json_response(out)
|
|
|
|
@routes.get("/history")
|
|
async def get_history(request):
|
|
max_items = request.rel_url.query.get("max_items", None)
|
|
if max_items is not None:
|
|
max_items = int(max_items)
|
|
return web.json_response(self.prompt_queue.get_history(max_items=max_items))
|
|
|
|
@routes.get("/history/{prompt_id}")
|
|
async def get_history(request):
|
|
prompt_id = request.match_info.get("prompt_id", None)
|
|
return web.json_response(self.prompt_queue.get_history(prompt_id=prompt_id))
|
|
|
|
@routes.get("/queue")
|
|
async def get_queue(request):
|
|
queue_info = {}
|
|
current_queue = self.prompt_queue.get_current_queue()
|
|
queue_info['queue_running'] = current_queue[0]
|
|
queue_info['queue_pending'] = current_queue[1]
|
|
return web.json_response(queue_info)
|
|
|
|
@routes.post("/prompt")
|
|
async def post_prompt(request):
|
|
logging.info("got prompt")
|
|
resp_code = 200
|
|
out_string = ""
|
|
json_data = await request.json()
|
|
json_data = self.trigger_on_prompt(json_data)
|
|
|
|
if "number" in json_data:
|
|
number = float(json_data['number'])
|
|
else:
|
|
number = self.number
|
|
if "front" in json_data:
|
|
if json_data['front']:
|
|
number = -number
|
|
|
|
self.number += 1
|
|
|
|
if "prompt" in json_data:
|
|
prompt = json_data["prompt"]
|
|
valid = execution.validate_prompt(prompt)
|
|
extra_data = {}
|
|
if "extra_data" in json_data:
|
|
extra_data = json_data["extra_data"]
|
|
|
|
if "client_id" in json_data:
|
|
extra_data["client_id"] = json_data["client_id"]
|
|
if valid[0]:
|
|
prompt_id = str(uuid.uuid4())
|
|
outputs_to_execute = valid[2]
|
|
self.prompt_queue.put((number, prompt_id, prompt, extra_data, outputs_to_execute))
|
|
response = {"prompt_id": prompt_id, "number": number, "node_errors": valid[3]}
|
|
return web.json_response(response)
|
|
else:
|
|
logging.warning("invalid prompt: {}".format(valid[1]))
|
|
return web.json_response({"error": valid[1], "node_errors": valid[3]}, status=400)
|
|
else:
|
|
return web.json_response({"error": "no prompt", "node_errors": []}, status=400)
|
|
|
|
@routes.post("/queue")
|
|
async def post_queue(request):
|
|
json_data = await request.json()
|
|
if "clear" in json_data:
|
|
if json_data["clear"]:
|
|
self.prompt_queue.wipe_queue()
|
|
if "delete" in json_data:
|
|
to_delete = json_data['delete']
|
|
for id_to_delete in to_delete:
|
|
delete_func = lambda a: a[1] == id_to_delete
|
|
self.prompt_queue.delete_queue_item(delete_func)
|
|
|
|
return web.Response(status=200)
|
|
|
|
@routes.post("/interrupt")
|
|
async def post_interrupt(request):
|
|
nodes.interrupt_processing()
|
|
return web.Response(status=200)
|
|
|
|
@routes.post("/free")
|
|
async def post_free(request):
|
|
json_data = await request.json()
|
|
unload_models = json_data.get("unload_models", False)
|
|
free_memory = json_data.get("free_memory", False)
|
|
if unload_models:
|
|
self.prompt_queue.set_flag("unload_models", unload_models)
|
|
if free_memory:
|
|
self.prompt_queue.set_flag("free_memory", free_memory)
|
|
return web.Response(status=200)
|
|
|
|
@routes.post("/history")
|
|
async def post_history(request):
|
|
json_data = await request.json()
|
|
if "clear" in json_data:
|
|
if json_data["clear"]:
|
|
self.prompt_queue.wipe_history()
|
|
if "delete" in json_data:
|
|
to_delete = json_data['delete']
|
|
for id_to_delete in to_delete:
|
|
self.prompt_queue.delete_history_item(id_to_delete)
|
|
|
|
return web.Response(status=200)
|
|
|
|
# Internal route. Should not be depended upon and is subject to change at any time.
|
|
# TODO(robinhuang): Move to internal route table class once we refactor PromptServer to pass around Websocket.
|
|
@routes.post("/internal/models/download")
|
|
async def download_handler(request):
|
|
async def report_progress(filename: str, status: DownloadModelStatus):
|
|
await self.send_json("download_progress", status.to_dict())
|
|
|
|
data = await request.json()
|
|
url = data.get('url')
|
|
model_directory = data.get('model_directory')
|
|
model_filename = data.get('model_filename')
|
|
progress_interval = data.get('progress_interval', 1.0) # In seconds, how often to report download progress.
|
|
|
|
if not url or not model_directory or not model_filename:
|
|
return web.json_response({"status": "error", "message": "Missing URL or folder path or filename"}, status=400)
|
|
|
|
session = self.client_session
|
|
if session is None:
|
|
logging.error("Client session is not initialized")
|
|
return web.Response(status=500)
|
|
|
|
task = asyncio.create_task(download_model(lambda url: session.get(url), model_filename, url, model_directory, report_progress, progress_interval))
|
|
await task
|
|
|
|
return web.json_response(task.result().to_dict())
|
|
|
|
async def setup(self):
|
|
timeout = aiohttp.ClientTimeout(total=None) # no timeout
|
|
self.client_session = aiohttp.ClientSession(timeout=timeout)
|
|
|
|
def add_routes(self):
|
|
self.user_manager.add_routes(self.routes)
|
|
|
|
# Prefix every route with /api for easier matching for delegation.
|
|
# This is very useful for frontend dev server, which need to forward
|
|
# everything except serving of static files.
|
|
# Currently both the old endpoints without prefix and new endpoints with
|
|
# prefix are supported.
|
|
api_routes = web.RouteTableDef()
|
|
for route in self.routes:
|
|
# Custom nodes might add extra static routes. Only process non-static
|
|
# routes to add /api prefix.
|
|
if isinstance(route, web.RouteDef):
|
|
api_routes.route(route.method, "/api" + route.path)(route.handler, **route.kwargs)
|
|
self.app.add_routes(api_routes)
|
|
self.app.add_routes(self.routes)
|
|
|
|
for name, dir in nodes.EXTENSION_WEB_DIRS.items():
|
|
self.app.add_routes([
|
|
web.static('/extensions/' + urllib.parse.quote(name), dir),
|
|
])
|
|
|
|
self.app.add_routes([
|
|
web.static('/', self.web_root),
|
|
])
|
|
|
|
def get_queue_info(self):
|
|
prompt_info = {}
|
|
exec_info = {}
|
|
exec_info['queue_remaining'] = self.prompt_queue.get_tasks_remaining()
|
|
prompt_info['exec_info'] = exec_info
|
|
return prompt_info
|
|
|
|
async def send(self, event, data, sid=None):
|
|
if event == BinaryEventTypes.UNENCODED_PREVIEW_IMAGE:
|
|
await self.send_image(data, sid=sid)
|
|
elif isinstance(data, (bytes, bytearray)):
|
|
await self.send_bytes(event, data, sid)
|
|
else:
|
|
await self.send_json(event, data, sid)
|
|
|
|
def encode_bytes(self, event, data):
|
|
if not isinstance(event, int):
|
|
raise RuntimeError(f"Binary event types must be integers, got {event}")
|
|
|
|
packed = struct.pack(">I", event)
|
|
message = bytearray(packed)
|
|
message.extend(data)
|
|
return message
|
|
|
|
async def send_image(self, image_data, sid=None):
|
|
image_type = image_data[0]
|
|
image = image_data[1]
|
|
max_size = image_data[2]
|
|
if max_size is not None:
|
|
if hasattr(Image, 'Resampling'):
|
|
resampling = Image.Resampling.BILINEAR
|
|
else:
|
|
resampling = Image.ANTIALIAS
|
|
|
|
image = ImageOps.contain(image, (max_size, max_size), resampling)
|
|
type_num = 1
|
|
if image_type == "JPEG":
|
|
type_num = 1
|
|
elif image_type == "PNG":
|
|
type_num = 2
|
|
|
|
bytesIO = BytesIO()
|
|
header = struct.pack(">I", type_num)
|
|
bytesIO.write(header)
|
|
image.save(bytesIO, format=image_type, quality=95, compress_level=1)
|
|
preview_bytes = bytesIO.getvalue()
|
|
await self.send_bytes(BinaryEventTypes.PREVIEW_IMAGE, preview_bytes, sid=sid)
|
|
|
|
async def send_bytes(self, event, data, sid=None):
|
|
message = self.encode_bytes(event, data)
|
|
|
|
if sid is None:
|
|
sockets = list(self.sockets.values())
|
|
for ws in sockets:
|
|
await send_socket_catch_exception(ws.send_bytes, message)
|
|
elif sid in self.sockets:
|
|
await send_socket_catch_exception(self.sockets[sid].send_bytes, message)
|
|
|
|
async def send_json(self, event, data, sid=None):
|
|
message = {"type": event, "data": data}
|
|
|
|
if sid is None:
|
|
sockets = list(self.sockets.values())
|
|
for ws in sockets:
|
|
await send_socket_catch_exception(ws.send_json, message)
|
|
elif sid in self.sockets:
|
|
await send_socket_catch_exception(self.sockets[sid].send_json, message)
|
|
|
|
def send_sync(self, event, data, sid=None):
|
|
self.loop.call_soon_threadsafe(
|
|
self.messages.put_nowait, (event, data, sid))
|
|
|
|
def queue_updated(self):
|
|
self.send_sync("status", { "status": self.get_queue_info() })
|
|
|
|
async def publish_loop(self):
|
|
while True:
|
|
msg = await self.messages.get()
|
|
await self.send(*msg)
|
|
|
|
async def start(self, address, port, verbose=True, call_on_start=None):
|
|
runner = web.AppRunner(self.app, access_log=None)
|
|
await runner.setup()
|
|
ssl_ctx = None
|
|
scheme = "http"
|
|
if args.tls_keyfile and args.tls_certfile:
|
|
ssl_ctx = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_SERVER, verify_mode=ssl.CERT_NONE)
|
|
ssl_ctx.load_cert_chain(certfile=args.tls_certfile,
|
|
keyfile=args.tls_keyfile)
|
|
scheme = "https"
|
|
|
|
site = web.TCPSite(runner, address, port, ssl_context=ssl_ctx)
|
|
await site.start()
|
|
|
|
self.address = address
|
|
self.port = port
|
|
|
|
if verbose:
|
|
logging.info("Starting server\n")
|
|
logging.info("To see the GUI go to: {}://{}:{}".format(scheme, address, port))
|
|
if call_on_start is not None:
|
|
call_on_start(scheme, address, port)
|
|
|
|
def add_on_prompt_handler(self, handler):
|
|
self.on_prompt_handlers.append(handler)
|
|
|
|
def trigger_on_prompt(self, json_data):
|
|
for handler in self.on_prompt_handlers:
|
|
try:
|
|
json_data = handler(json_data)
|
|
except Exception as e:
|
|
logging.warning(f"[ERROR] An error occurred during the on_prompt_handler processing")
|
|
logging.warning(traceback.format_exc())
|
|
|
|
return json_data
|