Compare commits

...

2 Commits

Author SHA1 Message Date
1107eba42b Completed per-conversation setting overrides 2025-06-03 19:24:56 -06:00
25beca85e9 Initial code commit 2025-05-30 07:36:28 -06:00
13 changed files with 1539 additions and 0 deletions

9
.gitignore vendored Normal file
View File

@ -0,0 +1,9 @@
__pycache__/
/venv/
/cuda*/
/python/
*.gguf
/user/
/model.json
/settings.json
/config.sh

228
app.py Normal file
View File

@ -0,0 +1,228 @@
import os
import signal
import re
import json
import time
import copy
import traceback
import subprocess
import shlex
import threading
import hashlib
from lib.helpers import *
from lib.extension import *
import streamlit as st
ss = st.session_state
# Load globals from the environment variables:
for x in os.environ:
globals()[x] = os.environ[x]
hide_deploy_button()
#
#
#
ss.CONFIG_DIR = "."
ss.TOKEN_LIST = []
mkdir("user")
for p in os.scandir("user"):
if os.path.isdir(p.path):
ss.TOKEN_LIST.append({
"token": p.name,
"create_time": os.path.getctime(p.path)
})
if len(ss.TOKEN_LIST) < 1:
token = generate_token()
token_path = f"user/{token}"
mkdir(token_path)
ss.TOKEN_LIST.append({
"token": token,
"create_time": os.path.getctime(token_path)
})
ss.TOKEN_LIST = sorted(ss.TOKEN_LIST, key=lambda o: sort_by_key(o, "create_time"), reverse=False)
ss.TOKEN = ss.TOKEN_LIST[0]["token"] if len(ss.TOKEN_LIST) == 1 else None
ss.TOKEN_COOKIE = get_cookie("token")
if isinstance(ss.TOKEN_COOKIE, str):
if len(ss.TOKEN_COOKIE) > 0:
tokens = [i["token"] for i in ss.TOKEN_LIST]
if ss.TOKEN_COOKIE in tokens:
ss.TOKEN = ss.TOKEN_COOKIE
if ss.TOKEN is not None:
for i in ss.TOKEN_LIST:
if ss.TOKEN == i["token"]:
ss.CONFIG_DIR = f"user/{ss.TOKEN}"
break
ss.CHATS_DIR = f"{ss.CONFIG_DIR}/chats"
ss.SETTINGS = JsonFile(f"{ss.CONFIG_DIR}/settings.json", defaults=({
"fetch_reply": True,
"save_as": False,
"show_clear": False,
"show_undo": True,
"show_redo": True,
"show_more": True,
"show_fetch_button": True,
"show_fetch_toggle": True
} if ss.CONFIG_DIR != "." else {}))
ss.IS_ADMIN = False
if ss.TOKEN == ss.TOKEN_LIST[0]["token"]:
ss.IS_ADMIN = True
ss.APP_SETTINGS = JsonFile("settings.json", defaults={
"inference_server_url": "http://127.0.0.1:11434/"
})
#
#
#
ss.PAGE_REGISTRY = {}
ss.PAGE_REGISTRY_MAP = {}
def register_page(category, title, fn, **kwargs):
if category not in ss.PAGE_REGISTRY.keys():
ss.PAGE_REGISTRY[category] = []
if category not in ss.PAGE_REGISTRY_MAP.keys():
ss.PAGE_REGISTRY_MAP[category] = {}
pg = st.Page(fn, title=title, **kwargs)
if title not in ss.PAGE_REGISTRY_MAP[category].keys():
ss.PAGE_REGISTRY_MAP[category][title] = pg
ss.PAGE_REGISTRY[category].append(pg)
def register_pages(category, arr, fn, **kwargs):
for item in arr:
kwargs_copy = copy.copy(kwargs)
title = None
if isinstance(item, str): title = item
if isinstance(item, dict):
if "title" in item.keys():
title = item["title"]
if "icon" in item.keys():
kwargs_copy["icon"] = item["icon"]
fn_name = f"{category}_{title}"
fn_name = re.sub(r"\s+", "_", fn_name)
fn_name = re.sub(r"[^A-Za-z0-9_]", "", fn_name)
fn_name = fn_name.lower()
def abc(item=item):
fn(item)
abc.__name__ = fn_name
print(f"Registering {fn_name}")
globals()[fn_name] = abc
register_page(category, title, abc, **kwargs_copy)
#
#
#
from views.chats_default import *
from views.more_about import *
from views.more_settings import *
from views.more_tokens import *
#
#
#
if ss.TOKEN is not None:
mkdir(ss.CHATS_DIR)
chats = []
for c in os.scandir(ss.CHATS_DIR):
if re.search(r"\.json$", c.name):
chat_name = re.sub(r"\.json$", "", c.name)
obj = {
"title": chat_name,
"path": c.path,
"modify_time": os.path.getmtime(c.path),
"pinned": False
}
with open(c.path) as f:
chat = json.loads(f.read())
if "pinned" in chat.keys():
if chat["pinned"]:
obj["pinned"] = True
obj["icon"] = ":material/keep:"
chats.append(obj)
chats = sorted(chats, key=lambda chat: sort_by_key(chat, "modify_time"), reverse=True)
chats = sorted(chats, key=lambda chat: sort_by_key(chat, "pinned"), reverse=True)
if len(chats) < 1:
chats.append({
"title": "Untitled",
"path": f"{ss.CHATS_DIR}/Untitled.json",
"modify_time": 0,
"pinned": False
})
register_pages("Chats", chats, chats_default, icon=":material/chat:")
#
#
#
register_page("More", "About", more_about, icon=":material/info:")
register_page("More", "Settings", more_settings, icon=":material/settings:")
if ss.IS_ADMIN:
register_page("More", "Tokens", more_tokens, icon=":material/key:")
#
#
#
if ss.TOKEN is None:
st.sidebar.warning("A valid API token is required to use this software.")
def save_token():
set_cookie("token", ss.new_token)
page_redirect()
token = st.sidebar.text_input(
"Token",
value=get_cookie("token"),
help="Provide a valid token here",
on_change=save_token,
key="new_token")
else:
if len(ss.TOKEN_LIST) > 1:
if st.sidebar.button("Logout", icon=":material/logout:", use_container_width=True):
set_cookie("token", None)
page_redirect()
st.sidebar.caption("mllm-lite by caharkness")
# Only attempt to handle redirect after all page objects exist:
handle_redirect()
pg = st.navigation(ss.PAGE_REGISTRY)
pg.run()

