FileSystem

from mcp.server.fastmcp import FastMCP
import os
import shutil
import time
import glob
import zipfile
from datetime import datetime


mcp = FastMCP("filesystem")

# 定义允许操作的目录
ALLOWED_DIR = "filesystem/folder"

# 确保目录存在
os.makedirs(ALLOWED_DIR, exist_ok=True)

def is_path_allowed(path: str) -> bool:
    """检查路径是否在允许的目录内"""
    # 获取绝对路径
    abs_path = os.path.abspath(path)
    # 检查路径是否在允许的目录内
    return abs_path.startswith(ALLOWED_DIR)

def ensure_path_allowed(path: str) -> str:
    """确保路径在允许的目录内,如果不是则抛出错误"""
    if not is_path_allowed(path):
        return f"错误:只允许操作 {ALLOWED_DIR} 目录下的文件"
    return ""

@mcp.tool()
def list_files() -> str:
    """列出当前目录下的文件"""
    return "\n".join(os.listdir(ALLOWED_DIR))


@mcp.tool()
def read_file(path: str) -> str:
    """读取文件内容"""
    # 构建完整路径
    full_path = os.path.join(ALLOWED_DIR, path)
    # 检查路径
    error = ensure_path_allowed(full_path)
    if error:
        return error
    
    try:
        with open(full_path, "r") as f:
            return f.read()
    except Exception as e:
        return f"读取文件出错: {str(e)}"
    
    
@mcp.tool()
def write_file(path: str, content: str) -> str:
    """写入文件内容"""
    # 构建完整路径
    full_path = os.path.join(ALLOWED_DIR, path)
    # 检查路径
    error = ensure_path_allowed(full_path)
    if error:
        return error
    
    try:
        with open(full_path, "w") as f:
            f.write(content)
        return f"写入 {path} 完成"
    except Exception as e:
        return f"写入文件出错: {str(e)}"

@mcp.tool()
def delete_file(path: str) -> str:
    """删除文件"""
    # 构建完整路径
    full_path = os.path.join(ALLOWED_DIR, path)
    # 检查路径
    error = ensure_path_allowed(full_path)
    if error:
        return error
    
    try:
        os.remove(full_path)
        return f"删除 {path} 完成"
    except Exception as e:
        return f"删除文件出错: {str(e)}"

@mcp.tool()
def create_directory(path: str) -> str:
    """创建目录"""
    # 构建完整路径
    full_path = os.path.join(ALLOWED_DIR, path)
    # 检查路径
    error = ensure_path_allowed(full_path)
    if error:
        return error
    
    try:
        os.makedirs(full_path, exist_ok=True)
        return f"创建目录 {path} 完成"
    except Exception as e:
        return f"创建目录出错: {str(e)}"

@mcp.tool()
def delete_directory(path: str) -> str:
    """删除目录"""
    # 构建完整路径
    full_path = os.path.join(ALLOWED_DIR, path)
    # 检查路径
    error = ensure_path_allowed(full_path)
    if error:
        return error
    
    try:
        shutil.rmtree(full_path)
        return f"删除目录 {path} 完成"
    except Exception as e:
        return f"删除目录出错: {str(e)}"

@mcp.tool()
def list_directory(path: str = "") -> str:
    """列出指定目录下的所有文件和子目录"""
    # 构建完整路径
    full_path = os.path.join(ALLOWED_DIR, path)
    # 检查路径
    error = ensure_path_allowed(full_path)
    if error:
        return error
    
    try:
        items = os.listdir(full_path)
        result = []
        for item in items:
            item_path = os.path.join(full_path, item)
            if os.path.isdir(item_path):
                result.append(f"[目录] {item}")
            else:
                result.append(f"[文件] {item}")
        return "\n".join(result) if result else f"目录 {path} 为空"
    except Exception as e:
        return f"列出目录内容出错: {str(e)}"

@mcp.tool()
def get_file_info(path: str) -> str:
    """获取文件信息(大小、修改时间等)"""
    # 构建完整路径
    full_path = os.path.join(ALLOWED_DIR, path)
    # 检查路径
    error = ensure_path_allowed(full_path)
    if error:
        return error
    
    try:
        if not os.path.exists(full_path):
            return f"文件 {path} 不存在"
        
        stat = os.stat(full_path)
        size = stat.st_size
        mtime = datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M:%S")
        atime = datetime.fromtimestamp(stat.st_atime).strftime("%Y-%m-%d %H:%M:%S")
        ctime = datetime.fromtimestamp(stat.st_ctime).strftime("%Y-%m-%d %H:%M:%S")
        
        return f"文件: {path}\n大小: {size} 字节\n修改时间: {mtime}\n访问时间: {atime}\n创建时间: {ctime}"
    except Exception as e:
        return f"获取文件信息出错: {str(e)}"

@mcp.tool()
def search_files(pattern: str, directory: str = "") -> str:
    """按名称模式搜索文件"""
    # 构建完整路径
    full_path = os.path.join(ALLOWED_DIR, directory)
    # 检查路径
    error = ensure_path_allowed(full_path)
    if error:
        return error
    
    try:
        search_pattern = os.path.join(full_path, pattern)
        matches = glob.glob(search_pattern, recursive=True)
        
        # 过滤出只在允许目录内的结果
        allowed_matches = [m for m in matches if is_path_allowed(m)]
        
        # 转换为相对路径
        relative_matches = [os.path.relpath(m, ALLOWED_DIR) for m in allowed_matches]
        
        if not relative_matches:
            return f"未找到匹配 '{pattern}' 的文件"
        return "\n".join(relative_matches)
    except Exception as e:
        return f"搜索文件出错: {str(e)}"

