openmcp-document/zh/plugin-tutorial/examples/python-rag_memo-stdio.md
2025-07-20 20:24:43 +08:00

13 KiB
Raw Permalink Blame History

Python 实现基于 RAG 的记忆存储 MCP 服务器

本期教程的代码

前言

本篇教程,我们将演示如何使用 Python 构建一个简易的基于 RAG (Retrieval-Augmented Generation) 的长期记忆存储 MCP 服务器, 并通过 openmcp 插件进行调试。 实现完成后,我们能够通过与大模型进行自然语言交互,轻松地存储、检索和管理我们的记忆,而无需编写任何特定的查询代码。

1. 准备

项目结构如下:

📦rag_memo_mcp
 ┣ 📂memory_db/      # LanceDB 数据库文件,初始化时会创建
 ┣ 📜server.py       # MCP 服务器实现
 ┣ 📜pyproject.toml  # 项目配置文件
 ┣ 📜uv.lock         # uv lockfile
 ┗ ...

首先,我们来准备运行环境。本项目推荐使用 uv。(uv 是一个速度快得飞起的 Python 包管理器,用过都说好。当然,如果你是 pip 或者其他包管理器的忠实粉丝,也完全没问题)

# 首先下载 uv (Windows)
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"
# 或者 (macOS/Linux)
# curl -LsSf https://astral.sh/uv/install.sh | sh
# 项目初始化
uv init rag_memo_mcp
cd rag_memo_mcp
# 建议创建一个虚拟环境
uv venv
# 激活虚拟环境 (Windows)
.venv\Scripts\activate
# 或者 (macOS/Linux)
# source .venv/bin/activate

# 安装依赖
uv add "mcp[cli]" lancedb pandas sentence-transformers

2. 理解服务实现

与需要预先安装和配置的传统数据库不同,本项目的核心 MemoryStore 使用 LanceDB,这是一个向量数据库,它会在服务器首次启动时自动在 memory_db 目录下创建并初始化,无需额外配置。

让我们深入 server.py 来理解其实现细节。

2.1 MemoryStore 核心类

MemoryStore 类是记忆存储和检索功能的核心。

class MemoryStore:
  • initialize(): 这个方法负责初始化。它会连接到 LanceDB 数据库(如果不存在则创建),定义记忆表的 schema并默认加载 all-MiniLM-L6-v2 用于将文本内容生成向量嵌入。
def __init__(self, db_path: str = "./memory_db"):
    self.db_path = db_path
    self.db = None
    self.table = None
    self.encoder = None
    self._initialized = False

async def initialize(self):
    if self._initialized:
        return

    self.encoder = SentenceTransformer("all-MiniLM-L6-v2")

    self.db = lancedb.connect(self.db_path)

    schema = pa.schema(
        [
            pa.field("id", pa.string()),
            pa.field("content", pa.string()),
            pa.field("summary", pa.string()),
            pa.field("tags", pa.list_(pa.string())),
            pa.field("timestamp", pa.timestamp("us")),
            pa.field("category", pa.string()),
            pa.field("importance", pa.int32()),
            pa.field(
                "vector", pa.list_(pa.float32(), 384)
            ),
        ]
    )

    try:
        self.table = self.db.open_table("memories")
    except Exception:
        self.table = self.db.create_table("memories", schema=schema)

    self._initialized = True
  • store_memory(): 当需要存储一条新记忆时此方法会被调用。它会为记忆内容生成一个唯一的ID和时间戳如果未提供摘要则自动生成一个简单的摘要然后使用预加载的模型将内容转换为向量最后将所有信息ID, 内容, 摘要, 标签, 时间戳, 类别, 重要性, 向量)存入 LanceDB 表中。
async def store_memory(
    self,
    content: str,
    summary: Optional[str] = None,
    tags: Optional[List[str]] = None,
    category: str = "general",
    importance: int = 5,
) -> str:
    await self.initialize()

    memory_id = str(uuid.uuid4())
    timestamp = datetime.now(timezone.utc)

    if not summary:
        summary = content[:100] + "..." if len(content) > 100 else content

    embedding = self._generate_embedding(content)

    data = [
        {
            "id": memory_id,
            "content": content,
            "summary": summary,
            "tags": tags or [],
            "timestamp": timestamp,
            "category": category,
            "importance": importance,
            "vector": embedding,
        }
    ]

    self.table.add(data)

    return memory_id
  • search_memories(): 这是实现 RAG 的关键。当提出一个查询时,此方法会将查询文本同样转换为向量,然后在 LanceDB 中执行向量相似度搜索,以找到最相关的记忆。它还支持按类别和重要性进行过滤。
async def search_memories(
    self,
    query: str,
    limit: int = 10,
    category: Optional[str] = None,
    min_importance: Optional[int] = None,
) -> List[Dict[str, Any]]:
    await self.initialize()
    query_embedding = self._generate_embedding(query)

    search_query = self.table.search(query_embedding)

    if limit:
        search_query = search_query.limit(limit)

    filters = []
    if category:
        filters.append(f"category = '{category}'")
    if min_importance is not None:
        filters.append(f"importance >= {min_importance}")

    if filters:
        filter_str = " AND ".join(filters)
        search_query = search_query.where(filter_str)

    results = search_query.to_pandas()

    memories = []
    for _, row in results.iterrows():
        memory = {
            "id": row["id"],
            "content": row["content"],
            "summary": row["summary"],
            "tags": row["tags"].tolist(),
            "timestamp": row["timestamp"],
            "category": row["category"],
            "importance": int(row["importance"]),
            "similarity_score": row.get(
                "_distance", 0.0
            ),
        }
        memories.append(memory)

    return memories