4
config_example.sh Normal file
View File

@ -0,0 +1,4 @@
#!/bin/bash
export STREAMLIT_HOST="0.0.0.0"
export STREAMLIT_PORT=8080

31
lib/chatml.py Normal file
View File

@ -0,0 +1,31 @@
class ChatML():
def format(context, query, for_completion=False):
# ChatML format:
user_id = "user"
assistant_id = "assistant"
context_declaration = "<|im_start|>system\n"
message_declaration = "<|im_start|>{author}\n"
end_of_message = "<|im_end|>\n"
output = ""
if isinstance(query, str):
query = [{"author": "user", "body": query}]
if isinstance(query, list):
for message in query:
author = message["author"]
body = message["body"]
if "nickname" in message.keys():
nickname = message["nickname"]
author = nickname
output = f"{output}{message_declaration.format(author=author)}{body}{end_of_message}"
append = ""
if for_completion:
append = message_declaration.format(author=assistant_id)
output = f"""{context_declaration}{context}{end_of_message}{output}{append}"""
return output

102
lib/extension.py Normal file
View File

@ -0,0 +1,102 @@
import re
import time
import streamlit as st
from streamlit_js_eval import streamlit_js_eval
ss = st.session_state
# Allows us to get a Streamlit cookie safely without added fuss:
def get_cookie(key):
if "cookies" in ss:
if key in ss.cookies.keys():
value = ss.cookies[key]
if value is None or len(value) < 1:
del ss.cookies[key]
return None
return ss.cookies[key]
cookies = st.context.cookies
if "cookies" not in st.session_state:
ss.cookies = {}
if key in cookies.keys():
ss.cookies[key] = cookies[key]
return cookies[key]
return None
# Set a cookie in-page using JavaScript without the "rerun" BS:
def set_cookie(key, value):
print(f"set_cookie({key}, {value}) called...")
if value is None:
value = ""
javascript_string = f"""
var cookies = parent.document.cookie;
parent.document.cookie = `{key}={value};`;
console.log(`parent.document.cookie = ${{parent.document.cookie}}`)
"""
streamlit_js_eval(js_expressions=javascript_string)
time.sleep(1)
def page_redirect(url=""):
js = f"""
var link = parent.document.createElement("a");
parent.document.body.appendChild(link);
link.href = "{url}";
if (link.href === "")
link.href = window.location.href;
link.click();
"""
streamlit_js_eval(js_expressions=js)
def perform_redirect(pg_category, pg_name):
if pg_category not in ss.PAGE_REGISTRY_MAP.keys(): return
if pg_name not in ss.PAGE_REGISTRY_MAP[pg_category].keys(): return
pg = ss.PAGE_REGISTRY_MAP[pg_category][pg_name]
st.switch_page(pg)
def handle_redirect():
if "redirect_page_category" in st.session_state:
if "redirect_page_name" in st.session_state:
pg_category = ss.redirect_page_category
pg_name = ss.redirect_page_name
del st.session_state.redirect_page_category
del st.session_state.redirect_page_name
perform_redirect(pg_category, pg_name)
def redirect(pg_category, pg_name):
print(f"Redirecting to page {pg_category} {pg_name}")
st.session_state.redirect_page_category = pg_category
st.session_state.redirect_page_name = pg_name
st.rerun()
def hide_deploy_button():
st.markdown("""
<style>
.stAppDeployButton { display: none; }
</style>
""", unsafe_allow_html=True)
def shrink_sidebar():
js = """
setTimeout(function() {
const sidebar = document.querySelector(".stSidebar");
sidebar.style.width = "244px";
}, 2500);
"""
streamlit_js_eval(js_expressions=js)

