Skip to content

Instantly share code, notes, and snippets.

@qingant
Created November 25, 2024 15:26
Show Gist options
  • Save qingant/e9a847e6a60d09babcc10c61642f3b93 to your computer and use it in GitHub Desktop.
Save qingant/e9a847e6a60d09babcc10c61642f3b93 to your computer and use it in GitHub Desktop.
from email import message
from math import log
from wcferry import Wcf, WxMsg
from queue import Empty
import os
import json
import logging
import time
from front.context import Context
from engine.route import ai_engine
import threading
def get_recent_files(directory, limit=100):
"""
Get the most recent `limit` files from the specified directory.
:param directory: The directory to search for files.
:param limit: The maximum number of recent files to return.
:return: A list of the most recent files.
"""
if not os.path.exists(directory):
return []
# Get all files in the directory
files = [os.path.join(directory, f) for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))]
# Sort files by modification time in descending order
files.sort(key=lambda x: os.path.getmtime(x), reverse=True)
# Return the most recent `limit` files
return reversed(files[:limit])
class RoomManager:
path = os.path.join('.', 'rooms')
def __init__(self):
if not os.path.exists(self.path):
os.makedirs(self.path)
def get_messages(self, room):
'''
Get messages from old to new in `self.path/room/`
'''
room_path = os.path.join(self.path, room)
files = [f for f in get_recent_files(room_path, 20) if f.endswith('.json')]
return [
json.load(open(f))
for f in files
]
def add_message(self, room, message):
path = os.path.join(self.path, room)
if not os.path.exists(path):
os.makedirs(path)
file_path = os.path.join(path, f'{message["id"]}.json')
with open(file_path, 'a') as f:
f.write(json.dumps(message) + '\n')
wcf = Wcf()
is_login = wcf.is_login()
print("is login:" + str(is_login))
wcf.enable_receiving_msg()
def send_message(to, message):
wcf.send_text(message, to)
rm = RoomManager()
def build_context(msg: WxMsg) -> list[Context]:
history = rm.get_messages(msg.roomid)
channel_context = [
{
'text': message['content'] or json.dumps([{k: v for k, v in b.items() if k in ('type', 'file')} for b in message.get('blocks')]),
'role': 'assistant' if message['_is_self'] else 'user',
'user_id': message.get('sender'),
'user_name': message.get('sender'),
}
for message in history
]
context = {
'thread_context': [],
'channel_context': channel_context,
'source': 'dm',
'session_id': msg.roomid,
'user_id': msg.sender,
'user_name': msg.sender,
'client_message_id': msg.id,
'message': msg.content,
}
return context
lock = threading.Lock()
def loop():
while wcf.is_receiving_msg():
try:
msg: WxMsg = wcf.get_msg()
def send_message(message: str=None, blocks: list=None, msg=msg):
with lock:
text = message
rm.add_message(
room=msg.roomid,
message={
'id': str(time.time()),
'_is_self': True,
'user_name': 'NeoBot',
'content': text,
'blocks': blocks,
'sender': wcf.get_self_wxid()
})
ats = msg.sender if msg.from_group() else None
if message:
wcf.send_text(text, msg.roomid, ats)
else:
for b in blocks:
if b['type'] == 'image':
path = os.path.abspath(b['file'])
wcf.send_image(path, msg.roomid)
print(msg)
print("msg:" + msg.content)
print(msg.__dict__)
print(msg.from_group(), msg.sender)
rm.add_message(msg.roomid, msg.__dict__)
# wcf.send_text(msg.content, msg.sender)
mentioned = msg.from_group() and msg.is_at(wcf.get_self_wxid())
if (not msg.from_group()) or mentioned:
context = build_context(msg)
threading.Thread(target=lambda: ai_engine(
context=context,
send_message=send_message
)).start()
print('Started')
except Empty:
continue # Empty message
except Exception as e:
print(f"Receiving message error: {e}")
logging.exception(e)
if __name__ == "__main__":
# dbs = wcf.get_dbs()
# print(dbs)
# tables = wcf.get_tables('ChatMsg.db')
# print(tables)
# wcf.
loop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment