Improved memory management. (#5450)

* Less fragile memory management.

* Fix issue.

* Remove useless function.

* Prevent and detect some types of memory leaks.

* Run garbage collector when switching workflow if needed.

* Fix issue.
This commit is contained in:
comfyanonymous 2024-12-02 14:39:34 -05:00 committed by GitHub
parent 2d5b3e0078
commit 79d5ceae6e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 119 additions and 120 deletions

View File

@ -23,6 +23,8 @@ from comfy.cli_args import args
import torch import torch
import sys import sys
import platform import platform
import weakref
import gc
class VRAMState(Enum): class VRAMState(Enum):
DISABLED = 0 #No vram present: no need to move models to vram DISABLED = 0 #No vram present: no need to move models to vram
@ -287,11 +289,27 @@ def module_size(module):
class LoadedModel: class LoadedModel:
def __init__(self, model): def __init__(self, model):
self.model = model self._set_model(model)
self.device = model.load_device self.device = model.load_device
self.weights_loaded = False
self.real_model = None self.real_model = None
self.currently_used = True self.currently_used = True
self.model_finalizer = None
self._patcher_finalizer = None
def _set_model(self, model):
self._model = weakref.ref(model)
if model.parent is not None:
self._parent_model = weakref.ref(model.parent)
self._patcher_finalizer = weakref.finalize(model, self._switch_parent)
def _switch_parent(self):
model = self._parent_model()
if model is not None:
self._set_model(model)
@property
def model(self):
return self._model()
def model_memory(self): def model_memory(self):
return self.model.model_size() return self.model.model_size()
@ -306,32 +324,23 @@ class LoadedModel:
return self.model_memory() return self.model_memory()
def model_load(self, lowvram_model_memory=0, force_patch_weights=False): def model_load(self, lowvram_model_memory=0, force_patch_weights=False):
patch_model_to = self.device
self.model.model_patches_to(self.device) self.model.model_patches_to(self.device)
self.model.model_patches_to(self.model.model_dtype()) self.model.model_patches_to(self.model.model_dtype())
load_weights = not self.weights_loaded # if self.model.loaded_size() > 0:
if self.model.loaded_size() > 0:
use_more_vram = lowvram_model_memory use_more_vram = lowvram_model_memory
if use_more_vram == 0: if use_more_vram == 0:
use_more_vram = 1e32 use_more_vram = 1e32
self.model_use_more_vram(use_more_vram) self.model_use_more_vram(use_more_vram, force_patch_weights=force_patch_weights)
else: real_model = self.model.model
try:
self.real_model = self.model.patch_model(device_to=patch_model_to, lowvram_model_memory=lowvram_model_memory, load_weights=load_weights, force_patch_weights=force_patch_weights)
except Exception as e:
self.model.unpatch_model(self.model.offload_device)
self.model_unload()
raise e
if is_intel_xpu() and not args.disable_ipex_optimize and 'ipex' in globals() and self.real_model is not None: if is_intel_xpu() and not args.disable_ipex_optimize and 'ipex' in globals() and real_model is not None:
with torch.no_grad(): with torch.no_grad():
self.real_model = ipex.optimize(self.real_model.eval(), inplace=True, graph_mode=True, concat_linear=True) real_model = ipex.optimize(real_model.eval(), inplace=True, graph_mode=True, concat_linear=True)
self.weights_loaded = True self.real_model = weakref.ref(real_model)
return self.real_model self.model_finalizer = weakref.finalize(real_model, cleanup_models)
return real_model
def should_reload_model(self, force_patch_weights=False): def should_reload_model(self, force_patch_weights=False):
if force_patch_weights and self.model.lowvram_patch_counter() > 0: if force_patch_weights and self.model.lowvram_patch_counter() > 0:
@ -344,18 +353,23 @@ class LoadedModel:
freed = self.model.partially_unload(self.model.offload_device, memory_to_free) freed = self.model.partially_unload(self.model.offload_device, memory_to_free)
if freed >= memory_to_free: if freed >= memory_to_free:
return False return False
self.model.unpatch_model(self.model.offload_device, unpatch_weights=unpatch_weights) self.model.detach(unpatch_weights)
self.model.model_patches_to(self.model.offload_device) self.model_finalizer.detach()
self.weights_loaded = self.weights_loaded and not unpatch_weights self.model_finalizer = None
self.real_model = None self.real_model = None
return True return True
def model_use_more_vram(self, extra_memory): def model_use_more_vram(self, extra_memory, force_patch_weights=False):
return self.model.partially_load(self.device, extra_memory) return self.model.partially_load(self.device, extra_memory, force_patch_weights=force_patch_weights)
def __eq__(self, other): def __eq__(self, other):
return self.model is other.model return self.model is other.model
def __del__(self):
if self._patcher_finalizer is not None:
self._patcher_finalizer.detach()
def use_more_memory(extra_memory, loaded_models, device): def use_more_memory(extra_memory, loaded_models, device):
for m in loaded_models: for m in loaded_models:
if m.device == device: if m.device == device:
@ -386,38 +400,8 @@ def extra_reserved_memory():
def minimum_inference_memory(): def minimum_inference_memory():
return (1024 * 1024 * 1024) * 0.8 + extra_reserved_memory() return (1024 * 1024 * 1024) * 0.8 + extra_reserved_memory()
def unload_model_clones(model, unload_weights_only=True, force_unload=True):
to_unload = []
for i in range(len(current_loaded_models)):
if model.is_clone(current_loaded_models[i].model):
to_unload = [i] + to_unload
if len(to_unload) == 0:
return True
same_weights = 0
for i in to_unload:
if model.clone_has_same_weights(current_loaded_models[i].model):
same_weights += 1
if same_weights == len(to_unload):
unload_weight = False
else:
unload_weight = True
if not force_unload:
if unload_weights_only and unload_weight == False:
return None
else:
unload_weight = True
for i in to_unload:
logging.debug("unload clone {} {}".format(i, unload_weight))
current_loaded_models.pop(i).model_unload(unpatch_weights=unload_weight)
return unload_weight
def free_memory(memory_required, device, keep_loaded=[]): def free_memory(memory_required, device, keep_loaded=[]):
cleanup_models_gc()
unloaded_model = [] unloaded_model = []
can_unload = [] can_unload = []
unloaded_models = [] unloaded_models = []
@ -454,6 +438,7 @@ def free_memory(memory_required, device, keep_loaded=[]):
return unloaded_models return unloaded_models
def load_models_gpu(models, memory_required=0, force_patch_weights=False, minimum_memory_required=None, force_full_load=False): def load_models_gpu(models, memory_required=0, force_patch_weights=False, minimum_memory_required=None, force_full_load=False):
cleanup_models_gc()
global vram_state global vram_state
inference_memory = minimum_inference_memory() inference_memory = minimum_inference_memory()
@ -466,11 +451,9 @@ def load_models_gpu(models, memory_required=0, force_patch_weights=False, minimu
models = set(models) models = set(models)
models_to_load = [] models_to_load = []
models_already_loaded = []
for x in models: for x in models:
loaded_model = LoadedModel(x) loaded_model = LoadedModel(x)
loaded = None
try: try:
loaded_model_index = current_loaded_models.index(loaded_model) loaded_model_index = current_loaded_models.index(loaded_model)
except: except:
@ -478,51 +461,35 @@ def load_models_gpu(models, memory_required=0, force_patch_weights=False, minimu
if loaded_model_index is not None: if loaded_model_index is not None:
loaded = current_loaded_models[loaded_model_index] loaded = current_loaded_models[loaded_model_index]
if loaded.should_reload_model(force_patch_weights=force_patch_weights): #TODO: cleanup this model reload logic
current_loaded_models.pop(loaded_model_index).model_unload(unpatch_weights=True)
loaded = None
else:
loaded.currently_used = True loaded.currently_used = True
models_already_loaded.append(loaded) models_to_load.append(loaded)
else:
if loaded is None:
if hasattr(x, "model"): if hasattr(x, "model"):
logging.info(f"Requested to load {x.model.__class__.__name__}") logging.info(f"Requested to load {x.model.__class__.__name__}")
models_to_load.append(loaded_model) models_to_load.append(loaded_model)
if len(models_to_load) == 0: for loaded_model in models_to_load:
devs = set(map(lambda a: a.device, models_already_loaded)) to_unload = []
for d in devs: for i in range(len(current_loaded_models)):
if d != torch.device("cpu"): if loaded_model.model.is_clone(current_loaded_models[i].model):
free_memory(extra_mem + offloaded_memory(models_already_loaded, d), d, models_already_loaded) to_unload = [i] + to_unload
free_mem = get_free_memory(d) for i in to_unload:
if free_mem < minimum_memory_required: current_loaded_models.pop(i).model.detach(unpatch_all=False)
logging.info("Unloading models for lowram load.") #TODO: partial model unloading when this case happens, also handle the opposite case where models can be unlowvramed.
models_to_load = free_memory(minimum_memory_required, d)
logging.info("{} models unloaded.".format(len(models_to_load)))
else:
use_more_memory(free_mem - minimum_memory_required, models_already_loaded, d)
if len(models_to_load) == 0:
return
logging.info(f"Loading {len(models_to_load)} new model{'s' if len(models_to_load) > 1 else ''}")
total_memory_required = {} total_memory_required = {}
for loaded_model in models_to_load: for loaded_model in models_to_load:
unload_model_clones(loaded_model.model, unload_weights_only=True, force_unload=False) #unload clones where the weights are different
total_memory_required[loaded_model.device] = total_memory_required.get(loaded_model.device, 0) + loaded_model.model_memory_required(loaded_model.device) total_memory_required[loaded_model.device] = total_memory_required.get(loaded_model.device, 0) + loaded_model.model_memory_required(loaded_model.device)
for loaded_model in models_already_loaded:
total_memory_required[loaded_model.device] = total_memory_required.get(loaded_model.device, 0) + loaded_model.model_memory_required(loaded_model.device)
for loaded_model in models_to_load:
weights_unloaded = unload_model_clones(loaded_model.model, unload_weights_only=False, force_unload=False) #unload the rest of the clones where the weights can stay loaded
if weights_unloaded is not None:
loaded_model.weights_loaded = not weights_unloaded
for device in total_memory_required: for device in total_memory_required:
if device != torch.device("cpu"): if device != torch.device("cpu"):
free_memory(total_memory_required[device] * 1.1 + extra_mem, device, models_already_loaded) free_memory(total_memory_required[device] * 1.1 + extra_mem, device)
for device in total_memory_required:
if device != torch.device("cpu"):
free_mem = get_free_memory(device)
if free_mem < minimum_memory_required:
models_l = free_memory(minimum_memory_required, device)
logging.info("{} models unloaded.".format(len(models_l)))
for loaded_model in models_to_load: for loaded_model in models_to_load:
model = loaded_model.model model = loaded_model.model
@ -544,17 +511,8 @@ def load_models_gpu(models, memory_required=0, force_patch_weights=False, minimu
cur_loaded_model = loaded_model.model_load(lowvram_model_memory, force_patch_weights=force_patch_weights) cur_loaded_model = loaded_model.model_load(lowvram_model_memory, force_patch_weights=force_patch_weights)
current_loaded_models.insert(0, loaded_model) current_loaded_models.insert(0, loaded_model)
devs = set(map(lambda a: a.device, models_already_loaded))
for d in devs:
if d != torch.device("cpu"):
free_mem = get_free_memory(d)
if free_mem > minimum_memory_required:
use_more_memory(free_mem - minimum_memory_required, models_already_loaded, d)
return return
def load_model_gpu(model): def load_model_gpu(model):
return load_models_gpu([model]) return load_models_gpu([model])
@ -568,21 +526,35 @@ def loaded_models(only_currently_used=False):
output.append(m.model) output.append(m.model)
return output return output
def cleanup_models(keep_clone_weights_loaded=False):
def cleanup_models_gc():
do_gc = False
for i in range(len(current_loaded_models)):
cur = current_loaded_models[i]
if cur.real_model() is not None and cur.model is None:
logging.info("Potential memory leak detected with model {}, doing a full garbage collect, for maximum performance avoid circular references in the model code.".format(cur.real_model().__class__.__name__))
do_gc = True
break
if do_gc:
gc.collect()
soft_empty_cache()
for i in range(len(current_loaded_models)):
cur = current_loaded_models[i]
if cur.real_model() is not None and cur.model is None:
logging.warning("WARNING, memory leak with model {}. Please make sure it is not being referenced from somewhere.".format(cur.real_model().__class__.__name__))
def cleanup_models():
to_delete = [] to_delete = []
for i in range(len(current_loaded_models)): for i in range(len(current_loaded_models)):
#TODO: very fragile function needs improvement if current_loaded_models[i].real_model() is None:
num_refs = sys.getrefcount(current_loaded_models[i].model)
if num_refs <= 2:
if not keep_clone_weights_loaded:
to_delete = [i] + to_delete
#TODO: find a less fragile way to do this.
elif sys.getrefcount(current_loaded_models[i].real_model) <= 3: #references from .real_model + the .model
to_delete = [i] + to_delete to_delete = [i] + to_delete
for i in to_delete: for i in to_delete:
x = current_loaded_models.pop(i) x = current_loaded_models.pop(i)
x.model_unload()
del x del x
def dtype_size(dtype): def dtype_size(dtype):

View File

@ -139,6 +139,7 @@ class ModelPatcher:
self.offload_device = offload_device self.offload_device = offload_device
self.weight_inplace_update = weight_inplace_update self.weight_inplace_update = weight_inplace_update
self.patches_uuid = uuid.uuid4() self.patches_uuid = uuid.uuid4()
self.parent = None
if not hasattr(self.model, 'model_loaded_weight_memory'): if not hasattr(self.model, 'model_loaded_weight_memory'):
self.model.model_loaded_weight_memory = 0 self.model.model_loaded_weight_memory = 0
@ -149,6 +150,9 @@ class ModelPatcher:
if not hasattr(self.model, 'model_lowvram'): if not hasattr(self.model, 'model_lowvram'):
self.model.model_lowvram = False self.model.model_lowvram = False
if not hasattr(self.model, 'current_weight_patches_uuid'):
self.model.current_weight_patches_uuid = None
def model_size(self): def model_size(self):
if self.size > 0: if self.size > 0:
return self.size return self.size
@ -172,6 +176,7 @@ class ModelPatcher:
n.model_options = copy.deepcopy(self.model_options) n.model_options = copy.deepcopy(self.model_options)
n.backup = self.backup n.backup = self.backup
n.object_patches_backup = self.object_patches_backup n.object_patches_backup = self.object_patches_backup
n.parent = self
return n return n
def is_clone(self, other): def is_clone(self, other):
@ -464,6 +469,7 @@ class ModelPatcher:
self.model.lowvram_patch_counter += patch_counter self.model.lowvram_patch_counter += patch_counter
self.model.device = device_to self.model.device = device_to
self.model.model_loaded_weight_memory = mem_counter self.model.model_loaded_weight_memory = mem_counter
self.model.current_weight_patches_uuid = self.patches_uuid
def patch_model(self, device_to=None, lowvram_model_memory=0, load_weights=True, force_patch_weights=False): def patch_model(self, device_to=None, lowvram_model_memory=0, load_weights=True, force_patch_weights=False):
for k in self.object_patches: for k in self.object_patches:
@ -498,6 +504,7 @@ class ModelPatcher:
else: else:
comfy.utils.set_attr_param(self.model, k, bk.weight) comfy.utils.set_attr_param(self.model, k, bk.weight)
self.model.current_weight_patches_uuid = None
self.backup.clear() self.backup.clear()
if device_to is not None: if device_to is not None:
@ -568,21 +575,42 @@ class ModelPatcher:
self.model.model_loaded_weight_memory -= memory_freed self.model.model_loaded_weight_memory -= memory_freed
return memory_freed return memory_freed
def partially_load(self, device_to, extra_memory=0): def partially_load(self, device_to, extra_memory=0, force_patch_weights=False):
self.unpatch_model(unpatch_weights=False) unpatch_weights = self.model.current_weight_patches_uuid is not None and (self.model.current_weight_patches_uuid != self.patches_uuid or force_patch_weights)
# TODO: force_patch_weights should not unload + reload full model
used = self.model.model_loaded_weight_memory
self.unpatch_model(self.offload_device, unpatch_weights=unpatch_weights)
if unpatch_weights:
extra_memory += (used - self.model.model_loaded_weight_memory)
self.patch_model(load_weights=False) self.patch_model(load_weights=False)
full_load = False full_load = False
if self.model.model_lowvram == False: if self.model.model_lowvram == False and self.model.model_loaded_weight_memory > 0:
return 0 return 0
if self.model.model_loaded_weight_memory + extra_memory > self.model_size(): if self.model.model_loaded_weight_memory + extra_memory > self.model_size():
full_load = True full_load = True
current_used = self.model.model_loaded_weight_memory current_used = self.model.model_loaded_weight_memory
self.load(device_to, lowvram_model_memory=current_used + extra_memory, full_load=full_load) try:
self.load(device_to, lowvram_model_memory=current_used + extra_memory, force_patch_weights=force_patch_weights, full_load=full_load)
except Exception as e:
self.detach()
raise e
return self.model.model_loaded_weight_memory - current_used return self.model.model_loaded_weight_memory - current_used
def detach(self, unpatch_all=True):
self.model_patches_to(self.offload_device)
if unpatch_all:
self.unpatch_model(self.offload_device, unpatch_weights=unpatch_all)
return self.model
def current_loaded_device(self): def current_loaded_device(self):
return self.model.device return self.model.device
def calculate_weight(self, patches, weight, key, intermediate_dtype=torch.float32): def calculate_weight(self, patches, weight, key, intermediate_dtype=torch.float32):
print("WARNING the ModelPatcher.calculate_weight function is deprecated, please use: comfy.lora.calculate_weight instead") print("WARNING the ModelPatcher.calculate_weight function is deprecated, please use: comfy.lora.calculate_weight instead")
return comfy.lora.calculate_weight(patches, weight, key, intermediate_dtype=intermediate_dtype) return comfy.lora.calculate_weight(patches, weight, key, intermediate_dtype=intermediate_dtype)
def __del__(self):
self.detach(unpatch_all=False)

View File

@ -480,7 +480,7 @@ class PromptExecutor:
if self.caches.outputs.get(node_id) is not None: if self.caches.outputs.get(node_id) is not None:
cached_nodes.append(node_id) cached_nodes.append(node_id)
comfy.model_management.cleanup_models(keep_clone_weights_loaded=True) comfy.model_management.cleanup_models_gc()
self.add_message("execution_cached", self.add_message("execution_cached",
{ "nodes": cached_nodes, "prompt_id": prompt_id}, { "nodes": cached_nodes, "prompt_id": prompt_id},
broadcast=False) broadcast=False)

View File

@ -154,7 +154,6 @@ def prompt_worker(q, server):
if need_gc: if need_gc:
current_time = time.perf_counter() current_time = time.perf_counter()
if (current_time - last_gc_collect) > gc_collect_interval: if (current_time - last_gc_collect) > gc_collect_interval:
comfy.model_management.cleanup_models()
gc.collect() gc.collect()
comfy.model_management.soft_empty_cache() comfy.model_management.soft_empty_cache()
last_gc_collect = current_time last_gc_collect = current_time