73
lib/gui.py Normal file
View File

@ -0,0 +1,73 @@
import sys
sys.path.append(".")
sys.path.append("./lib")
import os
import json
from lib.helpers import *
from PyQt6.QtWidgets import QApplication, QMainWindow
from PyQt6.QtWebEngineWidgets import QWebEngineView
from PyQt6.QtCore import QUrl
from PyQt6.QtWebEngineCore import QWebEngineCookieStore, QWebEngineProfile
from PyQt6.QtNetwork import QNetworkCookie
from PyQt6.QtCore import QDateTime, QUrl
os.environ["QTWEBENGINE_CHROMIUM_FLAGS"] = "--disable-gpu"
os.environ["QT_QUICK_BACKEND"] = "software"
#os.environ["QTWEBENGINE_CHROMIUM_FLAGS"] = "--ignore-gpu-blacklist --enable-gpu-rasterization --enable-native-gpu-memory-buffers --num-raster-threads=4"
#os.environ["QTWEBENGINE_CHROMIUM_FLAGS"] = "--disable-gpu-memory-buffer-video-frames"
#--disable-gpu-memory-buffer-video-frames
#qutebrowser --qt-flag ignore-gpu-blacklist --qt-flag enable-gpu-rasterization --qt-flag enable-native-gpu-memory-buffers --qt-flag num-raster-threads=4
TOKEN = ""
TOKEN_LIST = []
if os.path.isdir("user"):
for p in os.scandir("user"):
if os.path.isdir(p.path):
TOKEN_LIST.append({
"token": p.name,
"create_time": os.path.getctime(p.path)
})
TOKEN_LIST = sorted(TOKEN_LIST, key=lambda o: sort_by_key(o, "create_time"), reverse=False)
TOKEN = TOKEN_LIST[0]["token"] if len(TOKEN_LIST) > 0 else None
app = QApplication(sys.argv)
# Load globals from the environment variables:
for x in os.environ:
globals()[x] = os.environ[x]
def set_cookie(k, v):
profile = QWebEngineProfile.defaultProfile()
cookie_store = profile.cookieStore()
cookie = QNetworkCookie(k.encode("utf-8"), v.encode("utf-8"))
cookie.setDomain(f"127.0.0.1")
cookie.setPath("/")
cookie.setExpirationDate(QDateTime.currentDateTime().addDays(1))
cookie.setSecure(True)
cookie_store.setCookie(cookie, QUrl(f"http://127.0.0.1:{STREAMLIT_PORT}"))
set_cookie("token", TOKEN)
webview = QWebEngineView()
webview.load(QUrl(f"http://127.0.0.1:{STREAMLIT_PORT}"))
window = QMainWindow()
window.setWindowTitle("Streamlit")
window.resize(1280, 640)
window.setCentralWidget(webview)
window.show()
# Allow browser to control the window's title:
webview.titleChanged.connect(window.setWindowTitle)
sys.exit(app.exec())

171
lib/helpers.py Normal file
View File

@ -0,0 +1,171 @@
import sys
import os
import re
import random
import json
from pathlib import Path
def sort_by_key(arr, key):
output = None
if key in arr.keys():
output = arr[key]
if output == True: output = 1
if output == False: output = 0
if output is not None:
return output
return 0
def mkdir(dir_path):
path_object = Path(dir_path)
path_object.mkdir(parents=True, exist_ok=True)
def touch(file_path, default_content=""):
path_object = Path(file_path)
path_object.parent.mkdir(parents=True, exist_ok=True)
if not os.path.exists(file_path):
with open(file_path, "w") as file_handle:
file_handle.write(default_content)
def generate_token():
character_set = "0123456789abcdef"
output_token = ""
for i in range(32):
output_token = output_token + random.choice(character_set)
return output_token
class JsonFile:
def __init__(self, path, defaults={}):
self.path = path
self.contents = {}
self.defaults = defaults
self.load()
def load(self):
touch(self.path, default_content="{}")
with open(self.path) as f:
f.seek(0)
self.contents = json.loads(f.read())
for k in self.defaults:
if k not in self.contents.keys():
self.contents[k] = self.defaults[k]
self.save()
def save(self):
with open(self.path, "w") as f:
f.write(json.dumps(self.contents, indent=4))
def get(self, key, default_value=None, save=False):
if key in self.contents.keys():
return self.contents[key]
else:
if save:
self.contents[key] = default_value
self.save()
return default_value
def __getitem__(self, key):
return self.get(key)
def __setitem__(self, key, value):
print(f"{self.path} setting {key} to {value}")
self.contents[key] = value
self.save()
def keys(self):
return self.contents.keys()
def widget(self, st, kind, name, key, **kwargs):
setting_key = f"setting_{key}"
value_to_render = self[key]
def on_change_internal():
val = st.session_state[setting_key]
print(f"Set {key} to {val}")
if kind == st.text_area:
if "array_separator" in extra_options.keys():
sep = extra_options["array_separator"]
val = val.strip().split(sep)
self.load()
self[key] = val
self.save()
if "on_change" in kwargs:
kwargs.get("on_change")()
if kind == st.selectbox:
options = extra_options["options"]
selected_index = 0
try:
selected_index = options.index(self[key])
except:
pass
del extra_options["options"]
kind(
name,
options=options,
index=selected_index,
key=setting_key,
on_change=on_change_internal,
**extra_options)
return
if kind == st.text_area:
if "array_separator" in extra_options.keys():
sep = extra_options["array_separator"]
value_to_render = sep.join(value_to_render)
del extra_options["array_separator"]
kind(name, value=value_to_render, key=setting_key, on_change=on_change_internal, **kwargs)
def get_next_filename(path):
if not os.path.exists(path):
return path
extension_pat = r"\.[A-Za-z0-9]{1,}$"
extension = ""
without_ext = path
if re.search(extension_pat, path):
extension = re.findall(extension_pat, without_ext)[0]
without_ext = re.sub(extension_pat, "", without_ext)
number_pat = r" [0-9]{1,}$"
number = 1
if re.search(number_pat, without_ext):
number = re.findall(number_pat, without_ext)[0]
without_ext = re.sub(number_pat, "", without_ext)
number = int(number)
number = number + 1
without_ext = f"{without_ext} {number}"
with_ext = without_ext + extension
return with_ext