@mcp.tool()
def search_content(text: str, directory: str = "", file_pattern: str = "*.*") -> str:
    """在文件内容中搜索文本"""
    # 构建完整路径
    full_path = os.path.join(ALLOWED_DIR, directory)
    # 检查路径
    error = ensure_path_allowed(full_path)
    if error:
        return error
    
    try:
        search_pattern = os.path.join(full_path, file_pattern)
        results = []
        
        for file_path in glob.glob(search_pattern, recursive=True):
            # 确保文件在允许的目录内
            if not is_path_allowed(file_path):
                continue
                
            if os.path.isfile(file_path):
                try:
                    with open(file_path, 'r', errors='ignore') as f:
                        content = f.read()
                        if text in content:
                            # 转换为相对路径
                            relative_path = os.path.relpath(file_path, ALLOWED_DIR)
                            results.append(relative_path)
                except:
                    pass
        
        if not results:
            return f"未找到包含 '{text}' 的文件"
        return "\n".join(results)
    except Exception as e:
        return f"搜索文件内容出错: {str(e)}"

@mcp.tool()
def copy_file(source: str, destination: str) -> str:
    """复制文件"""
    # 构建完整路径
    full_source = os.path.join(ALLOWED_DIR, source)
    full_destination = os.path.join(ALLOWED_DIR, destination)
    
    # 检查路径
    error = ensure_path_allowed(full_source)
    if error:
        return error
    error = ensure_path_allowed(full_destination)
    if error:
        return error
    
    try:
        # 确保目标目录存在
        os.makedirs(os.path.dirname(full_destination), exist_ok=True)
        shutil.copy2(full_source, full_destination)
        return f"复制 {source}{destination} 完成"
    except Exception as e:
        return f"复制文件出错: {str(e)}"

@mcp.tool()
def move_file(source: str, destination: str) -> str:
    """移动文件"""
    # 构建完整路径
    full_source = os.path.join(ALLOWED_DIR, source)
    full_destination = os.path.join(ALLOWED_DIR, destination)
    
    # 检查路径
    error = ensure_path_allowed(full_source)
    if error:
        return error
    error = ensure_path_allowed(full_destination)
    if error:
        return error
    
    try:
        # 确保目标目录存在
        os.makedirs(os.path.dirname(full_destination), exist_ok=True)
        shutil.move(full_source, full_destination)
        return f"移动 {source}{destination} 完成"
    except Exception as e:
        return f"移动文件出错: {str(e)}"

@mcp.tool()
def zip_files(zip_name: str, files: str) -> str:
    """压缩文件,files参数为以逗号分隔的文件列表"""
    # 构建完整的zip文件路径
    full_zip_path = os.path.join(ALLOWED_DIR, zip_name)
    
    # 检查路径
    error = ensure_path_allowed(full_zip_path)
    if error:
        return error
    
    try:
        file_list = [f.strip() for f in files.split(",")]
        with zipfile.ZipFile(full_zip_path, 'w') as zipf:
            for file in file_list:
                full_file_path = os.path.join(ALLOWED_DIR, file)
                
                # 检查每个文件路径
                if not is_path_allowed(full_file_path):
                    return f"错误:文件 {file} 不在允许的目录范围内"
                
                if os.path.exists(full_file_path):
                    # 在zip中使用相对路径
                    arcname = os.path.relpath(full_file_path, ALLOWED_DIR)
                    zipf.write(full_file_path, arcname)
        
        return f"创建压缩文件 {zip_name} 完成"
    except Exception as e:
        return f"创建压缩文件出错: {str(e)}"

@mcp.tool()
def unzip_file(zip_name: str, extract_to: str = "") -> str:
    """解压缩文件"""
    # 构建完整路径
    full_zip_path = os.path.join(ALLOWED_DIR, zip_name)
    full_extract_path = os.path.join(ALLOWED_DIR, extract_to)
    
    # 检查路径
    error = ensure_path_allowed(full_zip_path)
    if error:
        return error
    error = ensure_path_allowed(full_extract_path)
    if error:
        return error
    
    try:
        # 检查解压后的文件是否会在允许的目录内
        with zipfile.ZipFile(full_zip_path, 'r') as zipf:
            # 确保所有解压后的文件都在允许的目录内
            for zip_info in zipf.infolist():
                target_path = os.path.join(full_extract_path, zip_info.filename)
                if not is_path_allowed(target_path):
                    return f"错误:解压后的文件 {zip_info.filename} 将不在允许的目录范围内"
            
            # 解压文件
            zipf.extractall(full_extract_path)
        
        return f"解压 {zip_name}{extract_to} 完成"
    except Exception as e:
        return f"解压文件出错: {str(e)}"


if __name__ == "__main__":
    mcp.run(transport="stdio")

Cherry Studio配置

命令: /bin/bash

参数: -c cd ./filesystem && source .venv/bin/activate && python main.py

最后更新于