354 lines
9.8 KiB
Python
354 lines
9.8 KiB
Python
import sys
|
|
import time
|
|
import json
|
|
import re
|
|
import random
|
|
import os
|
|
import base64
|
|
import io
|
|
|
|
from PIL import Image
|
|
|
|
# Using urllib to mitigate need to install "requests":
|
|
from urllib.request import Request
|
|
from urllib.request import urlopen
|
|
from urllib.error import HTTPError
|
|
from urllib.error import URLError
|
|
from urllib.parse import urlencode
|
|
|
|
ENABLE_LOGGING = True
|
|
|
|
def log(input_str):
|
|
if ENABLE_LOGGING == True:
|
|
print(input_str)
|
|
|
|
# Used for building multipart/form-data messages below:
|
|
def add_bytes(a, b):
|
|
new_bytes = a
|
|
|
|
if not isinstance(b, list):
|
|
b = [b]
|
|
|
|
for i in b:
|
|
if isinstance(i, str):
|
|
new_bytes = new_bytes + i.encode()
|
|
else: new_bytes = new_bytes + i
|
|
|
|
return new_bytes
|
|
|
|
# Merge dictionary b into a
|
|
def merge_dicts(a, b):
|
|
if b is None:
|
|
return a
|
|
|
|
output = a
|
|
|
|
for k in b.keys():
|
|
if k not in output.keys():
|
|
if b[k] is not None:
|
|
output[k] = b[k]
|
|
|
|
if b[k] is None:
|
|
if k in output.keys():
|
|
output.pop(k, None)
|
|
|
|
if isinstance(b[k], dict):
|
|
if isinstance(output[k], dict):
|
|
output[k] = merge_dicts(output[k], b[k])
|
|
continue
|
|
|
|
output[k] = b[k]
|
|
if output[k] is None:
|
|
del output[k]
|
|
|
|
return output
|
|
|
|
def png_to_jpg(quality, png_bytes):
|
|
with io.BytesIO(png_bytes) as png:
|
|
img = Image.open(png)
|
|
|
|
with io.BytesIO() as jpg:
|
|
img.convert("RGB").save(jpg, format="JPEG", quality=quality)
|
|
|
|
return jpg.getvalue()
|
|
|
|
def get_response(url, json_in=None, images_in=None, output_as=None):
|
|
|
|
output = None
|
|
error = None
|
|
|
|
verb = "GET"
|
|
bytes_sent = 0
|
|
bytes_received = 0
|
|
|
|
if json_in is None:
|
|
req = Request(url)
|
|
else:
|
|
req_headers = {
|
|
"User-Agent": "Mozilla/5.0 (X11; U; Linux i686) Gecko/20071127 Firefox/2.0.0.11",
|
|
"Content-Type": "application/json"
|
|
}
|
|
req_data = json.dumps(json_in).encode()
|
|
|
|
if images_in is not None:
|
|
|
|
nl = "\r\n"
|
|
boundary = "DataBoundary"
|
|
req_headers["Content-Type"] = f"multipart/form-data; boundary={boundary}"
|
|
|
|
b = bytes()
|
|
b = add_bytes(b, [f"--{boundary}", nl])
|
|
b = add_bytes(b, [f"Content-Disposition: form-data; name=\"payload_json\"", nl])
|
|
b = add_bytes(b, [f"Content-Type: application/json", nl, nl])
|
|
b = add_bytes(b, [json.dumps(json_in), nl])
|
|
|
|
if not isinstance(images_in, list):
|
|
images_in = [image_in]
|
|
|
|
count = 0
|
|
for i in images_in:
|
|
filename = i["filename"]
|
|
data = i["data"]
|
|
|
|
b = add_bytes(b, [f"--{boundary}", nl])
|
|
b = add_bytes(b, [f"Content-Disposition: form-data; name=\"file{count}\"; filename=\"{filename}\"", nl])
|
|
b = add_bytes(b, [f"Content-Type: application/octet-stream", nl, nl])
|
|
b = add_bytes(b, [data, ";", nl])
|
|
count = count + 1
|
|
|
|
b = add_bytes(b, [f"--{boundary}--", nl])
|
|
req_data = b
|
|
|
|
verb = "POST"
|
|
bytes_sent = len(req_data)
|
|
|
|
req = Request(
|
|
url,
|
|
req_data,
|
|
headers=req_headers,
|
|
method="POST"
|
|
)
|
|
|
|
try:
|
|
with urlopen(req) as resp:
|
|
resp_bytes = resp.read()
|
|
bytes_received = len(resp_bytes)
|
|
|
|
log(f"{verb}: {url} ({bytes_sent} bytes sent, {bytes_received} received)")
|
|
|
|
if output_as == bytes:
|
|
output = resp_bytes
|
|
else:
|
|
resp_text = resp_bytes.decode()
|
|
|
|
try:
|
|
resp_json = json.loads(resp_text)
|
|
output = resp_json
|
|
except:
|
|
output = resp_text
|
|
|
|
except HTTPError as e:
|
|
output = e.read().decode()
|
|
error = e
|
|
log(e)
|
|
log(output)
|
|
|
|
except URLError as e:
|
|
output = str(e)
|
|
error = e
|
|
log(e)
|
|
log(output)
|
|
|
|
return output, error
|
|
|
|
def generate(options):
|
|
|
|
for k in ["api_workflow", "api_url"]:
|
|
if k not in options.keys():
|
|
cli_key = re.sub(r"_", "-", k)
|
|
log(f"Error: The {k} key was not found in the provided options. Specify it by loading a JSON file using --options filename.json containing the {k} key with a string value or specify it directly as a command-line argument using --{cli_key} with a supplied value.")
|
|
exit()
|
|
|
|
api_workflow = options["api_workflow"]
|
|
api_url = options["api_url"]
|
|
|
|
api_url = re.sub(r"\/$", "", api_url)
|
|
|
|
with open(api_workflow, "r") as f:
|
|
workflow_text = f.read()
|
|
|
|
for k in options.keys():
|
|
v = options[k]
|
|
target_key = rf"__{k}__"
|
|
keys_to_replace = re.findall(target_key, workflow_text)
|
|
workflow_text = re.sub(target_key, str(v), workflow_text)
|
|
|
|
if len(keys_to_replace) > 0:
|
|
log(f"Replacing: {target_key} --> {v}")
|
|
|
|
matches = re.findall(r"__[A-Za-z0-9_]{1,}__", workflow_text)
|
|
|
|
for m in matches:
|
|
unreplaced_key = str(m)
|
|
log(f"Warning: key {unreplaced_key} remains unreplaced in json")
|
|
|
|
# Validate the JSON or exit early:
|
|
try:
|
|
valid_json = json.loads(workflow_text)
|
|
except Exception as x:
|
|
log(x)
|
|
log("JSON may be invalid")
|
|
exit()
|
|
|
|
stub, err = get_response(f"{api_url}/prompt", {"prompt": valid_json})
|
|
|
|
if err:
|
|
log(err)
|
|
return
|
|
|
|
prompt_id = stub["prompt_id"]
|
|
|
|
while True:
|
|
history, err = get_response(f"{api_url}/history/{prompt_id}")
|
|
|
|
if not err:
|
|
if prompt_id in history.keys():
|
|
break
|
|
|
|
time.sleep(1)
|
|
|
|
|
|
# Read the output history anmd fetch each image:
|
|
history = history[prompt_id]
|
|
output_images = []
|
|
|
|
for o in history["outputs"]:
|
|
for node_id in history["outputs"]:
|
|
node_output = history["outputs"][node_id]
|
|
files_output = []
|
|
|
|
if "images" in node_output:
|
|
for image in node_output["images"]:
|
|
url_params = urlencode({
|
|
"filename": image["filename"],
|
|
"subfolder": image["subfolder"],
|
|
"type": image["type"]
|
|
})
|
|
|
|
img, error = get_response(f"{api_url}/view?{url_params}", output_as=bytes)
|
|
|
|
if not error:
|
|
output_images.append({
|
|
"filename": image["filename"],
|
|
"data": img
|
|
})
|
|
|
|
|
|
|
|
if "jpeg_quality" in options.keys():
|
|
jpeg_quality = int(options["jpeg_quality"])
|
|
|
|
new_output_images = []
|
|
for i in output_images:
|
|
new_filename = re.sub(r"\.png$", ".jpg", i["filename"])
|
|
new_data = png_to_jpg(jpeg_quality, i["data"])
|
|
new_output_images.append({"filename": new_filename, "data": new_data})
|
|
|
|
output_images = new_output_images
|
|
|
|
|
|
json_payload = {"content": ""}
|
|
|
|
for k in ["content", "username"]:
|
|
if k in options.keys():
|
|
json_payload[k] = options[k]
|
|
|
|
if "webhook_url" in options.keys():
|
|
webhook_url = options["webhook_url"]
|
|
results, error = get_response(webhook_url, json_payload, images_in=output_images)
|
|
|
|
if "save_directory" in options.keys():
|
|
save_directory = options["save_directory"]
|
|
|
|
os.makedirs(save_directory, exist_ok=True)
|
|
|
|
if not re.search(r"\/$", save_directory):
|
|
save_directory = save_directory + "/"
|
|
|
|
for i in output_images:
|
|
filename = i["filename"]
|
|
data = i["data"]
|
|
with open(save_directory + filename, "wb") as f:
|
|
f.write(data)
|
|
|
|
if "json" in options.keys():
|
|
if options["json"] == 1:
|
|
output = {}
|
|
output["images"] = []
|
|
|
|
for i in output_images:
|
|
filename = i["filename"]
|
|
data = i["data"]
|
|
data_enc = base64.b64encode(data).decode("UTF-8")
|
|
output["images"].append({"filename": filename, "data": data_enc})
|
|
|
|
print(json.dumps(output))
|
|
|
|
|
|
# Command-line interface:
|
|
if __name__ == "__main__":
|
|
args = sys.argv[1:]
|
|
options = {}
|
|
i = -1
|
|
|
|
for arg in args:
|
|
i = i + 1
|
|
|
|
if re.match(r"^--", arg):
|
|
try:
|
|
k = re.sub(r"-", "_", args[i][2:])
|
|
options[k] = 1
|
|
|
|
if k in ["quiet", "json"]:
|
|
ENABLE_LOGGING = False
|
|
continue
|
|
|
|
v = args[i + 1]
|
|
|
|
if k == "options":
|
|
with open(v, "r") as f:
|
|
options_json_text = f.read()
|
|
options_json = json.loads(options_json_text)
|
|
options = merge_dicts(options, options_json)
|
|
continue
|
|
|
|
options[k] = v
|
|
except:
|
|
pass
|
|
|
|
for k in options.keys():
|
|
v = options[k]
|
|
if re.match(r"^random_", k):
|
|
choices = v.split("|")
|
|
options[k] = random.choice(choices)
|
|
|
|
if "seed" not in options.keys():
|
|
options["seed"] = random.randint(1, 999_999_999_999_999)
|
|
|
|
for k in options.keys():
|
|
v = options[k]
|
|
|
|
if not isinstance(v, str):
|
|
continue
|
|
|
|
matches = re.findall(r"__[A-Za-z0-9_]{1,}__", v)
|
|
|
|
for m in matches:
|
|
seek_key = m.split("__")[1]
|
|
if seek_key in options.keys():
|
|
v = re.sub(rf"{m}", options[seek_key], v)
|
|
options[k] = v
|
|
|
|
generate(options)
|
|
exit()
|