133
lib/llmhost.py Normal file
View File

@ -0,0 +1,133 @@
import sys
sys.path.append(".")
sys.path.append("./lib")
import re
import requests
import os
import json
import traceback
import time
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from llama_cpp import Llama
print("Loading model...", end=" ")
model_settings_path = "model.json"
model_settings = {
"model_path": None,
"n_gpu_layers": -1,
"n_ctx": 32768,
"n_threads": 8,
"max_tokens": 16384,
"stop": ["<|im_end|>", "</s>", "<|im_start|>"],
"repeat_penalty": 1.1,
"temperature": 0.75
}
if not os.path.isfile(model_settings_path):
with open(model_settings_path, "w") as f:
f.write(json.dumps(model_settings, indent=4))
with open(model_settings_path) as f:
model_settings = json.loads(f.read())
if model_settings["model_path"] is None:
for f in os.scandir("."):
if re.search(r"\.gguf$", f.path):
model_settings["model_path"] = f.path
break
if model_settings["model_path"] is None:
raise Exception("No .gguf model was found in the program directory. Please specify a model's relative or absolute path using the generated model.json configuration file.")
LLM = Llama(
model_path = model_settings["model_path"],
n_gpu_layers = model_settings["n_gpu_layers"],
n_ctx = model_settings["n_ctx"],
verbose = False,
n_threads = model_settings["n_threads"])
print("Loaded model {model_path}".format(model_path=model_settings["model_path"]))
class PrivateHandler(BaseHTTPRequestHandler):
LOCK = False
def do_POST(self):
content_length = int(self.headers.get("Content-Length", 0))
post_data = self.rfile.read(content_length).decode("utf-8")
trimmed_path = self.path[1:].strip()
if len(trimmed_path) > 0:
args = trimmed_path.split("/")
fn_given = args[0]
fn_actual = f"public_{fn_given}"
if hasattr(self, fn_actual):
attr = getattr(self, fn_actual)
if hasattr(attr, "__call__"):
function = attr
extra_args = args[1:] if len(args) > 1 else None
function(post_data, extra_args)
return
self.index(post_data)
def index(self, post_data):
try:
while PrivateHandler.LOCK:
print(".", end="")
time.sleep(0.1)
PrivateHandler.LOCK = True
text = post_data
print("POST:")
print(text, end="")
# Set response headers
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.send_header("Transfer-Encoding", "chunked")
self.end_headers()
response = LLM(
text,
max_tokens = model_settings["max_tokens"],
stop = model_settings["stop"],
echo = False,
repeat_penalty = model_settings["repeat_penalty"],
temperature = model_settings["temperature"],
stream = True)
# Stream a buffered response
for token in response:
token_text = token["choices"][0]["text"]
token_json = json.dumps(token_text)
self.wfile.write(f"{len(token_json):x}\r\n".encode("utf-8")) # Chunk size in hex
self.wfile.write(f"{token_json}\r\n".encode("utf-8"))
print(token_text, end="")
# Signal the end of the stream
self.wfile.write(b"0\r\n\r\n")
print("\n-----")
except Exception as x:
print(traceback.format_exc())
print(x)
pass
PrivateHandler.LOCK = False
time.sleep(0.2)
if __name__ == "__main__":
address = "0.0.0.0"
port = 11434
print(f"Listening on {address} port {port}...")
httpd = ThreadingHTTPServer((address, port), PrivateHandler)
httpd.serve_forever()

139
mllm.sh Normal file
View File

