334 lines
9.8 KiB
Python
334 lines
9.8 KiB
Python
import sys
|
|
|
|
sys.path.append(".")
|
|
sys.path.append("./lib")
|
|
|
|
import discord
|
|
import re
|
|
import requests
|
|
import datetime
|
|
import time
|
|
import re
|
|
import asyncio
|
|
import functools
|
|
import os
|
|
import json
|
|
import importlib
|
|
|
|
from llama_cpp import Llama
|
|
|
|
intents = discord.Intents(messages=True, guilds=True, message_content=True, reactions=True)
|
|
client = discord.Client(intents=intents)
|
|
session_times = {}
|
|
attention = {}
|
|
message_cache = {}
|
|
lock = False
|
|
|
|
praise = 0
|
|
|
|
print("Loading model...", end=" ")
|
|
|
|
model_settings_path = "model.json"
|
|
model_settings = {
|
|
"model_path": None,
|
|
"formatter": "chatml",
|
|
"n_gpu_layers": -1,
|
|
"n_ctx": 32768,
|
|
"n_threads": 8,
|
|
"max_tokens": 16384,
|
|
"stop": ["<|im_end|>", "</s>", "<|im_start|>"],
|
|
"repeat_penalty": 1.1,
|
|
"temperature": 0.75,
|
|
"default_context": "You are a nameless AI assistant with the programmed personality of Lain from the anime \"Serial Experiments Lain.\" You are to answer all of the user's questions as quickly and briefly as possible using advanced English and cryptic messaging. You are not to go into full length detail unless asked."
|
|
}
|
|
|
|
if not os.path.isfile(model_settings_path):
|
|
with open(model_settings_path, "w") as f:
|
|
f.write(json.dumps(model_settings, indent=4))
|
|
|
|
with open(model_settings_path) as f:
|
|
model_settings = json.loads(f.read())
|
|
|
|
if model_settings["model_path"] is None:
|
|
for f in os.scandir("."):
|
|
if re.search(r"\.gguf$", f.path):
|
|
model_settings["model_path"] = f.path
|
|
break
|
|
|
|
if model_settings["model_path"] is None:
|
|
raise Exception("No .gguf model was found in the program directory. Please specify a model's relative or absolute path using the generated model.json configuration file.")
|
|
|
|
formatter = importlib.import_module(model_settings["formatter"])
|
|
|
|
|
|
LLM = Llama(
|
|
model_path = model_settings["model_path"],
|
|
n_gpu_layers = model_settings["n_gpu_layers"],
|
|
n_ctx = model_settings["n_ctx"],
|
|
verbose = False,
|
|
n_threads = model_settings["n_threads"])
|
|
|
|
print("Loaded model {model_path}".format(model_path=model_settings["model_path"]))
|
|
|
|
|
|
|
|
|
|
class CommandExecutor:
|
|
async def process(command_list, text_input):
|
|
try:
|
|
text_input = re.sub(r"^[^\{]{1,}", "", text_input)
|
|
text_input = re.sub(r"[^\}]{1,}$", "", text_input)
|
|
json_blob = json.loads(text_input)
|
|
|
|
if "command" in json_blob.keys():
|
|
command_name = json_blob["command"]
|
|
if hasattr(command_list, command_name):
|
|
call_result = await getattr(command_list, command_name)(json_blob)
|
|
|
|
if call_result is not None:
|
|
print(call_result)
|
|
except ValueError as x:
|
|
pass
|
|
except Exception as x:
|
|
print(x)
|
|
pass
|
|
|
|
|
|
def get_response(text):
|
|
global lock
|
|
global model_settings
|
|
|
|
while lock == True:
|
|
time.sleep(0.1)
|
|
|
|
try:
|
|
lock = True
|
|
output = ""
|
|
|
|
response = LLM(
|
|
text,
|
|
max_tokens = model_settings["max_tokens"],
|
|
stop = model_settings["stop"],
|
|
echo = False,
|
|
repeat_penalty = model_settings["repeat_penalty"],
|
|
temperature = model_settings["temperature"],
|
|
stream = True)
|
|
|
|
# Stream a buffered response
|
|
for token in response:
|
|
token_text = token["choices"][0]["text"]
|
|
print(token_text, end="")
|
|
output = output + token_text
|
|
|
|
except:
|
|
pass
|
|
|
|
lock = False
|
|
return output
|
|
|
|
async def get_response_wrapper(text_in):
|
|
loop = asyncio.get_event_loop()
|
|
text_out = await loop.run_in_executor(None, functools.partial(get_response, text=text_in))
|
|
return text_out
|
|
|
|
async def get_message(channel, message_id):
|
|
if message_id in message_cache.keys():
|
|
return message_cache[message_id]
|
|
|
|
reference = await channel.fetch_message(message_id)
|
|
message_cache[message_id] = reference
|
|
return reference
|
|
|
|
|
|
async def y_or_n(user_input, question):
|
|
global formatter
|
|
|
|
context = "Analyze the conversation and answer the question as accurately as possible. Do not provide any commentary or extra help, you are programmed to respond with a Y or N."
|
|
|
|
messages = []
|
|
|
|
if isinstance(user_input, list):
|
|
for i in user_input:
|
|
messages.append(i)
|
|
|
|
if isinstance(user_input, str):
|
|
messages.append({"author": "user", "body": user_input})
|
|
|
|
messages.append({"author": "user", "body": question})
|
|
messages.append({"author": "user", "body": "Answer with Y or N only, no explanation is wanted."})
|
|
|
|
f_body = formatter.format(context, messages, for_completion=True)
|
|
f_resp = await get_response_wrapper(f_body)
|
|
|
|
if f_resp[0].lower() == "y":
|
|
return True
|
|
|
|
if f_resp[0].lower() == "n":
|
|
return False
|
|
|
|
raise Exception("Answer provided does not begin with Y or N.")
|
|
|
|
# When the Discord bot starts up successfully:
|
|
@client.event
|
|
async def on_ready():
|
|
print("READY")
|
|
|
|
# When the Discord bot sees a new message anywhere:
|
|
@client.event
|
|
async def on_message(msg):
|
|
global praise
|
|
|
|
if msg.author.id == client.user.id:
|
|
return
|
|
|
|
messages = []
|
|
msg_history = [message async for message in msg.channel.history(limit=50)]
|
|
msg_history.reverse()
|
|
|
|
for m in msg_history:
|
|
|
|
reference = None
|
|
if m.reference is not None:
|
|
reference = await get_message(msg.channel, m.reference.message_id)
|
|
|
|
# Ignore messages from other users:
|
|
if m.author.id not in [msg.author.id, client.user.id]:
|
|
continue
|
|
|
|
# Ignore bot's replies to other users:
|
|
if m.author.id == client.user.id:
|
|
if reference is None or reference.author.id != msg.author.id:
|
|
continue
|
|
|
|
now = datetime.datetime.now(datetime.timezone.utc)
|
|
then = m.created_at
|
|
age = now - then
|
|
age = age.total_seconds()
|
|
|
|
# Ignore messages older than 10 minutes:
|
|
if age > 10 * 60:
|
|
continue
|
|
|
|
if m.author.id == client.user.id:
|
|
messages.append({
|
|
"author": "assistant",
|
|
"body": m.content
|
|
})
|
|
continue
|
|
|
|
messages.append({
|
|
"author": "user",
|
|
"body": m.content
|
|
})
|
|
|
|
# Keep the message script short:
|
|
while len(messages) > 25:
|
|
del messages[0]
|
|
|
|
# Ensure the first message is always from the user:
|
|
while True:
|
|
if len(messages) > 0:
|
|
if messages[0]["author"] == "assistant":
|
|
del messages[0]
|
|
continue
|
|
break
|
|
|
|
# Begin processing the message:
|
|
scrubbed_message = msg.content
|
|
|
|
chl = msg.channel
|
|
user_name = msg.author.name
|
|
user_nickname = msg.author.display_name
|
|
user_discriminator = msg.author.discriminator
|
|
user_id = msg.author.id
|
|
|
|
paying_attention = False
|
|
bot_mentioned = False
|
|
|
|
guild_id = chl.guild.id
|
|
guild = client.get_guild(guild_id)
|
|
guild_name = guild.name
|
|
bot_member = guild.get_member(client.user.id)
|
|
bot_name = bot_member.display_name
|
|
session_name = f"{user_name}_{user_discriminator}"
|
|
|
|
if client.user.id in msg.raw_mentions:
|
|
paying_attention = True
|
|
bot_mentioned = True
|
|
exclusion = f"<@{client.user.id}>"
|
|
scrubbed_message = re.sub(exclusion, "", scrubbed_message)
|
|
scrubbed_message = scrubbed_message.strip()
|
|
|
|
bot_name_lower = bot_name.lower()
|
|
message_lower = scrubbed_message.lower()
|
|
|
|
if bot_name_lower in message_lower:
|
|
paying_attention = True
|
|
bot_mentioned = True
|
|
|
|
forget = False
|
|
|
|
if session_name in attention.keys():
|
|
time_last = attention[session_name]
|
|
time_diff = time.perf_counter() - time_last
|
|
if time_diff < 60 * 5:
|
|
paying_attention = True
|
|
else: forget = True
|
|
else: forget = True
|
|
|
|
if bot_mentioned:
|
|
attention[session_name] = time.perf_counter()
|
|
|
|
if paying_attention:
|
|
attention[session_name] = time.perf_counter()
|
|
|
|
context = model_settings["default_context"]
|
|
if chl.topic is not None:
|
|
context = chl.topic
|
|
|
|
if await y_or_n(context, "Does my previous message tell you to be forgetful?"):
|
|
while len(messages) > 1:
|
|
del messages[0]
|
|
|
|
"""
|
|
if await y_or_n([messages[-1]], "Am I asking you to do something you don't know how to do?"):
|
|
if os.path.isfile("commands.py"):
|
|
import commands
|
|
importlib.reload(commands)
|
|
|
|
command_list = commands.CommandList()
|
|
command_list.chl = chl
|
|
|
|
context = ""
|
|
with open("commands.py") as f:
|
|
context = f.read()
|
|
|
|
f_body = get_messages_as_text(context, [messages[-1]], for_completion=True)
|
|
f_resp = await get_response_wrapper(f_body)
|
|
|
|
await CommandExecutor.process(command_list, f_resp)
|
|
return
|
|
"""
|
|
|
|
print(f"{user_name}: {msg.content}")
|
|
print(f"{bot_name}: ", end="")
|
|
|
|
f_body = formatter.format(context, messages, for_completion=True)
|
|
f_resp = await get_response_wrapper(f_body)
|
|
|
|
print("")
|
|
|
|
if await y_or_n([messages[-1]], "Did I just praise you?"):
|
|
praise = praise + 1
|
|
await client.change_presence(activity=discord.CustomActivity(name=f"{praise} praises"))
|
|
|
|
await chl.send(f_resp, reference=msg)
|
|
|
|
if __name__ == "__main__":
|
|
# Read the token:
|
|
with open("token.txt") as f:
|
|
token = f.read()
|
|
|
|
# Start the Discord bot using the token:
|
|
client.run(token)
|