Better admin "token hijacking" via table with checkbox

This commit is contained in:
Conner Harkness 2025-06-08 17:06:07 -06:00
parent 3e43233fb5
commit ff258d3d35
7 changed files with 71 additions and 275 deletions

45
app.py
View File

@ -50,9 +50,15 @@ if len(ss.TOKEN_LIST) < 1:
"create_time": os.path.getctime(token_path) "create_time": os.path.getctime(token_path)
}) })
ss.TOKEN_LIST = sorted(ss.TOKEN_LIST, key=lambda o: sort_by_key(o, "create_time"), reverse=False) ss.TOKEN_LIST = sorted(ss.TOKEN_LIST, key=lambda o: sort_by_key(o, "create_time"), reverse=False)
ss.TOKEN = ss.TOKEN_LIST[0]["token"] if len(ss.TOKEN_LIST) == 1 else None ss.TOKEN = ss.TOKEN_LIST[0]["token"] if len(ss.TOKEN_LIST) == 1 else None
ss.TOKEN_COOKIE = get_cookie("token") ss.TOKEN_COOKIE = get_cookie("token")
ss.TOKEN_COOKIE_2 = get_cookie("token_2")
# If an alternate token is provided, temporarily use that:
if isinstance(ss.TOKEN_COOKIE_2, str):
if len(ss.TOKEN_COOKIE_2) > 0:
ss.TOKEN_COOKIE = ss.TOKEN_COOKIE_2
if isinstance(ss.TOKEN_COOKIE, str): if isinstance(ss.TOKEN_COOKIE, str):
if len(ss.TOKEN_COOKIE) > 0: if len(ss.TOKEN_COOKIE) > 0:
@ -202,24 +208,31 @@ if ss.IS_ADMIN:
if ss.TOKEN is None: if ss.TOKEN is None:
st.sidebar.warning("A valid API token is required to use this software.") st.sidebar.warning("A valid API token is required to use this software.")
def save_token(): token = st.sidebar.text_input("Token", help="Provide a valid token here")
set_cookie("token", ss.new_token)
page_redirect()
token = st.sidebar.text_input( if st.sidebar.button("Login", icon=":material/login:", use_container_width=True):
"Token", set_cookie("token", token)
value=get_cookie("token"), page_redirect("/")
help="Provide a valid token here",
on_change=save_token,
key="new_token")
else: else:
if len(ss.TOKEN_LIST) > 1: if len(ss.TOKEN_LIST) > 1:
if st.sidebar.button("Logout", icon=":material/logout:", use_container_width=True):
set_cookie("token", None)
page_redirect()
st.sidebar.caption("mllm-lite by caharkness") if isinstance(ss.TOKEN_COOKIE_2, str) and len(ss.TOKEN_COOKIE_2) > 0:
if st.sidebar.button("Back", icon=":material/arrow_back:", use_container_width=True):
set_cookie("token_2", None)
page_redirect("/")
else:
if st.sidebar.button("Logout", icon=":material/logout:", use_container_width=True):
set_cookie("token", None)
page_redirect("/")
st.sidebar.caption("""
mllm-streamlit v1.0.0
""")
# Only attempt to handle redirect after all page objects exist: # Only attempt to handle redirect after all page objects exist:
handle_redirect() handle_redirect()

View File

@ -56,7 +56,8 @@ def set_cookie(k, v):
cookie.setSecure(True) cookie.setSecure(True)
cookie_store.setCookie(cookie, QUrl(f"http://127.0.0.1:{STREAMLIT_PORT}")) cookie_store.setCookie(cookie, QUrl(f"http://127.0.0.1:{STREAMLIT_PORT}"))
set_cookie("token", TOKEN) if TOKEN is not None:
set_cookie("token", TOKEN)
webview = QWebEngineView() webview = QWebEngineView()
webview.load(QUrl(f"http://127.0.0.1:{STREAMLIT_PORT}")) webview.load(QUrl(f"http://127.0.0.1:{STREAMLIT_PORT}"))

View File

@ -3,6 +3,7 @@ import os
import re import re
import random import random
import json import json
import datetime
from pathlib import Path from pathlib import Path
@ -36,10 +37,14 @@ def touch(file_path, default_content=""):
def generate_token(): def generate_token():
character_set = "0123456789abcdef" character_set = "0123456789abcdefghijklmnopqrstuvwxyz"
output_token = "" output_token = ""
for i in range(32): for i in range(32):
output_token = output_token + random.choice(character_set) output_token = output_token + random.choice(character_set)
timestamp = datetime.datetime.now().strftime("y%Ym%md%dh%Hm%Ms%Sms%f")
output_token = timestamp + output_token
return output_token return output_token