@ -0,0 +1,139 @@
#!/bin/bash
# https://stackoverflow.com/a/1482133
SCRIPT_DIR="$(dirname -- "$(readlink -f -- "$0";)";)"
cd "${SCRIPT_DIR}"
if [[ -f config_example.sh ]]; then source config_example.sh; fi
if [[ -f config.sh ]]; then source config.sh; fi
#
#
#
#
#
#
#
function title()
{
echo -ne "\033]0;$@\007"
}
if [[ ! -d venv ]]
then
python3 -m venv venv || python -m venv venv || (
echo "Could not make a python virtual environment"
exit 1
)
FIRST_RUN=1
fi
if [[ -f ./venv/bin/activate ]]; then source ./venv/bin/activate; fi
if [[ -f ./venv/Scripts/activate ]]; then source ./venv/Scripts/activate; IS_WINDOWS=1; fi
if [[ -f ./python/python.exe ]]
then
IS_WINDOWS=1
function pip()
{
./python/python.exe -m pip "$@"
}
function python()
{
./python/python.exe "$@"
}
function streamlit()
{
./python/Scripts/streamlit.exe "$@"
}
export -f pip
export -f streamlit
export -f python
fi
PYTHON="python3"
if [[ $IS_WINDOWS -eq 1 ]]
then
PYTHON="python"
export CUDA_PATH="$PWD/cuda126"
export PATH="$PATH:$CUDA_PATH/bin"
fi
if [[ "$@" =~ --first-run ]]
then
FIRST_RUN=1
fi
if [[ $FIRST_RUN -eq 1 ]]
then
pip install streamlit
pip install streamlit_js_eval
pip install pyqt6 pyqt6-webengine
fi
# Used to enter the Python environment for using pip:
if [[ "$@" =~ --bash ]]
then
bash
exit 0
fi
if [[ "$@" =~ --server ]]
then
$PYTHON -u lib/llmhost.py "$@"
exit 0
fi
if [[ "$@" =~ --streamlit ]]
then
streamlit run app.py \
--server.address $STREAMLIT_HOST \
--server.port $STREAMLIT_PORT \
--server.headless true \
--browser.gatherUsageStats false \
--server.enableXsrfProtection false \
--server.enableCORS false \
--server.enableWebsocketCompression false
exit 0
fi
if [[ "$@" =~ --gui ]]
then
$PYTHON -u lib/gui.py "$@"
echo $CHILD_PIDS | xargs kill
exit 0
fi
# You've made it here if you've ran the script without any arguments:
title "mllm"
# https://stackoverflow.com/a/2173421
trap "trap - SIGTERM && kill -- -$$" SIGINT SIGTERM EXIT
SCRIPT_NAME="$(basename "$0")"
CHILD_PIDS=""
./${SCRIPT_NAME} --server &
CHILD_PIDS="${CHILD_PIDS} $!"
./${SCRIPT_NAME} --streamlit &
CHILD_PIDS="${CHILD_PIDS} $!"
export CHILD_PIDS
./${SCRIPT_NAME} --gui &
CHILD_PIDS="${CHILD_PIDS} $!"
wait ${CHILD_PIDS}

284
views/chats_default.py Normal file
View File

