mcp 就是一个接口,能为大模型引入各种丰富的功能,目前公开的 mcp 服务很多,能实现很多复杂的操作。之前曾用好几篇帖子来记录mcp的使用,也尝试使用了一些开源的mcp server,除了新鲜感作祟,更多的是对于将AI用于具体工作的思考。可用了很多别人的工具,我也就开始考虑开发专属于自己的mcp服务,毕竟有的工作具有极强的针对性,不是公共工具可以轻松完成的。仔细研究下来,开发自己的mcp工具其实并不复杂。个人愚见,能用Python等脚本实现的功能,只要有一个输入和输出,似乎就可以利用mcp库与mcp client建立联系,从而为AI所用。于是我就以 dify 工作流为切入口,开发一个可以用 mcp 直接操纵 dify 工作流的工具。
以下服务依赖一些先导条件:
- 我采用本地部署的 dify 完成工作流(直接使用 dify 官方云服务也可以,接口改一下就行)。
- 本地配置 uv 、node.js 环境。
我们最重要的是需要这个库:
from mcp.server.fastmcp import FastMCP
其他库就根据你自己的脚本需求安装就好了。

搭建 dify 工作流
搭建工作流不是本教程的核心,我展示一下我搭建的一个基础工作流:

功能非常简单,就是用户输入,然后调用本地部署的 searxng 进行查询,然后再用大模型整合后输出。
在“访问 API”页面中可以查询 api 使用文档:

我们就是用本地的http://127.0.0.1/v1/chat-messages的端口进行信息获取。
搭建脚本环境
参考链接:
https://modelcontextprotocol.io/quickstart/server#weather-api-issues
安装 uv,在 power shell 中执行:
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"
然后配置 uv 环境路径:
- Win + R,输入
sysdm.cpl
,按 Enter。 - 进入 “高级” → “环境变量”。
- 在 “用户变量” 部分找到
Path
,双击它。 - 点击“新建”,添加
C:\Users\{你的用户名}\.local\bin
。 - 确认后关闭窗口,并重新打开 PowerShell 终端。
重现打开 power shell,执行如下命令确认安装完成:
uv --version

创建 mcp 项目
在一个你想创建项目的文件夹,右键打开 power shell 执行如下命令:
uv init mcptool
cd mcptool
创建 uv 虚拟环境:
注意,确保安装 3.10 及以上版本的 Python,否则无法使用 mcp!
uv venv --python=python3.10 #mcp库依赖Python版本要大于3.10
.venv\Scripts\activate #激活这个虚拟环境
uv 环境和 conda 类似,由于是一个全新的虚拟环境,所以里面的 python 库都要重新安装。类似于 pip,直接使用 uv add
来添加需要的库:
uv add mcp httpx requests
创建一个执行脚本:
new-item mcptool.py
如此一来,便搭建好了一个可以配置脚本的基本环境。
函数定义
接下来就来构建具体的脚本。通过设置 FastMCP
和@mcp.tool()
注册mcp服务并设置异步处理函数。
mcp = FastMCP("mcptool")
@mcp.tool()
通过FastMCP("mcptool")
定义了当前 mcp 的服务框架名称,并通过装饰器@mcp.tool()
将普通函数注册为 FastMCP 工具。
接下来,我们只需要定义一个可以异步操作的函数接口供 mcp client 使用。例如,我们声明一个 send_dify_chat_request 函数,用来接收 dify 工作流返回的结果,那么在函数声明定义处可以这么写:
async def send_dify_chat_request(
query: str,
inputs: Optional[Dict[str, Any]] = None,
user_id: str = "user123",
api_key: Optional[str] = None,
api_url: Optional[str] = None,
response_mode: str = "streaming",
timeout: int = 30
) -> Dict[str, Any]:
"""
向Dify API发送聊天请求并处理响应
Args:
query: 用户的提问内容
inputs: 允许传入App定义的各变量值, 默认为空字典
user_id: 用户标识,用于定义终端用户身份,默认为"user123"
api_key: Dify API密钥,如不提供则使用默认值
api_url: Dify API URL,如不提供则使用默认值
response_mode: 响应模式,可选"streaming"或"blocking",默认为"streaming"
timeout: 请求超时时间(秒),默认为30
Returns:
Dict: 包含AI回答、使用情况等信息的字典
"""
通过 -> Dict[str, Any]:
来指定返回类型,并提供注解。下面的多行注释不仅是当前函数的运行提示,同样也是为诸如 Claude desktop 等 mcp client 工具标注当前 mcp 服务用法的注解。例如,把当前服务工具绑定到 Claude 中,可以在工具处查看这个 mcp 的用法:

