From 6ce06f4bf4d8c013dbb0f189b132b70bd71d09b3 Mon Sep 17 00:00:00 2001 From: Conner Harkness Date: Thu, 8 May 2025 07:39:52 -0600 Subject: [PATCH] Initial commit, basic functionality --- .gitignore | 4 + app.py | 353 +++++++++++++++++++++++++++++++++++++++++++++++++++++ app.sh | 31 +++++ 3 files changed, 388 insertions(+) create mode 100644 .gitignore create mode 100644 app.py create mode 100644 app.sh diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..45c8cdb --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +/downloads/ +/venv/ +/workflows/ +*.json diff --git a/app.py b/app.py new file mode 100644 index 0000000..9136938 --- /dev/null +++ b/app.py @@ -0,0 +1,353 @@ +import sys +import time +import json +import re +import random +import os +import base64 +import io + +from PIL import Image + +# Using urllib to mitigate need to install "requests": +from urllib.request import Request +from urllib.request import urlopen +from urllib.error import HTTPError +from urllib.error import URLError +from urllib.parse import urlencode + +ENABLE_LOGGING = True + +def log(input_str): + if ENABLE_LOGGING == True: + print(input_str) + +# Used for building multipart/form-data messages below: +def add_bytes(a, b): + new_bytes = a + + if not isinstance(b, list): + b = [b] + + for i in b: + if isinstance(i, str): + new_bytes = new_bytes + i.encode() + else: new_bytes = new_bytes + i + + return new_bytes + +# Merge dictionary b into a +def merge_dicts(a, b): + if b is None: + return a + + output = a + + for k in b.keys(): + if k not in output.keys(): + if b[k] is not None: + output[k] = b[k] + + if b[k] is None: + if k in output.keys(): + output.pop(k, None) + + if isinstance(b[k], dict): + if isinstance(output[k], dict): + output[k] = merge_dicts(output[k], b[k]) + continue + + output[k] = b[k] + if output[k] is None: + del output[k] + + return output + +def png_to_jpg(quality, png_bytes): + with io.BytesIO(png_bytes) as png: + img = Image.open(png) + + with io.BytesIO() as jpg: + img.convert("RGB").save(jpg, format="JPEG", quality=quality) + + return jpg.getvalue() + +def get_response(url, json_in=None, images_in=None, output_as=None): + + output = None + error = None + + verb = "GET" + bytes_sent = 0 + bytes_received = 0 + + if json_in is None: + req = Request(url) + else: + req_headers = { + "User-Agent": "Mozilla/5.0 (X11; U; Linux i686) Gecko/20071127 Firefox/2.0.0.11", + "Content-Type": "application/json" + } + req_data = json.dumps(json_in).encode() + + if images_in is not None: + + nl = "\r\n" + boundary = "DataBoundary" + req_headers["Content-Type"] = f"multipart/form-data; boundary={boundary}" + + b = bytes() + b = add_bytes(b, [f"--{boundary}", nl]) + b = add_bytes(b, [f"Content-Disposition: form-data; name=\"payload_json\"", nl]) + b = add_bytes(b, [f"Content-Type: application/json", nl, nl]) + b = add_bytes(b, [json.dumps(json_in), nl]) + + if not isinstance(images_in, list): + images_in = [image_in] + + count = 0 + for i in images_in: + filename = i["filename"] + data = i["data"] + + b = add_bytes(b, [f"--{boundary}", nl]) + b = add_bytes(b, [f"Content-Disposition: form-data; name=\"file{count}\"; filename=\"{filename}\"", nl]) + b = add_bytes(b, [f"Content-Type: application/octet-stream", nl, nl]) + b = add_bytes(b, [data, ";", nl]) + count = count + 1 + + b = add_bytes(b, [f"--{boundary}--", nl]) + req_data = b + + verb = "POST" + bytes_sent = len(req_data) + + req = Request( + url, + req_data, + headers=req_headers, + method="POST" + ) + + try: + with urlopen(req) as resp: + resp_bytes = resp.read() + bytes_received = len(resp_bytes) + + log(f"{verb}: {url} ({bytes_sent} bytes sent, {bytes_received} received)") + + if output_as == bytes: + output = resp_bytes + else: + resp_text = resp_bytes.decode() + + try: + resp_json = json.loads(resp_text) + output = resp_json + except: + output = resp_text + + except HTTPError as e: + output = e.read().decode() + error = e + log(e) + log(output) + + except URLError as e: + output = str(e) + error = e + log(e) + log(output) + + return output, error + +def generate(options): + + for k in ["api_workflow", "api_url"]: + if k not in options.keys(): + cli_key = re.sub(r"_", "-", k) + log(f"Error: The {k} key was not found in the provided options. Specify it by loading a JSON file using --options filename.json containing the {k} key with a string value or specify it directly as a command-line argument using --{cli_key} with a supplied value.") + exit() + + api_workflow = options["api_workflow"] + api_url = options["api_url"] + + api_url = re.sub(r"\/$", "", api_url) + + with open(api_workflow, "r") as f: + workflow_text = f.read() + + for k in options.keys(): + v = options[k] + target_key = rf"__{k}__" + keys_to_replace = re.findall(target_key, workflow_text) + workflow_text = re.sub(target_key, str(v), workflow_text) + + if len(keys_to_replace) > 0: + log(f"Replacing: {target_key} --> {v}") + + matches = re.findall(r"__[A-Za-z0-9_]{1,}__", workflow_text) + + for m in matches: + unreplaced_key = str(m) + log(f"Warning: key {unreplaced_key} remains unreplaced in json") + + # Validate the JSON or exit early: + try: + valid_json = json.loads(workflow_text) + except Exception as x: + log(x) + log("JSON may be invalid") + exit() + + stub, err = get_response(f"{api_url}/prompt", {"prompt": valid_json}) + + if err: + log(err) + return + + prompt_id = stub["prompt_id"] + + while True: + history, err = get_response(f"{api_url}/history/{prompt_id}") + + if not err: + if prompt_id in history.keys(): + break + + time.sleep(1) + + + # Read the output history anmd fetch each image: + history = history[prompt_id] + output_images = [] + + for o in history["outputs"]: + for node_id in history["outputs"]: + node_output = history["outputs"][node_id] + files_output = [] + + if "images" in node_output: + for image in node_output["images"]: + url_params = urlencode({ + "filename": image["filename"], + "subfolder": image["subfolder"], + "type": image["type"] + }) + + img, error = get_response(f"{api_url}/view?{url_params}", output_as=bytes) + + if not error: + output_images.append({ + "filename": image["filename"], + "data": img + }) + + + + if "jpeg_quality" in options.keys(): + jpeg_quality = int(options["jpeg_quality"]) + + new_output_images = [] + for i in output_images: + new_filename = re.sub(r"\.png$", ".jpg", i["filename"]) + new_data = png_to_jpg(jpeg_quality, i["data"]) + new_output_images.append({"filename": new_filename, "data": new_data}) + + output_images = new_output_images + + + json_payload = {"content": ""} + + for k in ["content", "username"]: + if k in options.keys(): + json_payload[k] = options[k] + + if "webhook_url" in options.keys(): + webhook_url = options["webhook_url"] + results, error = get_response(webhook_url, json_payload, images_in=output_images) + + if "save_directory" in options.keys(): + save_directory = options["save_directory"] + + os.makedirs(save_directory, exist_ok=True) + + if not re.search(r"\/$", save_directory): + save_directory = save_directory + "/" + + for i in output_images: + filename = i["filename"] + data = i["data"] + with open(save_directory + filename, "wb") as f: + f.write(data) + + if "json" in options.keys(): + if options["json"] == 1: + output = {} + output["images"] = [] + + for i in output_images: + filename = i["filename"] + data = i["data"] + data_enc = base64.b64encode(data).decode("UTF-8") + output["images"].append({"filename": filename, "data": data_enc}) + + print(json.dumps(output)) + + +# Command-line interface: +if __name__ == "__main__": + args = sys.argv[1:] + options = {} + i = -1 + + for arg in args: + i = i + 1 + + if re.match(r"^--", arg): + try: + k = re.sub(r"-", "_", args[i][2:]) + options[k] = 1 + + if k in ["quiet", "json"]: + ENABLE_LOGGING = False + continue + + v = args[i + 1] + + if k == "options": + with open(v, "r") as f: + options_json_text = f.read() + options_json = json.loads(options_json_text) + options = merge_dicts(options, options_json) + continue + + options[k] = v + except: + pass + + for k in options.keys(): + v = options[k] + if re.match(r"^random_", k): + choices = v.split("|") + options[k] = random.choice(choices) + + if "seed" not in options.keys(): + options["seed"] = random.randint(1, 999_999_999_999_999) + + for k in options.keys(): + v = options[k] + + if not isinstance(v, str): + continue + + matches = re.findall(r"__[A-Za-z0-9_]{1,}__", v) + + for m in matches: + seek_key = m.split("__")[1] + if seek_key in options.keys(): + v = re.sub(rf"{m}", options[seek_key], v) + options[k] = v + + generate(options) + exit() diff --git a/app.sh b/app.sh new file mode 100644 index 0000000..4778e50 --- /dev/null +++ b/app.sh @@ -0,0 +1,31 @@ +#!/bin/bash + +INSTALLATION=0 + +if [[ ! -d venv ]] +then + python3 -m venv venv || python -m venv venv || ( + echo "Could not create a Python virtual environment." + exit 1 + ) + + INSTALLATION=1 +fi + +if [[ -f ./venv/bin/activate ]]; then source ./venv/bin/activate; PYTHON=python3; fi +if [[ -f ./venv/Scripts/activate ]]; then source ./venv/Scripts/activate; PYTHON=python; fi + +if [[ $INSTALLATION -eq 1 ]] +then + pip install pillow +fi + +$PYTHON -u app.py $@ \ + --options "options.json" \ + --options "anime.json" \ + --positive "((modern art style)) 1girl close portrait, ruby red ((long flowing hair, straight hair)), ruby lips, pale skin, white futuristic armor, sci-fi tech augmentations, high saturation, black background" \ + --negative "((3d shading)), blurry, hazy, gritty, unclear, bad quality, disfigured, malformed, ((blue, gold, cyan)) gemstones" \ + --jpeg-quality 20 + + +# --positive "1girl close portrait, ruby red ((long flowing hair, straight hair)), ruby lips, pale skin, white futuristic armor, sci-fi tech augmentations, ((Xenoblade style)), slightly grayscale, graphite with color, high contrast"