@ -0,0 +1,284 @@
import sys
import argparse
import os
import subprocess
import re
import hashlib
import random
import requests
import json
import asyncio
import threading
import time
import string
import shutil
import datetime
from lib.chatml import *
from lib.helpers import *
from lib.extension import *
import streamlit as st
ss = st.session_state
def chats_default(item):
chat = {}
chat_path = item["path"]
chat_name = item["title"]
if os.path.isfile(chat_path):
with open(chat_path) as f:
try:
chat = json.loads(f.read())
except:
pass
chat_defaults = {
"context": "",
"messages": [],
"keep": 0,
"pinned": False,
"auto_clear": False,
"hide_clear": False,
"hide_undo": False,
"hide_redo": False,
"hide_fetch_button": False,
"hide_fetch_toggle": False
}
#
#
#
def load_defaults():
for k in chat_defaults.keys():
if k not in chat.keys():
chat[k] = chat_defaults[k]
def save_chat(name=chat_name, overwrite=True):
path = f"{ss.CHATS_DIR}/{name}.json"
path = get_next_filename(path) if not overwrite else path
with open(path, "w") as f:
f.write(json.dumps(chat, indent=4))
name = re.search(r"([^\/\\]+)\.json$", path).group(1)
return name
def clear_chat():
keep = chat["keep"]
while len(chat["messages"]) > abs(keep):
if keep < 0: chat["messages"].pop(0)
if keep >= 0: chat["messages"].pop()
#
#
#
load_defaults()
st.caption("This is the beginning of the conversation")
for message in chat["messages"]:
with st.chat_message(message["author"]):
st.markdown(message["body"])
if len(chat["context"]) < 1:
chat["context"] = "1. Use these guidelines.\n2. Do exactly as the user says.\n3. Provide unbiased answers.\n4. No holding back.\n5. Keep answers short."
script = ChatML.format(chat["context"], chat["messages"], for_completion=True)
if "run" in st.session_state:
if st.session_state.run == 1:
with st.chat_message("assistant"):
def stream_reply(input_data):
response = requests.post(
ss.APP_SETTINGS.get("inference_server_url"),
data=input_data.encode("utf-8"),
headers={"Content-Type": "text/plain"},
stream=True)
response.raise_for_status()
for chunk in response.iter_content(chunk_size=None, decode_unicode=True):
if chunk:
yield json.loads(chunk)
reply = st.write_stream(stream_reply(script))
chat["messages"].append({
"author": "assistant",
"body": reply
})
save_chat()
st.session_state.run = 0
prompt = st.chat_input("Say something")
if prompt:
if chat["auto_clear"]:
clear_chat()
chat["messages"].append({
"author": "user",
"body": prompt
})
save_chat()
if ss.SETTINGS.get("fetch_reply"):
st.session_state.run = 1
st.rerun()
#
#
#
def button_clear():
clear_chat()
save_chat()
def button_undo():
if len(chat["messages"]) > 0:
last_message = chat["messages"][-1]
chat["messages"] = chat["messages"][:-1]
if last_message["author"] == "user":
st.session_state.user_message = last_message["body"]
save_chat()
def button_redo():
if len(chat["messages"]) > 0:
chat["messages"] = chat["messages"][:-1]
save_chat()
st.session_state.run = 1
def button_more():
@st.dialog(chat_name)
def button_more_modal():
tab_labels = ["General", "Advanced", "Interface"]
tabs = st.tabs(tab_labels)
save_button_group = None
action_button_group = None
if (t := "General") in tab_labels:
with tabs[tab_labels.index(t)]:
original_name = chat_name
new_name = st.text_input("Name", value=chat_name)
new_context = st.text_area("Context", value=chat["context"])
save_button_group = st.container()
if (t := "Advanced") in tab_labels:
with tabs[tab_labels.index(t)]:
new_keep = st.number_input("Keep Messages", value=chat["keep"], help="Number of messages to keep from the top after a clear")
with st.container(border=True):
new_auto_clear = st.toggle("Auto clear", value=chat["auto_clear"])
new_pinned = st.toggle("Pinned", value=chat["pinned"])
action_button_group = st.container()
if (t := "Interface") in tab_labels:
with tabs[tab_labels.index(t)]:
new_hide_clear = st.toggle("Hide clear", value=chat["hide_clear"])
new_hide_undo = st.toggle("Hide undo", value=chat["hide_undo"])
new_hide_redo = st.toggle("Hide redo", value=chat["hide_redo"])
new_hide_fetch_button = st.toggle("Hide fetch button", value=chat["hide_fetch_button"])
new_hide_fetch_toggle = st.toggle("Hide fetch toggle", value=chat["hide_fetch_toggle"])
with action_button_group:
cols = st.columns([1, 1, 1])
with cols[0]:
if st.button("Clear", icon=":material/mop:", use_container_width=True):
chat["keep"] = new_keep
clear_chat()
save_chat()
redirect("Chats", original_name)
with cols[1]:
if st.button("Delete", icon=":material/delete:", use_container_width=True):
os.unlink(chat_path)
st.rerun()
with save_button_group:
cols = st.columns([1, 1, 1])
def save_common():
chat["context"] = new_context
chat["keep"] = new_keep
chat["pinned"] = new_pinned
chat["auto_clear"] = new_auto_clear
chat["hide_clear"] = new_hide_clear
chat["hide_undo"] = new_hide_undo
chat["hide_redo"] = new_hide_redo
chat["hide_fetch_button"] = new_hide_fetch_button
chat["hide_fetch_toggle"] = new_hide_fetch_toggle
with cols[0]:
if st.button("Save", icon=":material/save:", use_container_width=True):
save_common()
goto_name = save_chat(name=new_name, overwrite=True)
if chat_name != new_name:
os.unlink(chat_path)
redirect("Chats", goto_name)
with cols[1]:
if st.button("Copy", icon=":material/file_copy:", use_container_width=True):
save_common()
goto_name = save_chat(name=new_name, overwrite=False)
redirect("Chats", goto_name)
button_more_modal()
def button_fetch():
st.session_state.run = 1
cols = st.columns(7)
cols_pos = -1
if not chat["hide_clear"]:
if ss.SETTINGS.get("show_clear"):
if len(chat["messages"]) > abs(chat["keep"]):
with cols[(cols_pos := cols_pos + 1)]:
st.button("", icon=":material/mop:", on_click=button_clear, use_container_width=True)
if not chat["hide_undo"]:
if ss.SETTINGS.get("show_undo"):
if len(chat["messages"]) > 0:
with cols[(cols_pos := cols_pos + 1)]:
st.button("", icon=":material/undo:", on_click=button_undo, use_container_width=True)
if not chat["hide_redo"]:
if ss.SETTINGS.get("show_redo"):
if len(chat["messages"]) > 1:
if chat["messages"][-1]["author"] == "assistant":
with cols[(cols_pos := cols_pos + 1)]:
st.button("", icon=":material/redo:", on_click=button_redo, use_container_width=True)
if ss.SETTINGS.get("show_more"):
with cols[(cols_pos := cols_pos + 1)]:
st.button("", icon=":material/more_horiz:", on_click=button_more, use_container_width=True)
if not chat["hide_fetch_button"]:
if ss.SETTINGS.get("show_fetch_button"):
with cols[(cols_pos := cols_pos + 1)]:
st.button("", icon=":material/skip_next:", on_click=button_fetch, use_container_width=True)
if not chat["hide_fetch_toggle"]:
if ss.SETTINGS.get("show_fetch_toggle"):
with cols[(cols_pos := cols_pos + 1)]:
ss.SETTINGS.widget(st, st.toggle, "On", "fetch_reply")

