242 lines
6.0 KiB
Python
242 lines
6.0 KiB
Python
import os
|
|
import signal
|
|
import re
|
|
import json
|
|
import time
|
|
import copy
|
|
import traceback
|
|
import subprocess
|
|
import shlex
|
|
import threading
|
|
import hashlib
|
|
|
|
from lib.helpers import *
|
|
from lib.extension import *
|
|
|
|
import streamlit as st
|
|
|
|
ss = st.session_state
|
|
|
|
# Load globals from the environment variables:
|
|
for x in os.environ:
|
|
globals()[x] = os.environ[x]
|
|
|
|
hide_deploy_button()
|
|
|
|
#
|
|
#
|
|
#
|
|
|
|
ss.CONFIG_DIR = "."
|
|
ss.TOKEN_LIST = []
|
|
|
|
mkdir("user")
|
|
|
|
for p in os.scandir("user"):
|
|
if os.path.isdir(p.path):
|
|
ss.TOKEN_LIST.append({
|
|
"token": p.name,
|
|
"create_time": os.path.getctime(p.path)
|
|
})
|
|
|
|
if len(ss.TOKEN_LIST) < 1:
|
|
token = generate_token()
|
|
token_path = f"user/{token}"
|
|
|
|
mkdir(token_path)
|
|
|
|
ss.TOKEN_LIST.append({
|
|
"token": token,
|
|
"create_time": os.path.getctime(token_path)
|
|
})
|
|
|
|
ss.TOKEN_LIST = sorted(ss.TOKEN_LIST, key=lambda o: sort_by_key(o, "create_time"), reverse=False)
|
|
ss.TOKEN = ss.TOKEN_LIST[0]["token"] if len(ss.TOKEN_LIST) == 1 else None
|
|
ss.TOKEN_COOKIE = get_cookie("token")
|
|
ss.TOKEN_COOKIE_2 = get_cookie("token_2")
|
|
|
|
# If an alternate token is provided, temporarily use that:
|
|
if isinstance(ss.TOKEN_COOKIE_2, str):
|
|
if len(ss.TOKEN_COOKIE_2) > 0:
|
|
ss.TOKEN_COOKIE = ss.TOKEN_COOKIE_2
|
|
|
|
if isinstance(ss.TOKEN_COOKIE, str):
|
|
if len(ss.TOKEN_COOKIE) > 0:
|
|
tokens = [i["token"] for i in ss.TOKEN_LIST]
|
|
if ss.TOKEN_COOKIE in tokens:
|
|
ss.TOKEN = ss.TOKEN_COOKIE
|
|
|
|
if ss.TOKEN is not None:
|
|
for i in ss.TOKEN_LIST:
|
|
if ss.TOKEN == i["token"]:
|
|
ss.CONFIG_DIR = f"user/{ss.TOKEN}"
|
|
break
|
|
|
|
ss.CHATS_DIR = f"{ss.CONFIG_DIR}/chats"
|
|
ss.SETTINGS = JsonFile(f"{ss.CONFIG_DIR}/settings.json", defaults=({
|
|
"fetch_reply": True,
|
|
"save_as": False,
|
|
"show_clear": False,
|
|
"show_undo": True,
|
|
"show_redo": True,
|
|
"show_more": True,
|
|
"show_fetch_button": True,
|
|
"show_fetch_toggle": True
|
|
} if ss.CONFIG_DIR != "." else {}))
|
|
|
|
ss.IS_ADMIN = False
|
|
if ss.TOKEN == ss.TOKEN_LIST[0]["token"]:
|
|
ss.IS_ADMIN = True
|
|
|
|
ss.APP_SETTINGS = JsonFile("settings.json", defaults={
|
|
"inference_server_url": "http://127.0.0.1:11434/"
|
|
})
|
|
|
|
#
|
|
#
|
|
#
|
|
|
|
ss.PAGE_REGISTRY = {}
|
|
ss.PAGE_REGISTRY_MAP = {}
|
|
|
|
def register_page(category, title, fn, **kwargs):
|
|
if category not in ss.PAGE_REGISTRY.keys():
|
|
ss.PAGE_REGISTRY[category] = []
|
|
|
|
if category not in ss.PAGE_REGISTRY_MAP.keys():
|
|
ss.PAGE_REGISTRY_MAP[category] = {}
|
|
|
|
pg = st.Page(fn, title=title, **kwargs)
|
|
|
|
if title not in ss.PAGE_REGISTRY_MAP[category].keys():
|
|
ss.PAGE_REGISTRY_MAP[category][title] = pg
|
|
|
|
ss.PAGE_REGISTRY[category].append(pg)
|
|
|
|
def register_pages(category, arr, fn, **kwargs):
|
|
for item in arr:
|
|
kwargs_copy = copy.copy(kwargs)
|
|
|
|
title = None
|
|
if isinstance(item, str): title = item
|
|
if isinstance(item, dict):
|
|
if "title" in item.keys():
|
|
title = item["title"]
|
|
|
|
if "icon" in item.keys():
|
|
kwargs_copy["icon"] = item["icon"]
|
|
|
|
fn_name = f"{category}_{title}"
|
|
fn_name = re.sub(r"\s+", "_", fn_name)
|
|
fn_name = re.sub(r"[^A-Za-z0-9_]", "", fn_name)
|
|
fn_name = fn_name.lower()
|
|
|
|
def abc(item=item):
|
|
fn(item)
|
|
|
|
abc.__name__ = fn_name
|
|
|
|
print(f"Registering {fn_name}")
|
|
|
|
globals()[fn_name] = abc
|
|
register_page(category, title, abc, **kwargs_copy)
|
|
|
|
#
|
|
#
|
|
#
|
|
|
|
from views.chats_default import *
|
|
from views.more_about import *
|
|
from views.more_settings import *
|
|
from views.more_tokens import *
|
|
|
|
#
|
|
#
|
|
#
|
|
|
|
if ss.TOKEN is not None:
|
|
mkdir(ss.CHATS_DIR)
|
|
|
|
chats = []
|
|
for c in os.scandir(ss.CHATS_DIR):
|
|
if re.search(r"\.json$", c.name):
|
|
chat_name = re.sub(r"\.json$", "", c.name)
|
|
obj = {
|
|
"title": chat_name,
|
|
"path": c.path,
|
|
"modify_time": os.path.getmtime(c.path),
|
|
"pinned": False
|
|
}
|
|
|
|
with open(c.path) as f:
|
|
chat = json.loads(f.read())
|
|
|
|
if "pinned" in chat.keys():
|
|
if chat["pinned"]:
|
|
obj["pinned"] = True
|
|
obj["icon"] = ":material/keep:"
|
|
|
|
chats.append(obj)
|
|
|
|
chats = sorted(chats, key=lambda chat: sort_by_key(chat, "modify_time"), reverse=True)
|
|
chats = sorted(chats, key=lambda chat: sort_by_key(chat, "pinned"), reverse=True)
|
|
|
|
if len(chats) < 1:
|
|
chats.append({
|
|
"title": "Untitled",
|
|
"path": f"{ss.CHATS_DIR}/Untitled.json",
|
|
"modify_time": 0,
|
|
"pinned": False
|
|
})
|
|
|
|
register_pages("Chats", chats, chats_default, icon=":material/chat:")
|
|
|
|
#
|
|
#
|
|
#
|
|
|
|
register_page("More", "About", more_about, icon=":material/info:")
|
|
register_page("More", "Settings", more_settings, icon=":material/settings:")
|
|
|
|
if ss.IS_ADMIN:
|
|
register_page("More", "Tokens", more_tokens, icon=":material/key:")
|
|
|
|
#
|
|
#
|
|
#
|
|
|
|
if ss.TOKEN is None:
|
|
st.sidebar.warning("A valid API token is required to use this software.")
|
|
|
|
token = st.sidebar.text_input("Token", help="Provide a valid token here")
|
|
|
|
if st.sidebar.button("Login", icon=":material/login:", use_container_width=True):
|
|
set_cookie("token", token)
|
|
page_redirect("/")
|
|
|
|
else:
|
|
if len(ss.TOKEN_LIST) > 1:
|
|
|
|
if isinstance(ss.TOKEN_COOKIE_2, str) and len(ss.TOKEN_COOKIE_2) > 0:
|
|
if st.sidebar.button("Back", icon=":material/arrow_back:", use_container_width=True):
|
|
set_cookie("token_2", None)
|
|
page_redirect("/")
|
|
else:
|
|
if st.sidebar.button("Logout", icon=":material/logout:", use_container_width=True):
|
|
set_cookie("token", None)
|
|
page_redirect("/")
|
|
|
|
|
|
|
|
|
|
|
|
st.sidebar.caption("""
|
|
mllm-streamlit v1.0.0
|
|
""")
|
|
|
|
# Only attempt to handle redirect after all page objects exist:
|
|
handle_redirect()
|
|
|
|
pg = st.navigation(ss.PAGE_REGISTRY)
|
|
pg.run()
|