2.2 MCP 服务器与工具

我们使用 FastMCP 来快速构建一个 MCP 服务器,并通过 @mcp.tool() 装饰器将 MemoryStore 的功能暴露为大模型可以调用的工具。

  • store_memory: 记笔记! 存储一条记忆。
  • search_memories: 让我想想... 根据查询内容搜索相关记忆。
  • get_memory: 按图索骥! 根据 ID 精确检索某条记忆。
  • list_categories: 分门别类! 列出所有记忆的分类。
  • get_memory_stats: 记忆盘点! 获取关于记忆库的统计信息,如总数、各分类数量等。
# 初始化记忆存储
memory_store = MemoryStore()

# 创建 MCP 服务器
mcp = FastMCP("RAG-based Memory MCP Server")


@mcp.tool()
async def store_memory(
    content: str,
    summary: Optional[str] = None,
    tags: Optional[str] = None,
    category: str = "general",
    importance: int = 5,
) -> Dict[str, str]:
    """
    Store content in memory.

    Args:
        content: The content to store
        summary: Optional summary (auto-generated if not provided)
        tags: Comma-separated tags
        category: Memory category (default: general)
        importance: Importance level 1-10 (default: 5)
    """
    try:
        # Parse tags if provided
        tag_list = [tag.strip() for tag in tags.split(",")] if tags else []

        memory_id = await memory_store.store_memory(
            content=content,
            summary=summary,
            tags=tag_list,
            category=category,
            importance=importance,
        )

        return {
            "status": "success",
            "memory_id": memory_id,
            "message": f"Memory stored successfully with ID: {memory_id}",
        }
    except Exception as e:
        return {"status": "error", "message": f"Failed to store memory: {str(e)}"}


@mcp.tool()
async def search_memories(
    query: str,
    limit: int = 10,
    category: Optional[str] = None,
    min_importance: Optional[int] = None,
) -> Dict[str, Any]:
    """
    Search stored memories using semantic similarity.

    Args:
        query: Search query
        limit: Maximum number of results (default: 10)
        category: Filter by category
        min_importance: Minimum importance level
    """
    try:
        memories = await memory_store.search_memories(
            query=query, limit=limit, category=category, min_importance=min_importance
        )

        return {
            "status": "success",
            "query": query,
            "total_results": len(memories),
            "memories": memories,
        }
    except Exception as e:
        return {"status": "error", "message": f"Search failed: {str(e)}"}


@mcp.tool()
async def get_memory(memory_id: str) -> Dict[str, Any]:
    """
    Retrieve a specific memory by its ID.

    Args:
        memory_id: The unique identifier of the memory
    """
    try:
        memory = await memory_store.get_memory_by_id(memory_id)

        if memory:
            return {"status": "success", "memory": memory}
        else:
            return {
                "status": "error",
                "message": f"Memory with ID {memory_id} not found",
            }
    except Exception as e:
        return {"status": "error", "message": f"Failed to retrieve memory: {str(e)}"}


@mcp.tool()
async def list_categories() -> Dict[str, Any]:
    try:
        categories = await memory_store.list_categories()
        return {"status": "success", "categories": categories}
    except Exception as e:
        return {"status": "error", "message": f"Failed to list categories: {str(e)}"}


@mcp.tool()
async def get_memory_stats() -> Dict[str, Any]:
    try:
        stats = await memory_store.get_stats()
        return {"status": "success", "stats": stats}
    except Exception as e:
        return {"status": "error", "message": f"Failed to get stats: {str(e)}"}

服务器的启动代码位于 server.py 的末尾,它首先初始化 MemoryStore,然后运行 MCP 服务器。

if __name__ == "__main__":
    # 在启动时初始化记忆存储
    async def init_memory():
        await memory_store.initialize()

    # 运行初始化
    asyncio.run(init_memory())

    # 运行 MCP 服务器
    mcp.run()

3. 通过 openmcp 来进行调试

3.1 添加工作区连接

接下来,我们通过 openmcp 插件进行调试。首先测试是否能连接成功,这里选择 stdio,工作路径设置为项目所在的目录,然后点击 Connect。右边的日志栏里可以看到我们已经连接成功。

3.2 测试工具

连接成功后,让我们先测试一下工具是否工作正常。

  1. 存个小秘密: 新建一个 Tool 标签页,选择 store_memory 工具。例如我们输入:

    • content: 小明的生日是 2025.6.18
    • category: birthday
    • importance: 8

    点击 Execute,如果成功会返回存储的记忆 ID比如这里返回 bcc30f6c-979c-46d1-b34a-cd1a09242106

  1. 根据 ID 精确检索某条记忆 存储成功后,我们根据返回的记忆 ID bcc30f6c-979c-46d1-b34a-cd1a09242106,选择 get_memory 工具,测试是否能够从 Lancedb 里面检索出来。

  2. 列出目前的记忆分类: 我们调用 list_categories 工具来查看当前所有记忆的分类。由于我们只添加了一个 birthday 分类的记忆,所以返回结果中应该只包含这个分类。

  1. 获取记忆统计数据: 接着,我们使用 get_memory_stats 工具来获取记忆库的统计信息,例如总共有多少条记忆,以及每个分类下的记忆数量。

3.3 大模型交互测试

上面我们"遗漏"了一个工具 search_memories 没有测试,其实是特意把它留给了大模型交互测试。进入交互测试页面(记得事先参照连接大模型教程设置好大模型的 api_keybase_url),我们可以先把其他的工具都取消配备,只保留 search_memories 这一个工具:

然后,我们假装不经意地问一句:

好! 大模型成功帮助我召回了我的朋友小明的生日, Cheers!