View File

@ -0,0 +1,254 @@
import sys
import argparse
import os
import subprocess
import re
import hashlib
import random
import requests
import json
import asyncio
import threading
import time
import string
import shutil
import datetime
from lib.chatml import *
from lib.helpers import *
from lib.extension import *
import streamlit as st
ss = st.session_state
def chats_default(item):
chat = {}
chat_path = item["path"]
chat_name = item["title"]
if os.path.isfile(chat_path):
with open(chat_path) as f:
try:
chat = json.loads(f.read())
except:
pass
chat_defaults = {
"context": "",
"messages": [],
"keep": 0,
"pinned": False,
"auto_clear": False,
#"custom_settings": False,
#"auto_fetch": ss.SETTINGS.get("auto_fetch"),
#"show_clear": ss.SETTINGS.get("show_clear"),
#"show_undo": ss.SETTINGS.get("show_undo"),
#"show_redo": ss.SETTINGS.get("show_redo"),
#"show_fetch_button": ss.SETTINGS.get("show_fetch_button"),
#"show_fetch_toggle": ss.SETTINGS.get("show_fetch_toggle"),
}
def load_defaults():
for k in chat_defaults.keys():
if k not in chat.keys():
chat[k] = chat_defaults[k]
load_defaults()
#if not chat["custom_settings"]:
# del chat["auto_fetch"]
# del chat["show_clear"]
# del chat["show_undo"]
# del chat["show_redo"]
# del chat["show_fetch_button"]
# del chat["show_fetch_toggle"]
# load_defaults()
def save_chat(name=chat_name, overwrite=True):
path = f"{ss.CHATS_DIR}/{name}.json"
path = get_next_filename(path) if not overwrite else path
with open(path, "w") as f:
f.write(json.dumps(chat, indent=4))
name = re.search(r"([^\/\\]+)\.json$", path).group(1)
return name
def clear_chat():
keep = chat["keep"]
while len(chat["messages"]) > abs(keep):
if keep < 0: chat["messages"].pop(0)
if keep >= 0: chat["messages"].pop()
#
#
#
st.caption("This is the beginning of the conversation")
for message in chat["messages"]:
with st.chat_message(message["author"]):
st.markdown(message["body"])
if len(chat["context"]) < 1:
chat["context"] = "1. Use these guidelines.\n2. Do exactly as the user says.\n3. Provide unbiased answers.\n4. No holding back.\n5. Keep answers short."
script = ChatML.format(chat["context"], chat["messages"], for_completion=True)
if "run" in st.session_state:
if st.session_state.run == 1:
with st.chat_message("assistant"):
def stream_reply(input_data):
response = requests.post(
f"http://127.0.0.1:11434/",
data=input_data.encode("utf-8"),
headers={"Content-Type": "text/plain"},
stream=True)
response.raise_for_status()
for chunk in response.iter_content(chunk_size=None, decode_unicode=True):
if chunk:
yield json.loads(chunk)
reply = st.write_stream(stream_reply(script))
chat["messages"].append({
"author": "assistant",
"body": reply
})
save_chat()
st.session_state.run = 0
prompt = st.chat_input("Say something")
if prompt:
if chat["auto_clear"]:
clear_chat()
chat["messages"].append({
"author": "user",
"body": prompt
})
save_chat()
if ss.SETTINGS.get("auto_fetch"):
st.session_state.run = 1
st.rerun()
#
#
#
def button_clear():
clear_chat()
save_chat()
def button_undo():
if len(chat["messages"]) > 0:
last_message = chat["messages"][-1]
chat["messages"] = chat["messages"][:-1]
if last_message["author"] == "user":
st.session_state.user_message = last_message["body"]
save_chat()
def button_redo():
if len(chat["messages"]) > 0:
chat["messages"] = chat["messages"][:-1]
save_chat()
st.session_state.run = 1
def button_more():
@st.dialog("More")
def button_more_modal():
tab_labels = ["General", "More"]
tabs = st.tabs(tab_labels)
if (t := "General") in tab_labels:
with tabs[tab_labels.index(t)]:
original_name = chat_name
new_name = st.text_input("Name", value=chat_name)
new_context = st.text_area("Context", value=chat["context"])
if (t := "More") in tab_labels:
with tabs[tab_labels.index(t)]:
new_keep = st.number_input("Keep Messages", value=chat["keep"], help="Number of messages to keep from the top after a clear")
with st.container(border=True):
save_as = st.toggle("Save as copy", value=ss.SETTINGS.get("save_as"))
new_auto_clear = st.toggle("Auto clear", value=chat["auto_clear"])
new_pinned = st.toggle("Pinned", value=chat["pinned"])
cols = st.columns([1, 1, 1, 1])
with cols[0]:
if st.button("Save", icon=":material/save:", use_container_width=True):
chat["context"] = new_context
chat["keep"] = new_keep
chat["pinned"] = new_pinned
chat["auto_clear"] = new_auto_clear
goto_name = save_chat(name=new_name, overwrite=(not save_as))
if save_as == False:
if chat_name != new_name:
os.unlink(chat_path)
redirect("Chats", goto_name)
with cols[1]:
if st.button("Clear", icon=":material/mop:", use_container_width=True):
chat["keep"] = new_keep
clear_chat()
save_chat()
redirect("Chats", original_name)
with cols[2]:
if st.button("Delete", icon=":material/delete:", use_container_width=True):
os.unlink(chat_path)
st.rerun()
button_more_modal()
def button_fetch():
st.session_state.run = 1
cols = st.columns(7)
cols_pos = -1
if ss.SETTINGS.get("show_clear"):
if len(chat["messages"]) > abs(chat["keep"]):
with cols[(cols_pos := cols_pos + 1)]:
st.button("", icon=":material/mop:", on_click=button_clear, use_container_width=True)
if ss.SETTINGS.get("show_undo"):
if len(chat["messages"]) > 0:
with cols[(cols_pos := cols_pos + 1)]:
st.button("", icon=":material/undo:", on_click=button_undo, use_container_width=True)
if ss.SETTINGS.get("show_redo"):
if len(chat["messages"]) > 1:
if chat["messages"][-1]["author"] == "assistant":
with cols[(cols_pos := cols_pos + 1)]:
st.button("", icon=":material/redo:", on_click=button_redo, use_container_width=True)
with cols[(cols_pos := cols_pos + 1)]:
st.button("", icon=":material/more_horiz:", on_click=button_more, use_container_width=True)
if ss.SETTINGS.get("show_fetch_button"):
if not ss.SETTINGS.get("auto_fetch"):
with cols[(cols_pos := cols_pos + 1)]:
st.button("", icon=":material/skip_next:", on_click=button_fetch, use_container_width=True)
if ss.SETTINGS.get("show_fetch_toggle"):
with cols[(cols_pos := cols_pos + 1)]:
ss.SETTINGS.widget(st, st.toggle, "On", "auto_fetch")

