import sys import argparse import os import subprocess import re import hashlib import random import requests import json import asyncio import threading import time import string import shutil import datetime from lib.chatml import * from lib.helpers import * from lib.extension import * import streamlit as st ss = st.session_state DEFAULT_CONTEXT = "1. Use these guidelines.\n2. Do exactly as the user says.\n3. Provide unbiased answers.\n4. No holding back.\n5. Keep answers short.\n6. Do not offer assistance.\n7. Get straight to the point.\n6. Be direct.\n8. Nothing extra, make no excuses." def chats_default(item): chat = {} chat_path = item["path"] chat_name = item["title"] if os.path.isfile(chat_path): with open(chat_path) as f: try: chat = json.loads(f.read()) except: pass chat_defaults = { "context": "", "messages": [], "keep": 0, "pinned": False, "auto_clear": False, "auto_name": True, "hide_clear": False, "hide_undo": False, "hide_redo": False, "hide_fetch_button": False, "hide_fetch_toggle": False } # # # def load_defaults(): for k in chat_defaults.keys(): if k not in chat.keys(): chat[k] = chat_defaults[k] def save_chat(name=chat_name, overwrite=True): path = f"{ss.CHATS_DIR}/{name}.json" path = get_next_filename(path) if not overwrite else path with open(path, "w") as f: f.write(json.dumps(chat, indent=4)) name = re.search(r"([^\/\\]+)\.json$", path).group(1) return name def stream_llm_reply(raw_data): response = requests.post( ss.APP_SETTINGS.get("inference_server_url"), data=raw_data.encode("utf-8"), headers={"Content-Type": "text/plain"}, stream=True) response.raise_for_status() for chunk in response.iter_content(chunk_size=None, decode_unicode=True): if chunk: yield json.loads(chunk) def guess_chat_topic(input_chat): if len(input_chat["messages"]) < 1: return "Untitled" script = ChatML.format( DEFAULT_CONTEXT, [ {"author": "user", "body": input_chat["messages"][0]["body"]}, {"author": "user", "body": "Describe or summarize the previous message with ONE or TWO words:"} ], for_completion=True) reply = "".join(stream_llm_reply(script)) reply = re.sub(r"[^A-Za-z0-9 ]", "", reply) return reply def clear_chat(): keep = chat["keep"] while len(chat["messages"]) > abs(keep): if keep < 0: chat["messages"].pop(0) if keep >= 0: chat["messages"].pop() # # # load_defaults() st.caption("This is the beginning of the conversation") for message in chat["messages"]: with st.chat_message(message["author"]): st.markdown(message["body"]) if len(chat["context"]) < 1: chat["context"] = DEFAULT_CONTEXT script = ChatML.format(chat["context"], chat["messages"], for_completion=True) if "run" in st.session_state: if st.session_state.run == 1: with st.chat_message("assistant"): reply = st.write_stream(stream_llm_reply(script)) chat["messages"].append({ "author": "assistant", "body": reply }) save_chat() st.session_state.run = 0 prompt = st.chat_input("Say something") if prompt: if chat["auto_clear"]: clear_chat() chat["messages"].append({ "author": "user", "body": prompt }) save_chat() if ss.SETTINGS.get("fetch_reply"): st.session_state.run = 1 if len(chat["messages"]) == 1: if chat["auto_name"]: new_name = guess_chat_topic(chat) goto_name = save_chat(name=new_name, overwrite=False) if chat_name != new_name: os.unlink(chat_path) redirect("Chats", goto_name) st.rerun() # # # def button_clear(): clear_chat() save_chat() def button_undo(): if len(chat["messages"]) > 0: last_message = chat["messages"][-1] chat["messages"] = chat["messages"][:-1] if last_message["author"] == "user": st.session_state.user_message = last_message["body"] save_chat() def button_redo(): if len(chat["messages"]) > 0: chat["messages"] = chat["messages"][:-1] save_chat() st.session_state.run = 1 def button_more(): @st.dialog(chat_name) def button_more_modal(): tab_labels = ["General", "Advanced", "Interface"] tabs = st.tabs(tab_labels) save_button_group = None action_button_group = None if (t := "General") in tab_labels: with tabs[tab_labels.index(t)]: original_name = chat_name new_name = st.text_input("Name", value=chat_name) new_context = st.text_area("Context", value=chat["context"]) save_button_group = st.container() if (t := "Advanced") in tab_labels: with tabs[tab_labels.index(t)]: new_keep = st.number_input("Keep Messages", value=chat["keep"], help="Number of messages to keep from the top after a clear") with st.container(border=True): new_auto_clear = st.toggle("Auto clear", value=chat["auto_clear"]) new_auto_name = st.toggle("Auto name", value=chat["auto_name"]) new_pinned = st.toggle("Pinned", value=chat["pinned"]) action_button_group = st.container() if (t := "Interface") in tab_labels: with tabs[tab_labels.index(t)]: new_hide_clear = st.toggle("Hide clear", value=chat["hide_clear"]) new_hide_undo = st.toggle("Hide undo", value=chat["hide_undo"]) new_hide_redo = st.toggle("Hide redo", value=chat["hide_redo"]) new_hide_fetch_button = st.toggle("Hide fetch button", value=chat["hide_fetch_button"]) new_hide_fetch_toggle = st.toggle("Hide fetch toggle", value=chat["hide_fetch_toggle"]) with action_button_group: cols = st.columns([1, 1, 1]) with cols[0]: if st.button("Clear", icon=":material/mop:", use_container_width=True): chat["keep"] = new_keep clear_chat() save_chat() redirect("Chats", original_name) with cols[1]: if st.button("Delete", icon=":material/delete:", use_container_width=True): os.unlink(chat_path) st.rerun() with save_button_group: cols = st.columns([1, 1, 1]) def save_common(): chat["context"] = new_context chat["keep"] = new_keep chat["pinned"] = new_pinned chat["auto_clear"] = new_auto_clear chat["auto_name"] = new_auto_name chat["hide_clear"] = new_hide_clear chat["hide_undo"] = new_hide_undo chat["hide_redo"] = new_hide_redo chat["hide_fetch_button"] = new_hide_fetch_button chat["hide_fetch_toggle"] = new_hide_fetch_toggle with cols[0]: if st.button("Save", icon=":material/save:", use_container_width=True): save_common() goto_name = save_chat(name=new_name, overwrite=True) if chat_name != new_name: os.unlink(chat_path) redirect("Chats", goto_name) with cols[1]: if st.button("Copy", icon=":material/file_copy:", use_container_width=True): save_common() goto_name = save_chat(name=new_name, overwrite=False) redirect("Chats", goto_name) button_more_modal() def button_fetch(): st.session_state.run = 1 cols = st.columns(7) cols_pos = -1 if not chat["hide_clear"]: if ss.SETTINGS.get("show_clear"): if len(chat["messages"]) > abs(chat["keep"]): with cols[(cols_pos := cols_pos + 1)]: st.button("", icon=":material/mop:", on_click=button_clear, use_container_width=True) if not chat["hide_undo"]: if ss.SETTINGS.get("show_undo"): if len(chat["messages"]) > 0: with cols[(cols_pos := cols_pos + 1)]: st.button("", icon=":material/undo:", on_click=button_undo, use_container_width=True) if not chat["hide_redo"]: if ss.SETTINGS.get("show_redo"): if len(chat["messages"]) > 1: if chat["messages"][-1]["author"] == "assistant": with cols[(cols_pos := cols_pos + 1)]: st.button("", icon=":material/redo:", on_click=button_redo, use_container_width=True) if ss.SETTINGS.get("show_more"): with cols[(cols_pos := cols_pos + 1)]: st.button("", icon=":material/more_horiz:", on_click=button_more, use_container_width=True) if not chat["hide_fetch_button"]: if ss.SETTINGS.get("show_fetch_button"): with cols[(cols_pos := cols_pos + 1)]: st.button("", icon=":material/skip_next:", on_click=button_fetch, use_container_width=True) if not chat["hide_fetch_toggle"]: if ss.SETTINGS.get("show_fetch_toggle"): with cols[(cols_pos := cols_pos + 1)]: ss.SETTINGS.widget(st, st.toggle, "On", "fetch_reply")