本次对话探讨了基于 wcferry
库的信息消息处理系统。主要功能包括:获取最近文件、管理聊天室信息、支持微信信息的发送接收等。系统通过自定义的消息结构体实现消息存储与处理,每条消息都以JSON格式存储,以支持快速检索和处理。改进建议主要集中在错误处理和性能优化方面。
from email import message
from math import log
from wcferry import Wcf, WxMsg
from queue import Empty
import os
import json
import logging
import time
from front.context import Context
from engine.route import ai_engine
import threading
def get_recent_files(directory, limit=100):
"""
Get the most recent `limit` files from the specified directory.
:param directory: The directory to search for files.
:param limit: The maximum number of recent files to return.
:return: A list of the most recent files.
"""
if not os.path.exists(directory):
return []
# Get all files in the directory
files = [os.path.join(directory, f) for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))]
# Sort files by modification time in descending order
files.sort(key=lambda x: os.path.getmtime(x), reverse=True)
# Return the most recent `limit` files
return reversed(files[:limit])
class RoomManager:
path = os.path.join('.', 'rooms')
def __init__(self):
if not os.path.exists(self.path):
os.makedirs(self.path)
def get_messages(self, room):
'''
Get messages from old to new in `self.path/room/`
'''
room_path = os.path.join(self.path, room)
files = [f for f in get_recent_files(room_path, 20) if f.endswith('.json')]
return [
json.load(open(f))
for f in files
]
def add_message(self, room, message):
path = os.path.join(self.path, room)
if not os.path.exists(path):
os.makedirs(path)
file_path = os.path.join(path, f'{message["id"]}.json')
with open(file_path, 'a') as f:
f.write(json.dumps(message) + '\n')
wcf = Wcf()
is_login = wcf.is_login()
print("is login:" + str(is_login))
wcf.enable_receiving_msg()
def send_message(to, message):
wcf.send_text(message, to)
rm = RoomManager()
def build_context(msg: WxMsg) -> list[Context]:
history = rm.get_messages(msg.roomid)
channel_context = [
{
'text': message['content'] or json.dumps([{k: v for k, v in b.items() if k in ('type', 'file')} for b in message.get('blocks')]),
'role': 'assistant' if message['_is_self'] else 'user',
'user_id': message.get('sender'),
'user_name': message.get('sender'),
}
for message in history
]
context = {
'thread_context': [],
'channel_context': channel_context,
'source': 'dm',
'session_id': msg.roomid,
'user_id': msg.sender,
'user_name': msg.sender,
'client_message_id': msg.id,
'message': msg.content,
}
return context
lock = threading.Lock()
def loop():
while wcf.is_receiving_msg():
try:
msg: WxMsg = wcf.get_msg()
def send_message(message: str=None, blocks: list=None, msg=msg):
with lock:
text = message
rm.add_message(
room=msg.roomid,
message={
'id': str(time.time()),
'_is_self': True,
'user_name': 'NeoBot',
'content': text,
'blocks': blocks,
'sender': wcf.get_self_wxid()
})
ats = msg.sender if msg.from_group() else None
if message:
wcf.send_text(text, msg.roomid, ats)
else:
for b in blocks:
if b['type'] == 'image':
path = os.path.abspath(b['file'])
wcf.send_image(path, msg.roomid)
print(msg)
print("msg:" + msg.content)
print(msg.__dict__)
print(msg.from_group(), msg.sender)
rm.add_message(msg.roomid, msg.__dict__)
# wcf.send_text(msg.content, msg.sender)
mentioned = msg.from_group() and msg.is_at(wcf.get_self_wxid())
if (not msg.from_group()) or mentioned:
context = build_context(msg)
threading.Thread(target=lambda: ai_engine(
context=context,
send_message=send_message
)).start()
print('Started')
except Empty:
continue # Empty message
except Exception as e:
print(f"Receiving message error: {e}")
logging.exception(e)
if __name__ == "__main__":
# dbs = wcf.get_dbs()
# print(dbs)
# tables = wcf.get_tables('ChatMsg.db')
# print(tables)
# wcf.
loop()
这段代码实现了一个基于 wcferry
库的微信消息处理系统,其中包括以下主要功能模块:
获取最近文件功能 (get_recent_files
)
- 提供一个工具方法,用于获取目录下最近修改的文件,支持限制返回文件数量。
- 使用文件修改时间排序。
房间消息管理器 (RoomManager
)
- 管理房间中的消息记录,支持添加消息和读取消息。
- 消息存储在指定目录中,格式为 JSON 文件。
与微信通讯的集成 (Wcf
)
- Wcf
用于和微信通信,支持消息发送和接收。
- 消息接收通过轮询实现,并支持对消息内容进行上下文处理。
上下文构建 (build_context
)
- 将历史消息和当前消息组合成统一的上下文,用于后续处理(例如 AI 分析)。
- 每条消息包括角色、内容、用户 ID 等信息。
主循环 (loop
)
- 持续监听微信消息,支持群聊消息的艾特检测和单聊处理。
- 使用多线程方式,将上下文传递给 AI 引擎(假设由 ai_engine
实现)。
AI 处理
- ai_engine
(从 engine.route
导入)处理上下文,并通过回调 send_message
发送响应。
消息的接收和存储
- 每条消息都会被存储到对应的房间目录中,文件名为消息 ID,格式为 JSON。
消息上下文构建
- 历史消息用于构建上下文,区分角色(用户/助手)以及消息内容。
群聊中艾特处理
- 如果是群聊消息且被艾特(is_at
),则触发 AI 引擎的响应逻辑。
消息发送
- 消息可以是纯文本,也可以包含图片块,调用 Wcf
的发送接口完成。
文件句柄未关闭
- 在读取 JSON 文件时,json.load(open(f))
可能导致文件句柄未关闭。建议使用 with open(f, 'r') as file
的形式。
异常处理增强
- 捕获异常的同时可以增加对特定错误(例如 JSON 解析失败)的处理。
轮询优化
- 当前消息接收使用简单轮询,可以考虑使用异步或长连接以降低资源占用。
日志记录
- 提供更详细的日志信息,如消息来源、上下文构建状态、AI 响应内容等,方便调试。
AI 引擎接口解耦
- 将 ai_engine
的接口进一步解耦,通过异步任务队列(如 Celery)处理上下文分析,提升扩展性。
这段代码实现了一个完整的微信消息管理系统,涵盖消息接收、存储、上下文处理和自动响应功能。代码结构清晰,但部分资源管理和异常处理可以进一步优化。如果需要更高效的消息监听,建议引入异步框架或改用事件驱动的方式。