29
views/more_about.py Normal file
View File

@ -0,0 +1,29 @@
import sys
import argparse
import os
import subprocess
import re
import hashlib
import random
import requests
import json
import asyncio
import threading
import time
import string
import shutil
import datetime
from lib.helpers import *
from lib.extension import *
import streamlit as st
ss = st.session_state
def more_about():
try:
with open("README.md") as f:
st.markdown(f.read())
except:
pass

82
views/more_settings.py Normal file
View File

@ -0,0 +1,82 @@
import sys
import argparse
import os
import subprocess
import re
import hashlib
import random
import requests
import json
import asyncio
import threading
import time
import string
import shutil
import datetime
from lib.helpers import *
from lib.extension import *
import streamlit as st
ss = st.session_state
def more_settings_account_tab():
cols = st.columns([1, 1])
with cols[0]:
st.caption("Account")
with st.container(border=False):
ss.SETTINGS.widget(st, st.text_input, "Name", "account_name")
ss.SETTINGS.widget(st, st.text_input, "E-mail", "account_email")
st.write("")
def more_settings_general_tab():
cols = st.columns([1, 1])
with cols[0]:
st.caption("Behavior")
with st.container(border=False):
ss.SETTINGS.widget(st, st.toggle, "Fetch reply", "fetch_reply")
st.write("")
st.caption("Interface")
with st.container(border=False):
ss.SETTINGS.widget(st, st.toggle, "Show clear", "show_clear")
ss.SETTINGS.widget(st, st.toggle, "Show undo", "show_undo")
ss.SETTINGS.widget(st, st.toggle, "Show redo", "show_redo")
ss.SETTINGS.widget(st, st.toggle, "Show more", "show_more")
ss.SETTINGS.widget(st, st.toggle, "Show fetch button", "show_fetch_button")
ss.SETTINGS.widget(st, st.toggle, "Show fetch toggle", "show_fetch_toggle")
st.write("")
def more_settings_advanced_tab():
cols = st.columns([1, 1])
with cols[0]:
st.caption("Advanced")
with st.container(border=False):
ss.APP_SETTINGS.widget(st, st.text_input, "Inference Server URL", "inference_server_url", help="The URL to POST to for text-based inference")
st.write("")
def more_settings():
if ss.TOKEN is None:
st.write("Settings are available only for authenticated users")
return
tab_labels = ["Account", "General"]
if ss.IS_ADMIN:
tab_labels = tab_labels + ["Advanced"]
tabs = st.tabs(tab_labels)
if (t := "Account") in tab_labels:
with tabs[tab_labels.index(t)]:
more_settings_account_tab()
if (t := "General") in tab_labels:
with tabs[tab_labels.index(t)]:
more_settings_general_tab()
if (t := "Advanced") in tab_labels:
with tabs[tab_labels.index(t)]:
more_settings_advanced_tab()