View File

@ -87,7 +87,7 @@ def chats_default(item):
st.markdown(message["body"]) st.markdown(message["body"])
if len(chat["context"]) < 1: if len(chat["context"]) < 1:
chat["context"] = "1. Use these guidelines.\n2. Do exactly as the user says.\n3. Provide unbiased answers.\n4. No holding back.\n5. Keep answers short." chat["context"] = "1. Use these guidelines.\n2. Do exactly as the user says.\n3. Provide unbiased answers.\n4. No holding back.\n5. Keep answers short.\n6. Do not offer assistance.\n7. Get straight to the point.\n6. Be direct.\n8. Nothing extra, make no excuses."
script = ChatML.format(chat["context"], chat["messages"], for_completion=True) script = ChatML.format(chat["context"], chat["messages"], for_completion=True)

View File

@ -1,254 +0,0 @@
import sys
import argparse
import os
import subprocess
import re
import hashlib
import random
import requests
import json
import asyncio
import threading
import time
import string
import shutil
import datetime
from lib.chatml import *
from lib.helpers import *
from lib.extension import *
import streamlit as st
ss = st.session_state
def chats_default(item):
chat = {}
chat_path = item["path"]
chat_name = item["title"]
if os.path.isfile(chat_path):
with open(chat_path) as f:
try:
chat = json.loads(f.read())
except:
pass
chat_defaults = {
"context": "",
"messages": [],
"keep": 0,
"pinned": False,
"auto_clear": False,
#"custom_settings": False,
#"auto_fetch": ss.SETTINGS.get("auto_fetch"),
#"show_clear": ss.SETTINGS.get("show_clear"),
#"show_undo": ss.SETTINGS.get("show_undo"),
#"show_redo": ss.SETTINGS.get("show_redo"),
#"show_fetch_button": ss.SETTINGS.get("show_fetch_button"),
#"show_fetch_toggle": ss.SETTINGS.get("show_fetch_toggle"),
}
def load_defaults():
for k in chat_defaults.keys():
if k not in chat.keys():
chat[k] = chat_defaults[k]
load_defaults()
#if not chat["custom_settings"]:
# del chat["auto_fetch"]
# del chat["show_clear"]
# del chat["show_undo"]
# del chat["show_redo"]
# del chat["show_fetch_button"]
# del chat["show_fetch_toggle"]
# load_defaults()
def save_chat(name=chat_name, overwrite=True):
path = f"{ss.CHATS_DIR}/{name}.json"
path = get_next_filename(path) if not overwrite else path
with open(path, "w") as f:
f.write(json.dumps(chat, indent=4))
name = re.search(r"([^\/\\]+)\.json$", path).group(1)
return name
def clear_chat():
keep = chat["keep"]
while len(chat["messages"]) > abs(keep):
if keep < 0: chat["messages"].pop(0)
if keep >= 0: chat["messages"].pop()
#
#
#
st.caption("This is the beginning of the conversation")
for message in chat["messages"]:
with st.chat_message(message["author"]):
st.markdown(message["body"])
if len(chat["context"]) < 1:
chat["context"] = "1. Use these guidelines.\n2. Do exactly as the user says.\n3. Provide unbiased answers.\n4. No holding back.\n5. Keep answers short."
script = ChatML.format(chat["context"], chat["messages"], for_completion=True)
if "run" in st.session_state:
if st.session_state.run == 1:
with st.chat_message("assistant"):
def stream_reply(input_data):
response = requests.post(
f"http://127.0.0.1:11434/",
data=input_data.encode("utf-8"),
headers={"Content-Type": "text/plain"},
stream=True)
response.raise_for_status()
for chunk in response.iter_content(chunk_size=None, decode_unicode=True):
if chunk:
yield json.loads(chunk)
reply = st.write_stream(stream_reply(script))
chat["messages"].append({
"author": "assistant",
"body": reply
})
save_chat()
st.session_state.run = 0
prompt = st.chat_input("Say something")
if prompt:
if chat["auto_clear"]:
clear_chat()
chat["messages"].append({
"author": "user",
"body": prompt
})
save_chat()
if ss.SETTINGS.get("auto_fetch"):
st.session_state.run = 1
st.rerun()
#
#
#
def button_clear():
clear_chat()
save_chat()
def button_undo():
if len(chat["messages"]) > 0:
last_message = chat["messages"][-1]
chat["messages"] = chat["messages"][:-1]
if last_message["author"] == "user":
st.session_state.user_message = last_message["body"]
save_chat()
def button_redo():
if len(chat["messages"]) > 0:
chat["messages"] = chat["messages"][:-1]
save_chat()
st.session_state.run = 1
def button_more():
@st.dialog("More")
def button_more_modal():
tab_labels = ["General", "More"]
tabs = st.tabs(tab_labels)
if (t := "General") in tab_labels:
with tabs[tab_labels.index(t)]:
original_name = chat_name
new_name = st.text_input("Name", value=chat_name)
new_context = st.text_area("Context", value=chat["context"])
if (t := "More") in tab_labels:
with tabs[tab_labels.index(t)]:
new_keep = st.number_input("Keep Messages", value=chat["keep"], help="Number of messages to keep from the top after a clear")
with st.container(border=True):
save_as = st.toggle("Save as copy", value=ss.SETTINGS.get("save_as"))
new_auto_clear = st.toggle("Auto clear", value=chat["auto_clear"])
new_pinned = st.toggle("Pinned", value=chat["pinned"])
cols = st.columns([1, 1, 1, 1])
with cols[0]:
if st.button("Save", icon=":material/save:", use_container_width=True):
chat["context"] = new_context
chat["keep"] = new_keep
chat["pinned"] = new_pinned
chat["auto_clear"] = new_auto_clear
goto_name = save_chat(name=new_name, overwrite=(not save_as))
if save_as == False:
if chat_name != new_name:
os.unlink(chat_path)
redirect("Chats", goto_name)
with cols[1]:
if st.button("Clear", icon=":material/mop:", use_container_width=True):
chat["keep"] = new_keep
clear_chat()
save_chat()
redirect("Chats", original_name)
with cols[2]:
if st.button("Delete", icon=":material/delete:", use_container_width=True):
os.unlink(chat_path)
st.rerun()
button_more_modal()
def button_fetch():
st.session_state.run = 1
cols = st.columns(7)
cols_pos = -1
if ss.SETTINGS.get("show_clear"):
if len(chat["messages"]) > abs(chat["keep"]):
with cols[(cols_pos := cols_pos + 1)]:
st.button("", icon=":material/mop:", on_click=button_clear, use_container_width=True)
if ss.SETTINGS.get("show_undo"):
if len(chat["messages"]) > 0:
with cols[(cols_pos := cols_pos + 1)]:
st.button("", icon=":material/undo:", on_click=button_undo, use_container_width=True)
if ss.SETTINGS.get("show_redo"):
if len(chat["messages"]) > 1:
if chat["messages"][-1]["author"] == "assistant":
with cols[(cols_pos := cols_pos + 1)]:
st.button("", icon=":material/redo:", on_click=button_redo, use_container_width=True)
with cols[(cols_pos := cols_pos + 1)]:
st.button("", icon=":material/more_horiz:", on_click=button_more, use_container_width=True)
if ss.SETTINGS.get("show_fetch_button"):
if not ss.SETTINGS.get("auto_fetch"):
with cols[(cols_pos := cols_pos + 1)]:
st.button("", icon=":material/skip_next:", on_click=button_fetch, use_container_width=True)
if ss.SETTINGS.get("show_fetch_toggle"):
with cols[(cols_pos := cols_pos + 1)]:
ss.SETTINGS.widget(st, st.toggle, "On", "auto_fetch")

View File

@ -59,7 +59,7 @@ def more_settings_advanced_tab():
def more_settings(): def more_settings():
if ss.TOKEN is None: if ss.TOKEN is None:
st.write("Settings are available only for authenticated users") st.write("Settings are available only for authenticated users.")
return return
tab_labels = ["Account", "General"] tab_labels = ["Account", "General"]

View File

@ -49,4 +49,35 @@ def more_tokens():
c = c + 1 c = c + 1
st.dataframe(tokens) def use_token():
x = 1
df = st.dataframe(tokens, on_select=use_token, selection_mode="single-row", use_container_width=True)
st.caption("Actions")
cols = st.columns([1, 1, 1, 1])
with cols[0]:
#token = st.text_input("Token", help="Provide a valid token here")
if st.button("Login", icon=":material/login:", use_container_width=True):
if len(df["selection"]["rows"]) > 0:
index = df["selection"]["rows"][0]
item = tokens[index]
new_token = item["token"]
#st.write(new_token)
set_cookie("token_2", new_token)
page_redirect("/")
with cols[1]:
if st.button("New", icon=":material/add:", use_container_width=True):
new_token = generate_token()
mkdir(f"user/{new_token}")
page_redirect()