import sys import argparse import os import subprocess import re import hashlib import random import requests import json import asyncio import threading import time import string import shutil import datetime from lib.chatml import * from lib.helpers import * from lib.extension import * import streamlit as st ss = st.session_state def chats_default(item): chat = {} chat_path = item["path"] chat_name = item["title"] if os.path.isfile(chat_path): with open(chat_path) as f: try: chat = json.loads(f.read()) except: pass chat_defaults = { "context": "", "messages": [], "keep": 0, "pinned": False, "auto_clear": False, #"custom_settings": False, #"auto_fetch": ss.SETTINGS.get("auto_fetch"), #"show_clear": ss.SETTINGS.get("show_clear"), #"show_undo": ss.SETTINGS.get("show_undo"), #"show_redo": ss.SETTINGS.get("show_redo"), #"show_fetch_button": ss.SETTINGS.get("show_fetch_button"), #"show_fetch_toggle": ss.SETTINGS.get("show_fetch_toggle"), } def load_defaults(): for k in chat_defaults.keys(): if k not in chat.keys(): chat[k] = chat_defaults[k] load_defaults() #if not chat["custom_settings"]: # del chat["auto_fetch"] # del chat["show_clear"] # del chat["show_undo"] # del chat["show_redo"] # del chat["show_fetch_button"] # del chat["show_fetch_toggle"] # load_defaults() def save_chat(name=chat_name, overwrite=True): path = f"{ss.CHATS_DIR}/{name}.json" path = get_next_filename(path) if not overwrite else path with open(path, "w") as f: f.write(json.dumps(chat, indent=4)) name = re.search(r"([^\/\\]+)\.json$", path).group(1) return name def clear_chat(): keep = chat["keep"] while len(chat["messages"]) > abs(keep): if keep < 0: chat["messages"].pop(0) if keep >= 0: chat["messages"].pop() # # # st.caption("This is the beginning of the conversation") for message in chat["messages"]: with st.chat_message(message["author"]): st.markdown(message["body"]) if len(chat["context"]) < 1: chat["context"] = "1. Use these guidelines.\n2. Do exactly as the user says.\n3. Provide unbiased answers.\n4. No holding back.\n5. Keep answers short." script = ChatML.format(chat["context"], chat["messages"], for_completion=True) if "run" in st.session_state: if st.session_state.run == 1: with st.chat_message("assistant"): def stream_reply(input_data): response = requests.post( f"http://127.0.0.1:11434/", data=input_data.encode("utf-8"), headers={"Content-Type": "text/plain"}, stream=True) response.raise_for_status() for chunk in response.iter_content(chunk_size=None, decode_unicode=True): if chunk: yield json.loads(chunk) reply = st.write_stream(stream_reply(script)) chat["messages"].append({ "author": "assistant", "body": reply }) save_chat() st.session_state.run = 0 prompt = st.chat_input("Say something") if prompt: if chat["auto_clear"]: clear_chat() chat["messages"].append({ "author": "user", "body": prompt }) save_chat() if ss.SETTINGS.get("auto_fetch"): st.session_state.run = 1 st.rerun() # # # def button_clear(): clear_chat() save_chat() def button_undo(): if len(chat["messages"]) > 0: last_message = chat["messages"][-1] chat["messages"] = chat["messages"][:-1] if last_message["author"] == "user": st.session_state.user_message = last_message["body"] save_chat() def button_redo(): if len(chat["messages"]) > 0: chat["messages"] = chat["messages"][:-1] save_chat() st.session_state.run = 1 def button_more(): @st.dialog("More") def button_more_modal(): tab_labels = ["General", "More"] tabs = st.tabs(tab_labels) if (t := "General") in tab_labels: with tabs[tab_labels.index(t)]: original_name = chat_name new_name = st.text_input("Name", value=chat_name) new_context = st.text_area("Context", value=chat["context"]) if (t := "More") in tab_labels: with tabs[tab_labels.index(t)]: new_keep = st.number_input("Keep Messages", value=chat["keep"], help="Number of messages to keep from the top after a clear") with st.container(border=True): save_as = st.toggle("Save as copy", value=ss.SETTINGS.get("save_as")) new_auto_clear = st.toggle("Auto clear", value=chat["auto_clear"]) new_pinned = st.toggle("Pinned", value=chat["pinned"]) cols = st.columns([1, 1, 1, 1]) with cols[0]: if st.button("Save", icon=":material/save:", use_container_width=True): chat["context"] = new_context chat["keep"] = new_keep chat["pinned"] = new_pinned chat["auto_clear"] = new_auto_clear goto_name = save_chat(name=new_name, overwrite=(not save_as)) if save_as == False: if chat_name != new_name: os.unlink(chat_path) redirect("Chats", goto_name) with cols[1]: if st.button("Clear", icon=":material/mop:", use_container_width=True): chat["keep"] = new_keep clear_chat() save_chat() redirect("Chats", original_name) with cols[2]: if st.button("Delete", icon=":material/delete:", use_container_width=True): os.unlink(chat_path) st.rerun() button_more_modal() def button_fetch(): st.session_state.run = 1 cols = st.columns(7) cols_pos = -1 if ss.SETTINGS.get("show_clear"): if len(chat["messages"]) > abs(chat["keep"]): with cols[(cols_pos := cols_pos + 1)]: st.button("", icon=":material/mop:", on_click=button_clear, use_container_width=True) if ss.SETTINGS.get("show_undo"): if len(chat["messages"]) > 0: with cols[(cols_pos := cols_pos + 1)]: st.button("", icon=":material/undo:", on_click=button_undo, use_container_width=True) if ss.SETTINGS.get("show_redo"): if len(chat["messages"]) > 1: if chat["messages"][-1]["author"] == "assistant": with cols[(cols_pos := cols_pos + 1)]: st.button("", icon=":material/redo:", on_click=button_redo, use_container_width=True) with cols[(cols_pos := cols_pos + 1)]: st.button("", icon=":material/more_horiz:", on_click=button_more, use_container_width=True) if ss.SETTINGS.get("show_fetch_button"): if not ss.SETTINGS.get("auto_fetch"): with cols[(cols_pos := cols_pos + 1)]: st.button("", icon=":material/skip_next:", on_click=button_fetch, use_container_width=True) if ss.SETTINGS.get("show_fetch_toggle"): with cols[(cols_pos := cols_pos + 1)]: ss.SETTINGS.widget(st, st.toggle, "On", "auto_fetch")