import sys import time import json import re import random import os import base64 import io from PIL import Image # Using urllib to mitigate need to install "requests": from urllib.request import Request from urllib.request import urlopen from urllib.error import HTTPError from urllib.error import URLError from urllib.parse import urlencode ENABLE_LOGGING = True def log(input_str): if ENABLE_LOGGING == True: print(input_str) # Used for building multipart/form-data messages below: def add_bytes(a, b): new_bytes = a if not isinstance(b, list): b = [b] for i in b: if isinstance(i, str): new_bytes = new_bytes + i.encode() else: new_bytes = new_bytes + i return new_bytes # Merge dictionary b into a def merge_dicts(a, b): if b is None: return a output = a for k in b.keys(): if k not in output.keys(): if b[k] is not None: output[k] = b[k] if b[k] is None: if k in output.keys(): output.pop(k, None) if isinstance(b[k], dict): if isinstance(output[k], dict): output[k] = merge_dicts(output[k], b[k]) continue output[k] = b[k] if output[k] is None: del output[k] return output def png_to_jpg(quality, png_bytes): with io.BytesIO(png_bytes) as png: img = Image.open(png) with io.BytesIO() as jpg: img.convert("RGB").save(jpg, format="JPEG", quality=quality) return jpg.getvalue() def get_response(url, json_in=None, images_in=None, output_as=None): output = None error = None verb = "GET" bytes_sent = 0 bytes_received = 0 if json_in is None: req = Request(url) else: req_headers = { "User-Agent": "Mozilla/5.0 (X11; U; Linux i686) Gecko/20071127 Firefox/2.0.0.11", "Content-Type": "application/json" } req_data = json.dumps(json_in).encode() if images_in is not None: nl = "\r\n" boundary = "DataBoundary" req_headers["Content-Type"] = f"multipart/form-data; boundary={boundary}" b = bytes() b = add_bytes(b, [f"--{boundary}", nl]) b = add_bytes(b, [f"Content-Disposition: form-data; name=\"payload_json\"", nl]) b = add_bytes(b, [f"Content-Type: application/json", nl, nl]) b = add_bytes(b, [json.dumps(json_in), nl]) if not isinstance(images_in, list): images_in = [image_in] count = 0 for i in images_in: filename = i["filename"] data = i["data"] b = add_bytes(b, [f"--{boundary}", nl]) b = add_bytes(b, [f"Content-Disposition: form-data; name=\"file{count}\"; filename=\"{filename}\"", nl]) b = add_bytes(b, [f"Content-Type: application/octet-stream", nl, nl]) b = add_bytes(b, [data, ";", nl]) count = count + 1 b = add_bytes(b, [f"--{boundary}--", nl]) req_data = b verb = "POST" bytes_sent = len(req_data) req = Request( url, req_data, headers=req_headers, method="POST" ) try: with urlopen(req) as resp: resp_bytes = resp.read() bytes_received = len(resp_bytes) log(f"{verb}: {url} ({bytes_sent} bytes sent, {bytes_received} received)") if output_as == bytes: output = resp_bytes else: resp_text = resp_bytes.decode() try: resp_json = json.loads(resp_text) output = resp_json except: output = resp_text except HTTPError as e: output = e.read().decode() error = e log(e) log(output) except URLError as e: output = str(e) error = e log(e) log(output) return output, error def generate(options): for k in ["api_workflow", "api_url"]: if k not in options.keys(): cli_key = re.sub(r"_", "-", k) log(f"Error: The {k} key was not found in the provided options. Specify it by loading a JSON file using --options filename.json containing the {k} key with a string value or specify it directly as a command-line argument using --{cli_key} with a supplied value.") exit() api_workflow = options["api_workflow"] api_url = options["api_url"] api_url = re.sub(r"\/$", "", api_url) with open(api_workflow, "r") as f: workflow_text = f.read() for k in options.keys(): v = options[k] target_key = rf"__{k}__" keys_to_replace = re.findall(target_key, workflow_text) workflow_text = re.sub(target_key, str(v), workflow_text) if len(keys_to_replace) > 0: log(f"Replacing: {target_key} --> {v}") matches = re.findall(r"__[A-Za-z0-9_]{1,}__", workflow_text) for m in matches: unreplaced_key = str(m) log(f"Warning: key {unreplaced_key} remains unreplaced in json") # Validate the JSON or exit early: try: valid_json = json.loads(workflow_text) except Exception as x: log(x) log("JSON may be invalid") exit() stub, err = get_response(f"{api_url}/prompt", {"prompt": valid_json}) if err: log(err) return prompt_id = stub["prompt_id"] while True: history, err = get_response(f"{api_url}/history/{prompt_id}") if not err: if prompt_id in history.keys(): break time.sleep(1) # Read the output history anmd fetch each image: history = history[prompt_id] output_images = [] for o in history["outputs"]: for node_id in history["outputs"]: node_output = history["outputs"][node_id] files_output = [] if "images" in node_output: for image in node_output["images"]: url_params = urlencode({ "filename": image["filename"], "subfolder": image["subfolder"], "type": image["type"] }) img, error = get_response(f"{api_url}/view?{url_params}", output_as=bytes) if not error: output_images.append({ "filename": image["filename"], "data": img }) if "jpeg_quality" in options.keys(): jpeg_quality = int(options["jpeg_quality"]) new_output_images = [] for i in output_images: new_filename = re.sub(r"\.png$", ".jpg", i["filename"]) new_data = png_to_jpg(jpeg_quality, i["data"]) new_output_images.append({"filename": new_filename, "data": new_data}) output_images = new_output_images json_payload = {"content": ""} for k in ["content", "username"]: if k in options.keys(): json_payload[k] = options[k] if "webhook_url" in options.keys(): webhook_url = options["webhook_url"] results, error = get_response(webhook_url, json_payload, images_in=output_images) if "save_directory" in options.keys(): save_directory = options["save_directory"] os.makedirs(save_directory, exist_ok=True) if not re.search(r"\/$", save_directory): save_directory = save_directory + "/" for i in output_images: filename = i["filename"] data = i["data"] with open(save_directory + filename, "wb") as f: f.write(data) if "json" in options.keys(): if options["json"] == 1: output = {} output["images"] = [] for i in output_images: filename = i["filename"] data = i["data"] data_enc = base64.b64encode(data).decode("UTF-8") output["images"].append({"filename": filename, "data": data_enc}) print(json.dumps(output)) # Command-line interface: if __name__ == "__main__": args = sys.argv[1:] options = {} i = -1 for arg in args: i = i + 1 if re.match(r"^--", arg): try: k = re.sub(r"-", "_", args[i][2:]) options[k] = 1 if k in ["quiet", "json"]: ENABLE_LOGGING = False continue v = args[i + 1] if k == "options": with open(v, "r") as f: options_json_text = f.read() options_json = json.loads(options_json_text) options = merge_dicts(options, options_json) continue options[k] = v except: pass for k in options.keys(): v = options[k] if re.match(r"^random_", k): choices = v.split("|") options[k] = random.choice(choices) if "seed" not in options.keys(): options["seed"] = random.randint(1, 999_999_999_999_999) for k in options.keys(): v = options[k] if not isinstance(v, str): continue matches = re.findall(r"__[A-Za-z0-9_]{1,}__", v) for m in matches: seek_key = m.split("__")[1] if seek_key in options.keys(): v = re.sub(rf"{m}", options[seek_key], v) options[k] = v generate(options) exit()