From bca7775322b5cef97b6efb65f3129beefdb53c07 Mon Sep 17 00:00:00 2001 From: Zhenyu Pan <120090196@link.cuhk.edu.cn> Date: Sun, 20 Jul 2025 20:24:43 +0800 Subject: [PATCH] docs: add an example --- .../examples/python-rag_memo-stdio.md | 392 ++++++++++++++++++ .../examples/python-rag_memo-stdio.md | 382 +++++++++++++++++ 2 files changed, 774 insertions(+) create mode 100644 plugin-tutorial/examples/python-rag_memo-stdio.md create mode 100644 zh/plugin-tutorial/examples/python-rag_memo-stdio.md diff --git a/plugin-tutorial/examples/python-rag_memo-stdio.md b/plugin-tutorial/examples/python-rag_memo-stdio.md new file mode 100644 index 0000000..a9f96ae --- /dev/null +++ b/plugin-tutorial/examples/python-rag_memo-stdio.md @@ -0,0 +1,392 @@ +# Building a RAG-Based Memory Storage MCP Server in Python + +[Tutorial Code Repository](https://github.com/Dormiveglia-elf/rag_memo_mcp) + +## Introduction + +In this tutorial, we'll demonstrate how to build a simple RAG (Retrieval-Augmented Generation) based long-term memory storage MCP server using Python, and debug it using the [openmcp](https://github.com/LSTM-Kirigaya/openmcp-client) plugin. Once implemented, we'll be able to store, retrieve, and manage our memories through natural language interactions with large language models, without needing to write any specific query code. + +## 1. Setup + +The project structure is as follows: + +```bash +📦rag_memo_mcp + ┣ 📂memory_db/ # LanceDB database files, created during initialization + ┣ 📜server.py # MCP server implementation + ┣ 📜pyproject.toml # Project configuration file + ┣ 📜uv.lock # uv lockfile + ┗ ... +``` + +First, let's prepare the runtime environment. This project recommends using [uv](https://github.com/astral-sh/uv). (`uv` is a blazingly fast Python package manager that's beloved by those who use it. Of course, if you're a loyal fan of `pip` or other package managers, that works perfectly fine too.) + +```bash +# First download uv (Windows) +powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex" +# Or (macOS/Linux) +# curl -LsSf https://astral.sh/uv/install.sh | sh +``` + +```bash +# Project initialization +uv init rag_memo_mcp +cd rag_memo_mcp +# We recommend creating a virtual environment +uv venv +# Activate virtual environment (Windows) +.venv\Scripts\activate +# Or (macOS/Linux) +# source .venv/bin/activate + +# Install dependencies +uv add "mcp[cli]" lancedb pandas sentence-transformers +``` + +## 2. Understanding the Service Implementation + +Unlike traditional databases that require pre-installation and configuration, this project's core `MemoryStore` uses [LanceDB](https://lancedb.github.io/), a vector database that automatically creates and initializes itself in the `memory_db` directory when the server first starts, requiring no additional configuration. + +Let's dive into `server.py` to understand its implementation details. + +### 2.1 MemoryStore Core Class + +The `MemoryStore` class is the heart of memory storage and retrieval functionality. + +```python +class MemoryStore: +``` + +- **`initialize()`**: This method handles initialization. It connects to the LanceDB database (creating it if it doesn't exist), defines the memory table schema, and by default loads the `all-MiniLM-L6-v2` model for generating vector embeddings from text content. + +```python +def __init__(self, db_path: str = "./memory_db"): + self.db_path = db_path + self.db = None + self.table = None + self.encoder = None + self._initialized = False + +async def initialize(self): + if self._initialized: + return + + self.encoder = SentenceTransformer("all-MiniLM-L6-v2") + + self.db = lancedb.connect(self.db_path) + + schema = pa.schema( + [ + pa.field("id", pa.string()), + pa.field("content", pa.string()), + pa.field("summary", pa.string()), + pa.field("tags", pa.list_(pa.string())), + pa.field("timestamp", pa.timestamp("us")), + pa.field("category", pa.string()), + pa.field("importance", pa.int32()), + pa.field( + "vector", pa.list_(pa.float32(), 384) + ), + ] + ) + + try: + self.table = self.db.open_table("memories") + except Exception: + self.table = self.db.create_table("memories", schema=schema) + + self._initialized = True +``` + +- **`store_memory()`**: When storing a new memory, this method generates a unique ID and timestamp for the memory content. If no summary is provided, it automatically generates a simple summary, then uses the pre-loaded model to convert the content into a vector, and finally stores all information (ID, content, summary, tags, timestamp, category, importance, vector) in the LanceDB table. + +```python +async def store_memory( + self, + content: str, + summary: Optional[str] = None, + tags: Optional[List[str]] = None, + category: str = "general", + importance: int = 5, +) -> str: + await self.initialize() + + memory_id = str(uuid.uuid4()) + timestamp = datetime.now(timezone.utc) + + if not summary: + summary = content[:100] + "..." if len(content) > 100 else content + + embedding = self._generate_embedding(content) + + data = [ + { + "id": memory_id, + "content": content, + "summary": summary, + "tags": tags or [], + "timestamp": timestamp, + "category": category, + "importance": importance, + "vector": embedding, + } + ] + + self.table.add(data) + + return memory_id +``` + +- **`search_memories()`**: This is the key to implementing RAG. When a query is made, this method converts the query text into a vector as well, then performs vector similarity search in LanceDB to find the most relevant memories. It also supports filtering by category and importance. + +```python +async def search_memories( + self, + query: str, + limit: int = 10, + category: Optional[str] = None, + min_importance: Optional[int] = None, +) -> List[Dict[str, Any]]: + await self.initialize() + query_embedding = self._generate_embedding(query) + + search_query = self.table.search(query_embedding) + + if limit: + search_query = search_query.limit(limit) + + filters = [] + if category: + filters.append(f"category = '{category}'") + if min_importance is not None: + filters.append(f"importance >= {min_importance}") + + if filters: + filter_str = " AND ".join(filters) + search_query = search_query.where(filter_str) + + results = search_query.to_pandas() + + memories = [] + for _, row in results.iterrows(): + memory = { + "id": row["id"], + "content": row["content"], + "summary": row["summary"], + "tags": row["tags"].tolist(), + "timestamp": row["timestamp"], + "category": row["category"], + "importance": int(row["importance"]), + "similarity_score": row.get( + "_distance", 0.0 + ), + } + memories.append(memory) + + return memories +``` + +### 2.2 MCP Server and Tools + +We use `FastMCP` to quickly build an MCP server and expose `MemoryStore` functionality as tools that large language models can call through the `@mcp.tool()` decorator. + +- **`store_memory`**: **Take notes!** Store a memory. +- **`search_memories`**: **Let me think...** Search for relevant memories based on query content. +- **`get_memory`**: **Find by reference!** Retrieve a specific memory by ID. +- **`list_categories`**: **Organize by category!** List all memory categories. +- **`get_memory_stats`**: **Memory inventory!** Get statistics about the memory store, such as total count, counts by category, etc. + +```python +# Initialize memory store +memory_store = MemoryStore() + +# Create MCP server +mcp = FastMCP("RAG-based Memory MCP Server") + + +@mcp.tool() +async def store_memory( + content: str, + summary: Optional[str] = None, + tags: Optional[str] = None, + category: str = "general", + importance: int = 5, +) -> Dict[str, str]: + """ + Store content in memory. + + Args: + content: The content to store + summary: Optional summary (auto-generated if not provided) + tags: Comma-separated tags + category: Memory category (default: general) + importance: Importance level 1-10 (default: 5) + """ + try: + # Parse tags if provided + tag_list = [tag.strip() for tag in tags.split(",")] if tags else [] + + memory_id = await memory_store.store_memory( + content=content, + summary=summary, + tags=tag_list, + category=category, + importance=importance, + ) + + return { + "status": "success", + "memory_id": memory_id, + "message": f"Memory stored successfully with ID: {memory_id}", + } + except Exception as e: + return {"status": "error", "message": f"Failed to store memory: {str(e)}"} + + +@mcp.tool() +async def search_memories( + query: str, + limit: int = 10, + category: Optional[str] = None, + min_importance: Optional[int] = None, +) -> Dict[str, Any]: + """ + Search stored memories using semantic similarity. + + Args: + query: Search query + limit: Maximum number of results (default: 10) + category: Filter by category + min_importance: Minimum importance level + """ + try: + memories = await memory_store.search_memories( + query=query, limit=limit, category=category, min_importance=min_importance + ) + + return { + "status": "success", + "query": query, + "total_results": len(memories), + "memories": memories, + } + except Exception as e: + return {"status": "error", "message": f"Search failed: {str(e)}"} + + +@mcp.tool() +async def get_memory(memory_id: str) -> Dict[str, Any]: + """ + Retrieve a specific memory by its ID. + + Args: + memory_id: The unique identifier of the memory + """ + try: + memory = await memory_store.get_memory_by_id(memory_id) + + if memory: + return {"status": "success", "memory": memory} + else: + return { + "status": "error", + "message": f"Memory with ID {memory_id} not found", + } + except Exception as e: + return {"status": "error", "message": f"Failed to retrieve memory: {str(e)}"} + + +@mcp.tool() +async def list_categories() -> Dict[str, Any]: + try: + categories = await memory_store.list_categories() + return {"status": "success", "categories": categories} + except Exception as e: + return {"status": "error", "message": f"Failed to list categories: {str(e)}"} + + +@mcp.tool() +async def get_memory_stats() -> Dict[str, Any]: + try: + stats = await memory_store.get_stats() + return {"status": "success", "stats": stats} + except Exception as e: + return {"status": "error", "message": f"Failed to get stats: {str(e)}"} +``` + +The server startup code is at the end of `server.py`, which first initializes the `MemoryStore`, then runs the MCP server. + +```python +if __name__ == "__main__": + # Initialize memory store on startup + async def init_memory(): + await memory_store.initialize() + + # Run initialization + asyncio.run(init_memory()) + + # Run MCP server + mcp.run() +``` + +## 3. Debugging with [openmcp](https://github.com/LSTM-Kirigaya/openmcp-client) + +### 3.1 Adding Workspace Connection + +Next, we'll debug using the [openmcp](https://github.com/LSTM-Kirigaya/openmcp-client) plugin. First, let's test if we can connect successfully. Here we choose `stdio`, set the working path to the project directory, then click `Connect`. In the log panel on the right, we can see that we've successfully connected. + +
+ +
+ +### 3.2 Testing Tools + +After successful connection, let's test if the tools work properly. + +1. **Store a little secret**: Create a new `Tool` tab and select the `store_memory` tool. For example, we input: + - `content`: `Xiao Ming's birthday is 2025.6.18` + - `category`: `birthday` + - `importance`: `8` + + Click `Execute`, and if successful, it will return the stored memory ID, such as `bcc30f6c-979c-46d1-b34a-cd1a09242106` + +
+ +
+ +2. **Retrieve a specific memory by ID**: + After successful storage, we use the returned memory ID `bcc30f6c-979c-46d1-b34a-cd1a09242106`, select the `get_memory` tool, and test if we can retrieve it from `LanceDB`. + +
+ +
+ +3. **List current memory categories**: + We call the `list_categories` tool to view all current memory categories. Since we only added one memory with the `birthday` category, the result should only contain this category. + +
+ +
+ +4. **Get memory statistics**: + Next, we use the `get_memory_stats` tool to get statistical information about the memory store, such as the total number of memories and the count of memories in each category. + +
+ +
+ +### 3.3 Large Language Model Interaction Testing + +We intentionally "skipped" testing one tool `search_memories` above, saving it for the large language model interaction testing. Enter the interaction testing page (remember to set up the LLM's `api_key` and `base_url` first according to the [Connect to LLM tutorial](https://kirigaya.cn/openmcp/zh/plugin-tutorial/usage/connect-llm.html)). We can first disable all other tools, keeping only the `search_memories` tool: + +
+ +
+ +Then, we casually ask: + +
+ +
+ +Great! The large language model successfully helped me recall my friend Xiao Ming's birthday. Cheers! diff --git a/zh/plugin-tutorial/examples/python-rag_memo-stdio.md b/zh/plugin-tutorial/examples/python-rag_memo-stdio.md new file mode 100644 index 0000000..12c21d5 --- /dev/null +++ b/zh/plugin-tutorial/examples/python-rag_memo-stdio.md @@ -0,0 +1,382 @@ +# Python 实现基于 RAG 的记忆存储 MCP 服务器 + +[本期教程的代码](https://github.com/Dormiveglia-elf/rag_memo_mcp) + +## 前言 + +本篇教程,我们将演示如何使用 Python 构建一个简易的基于 RAG (Retrieval-Augmented Generation) 的长期记忆存储 MCP 服务器, 并通过 [openmcp](https://github.com/LSTM-Kirigaya/openmcp-client) 插件进行调试。 实现完成后,我们能够通过与大模型进行自然语言交互,轻松地存储、检索和管理我们的记忆,而无需编写任何特定的查询代码。 + +## 1. 准备 + +项目结构如下: + +```bash +📦rag_memo_mcp + ┣ 📂memory_db/ # LanceDB 数据库文件,初始化时会创建 + ┣ 📜server.py # MCP 服务器实现 + ┣ 📜pyproject.toml # 项目配置文件 + ┣ 📜uv.lock # uv lockfile + ┗ ... +``` + +首先,我们来准备运行环境。本项目推荐使用 [uv](https://github.com/astral-sh/uv)。(`uv` 是一个速度快得飞起的 Python 包管理器,用过都说好。当然,如果你是 `pip` 或者其他包管理器的忠实粉丝,也完全没问题) +```bash +# 首先下载 uv (Windows) +powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex" +# 或者 (macOS/Linux) +# curl -LsSf https://astral.sh/uv/install.sh | sh +``` + +```bash +# 项目初始化 +uv init rag_memo_mcp +cd rag_memo_mcp +# 建议创建一个虚拟环境 +uv venv +# 激活虚拟环境 (Windows) +.venv\Scripts\activate +# 或者 (macOS/Linux) +# source .venv/bin/activate + +# 安装依赖 +uv add "mcp[cli]" lancedb pandas sentence-transformers +``` + +## 2. 理解服务实现 + +与需要预先安装和配置的传统数据库不同,本项目的核心 `MemoryStore` 使用 [LanceDB](https://lancedb.github.io/),这是一个向量数据库,它会在服务器首次启动时自动在 `memory_db` 目录下创建并初始化,无需额外配置。 + +让我们深入 `server.py` 来理解其实现细节。 + +### 2.1 MemoryStore 核心类 + +`MemoryStore` 类是记忆存储和检索功能的核心。 +```python +class MemoryStore: +``` + +- **`initialize()`**: 这个方法负责初始化。它会连接到 LanceDB 数据库(如果不存在则创建),定义记忆表的 schema,并默认加载 `all-MiniLM-L6-v2` 用于将文本内容生成向量嵌入。 +```python +def __init__(self, db_path: str = "./memory_db"): + self.db_path = db_path + self.db = None + self.table = None + self.encoder = None + self._initialized = False + +async def initialize(self): + if self._initialized: + return + + self.encoder = SentenceTransformer("all-MiniLM-L6-v2") + + self.db = lancedb.connect(self.db_path) + + schema = pa.schema( + [ + pa.field("id", pa.string()), + pa.field("content", pa.string()), + pa.field("summary", pa.string()), + pa.field("tags", pa.list_(pa.string())), + pa.field("timestamp", pa.timestamp("us")), + pa.field("category", pa.string()), + pa.field("importance", pa.int32()), + pa.field( + "vector", pa.list_(pa.float32(), 384) + ), + ] + ) + + try: + self.table = self.db.open_table("memories") + except Exception: + self.table = self.db.create_table("memories", schema=schema) + + self._initialized = True +``` + +- **`store_memory()`**: 当需要存储一条新记忆时,此方法会被调用。它会为记忆内容生成一个唯一的ID和时间戳,如果未提供摘要,则自动生成一个简单的摘要,然后使用预加载的模型将内容转换为向量,最后将所有信息(ID, 内容, 摘要, 标签, 时间戳, 类别, 重要性, 向量)存入 LanceDB 表中。 +```python +async def store_memory( + self, + content: str, + summary: Optional[str] = None, + tags: Optional[List[str]] = None, + category: str = "general", + importance: int = 5, +) -> str: + await self.initialize() + + memory_id = str(uuid.uuid4()) + timestamp = datetime.now(timezone.utc) + + if not summary: + summary = content[:100] + "..." if len(content) > 100 else content + + embedding = self._generate_embedding(content) + + data = [ + { + "id": memory_id, + "content": content, + "summary": summary, + "tags": tags or [], + "timestamp": timestamp, + "category": category, + "importance": importance, + "vector": embedding, + } + ] + + self.table.add(data) + + return memory_id +``` + +- **`search_memories()`**: 这是实现 RAG 的关键。当提出一个查询时,此方法会将查询文本同样转换为向量,然后在 LanceDB 中执行向量相似度搜索,以找到最相关的记忆。它还支持按类别和重要性进行过滤。 +```python +async def search_memories( + self, + query: str, + limit: int = 10, + category: Optional[str] = None, + min_importance: Optional[int] = None, +) -> List[Dict[str, Any]]: + await self.initialize() + query_embedding = self._generate_embedding(query) + + search_query = self.table.search(query_embedding) + + if limit: + search_query = search_query.limit(limit) + + filters = [] + if category: + filters.append(f"category = '{category}'") + if min_importance is not None: + filters.append(f"importance >= {min_importance}") + + if filters: + filter_str = " AND ".join(filters) + search_query = search_query.where(filter_str) + + results = search_query.to_pandas() + + memories = [] + for _, row in results.iterrows(): + memory = { + "id": row["id"], + "content": row["content"], + "summary": row["summary"], + "tags": row["tags"].tolist(), + "timestamp": row["timestamp"], + "category": row["category"], + "importance": int(row["importance"]), + "similarity_score": row.get( + "_distance", 0.0 + ), + } + memories.append(memory) + + return memories +``` + +### 2.2 MCP 服务器与工具 + +我们使用 `FastMCP` 来快速构建一个 MCP 服务器,并通过 `@mcp.tool()` 装饰器将 `MemoryStore` 的功能暴露为大模型可以调用的工具。 + +- **`store_memory`**: **记笔记!** 存储一条记忆。 +- **`search_memories`**: **让我想想...** 根据查询内容搜索相关记忆。 +- **`get_memory`**: **按图索骥!** 根据 ID 精确检索某条记忆。 +- **`list_categories`**: **分门别类!** 列出所有记忆的分类。 +- **`get_memory_stats`**: **记忆盘点!** 获取关于记忆库的统计信息,如总数、各分类数量等。 + +```python +# 初始化记忆存储 +memory_store = MemoryStore() + +# 创建 MCP 服务器 +mcp = FastMCP("RAG-based Memory MCP Server") + + +@mcp.tool() +async def store_memory( + content: str, + summary: Optional[str] = None, + tags: Optional[str] = None, + category: str = "general", + importance: int = 5, +) -> Dict[str, str]: + """ + Store content in memory. + + Args: + content: The content to store + summary: Optional summary (auto-generated if not provided) + tags: Comma-separated tags + category: Memory category (default: general) + importance: Importance level 1-10 (default: 5) + """ + try: + # Parse tags if provided + tag_list = [tag.strip() for tag in tags.split(",")] if tags else [] + + memory_id = await memory_store.store_memory( + content=content, + summary=summary, + tags=tag_list, + category=category, + importance=importance, + ) + + return { + "status": "success", + "memory_id": memory_id, + "message": f"Memory stored successfully with ID: {memory_id}", + } + except Exception as e: + return {"status": "error", "message": f"Failed to store memory: {str(e)}"} + + +@mcp.tool() +async def search_memories( + query: str, + limit: int = 10, + category: Optional[str] = None, + min_importance: Optional[int] = None, +) -> Dict[str, Any]: + """ + Search stored memories using semantic similarity. + + Args: + query: Search query + limit: Maximum number of results (default: 10) + category: Filter by category + min_importance: Minimum importance level + """ + try: + memories = await memory_store.search_memories( + query=query, limit=limit, category=category, min_importance=min_importance + ) + + return { + "status": "success", + "query": query, + "total_results": len(memories), + "memories": memories, + } + except Exception as e: + return {"status": "error", "message": f"Search failed: {str(e)}"} + + +@mcp.tool() +async def get_memory(memory_id: str) -> Dict[str, Any]: + """ + Retrieve a specific memory by its ID. + + Args: + memory_id: The unique identifier of the memory + """ + try: + memory = await memory_store.get_memory_by_id(memory_id) + + if memory: + return {"status": "success", "memory": memory} + else: + return { + "status": "error", + "message": f"Memory with ID {memory_id} not found", + } + except Exception as e: + return {"status": "error", "message": f"Failed to retrieve memory: {str(e)}"} + + +@mcp.tool() +async def list_categories() -> Dict[str, Any]: + try: + categories = await memory_store.list_categories() + return {"status": "success", "categories": categories} + except Exception as e: + return {"status": "error", "message": f"Failed to list categories: {str(e)}"} + + +@mcp.tool() +async def get_memory_stats() -> Dict[str, Any]: + try: + stats = await memory_store.get_stats() + return {"status": "success", "stats": stats} + except Exception as e: + return {"status": "error", "message": f"Failed to get stats: {str(e)}"} +``` + + +服务器的启动代码位于 `server.py` 的末尾,它首先初始化 `MemoryStore`,然后运行 MCP 服务器。 + +```python +if __name__ == "__main__": + # 在启动时初始化记忆存储 + async def init_memory(): + await memory_store.initialize() + + # 运行初始化 + asyncio.run(init_memory()) + + # 运行 MCP 服务器 + mcp.run() +``` + +## 3. 通过 [openmcp](https://github.com/LSTM-Kirigaya/openmcp-client) 来进行调试 +### 3.1 添加工作区连接 + +接下来,我们通过 [openmcp](https://github.com/LSTM-Kirigaya/openmcp-client) 插件进行调试。首先测试是否能连接成功,这里选择 `stdio`,工作路径设置为项目所在的目录,然后点击 `Connect`。右边的日志栏里可以看到我们已经连接成功。 + +
+ +
+ +### 3.2 测试工具 + +连接成功后,让我们先测试一下工具是否工作正常。 + +1. **存个小秘密**: 新建一个 `Tool` 标签页,选择 `store_memory` 工具。例如我们输入: + - `content`: `小明的生日是 2025.6.18` + - `category`: `birthday` + - `importance`: `8` + + 点击 `Execute`,如果成功会返回存储的记忆 ID,比如这里返回 `bcc30f6c-979c-46d1-b34a-cd1a09242106` + +
+ +
+ +2. **根据 ID 精确检索某条记忆**: +存储成功后,我们根据返回的记忆 ID `bcc30f6c-979c-46d1-b34a-cd1a09242106`,选择 `get_memory` 工具,测试是否能够从 `Lancedb` 里面检索出来。 + + +3. **列出目前的记忆分类**: + 我们调用 `list_categories` 工具来查看当前所有记忆的分类。由于我们只添加了一个 `birthday` 分类的记忆,所以返回结果中应该只包含这个分类。 + +
+ +
+ +4. **获取记忆统计数据**: + 接着,我们使用 `get_memory_stats` 工具来获取记忆库的统计信息,例如总共有多少条记忆,以及每个分类下的记忆数量。 + +
+ +
+ +### 3.3 大模型交互测试 +上面我们"遗漏"了一个工具 `search_memories` 没有测试,其实是特意把它留给了大模型交互测试。进入交互测试页面(记得事先参照[连接大模型教程](https://kirigaya.cn/openmcp/zh/plugin-tutorial/usage/connect-llm.html)设置好大模型的 `api_key` 和 `base_url`),我们可以先把其他的工具都取消配备,只保留 `search_memories` 这一个工具: +
+ +
+ +然后,我们假装不经意地问一句: + +
+ +
+ +好! 大模型成功帮助我召回了我的朋友小明的生日, Cheers!