之后,我们可以在当前函数中编辑主体完成脚本逻辑。我只想获得 dify 工作流的输出,相对来说操作起来比较简单,但因为 dify 自身平台的限制,如果使用阻塞式输出,即当工作流全部输出时才获取结果,很有可能会达到 dify 的超时限制从而发出错误,所以当前脚本逻辑改为流式输出,但只在结果全部输出后才返回,提升系统的稳定性。
当前函数逻辑比较简单,并没有把 api-key 单独封装,而是直接预置在脚本里,未来为了安全性应当把 api-key 设置成变量单独存储。
if api_key is None:
api_key = "你的dify API密钥" # 使用默认API密钥
if api_url is None:
api_url = "http://127.0.0.1/v1/chat-messages" #本地部署的dify访问端口
设置发送到 dify 工作流的内容,以及设定输出格式:
data = {
"inputs": inputs,
"query": query,
"response_mode": response_mode,
"user": user_id
}
result = {
"success": False,
"answer": "",
"usage_info": None,
"error": None,
"workflow_info": {}
}
返回的格式如下:
{
"success": Boolean, # 请求是否成功
"answer": String, # Dify API 返回的完整回答文本
"usage_info": Object, # 使用情况统计(令牌使用量等)
"error": String, # 如果发生错误,这里会包含错误信息
"workflow_info": { # 工作流相关信息
"id": String, # 工作流ID
"status": String # 工作流状态(如 "succeeded"、"failed" 等)
}
}
之后加上你想要的逻辑就可以了。
最后,声明函数入口:
if __name__ == "__main__":
mcp.run(transport='stdio')
通过mcp.run(transport='stdio')
激活FastMCP服务实例的运行时环境,并建立基于STDIO协议的通信通道。
完整程序
import requests
import json
import sys
from typing import Any, Optional, Dict
from requests.exceptions import RequestException, Timeout, ConnectionError
from mcp.server.fastmcp import FastMCP
# 初始化FastMCP服务器
mcp = FastMCP("mcptool")
@mcp.tool()
async def send_dify_chat_request(
query: str,
inputs: Optional[Dict[str, Any]] = None,
user_id: str = "user123",
api_key: Optional[str] = None,
api_url: Optional[str] = None,
response_mode: str = "streaming",
timeout: int = 30
) -> Dict[str, Any]:
"""
向Dify API发送聊天请求并处理响应
Args:
query: 用户的提问内容
inputs: 允许传入App定义的各变量值, 默认为空字典
user_id: 用户标识,用于定义终端用户身份,默认为"user123"
api_key: Dify API密钥,如不提供则使用默认值
api_url: Dify API URL,如不提供则使用默认值
response_mode: 响应模式,可选"streaming"或"blocking",默认为"streaming"
timeout: 请求超时时间(秒),默认为30
Returns:
Dict: 包含AI回答、使用情况等信息的字典
"""
# 设置API密钥和URL
if api_key is None:
api_key = "你的API key" # 使用默认API密钥
if api_url is None:
api_url = "http://127.0.0.1/v1/chat-messages" # 使用默认URL
# 打印调试信息
print(f"向Dify API发送请求: 查询={query}, 用户={user_id}")
# 设置请求头
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
# 设置请求体
if inputs is None:
inputs = {}
data = {
"inputs": inputs,
"query": query,
"response_mode": response_mode,
"user": user_id
}
result = {
"success": False,
"answer": "",
"usage_info": None,
"error": None,
"workflow_info": {}
}
try:
# 发送请求,添加超时设置
print(f"发送请求到 {api_url}")
response = requests.post(api_url, headers=headers, json=data, stream=True, timeout=timeout)
# 处理流式响应,但只在全部接收后才输出
if response.status_code == 200:
full_answer = ""
usage_info = None
workflow_info = {}
error_occurred = False
try:
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data_str = line[6:] # 去掉 'data: ' 前缀
try:
data_json = json.loads(data_str)
event_type = data_json.get("event")
# 处理不同类型的事件
if event_type == "message":
full_answer += data_json.get("answer", "")
# 在消息结束时获取使用信息
elif event_type == "message_end":
if "metadata" in data_json:
usage_info = data_json.get("metadata", {}).get("usage")
# 处理错误事件
elif event_type == "error":
error_occurred = True
error_message = data_json.get("error", "未知错误")
print(f"API错误: {error_message}")
result["error"] = f"API错误: {error_message}"
break
# 处理工作流事件
elif event_type == "workflow_started":
workflow_info["id"] = data_json.get("workflow_run_id")
print(f"工作流开始: {workflow_info['id']}")
elif event_type == "workflow_finished":
status = data_json.get("data", {}).get("status")
print(f"工作流完成: {status}")
workflow_info["status"] = status
# 检查工作流是否失败
if status == "failed":
error_msg = data_json.get("data", {}).get("error", "未知错误")
print(f"工作流执行失败: {error_msg}")
result["error"] = f"工作流执行失败: {error_msg}"
# 处理节点事件
elif event_type in ["node_started", "node_finished"]:
node_id = data_json.get("data", {}).get("node_id")
node_status = data_json.get("data", {}).get("status", "running")
if event_type == "node_finished" and node_status == "failed":
error_msg = data_json.get("data", {}).get("error", "未知错误")
print(f"节点 {node_id} 执行失败: {error_msg}")
result["error"] = f"节点 {node_id} 执行失败: {error_msg}"
# 处理文件消息
elif event_type == "message_file":
file_id = data_json.get("id")
file_type = data_json.get("type")
file_url = data_json.get("url")
print(f"收到文件: ID={file_id}, 类型={file_type}, URL={file_url}")
if "files" not in result:
result["files"] = []
result["files"].append({
"id": file_id,
"type": file_type,
"url": file_url
})
# 处理TTS(文字转语音)消息
elif event_type in ["tts_message", "tts_message_end"]:
print(f"收到TTS事件: {event_type}")
# 处理消息替换事件
elif event_type == "message_replace":
full_answer = data_json.get("answer", "")
print("消息内容被替换(可能是由于内容审查)")
except json.JSONDecodeError as e:
error_message = f"JSON解析错误: {e}"
print(error_message)
result["error"] = error_message
except Exception as e:
error_message = f"处理消息时出错: {e}"
print(error_message)
result["error"] = error_message
except Exception as e:
error_message = f"读取响应流时出错: {e}"
print(error_message)
result["error"] = error_message
# 全部收集完后,只有在没有错误时才输出结果
if not error_occurred:
print("AI回答:", full_answer)
# 如果需要,还可以输出使用情况
if usage_info:
print("\n使用情况:")
print(f"提示令牌: {usage_info.get('prompt_tokens', 'N/A')}")
print(f"完成令牌: {usage_info.get('completion_tokens', 'N/A')}")
print(f"总令牌: {usage_info.get('total_tokens', 'N/A')}")
# 设置返回结果
result["success"] = not error_occurred
result["answer"] = full_answer
result["usage_info"] = usage_info
result["workflow_info"] = workflow_info
else:
# 处理HTTP错误代码
error_message = "未知错误"
try:
error_data = response.json()
error_message = error_data.get("error", {}).get("message", str(error_data))
except (json.JSONDecodeError, ValueError, KeyError):
error_message = response.text
print(f"请求失败,状态码: {response.status_code}")
print(f"错误信息: {error_message}")
result["error"] = f"请求失败,状态码: {response.status_code}, 错误信息: {error_message}"
# 处理常见的HTTP错误
if response.status_code == 401:
print("认证失败: 请检查您的API密钥是否正确")
result["error_type"] = "authentication_error"
elif response.status_code == 403:
print("权限不足: 您的API密钥没有足够的权限执行此操作")
result["error_type"] = "permission_error"
elif response.status_code == 404:
print("资源不存在: 请检查API端点URL是否正确")
result["error_type"] = "not_found_error"
elif response.status_code == 429:
print("请求过多: 已超过API速率限制,请稍后再试")
result["error_type"] = "rate_limit_error"
elif 500 <= response.status_code < 600:
print("服务器错误: 服务器端发生错误,请稍后再试")
result["error_type"] = "server_error"
except Timeout:
error_message = "请求超时: 服务器没有在预期的时间内响应"
print(error_message)
result["error"] = error_message
result["error_type"] = "timeout_error"
except ConnectionError:
error_message = "连接错误: 无法连接到服务器,请检查网络连接和服务器状态"
print(error_message)
result["error"] = error_message
result["error_type"] = "connection_error"
except RequestException as e:
error_message = f"请求异常: {e}"
print(error_message)
result["error"] = error_message
result["error_type"] = "request_error"
except Exception as e:
error_message = f"发生未预期的错误: {e}"
print(error_message)
result["error"] = error_message
result["error_type"] = "unexpected_error"
return result
# 直接运行时的示例用法
if __name__ == "__main__":
# import argparse
# # 检查是否需要通过命令行运行或作为MCP服务器运行
# if len(sys.argv) > 1 and sys.argv[1] == "--server":
# print("启动MCP服务器...")
# # 启动MCP服务器
# mcp.run(transport='stdio')
# else:
# # 作为命令行工具运行
# parser = argparse.ArgumentParser(description='向Dify API发送聊天请求')
# parser.add_argument('--query', type=str, help='用户的提问内容', default="gitbook是什么?怎么用?")
# parser.add_argument('--user', type=str, help='用户标识', default="user123")
# parser.add_argument('--input', type=str, help='输入变量,格式为JSON字符串', default="{}")
# parser.add_argument('--api-key', type=str, help='Dify API密钥')
# parser.add_argument('--api-url', type=str, help='Dify API URL')
# parser.add_argument('--server', action='store_true', help='以MCP服务器模式运行')
# args = parser.parse_args()
# if args.server:
# print("启动MCP服务器...")
# # 启动MCP服务器
# mcp.run(transport='stdio')
# else:
# # 处理输入参数
# try:
# inputs = json.loads(args.input)
# except json.JSONDecodeError:
# print("输入参数JSON格式错误,使用空字典作为默认值")
# inputs = {}
# # 同步调用异步函数
# import asyncio
# # 发送请求
# result = asyncio.run(send_dify_chat_request(
# query=args.query,
# inputs=inputs,
# user_id=args.user,
# api_key=args.api_key,
# api_url=args.api_url
# ))
# # 打印结果摘要
# if result["success"]:
# if not result.get("answer"):
# print("请求成功,但没有获取到回答")
# # 注意:实际回答已经在函数内部打印出来了
# else:
# print(f"请求失败: {result.get('error', '未知错误')}")
mcp.run(transport='stdio')
放入 Claude 桌面应用
在 claude_desktop_config.josn 中加入:
"mcptool":{
"command":"uv",
"args":[
"--directory",
"D:\\Desktop\\Claude-Files\\mcptool", <---改为你自己存放脚本的位置
"run",
"mcptool.py"
]
}
重启客户端就可以了。
实验结果:

在 chatwise 中使用
能在 Claude 桌面端使用就能在 chatwise 等带有 mcp client 功能的软件中使用,只需要改一下命令:
uv --directory D:\Desktop\Claude-Files\mcptool run mcptool.py
指定你的脚本位置以及运行方法就 OK 了。


在 Cherry studio 中使用
Cherry studio 使用 mcp 总是没有 Claude 和 chatwise 丝滑,但也能用,同样把上面的单独指令放入。

