174 lines
6.3 KiB
Python
174 lines
6.3 KiB
Python
import os.path
|
|
from concurrent.futures import ProcessPoolExecutor
|
|
import multiprocessing
|
|
import time
|
|
import re
|
|
|
|
re_special = re.compile(r'([\\()])')
|
|
|
|
def get_deepbooru_tags(pil_image):
|
|
"""
|
|
This method is for running only one image at a time for simple use. Used to the img2img interrogate.
|
|
"""
|
|
from modules import shared # prevents circular reference
|
|
|
|
try:
|
|
create_deepbooru_process(shared.opts.interrogate_deepbooru_score_threshold, create_deepbooru_opts())
|
|
return get_tags_from_process(pil_image)
|
|
finally:
|
|
release_process()
|
|
|
|
|
|
OPT_INCLUDE_RANKS = "include_ranks"
|
|
def create_deepbooru_opts():
|
|
from modules import shared
|
|
|
|
return {
|
|
"use_spaces": shared.opts.deepbooru_use_spaces,
|
|
"use_escape": shared.opts.deepbooru_escape,
|
|
"alpha_sort": shared.opts.deepbooru_sort_alpha,
|
|
OPT_INCLUDE_RANKS: shared.opts.interrogate_return_ranks,
|
|
}
|
|
|
|
|
|
def deepbooru_process(queue, deepbooru_process_return, threshold, deepbooru_opts):
|
|
model, tags = get_deepbooru_tags_model()
|
|
while True: # while process is running, keep monitoring queue for new image
|
|
pil_image = queue.get()
|
|
if pil_image == "QUIT":
|
|
break
|
|
else:
|
|
deepbooru_process_return["value"] = get_deepbooru_tags_from_model(model, tags, pil_image, threshold, deepbooru_opts)
|
|
|
|
|
|
def create_deepbooru_process(threshold, deepbooru_opts):
|
|
"""
|
|
Creates deepbooru process. A queue is created to send images into the process. This enables multiple images
|
|
to be processed in a row without reloading the model or creating a new process. To return the data, a shared
|
|
dictionary is created to hold the tags created. To wait for tags to be returned, a value of -1 is assigned
|
|
to the dictionary and the method adding the image to the queue should wait for this value to be updated with
|
|
the tags.
|
|
"""
|
|
from modules import shared # prevents circular reference
|
|
context = multiprocessing.get_context("spawn")
|
|
shared.deepbooru_process_manager = context.Manager()
|
|
shared.deepbooru_process_queue = shared.deepbooru_process_manager.Queue()
|
|
shared.deepbooru_process_return = shared.deepbooru_process_manager.dict()
|
|
shared.deepbooru_process_return["value"] = -1
|
|
shared.deepbooru_process = context.Process(target=deepbooru_process, args=(shared.deepbooru_process_queue, shared.deepbooru_process_return, threshold, deepbooru_opts))
|
|
shared.deepbooru_process.start()
|
|
|
|
|
|
def get_tags_from_process(image):
|
|
from modules import shared
|
|
|
|
shared.deepbooru_process_return["value"] = -1
|
|
shared.deepbooru_process_queue.put(image)
|
|
while shared.deepbooru_process_return["value"] == -1:
|
|
time.sleep(0.2)
|
|
caption = shared.deepbooru_process_return["value"]
|
|
shared.deepbooru_process_return["value"] = -1
|
|
|
|
return caption
|
|
|
|
|
|
def release_process():
|
|
"""
|
|
Stops the deepbooru process to return used memory
|
|
"""
|
|
from modules import shared # prevents circular reference
|
|
shared.deepbooru_process_queue.put("QUIT")
|
|
shared.deepbooru_process.join()
|
|
shared.deepbooru_process_queue = None
|
|
shared.deepbooru_process = None
|
|
shared.deepbooru_process_return = None
|
|
shared.deepbooru_process_manager = None
|
|
|
|
def get_deepbooru_tags_model():
|
|
import deepdanbooru as dd
|
|
import tensorflow as tf
|
|
import numpy as np
|
|
this_folder = os.path.dirname(__file__)
|
|
model_path = os.path.abspath(os.path.join(this_folder, '..', 'models', 'deepbooru'))
|
|
if not os.path.exists(os.path.join(model_path, 'project.json')):
|
|
# there is no point importing these every time
|
|
import zipfile
|
|
from basicsr.utils.download_util import load_file_from_url
|
|
load_file_from_url(
|
|
r"https://github.com/KichangKim/DeepDanbooru/releases/download/v3-20211112-sgd-e28/deepdanbooru-v3-20211112-sgd-e28.zip",
|
|
model_path)
|
|
with zipfile.ZipFile(os.path.join(model_path, "deepdanbooru-v3-20211112-sgd-e28.zip"), "r") as zip_ref:
|
|
zip_ref.extractall(model_path)
|
|
os.remove(os.path.join(model_path, "deepdanbooru-v3-20211112-sgd-e28.zip"))
|
|
|
|
tags = dd.project.load_tags_from_project(model_path)
|
|
model = dd.project.load_model_from_project(
|
|
model_path, compile_model=False
|
|
)
|
|
return model, tags
|
|
|
|
|
|
def get_deepbooru_tags_from_model(model, tags, pil_image, threshold, deepbooru_opts):
|
|
import deepdanbooru as dd
|
|
import tensorflow as tf
|
|
import numpy as np
|
|
|
|
alpha_sort = deepbooru_opts['alpha_sort']
|
|
use_spaces = deepbooru_opts['use_spaces']
|
|
use_escape = deepbooru_opts['use_escape']
|
|
include_ranks = deepbooru_opts['include_ranks']
|
|
|
|
width = model.input_shape[2]
|
|
height = model.input_shape[1]
|
|
image = np.array(pil_image)
|
|
image = tf.image.resize(
|
|
image,
|
|
size=(height, width),
|
|
method=tf.image.ResizeMethod.AREA,
|
|
preserve_aspect_ratio=True,
|
|
)
|
|
image = image.numpy() # EagerTensor to np.array
|
|
image = dd.image.transform_and_pad_image(image, width, height)
|
|
image = image / 255.0
|
|
image_shape = image.shape
|
|
image = image.reshape((1, image_shape[0], image_shape[1], image_shape[2]))
|
|
|
|
y = model.predict(image)[0]
|
|
|
|
result_dict = {}
|
|
|
|
for i, tag in enumerate(tags):
|
|
result_dict[tag] = y[i]
|
|
|
|
unsorted_tags_in_theshold = []
|
|
result_tags_print = []
|
|
for tag in tags:
|
|
if result_dict[tag] >= threshold:
|
|
if tag.startswith("rating:"):
|
|
continue
|
|
unsorted_tags_in_theshold.append((result_dict[tag], tag))
|
|
result_tags_print.append(f'{result_dict[tag]} {tag}')
|
|
|
|
# sort tags
|
|
result_tags_out = []
|
|
sort_ndx = 0
|
|
if alpha_sort:
|
|
sort_ndx = 1
|
|
|
|
# sort by reverse by likelihood and normal for alpha, and format tag text as requested
|
|
unsorted_tags_in_theshold.sort(key=lambda y: y[sort_ndx], reverse=(not alpha_sort))
|
|
for weight, tag in unsorted_tags_in_theshold:
|
|
tag_outformat = tag
|
|
if use_spaces:
|
|
tag_outformat = tag_outformat.replace('_', ' ')
|
|
if use_escape:
|
|
tag_outformat = re.sub(re_special, r'\\\1', tag_outformat)
|
|
if include_ranks:
|
|
tag_outformat = f"({tag_outformat}:{weight:.3f})"
|
|
|
|
result_tags_out.append(tag_outformat)
|
|
|
|
print('\n'.join(sorted(result_tags_print, reverse=True)))
|
|
|
|
return ', '.join(result_tags_out)
|