FileSystem
from mcp.server.fastmcp import FastMCP
import os
import shutil
import time
import glob
import zipfile
from datetime import datetime
mcp = FastMCP("filesystem")
# 定义允许操作的目录
ALLOWED_DIR = "filesystem/folder"
# 确保目录存在
os.makedirs(ALLOWED_DIR, exist_ok=True)
def is_path_allowed(path: str) -> bool:
"""检查路径是否在允许的目录内"""
# 获取绝对路径
abs_path = os.path.abspath(path)
# 检查路径是否在允许的目录内
return abs_path.startswith(ALLOWED_DIR)
def ensure_path_allowed(path: str) -> str:
"""确保路径在允许的目录内,如果不是则抛出错误"""
if not is_path_allowed(path):
return f"错误:只允许操作 {ALLOWED_DIR} 目录下的文件"
return ""
@mcp.tool()
def list_files() -> str:
"""列出当前目录下的文件"""
return "\n".join(os.listdir(ALLOWED_DIR))
@mcp.tool()
def read_file(path: str) -> str:
"""读取文件内容"""
# 构建完整路径
full_path = os.path.join(ALLOWED_DIR, path)
# 检查路径
error = ensure_path_allowed(full_path)
if error:
return error
try:
with open(full_path, "r") as f:
return f.read()
except Exception as e:
return f"读取文件出错: {str(e)}"
@mcp.tool()
def write_file(path: str, content: str) -> str:
"""写入文件内容"""
# 构建完整路径
full_path = os.path.join(ALLOWED_DIR, path)
# 检查路径
error = ensure_path_allowed(full_path)
if error:
return error
try:
with open(full_path, "w") as f:
f.write(content)
return f"写入 {path} 完成"
except Exception as e:
return f"写入文件出错: {str(e)}"
@mcp.tool()
def delete_file(path: str) -> str:
"""删除文件"""
# 构建完整路径
full_path = os.path.join(ALLOWED_DIR, path)
# 检查路径
error = ensure_path_allowed(full_path)
if error:
return error
try:
os.remove(full_path)
return f"删除 {path} 完成"
except Exception as e:
return f"删除文件出错: {str(e)}"
@mcp.tool()
def create_directory(path: str) -> str:
"""创建目录"""
# 构建完整路径
full_path = os.path.join(ALLOWED_DIR, path)
# 检查路径
error = ensure_path_allowed(full_path)
if error:
return error
try:
os.makedirs(full_path, exist_ok=True)
return f"创建目录 {path} 完成"
except Exception as e:
return f"创建目录出错: {str(e)}"
@mcp.tool()
def delete_directory(path: str) -> str:
"""删除目录"""
# 构建完整路径
full_path = os.path.join(ALLOWED_DIR, path)
# 检查路径
error = ensure_path_allowed(full_path)
if error:
return error
try:
shutil.rmtree(full_path)
return f"删除目录 {path} 完成"
except Exception as e:
return f"删除目录出错: {str(e)}"
@mcp.tool()
def list_directory(path: str = "") -> str:
"""列出指定目录下的所有文件和子目录"""
# 构建完整路径
full_path = os.path.join(ALLOWED_DIR, path)
# 检查路径
error = ensure_path_allowed(full_path)
if error:
return error
try:
items = os.listdir(full_path)
result = []
for item in items:
item_path = os.path.join(full_path, item)
if os.path.isdir(item_path):
result.append(f"[目录] {item}")
else:
result.append(f"[文件] {item}")
return "\n".join(result) if result else f"目录 {path} 为空"
except Exception as e:
return f"列出目录内容出错: {str(e)}"
@mcp.tool()
def get_file_info(path: str) -> str:
"""获取文件信息(大小、修改时间等)"""
# 构建完整路径
full_path = os.path.join(ALLOWED_DIR, path)
# 检查路径
error = ensure_path_allowed(full_path)
if error:
return error
try:
if not os.path.exists(full_path):
return f"文件 {path} 不存在"
stat = os.stat(full_path)
size = stat.st_size
mtime = datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M:%S")
atime = datetime.fromtimestamp(stat.st_atime).strftime("%Y-%m-%d %H:%M:%S")
ctime = datetime.fromtimestamp(stat.st_ctime).strftime("%Y-%m-%d %H:%M:%S")
return f"文件: {path}\n大小: {size} 字节\n修改时间: {mtime}\n访问时间: {atime}\n创建时间: {ctime}"
except Exception as e:
return f"获取文件信息出错: {str(e)}"
@mcp.tool()
def search_files(pattern: str, directory: str = "") -> str:
"""按名称模式搜索文件"""
# 构建完整路径
full_path = os.path.join(ALLOWED_DIR, directory)
# 检查路径
error = ensure_path_allowed(full_path)
if error:
return error
try:
search_pattern = os.path.join(full_path, pattern)
matches = glob.glob(search_pattern, recursive=True)
# 过滤出只在允许目录内的结果
allowed_matches = [m for m in matches if is_path_allowed(m)]
# 转换为相对路径
relative_matches = [os.path.relpath(m, ALLOWED_DIR) for m in allowed_matches]
if not relative_matches:
return f"未找到匹配 '{pattern}' 的文件"
return "\n".join(relative_matches)
except Exception as e:
return f"搜索文件出错: {str(e)}"
@mcp.tool()
def search_content(text: str, directory: str = "", file_pattern: str = "*.*") -> str:
"""在文件内容中搜索文本"""
# 构建完整路径
full_path = os.path.join(ALLOWED_DIR, directory)
# 检查路径
error = ensure_path_allowed(full_path)
if error:
return error
try:
search_pattern = os.path.join(full_path, file_pattern)
results = []
for file_path in glob.glob(search_pattern, recursive=True):
# 确保文件在允许的目录内
if not is_path_allowed(file_path):
continue
if os.path.isfile(file_path):
try:
with open(file_path, 'r', errors='ignore') as f:
content = f.read()
if text in content:
# 转换为相对路径
relative_path = os.path.relpath(file_path, ALLOWED_DIR)
results.append(relative_path)
except:
pass
if not results:
return f"未找到包含 '{text}' 的文件"
return "\n".join(results)
except Exception as e:
return f"搜索文件内容出错: {str(e)}"
@mcp.tool()
def copy_file(source: str, destination: str) -> str:
"""复制文件"""
# 构建完整路径
full_source = os.path.join(ALLOWED_DIR, source)
full_destination = os.path.join(ALLOWED_DIR, destination)
# 检查路径
error = ensure_path_allowed(full_source)
if error:
return error
error = ensure_path_allowed(full_destination)
if error:
return error
try:
# 确保目标目录存在
os.makedirs(os.path.dirname(full_destination), exist_ok=True)
shutil.copy2(full_source, full_destination)
return f"复制 {source} 到 {destination} 完成"
except Exception as e:
return f"复制文件出错: {str(e)}"
@mcp.tool()
def move_file(source: str, destination: str) -> str:
"""移动文件"""
# 构建完整路径
full_source = os.path.join(ALLOWED_DIR, source)
full_destination = os.path.join(ALLOWED_DIR, destination)
# 检查路径
error = ensure_path_allowed(full_source)
if error:
return error
error = ensure_path_allowed(full_destination)
if error:
return error
try:
# 确保目标目录存在
os.makedirs(os.path.dirname(full_destination), exist_ok=True)
shutil.move(full_source, full_destination)
return f"移动 {source} 到 {destination} 完成"
except Exception as e:
return f"移动文件出错: {str(e)}"
@mcp.tool()
def zip_files(zip_name: str, files: str) -> str:
"""压缩文件,files参数为以逗号分隔的文件列表"""
# 构建完整的zip文件路径
full_zip_path = os.path.join(ALLOWED_DIR, zip_name)
# 检查路径
error = ensure_path_allowed(full_zip_path)
if error:
return error
try:
file_list = [f.strip() for f in files.split(",")]
with zipfile.ZipFile(full_zip_path, 'w') as zipf:
for file in file_list:
full_file_path = os.path.join(ALLOWED_DIR, file)
# 检查每个文件路径
if not is_path_allowed(full_file_path):
return f"错误:文件 {file} 不在允许的目录范围内"
if os.path.exists(full_file_path):
# 在zip中使用相对路径
arcname = os.path.relpath(full_file_path, ALLOWED_DIR)
zipf.write(full_file_path, arcname)
return f"创建压缩文件 {zip_name} 完成"
except Exception as e:
return f"创建压缩文件出错: {str(e)}"
@mcp.tool()
def unzip_file(zip_name: str, extract_to: str = "") -> str:
"""解压缩文件"""
# 构建完整路径
full_zip_path = os.path.join(ALLOWED_DIR, zip_name)
full_extract_path = os.path.join(ALLOWED_DIR, extract_to)
# 检查路径
error = ensure_path_allowed(full_zip_path)
if error:
return error
error = ensure_path_allowed(full_extract_path)
if error:
return error
try:
# 检查解压后的文件是否会在允许的目录内
with zipfile.ZipFile(full_zip_path, 'r') as zipf:
# 确保所有解压后的文件都在允许的目录内
for zip_info in zipf.infolist():
target_path = os.path.join(full_extract_path, zip_info.filename)
if not is_path_allowed(target_path):
return f"错误:解压后的文件 {zip_info.filename} 将不在允许的目录范围内"
# 解压文件
zipf.extractall(full_extract_path)
return f"解压 {zip_name} 到 {extract_to} 完成"
except Exception as e:
return f"解压文件出错: {str(e)}"
if __name__ == "__main__":
mcp.run(transport="stdio")Cherry Studio配置
命令: /bin/bash
参数:
-c
cd ./filesystem && source .venv/bin/activate && python main.py
最后更新于