openmcp-document/zh/plugin-tutorial/examples/python-rag_memo-stdio.md
2025-07-20 20:24:43 +08:00

383 lines
13 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Python 实现基于 RAG 的记忆存储 MCP 服务器
[本期教程的代码](https://github.com/Dormiveglia-elf/rag_memo_mcp)
## 前言
本篇教程,我们将演示如何使用 Python 构建一个简易的基于 RAG (Retrieval-Augmented Generation) 的长期记忆存储 MCP 服务器, 并通过 [openmcp](https://github.com/LSTM-Kirigaya/openmcp-client) 插件进行调试。 实现完成后,我们能够通过与大模型进行自然语言交互,轻松地存储、检索和管理我们的记忆,而无需编写任何特定的查询代码。
## 1. 准备
项目结构如下:
```bash
📦rag_memo_mcp
┣ 📂memory_db/ # LanceDB 数据库文件,初始化时会创建
┣ 📜server.py # MCP 服务器实现
┣ 📜pyproject.toml # 项目配置文件
┣ 📜uv.lock # uv lockfile
┗ ...
```
首先,我们来准备运行环境。本项目推荐使用 [uv](https://github.com/astral-sh/uv)。(`uv` 是一个速度快得飞起的 Python 包管理器,用过都说好。当然,如果你是 `pip` 或者其他包管理器的忠实粉丝,也完全没问题)
```bash
# 首先下载 uv (Windows)
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"
# 或者 (macOS/Linux)
# curl -LsSf https://astral.sh/uv/install.sh | sh
```
```bash
# 项目初始化
uv init rag_memo_mcp
cd rag_memo_mcp
# 建议创建一个虚拟环境
uv venv
# 激活虚拟环境 (Windows)
.venv\Scripts\activate
# 或者 (macOS/Linux)
# source .venv/bin/activate
# 安装依赖
uv add "mcp[cli]" lancedb pandas sentence-transformers
```
## 2. 理解服务实现
与需要预先安装和配置的传统数据库不同,本项目的核心 `MemoryStore` 使用 [LanceDB](https://lancedb.github.io/),这是一个向量数据库,它会在服务器首次启动时自动在 `memory_db` 目录下创建并初始化,无需额外配置。
让我们深入 `server.py` 来理解其实现细节。
### 2.1 MemoryStore 核心类
`MemoryStore` 类是记忆存储和检索功能的核心。
```python
class MemoryStore:
```
- **`initialize()`**: 这个方法负责初始化。它会连接到 LanceDB 数据库(如果不存在则创建),定义记忆表的 schema并默认加载 `all-MiniLM-L6-v2` 用于将文本内容生成向量嵌入。
```python
def __init__(self, db_path: str = "./memory_db"):
self.db_path = db_path
self.db = None
self.table = None
self.encoder = None
self._initialized = False
async def initialize(self):
if self._initialized:
return
self.encoder = SentenceTransformer("all-MiniLM-L6-v2")
self.db = lancedb.connect(self.db_path)
schema = pa.schema(
[
pa.field("id", pa.string()),
pa.field("content", pa.string()),
pa.field("summary", pa.string()),
pa.field("tags", pa.list_(pa.string())),
pa.field("timestamp", pa.timestamp("us")),
pa.field("category", pa.string()),
pa.field("importance", pa.int32()),
pa.field(
"vector", pa.list_(pa.float32(), 384)
),
]
)
try:
self.table = self.db.open_table("memories")
except Exception:
self.table = self.db.create_table("memories", schema=schema)
self._initialized = True
```
- **`store_memory()`**: 当需要存储一条新记忆时此方法会被调用。它会为记忆内容生成一个唯一的ID和时间戳如果未提供摘要则自动生成一个简单的摘要然后使用预加载的模型将内容转换为向量最后将所有信息ID, 内容, 摘要, 标签, 时间戳, 类别, 重要性, 向量)存入 LanceDB 表中。
```python
async def store_memory(
self,
content: str,
summary: Optional[str] = None,
tags: Optional[List[str]] = None,
category: str = "general",
importance: int = 5,
) -> str:
await self.initialize()
memory_id = str(uuid.uuid4())
timestamp = datetime.now(timezone.utc)
if not summary:
summary = content[:100] + "..." if len(content) > 100 else content
embedding = self._generate_embedding(content)
data = [
{
"id": memory_id,
"content": content,
"summary": summary,
"tags": tags or [],
"timestamp": timestamp,
"category": category,
"importance": importance,
"vector": embedding,
}
]
self.table.add(data)
return memory_id
```
- **`search_memories()`**: 这是实现 RAG 的关键。当提出一个查询时,此方法会将查询文本同样转换为向量,然后在 LanceDB 中执行向量相似度搜索,以找到最相关的记忆。它还支持按类别和重要性进行过滤。
```python
async def search_memories(
self,
query: str,
limit: int = 10,
category: Optional[str] = None,
min_importance: Optional[int] = None,
) -> List[Dict[str, Any]]:
await self.initialize()
query_embedding = self._generate_embedding(query)
search_query = self.table.search(query_embedding)
if limit:
search_query = search_query.limit(limit)
filters = []
if category:
filters.append(f"category = '{category}'")
if min_importance is not None:
filters.append(f"importance >= {min_importance}")
if filters:
filter_str = " AND ".join(filters)
search_query = search_query.where(filter_str)
results = search_query.to_pandas()
memories = []
for _, row in results.iterrows():
memory = {
"id": row["id"],
"content": row["content"],
"summary": row["summary"],
"tags": row["tags"].tolist(),
"timestamp": row["timestamp"],
"category": row["category"],
"importance": int(row["importance"]),
"similarity_score": row.get(
"_distance", 0.0
),
}
memories.append(memory)
return memories
```
### 2.2 MCP 服务器与工具
我们使用 `FastMCP` 来快速构建一个 MCP 服务器,并通过 `@mcp.tool()` 装饰器将 `MemoryStore` 的功能暴露为大模型可以调用的工具。
- **`store_memory`**: **记笔记!** 存储一条记忆。
- **`search_memories`**: **让我想想...** 根据查询内容搜索相关记忆。
- **`get_memory`**: **按图索骥!** 根据 ID 精确检索某条记忆。
- **`list_categories`**: **分门别类!** 列出所有记忆的分类。
- **`get_memory_stats`**: **记忆盘点!** 获取关于记忆库的统计信息,如总数、各分类数量等。
```python
# 初始化记忆存储
memory_store = MemoryStore()
# 创建 MCP 服务器
mcp = FastMCP("RAG-based Memory MCP Server")
@mcp.tool()
async def store_memory(
content: str,
summary: Optional[str] = None,
tags: Optional[str] = None,
category: str = "general",
importance: int = 5,
) -> Dict[str, str]:
"""
Store content in memory.
Args:
content: The content to store
summary: Optional summary (auto-generated if not provided)
tags: Comma-separated tags
category: Memory category (default: general)
importance: Importance level 1-10 (default: 5)
"""
try:
# Parse tags if provided
tag_list = [tag.strip() for tag in tags.split(",")] if tags else []
memory_id = await memory_store.store_memory(
content=content,
summary=summary,
tags=tag_list,
category=category,
importance=importance,
)
return {
"status": "success",
"memory_id": memory_id,
"message": f"Memory stored successfully with ID: {memory_id}",
}
except Exception as e:
return {"status": "error", "message": f"Failed to store memory: {str(e)}"}
@mcp.tool()
async def search_memories(
query: str,
limit: int = 10,
category: Optional[str] = None,
min_importance: Optional[int] = None,
) -> Dict[str, Any]:
"""
Search stored memories using semantic similarity.
Args:
query: Search query
limit: Maximum number of results (default: 10)
category: Filter by category
min_importance: Minimum importance level
"""
try:
memories = await memory_store.search_memories(
query=query, limit=limit, category=category, min_importance=min_importance
)
return {
"status": "success",
"query": query,
"total_results": len(memories),
"memories": memories,
}
except Exception as e:
return {"status": "error", "message": f"Search failed: {str(e)}"}
@mcp.tool()
async def get_memory(memory_id: str) -> Dict[str, Any]:
"""
Retrieve a specific memory by its ID.
Args:
memory_id: The unique identifier of the memory
"""
try:
memory = await memory_store.get_memory_by_id(memory_id)
if memory:
return {"status": "success", "memory": memory}
else:
return {
"status": "error",
"message": f"Memory with ID {memory_id} not found",
}
except Exception as e:
return {"status": "error", "message": f"Failed to retrieve memory: {str(e)}"}
@mcp.tool()
async def list_categories() -> Dict[str, Any]:
try:
categories = await memory_store.list_categories()
return {"status": "success", "categories": categories}
except Exception as e:
return {"status": "error", "message": f"Failed to list categories: {str(e)}"}
@mcp.tool()
async def get_memory_stats() -> Dict[str, Any]:
try:
stats = await memory_store.get_stats()
return {"status": "success", "stats": stats}
except Exception as e:
return {"status": "error", "message": f"Failed to get stats: {str(e)}"}
```
服务器的启动代码位于 `server.py` 的末尾,它首先初始化 `MemoryStore`,然后运行 MCP 服务器。
```python
if __name__ == "__main__":
# 在启动时初始化记忆存储
async def init_memory():
await memory_store.initialize()
# 运行初始化
asyncio.run(init_memory())
# 运行 MCP 服务器
mcp.run()
```
## 3. 通过 [openmcp](https://github.com/LSTM-Kirigaya/openmcp-client) 来进行调试
### 3.1 添加工作区连接
接下来,我们通过 [openmcp](https://github.com/LSTM-Kirigaya/openmcp-client) 插件进行调试。首先测试是否能连接成功,这里选择 `stdio`,工作路径设置为项目所在的目录,然后点击 `Connect`。右边的日志栏里可以看到我们已经连接成功。
<div align=center>
<img src="https://raw.githubusercontent.com/Dormiveglia-elf/rag_memo_mcp/main/images/rag_memo_mcp_1.png" style="width: 80%;"/>
</div>
### 3.2 测试工具
连接成功后,让我们先测试一下工具是否工作正常。
1. **存个小秘密**: 新建一个 `Tool` 标签页,选择 `store_memory` 工具。例如我们输入:
- `content`: `小明的生日是 2025.6.18`
- `category`: `birthday`
- `importance`: `8`
点击 `Execute`,如果成功会返回存储的记忆 ID比如这里返回 `bcc30f6c-979c-46d1-b34a-cd1a09242106`
<div align=center>
<img src="https://raw.githubusercontent.com/Dormiveglia-elf/rag_memo_mcp/main/images/rag_memo_mcp_2.png" style="width: 80%;"/>
</div>
2. **根据 ID 精确检索某条记忆**
存储成功后,我们根据返回的记忆 ID `bcc30f6c-979c-46d1-b34a-cd1a09242106`,选择 `get_memory` 工具,测试是否能够从 `Lancedb` 里面检索出来。
<img src="https://raw.githubusercontent.com/Dormiveglia-elf/rag_memo_mcp/main/images/rag_memo_mcp_3.png" style="width: 80%;"/>
3. **列出目前的记忆分类**:
我们调用 `list_categories` 工具来查看当前所有记忆的分类。由于我们只添加了一个 `birthday` 分类的记忆,所以返回结果中应该只包含这个分类。
<div align=center>
<img src="https://raw.githubusercontent.com/Dormiveglia-elf/rag_memo_mcp/main/images/rag_memo_mcp_4.png" style="width: 80%;"/>
</div>
4. **获取记忆统计数据**:
接着,我们使用 `get_memory_stats` 工具来获取记忆库的统计信息,例如总共有多少条记忆,以及每个分类下的记忆数量。
<div align=center>
<img src="https://raw.githubusercontent.com/Dormiveglia-elf/rag_memo_mcp/main/images/rag_memo_mcp_5.png" style="width: 80%;"/>
</div>
### 3.3 大模型交互测试
上面我们"遗漏"了一个工具 `search_memories` 没有测试,其实是特意把它留给了大模型交互测试。进入交互测试页面(记得事先参照[连接大模型教程](https://kirigaya.cn/openmcp/zh/plugin-tutorial/usage/connect-llm.html)设置好大模型的 `api_key``base_url`),我们可以先把其他的工具都取消配备,只保留 `search_memories` 这一个工具:
<div align=center>
<img src="https://raw.githubusercontent.com/Dormiveglia-elf/rag_memo_mcp/main/images/rag_memo_mcp_6.png" style="width: 80%;"/>
</div>
然后,我们假装不经意地问一句:
<div align=center>
<img src="https://raw.githubusercontent.com/Dormiveglia-elf/rag_memo_mcp/main/images/rag_memo_mcp_7.png" style="width: 80%;"/>
</div>
好! 大模型成功帮助我召回了我的朋友小明的生日, Cheers!