mcp 就是一个接口,能为大模型引入各种丰富的功能,目前公开的 mcp 服务很多,能实现很多复杂的操作。之前曾用好几篇帖子来记录mcp的使用,也尝试使用了一些开源的mcp server,除了新鲜感作祟,更多的是对于将AI用于具体工作的思考。可用了很多别人的工具,我也就开始考虑开发专属于自己的mcp服务,毕竟有的工作具有极强的针对性,不是公共工具可以轻松完成的。仔细研究下来,开发自己的mcp工具其实并不复杂。个人愚见,能用Python等脚本实现的功能,只要有一个输入和输出,似乎就可以利用mcp库与mcp client建立联系,从而为AI所用。于是我就以 dify 工作流为切入口,开发一个可以用 mcp 直接操纵 dify 工作流的工具。

以下服务依赖一些先导条件:

  • 我采用本地部署的 dify 完成工作流(直接使用 dify 官方云服务也可以,接口改一下就行)。
  • 本地配置 uv 、node.js 环境。

我们最重要的是需要这个库:

from mcp.server.fastmcp import FastMCP

其他库就根据你自己的脚本需求安装就好了。

搭建 dify 工作流

搭建工作流不是本教程的核心,我展示一下我搭建的一个基础工作流:

功能非常简单,就是用户输入,然后调用本地部署的 searxng 进行查询,然后再用大模型整合后输出。

在“访问 API”页面中可以查询 api 使用文档:

我们就是用本地的http://127.0.0.1/v1/chat-messages的端口进行信息获取。

搭建脚本环境

参考链接:

https://modelcontextprotocol.io/quickstart/server#weather-api-issues

安装 uv,在 power shell 中执行:

powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"

然后配置 uv 环境路径:

  • Win + R,输入 sysdm.cpl,按 Enter
  • 进入 “高级”“环境变量”
  • “用户变量” 部分找到 Path,双击它。
  • 点击“新建”,添加 C:\Users\{你的用户名}\.local\bin
  • 确认后关闭窗口,并重新打开 PowerShell 终端。

重现打开 power shell,执行如下命令确认安装完成:

uv --version

创建 mcp 项目

在一个你想创建项目的文件夹,右键打开 power shell 执行如下命令:

uv init mcptool
cd mcptool

创建 uv 虚拟环境:

注意,确保安装 3.10 及以上版本的 Python,否则无法使用 mcp!

uv venv --python=python3.10   #mcp库依赖Python版本要大于3.10
.venv\Scripts\activate        #激活这个虚拟环境

uv 环境和 conda 类似,由于是一个全新的虚拟环境,所以里面的 python 库都要重新安装。类似于 pip,直接使用 uv add 来添加需要的库:

uv add mcp httpx requests

创建一个执行脚本:

new-item mcptool.py

如此一来,便搭建好了一个可以配置脚本的基本环境。

函数定义

接下来就来构建具体的脚本。通过设置 FastMCP@mcp.tool()注册mcp服务并设置异步处理函数。

mcp = FastMCP("mcptool")

@mcp.tool()

通过FastMCP("mcptool")定义了当前 mcp 的服务框架名称,并通过装饰器@mcp.tool()将普通函数注册为 FastMCP 工具。

接下来,我们只需要定义一个可以异步操作的函数接口供 mcp client 使用。例如,我们声明一个 send_dify_chat_request 函数,用来接收 dify 工作流返回的结果,那么在函数声明定义处可以这么写:

async def send_dify_chat_request(
    query: str,
    inputs: Optional[Dict[str, Any]] = None,
    user_id: str = "user123",
    api_key: Optional[str] = None,
    api_url: Optional[str] = None,
    response_mode: str = "streaming",
    timeout: int = 30
) -> Dict[str, Any]:
    """
    向Dify API发送聊天请求并处理响应
    
    Args:
        query: 用户的提问内容
        inputs: 允许传入App定义的各变量值, 默认为空字典
        user_id: 用户标识,用于定义终端用户身份,默认为"user123"
        api_key: Dify API密钥,如不提供则使用默认值
        api_url: Dify API URL,如不提供则使用默认值
        response_mode: 响应模式,可选"streaming"或"blocking",默认为"streaming"
        timeout: 请求超时时间(秒),默认为30
        
    Returns:
        Dict: 包含AI回答、使用情况等信息的字典
    """

通过 -> Dict[str, Any]:来指定返回类型,并提供注解。下面的多行注释不仅是当前函数的运行提示,同样也是为诸如 Claude desktop 等 mcp client 工具标注当前 mcp 服务用法的注解。例如,把当前服务工具绑定到 Claude 中,可以在工具处查看这个 mcp 的用法:

之后,我们可以在当前函数中编辑主体完成脚本逻辑。我只想获得 dify 工作流的输出,相对来说操作起来比较简单,但因为 dify 自身平台的限制,如果使用阻塞式输出,即当工作流全部输出时才获取结果,很有可能会达到 dify 的超时限制从而发出错误,所以当前脚本逻辑改为流式输出,但只在结果全部输出后才返回,提升系统的稳定性。

当前函数逻辑比较简单,并没有把 api-key 单独封装,而是直接预置在脚本里,未来为了安全性应当把 api-key 设置成变量单独存储。

if api_key is None:
    api_key = "你的dify API密钥"  # 使用默认API密钥

if api_url is None:
    api_url = "http://127.0.0.1/v1/chat-messages"  #本地部署的dify访问端口

设置发送到 dify 工作流的内容,以及设定输出格式:

data = {
    "inputs": inputs,
    "query": query,
    "response_mode": response_mode,
    "user": user_id
}

result = {
    "success": False,
    "answer": "",
    "usage_info": None,
    "error": None,
    "workflow_info": {}
}

返回的格式如下:

{
    "success": Boolean,  # 请求是否成功
    "answer": String,    # Dify API 返回的完整回答文本
    "usage_info": Object,  # 使用情况统计(令牌使用量等)
    "error": String,     # 如果发生错误,这里会包含错误信息
    "workflow_info": {   # 工作流相关信息
        "id": String,    # 工作流ID
        "status": String # 工作流状态(如 "succeeded"、"failed" 等)
    }
}

之后加上你想要的逻辑就可以了。

最后,声明函数入口:

if __name__ == "__main__":
    mcp.run(transport='stdio')

通过mcp.run(transport='stdio')激活FastMCP服务实例的运行时环境,并建立基于STDIO协议的通信通道。

完整程序

import requests
import json
import sys
from typing import Any, Optional, Dict
from requests.exceptions import RequestException, Timeout, ConnectionError
from mcp.server.fastmcp import FastMCP

# 初始化FastMCP服务器
mcp = FastMCP("mcptool")

@mcp.tool()
async def send_dify_chat_request(
    query: str,
    inputs: Optional[Dict[str, Any]] = None,
    user_id: str = "user123",
    api_key: Optional[str] = None,
    api_url: Optional[str] = None,
    response_mode: str = "streaming",
    timeout: int = 30
) -> Dict[str, Any]:
    """
    向Dify API发送聊天请求并处理响应
    
    Args:
        query: 用户的提问内容
        inputs: 允许传入App定义的各变量值, 默认为空字典
        user_id: 用户标识,用于定义终端用户身份,默认为"user123"
        api_key: Dify API密钥,如不提供则使用默认值
        api_url: Dify API URL,如不提供则使用默认值
        response_mode: 响应模式,可选"streaming"或"blocking",默认为"streaming"
        timeout: 请求超时时间(秒),默认为30
        
    Returns:
        Dict: 包含AI回答、使用情况等信息的字典
    """
    # 设置API密钥和URL
    if api_key is None:
        api_key = "你的API key"  # 使用默认API密钥
    
    if api_url is None:
        api_url = "http://127.0.0.1/v1/chat-messages"  # 使用默认URL
    
    # 打印调试信息
    print(f"向Dify API发送请求: 查询={query}, 用户={user_id}")
    
    # 设置请求头
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }
    
    # 设置请求体
    if inputs is None:
        inputs = {}
        
    data = {
        "inputs": inputs,
        "query": query,
        "response_mode": response_mode,
        "user": user_id
    }
    
    result = {
        "success": False,
        "answer": "",
        "usage_info": None,
        "error": None,
        "workflow_info": {}
    }
    
    try:
        # 发送请求,添加超时设置
        print(f"发送请求到 {api_url}")
        response = requests.post(api_url, headers=headers, json=data, stream=True, timeout=timeout)

        # 处理流式响应,但只在全部接收后才输出
        if response.status_code == 200:
            full_answer = ""
            usage_info = None
            workflow_info = {}
            error_occurred = False
            
            try:
                for line in response.iter_lines():
                    if line:
                        line = line.decode('utf-8')
                        if line.startswith('data: '):
                            data_str = line[6:]  # 去掉 'data: ' 前缀
                            try:
                                data_json = json.loads(data_str)
                                event_type = data_json.get("event")
                                
                                # 处理不同类型的事件
                                if event_type == "message":
                                    full_answer += data_json.get("answer", "")
                                
                                # 在消息结束时获取使用信息
                                elif event_type == "message_end":
                                    if "metadata" in data_json:
                                        usage_info = data_json.get("metadata", {}).get("usage")
                                    
                                # 处理错误事件
                                elif event_type == "error":
                                    error_occurred = True
                                    error_message = data_json.get("error", "未知错误")
                                    print(f"API错误: {error_message}")
                                    result["error"] = f"API错误: {error_message}"
                                    break
                                    
                                # 处理工作流事件
                                elif event_type == "workflow_started":
                                    workflow_info["id"] = data_json.get("workflow_run_id")
                                    print(f"工作流开始: {workflow_info['id']}")
                                    
                                elif event_type == "workflow_finished":
                                    status = data_json.get("data", {}).get("status")
                                    print(f"工作流完成: {status}")
                                    workflow_info["status"] = status
                                    
                                    # 检查工作流是否失败
                                    if status == "failed":
                                        error_msg = data_json.get("data", {}).get("error", "未知错误")
                                        print(f"工作流执行失败: {error_msg}")
                                        result["error"] = f"工作流执行失败: {error_msg}"
                                        
                                # 处理节点事件
                                elif event_type in ["node_started", "node_finished"]:
                                    node_id = data_json.get("data", {}).get("node_id")
                                    node_status = data_json.get("data", {}).get("status", "running")
                                    
                                    if event_type == "node_finished" and node_status == "failed":
                                        error_msg = data_json.get("data", {}).get("error", "未知错误")
                                        print(f"节点 {node_id} 执行失败: {error_msg}")
                                        result["error"] = f"节点 {node_id} 执行失败: {error_msg}"
                                        
                                # 处理文件消息
                                elif event_type == "message_file":
                                    file_id = data_json.get("id")
                                    file_type = data_json.get("type")
                                    file_url = data_json.get("url")
                                    print(f"收到文件: ID={file_id}, 类型={file_type}, URL={file_url}")
                                    if "files" not in result:
                                        result["files"] = []
                                    result["files"].append({
                                        "id": file_id,
                                        "type": file_type,
                                        "url": file_url
                                    })
                                    
                                # 处理TTS(文字转语音)消息
                                elif event_type in ["tts_message", "tts_message_end"]:
                                    print(f"收到TTS事件: {event_type}")
                                    
                                # 处理消息替换事件
                                elif event_type == "message_replace":
                                    full_answer = data_json.get("answer", "")
                                    print("消息内容被替换(可能是由于内容审查)")
                                    
                            except json.JSONDecodeError as e:
                                error_message = f"JSON解析错误: {e}"
                                print(error_message)
                                result["error"] = error_message
                            except Exception as e:
                                error_message = f"处理消息时出错: {e}"
                                print(error_message)
                                result["error"] = error_message
            except Exception as e:
                error_message = f"读取响应流时出错: {e}"
                print(error_message)
                result["error"] = error_message
                
            # 全部收集完后,只有在没有错误时才输出结果
            if not error_occurred:
                print("AI回答:", full_answer)
                
                # 如果需要,还可以输出使用情况
                if usage_info:
                    print("\n使用情况:")
                    print(f"提示令牌: {usage_info.get('prompt_tokens', 'N/A')}")
                    print(f"完成令牌: {usage_info.get('completion_tokens', 'N/A')}")
                    print(f"总令牌: {usage_info.get('total_tokens', 'N/A')}")
            
            # 设置返回结果
            result["success"] = not error_occurred
            result["answer"] = full_answer
            result["usage_info"] = usage_info
            result["workflow_info"] = workflow_info
            
        else:
            # 处理HTTP错误代码
            error_message = "未知错误"
            try:
                error_data = response.json()
                error_message = error_data.get("error", {}).get("message", str(error_data))
            except (json.JSONDecodeError, ValueError, KeyError):
                error_message = response.text
                
            print(f"请求失败,状态码: {response.status_code}")
            print(f"错误信息: {error_message}")
            
            result["error"] = f"请求失败,状态码: {response.status_code}, 错误信息: {error_message}"
            
            # 处理常见的HTTP错误
            if response.status_code == 401:
                print("认证失败: 请检查您的API密钥是否正确")
                result["error_type"] = "authentication_error"
            elif response.status_code == 403:
                print("权限不足: 您的API密钥没有足够的权限执行此操作")
                result["error_type"] = "permission_error"
            elif response.status_code == 404:
                print("资源不存在: 请检查API端点URL是否正确")
                result["error_type"] = "not_found_error"
            elif response.status_code == 429:
                print("请求过多: 已超过API速率限制,请稍后再试")
                result["error_type"] = "rate_limit_error"
            elif 500 <= response.status_code < 600:
                print("服务器错误: 服务器端发生错误,请稍后再试")
                result["error_type"] = "server_error"

    except Timeout:
        error_message = "请求超时: 服务器没有在预期的时间内响应"
        print(error_message)
        result["error"] = error_message
        result["error_type"] = "timeout_error"
    except ConnectionError:
        error_message = "连接错误: 无法连接到服务器,请检查网络连接和服务器状态"
        print(error_message)
        result["error"] = error_message
        result["error_type"] = "connection_error"
    except RequestException as e:
        error_message = f"请求异常: {e}"
        print(error_message)
        result["error"] = error_message
        result["error_type"] = "request_error"
    except Exception as e:
        error_message = f"发生未预期的错误: {e}"
        print(error_message)
        result["error"] = error_message
        result["error_type"] = "unexpected_error"
    
    return result

# 直接运行时的示例用法
if __name__ == "__main__":
    # import argparse
    
    # # 检查是否需要通过命令行运行或作为MCP服务器运行
    # if len(sys.argv) > 1 and sys.argv[1] == "--server":
    #     print("启动MCP服务器...")
    #     # 启动MCP服务器
    #     mcp.run(transport='stdio')
    # else:
    #     # 作为命令行工具运行
    #     parser = argparse.ArgumentParser(description='向Dify API发送聊天请求')
    #     parser.add_argument('--query', type=str, help='用户的提问内容', default="gitbook是什么?怎么用?")
    #     parser.add_argument('--user', type=str, help='用户标识', default="user123")
    #     parser.add_argument('--input', type=str, help='输入变量,格式为JSON字符串', default="{}")
    #     parser.add_argument('--api-key', type=str, help='Dify API密钥')
    #     parser.add_argument('--api-url', type=str, help='Dify API URL')
    #     parser.add_argument('--server', action='store_true', help='以MCP服务器模式运行')
        
    #     args = parser.parse_args()
        
    #     if args.server:
    #         print("启动MCP服务器...")
    #         # 启动MCP服务器
    #         mcp.run(transport='stdio')
    #     else:
    #         # 处理输入参数
    #         try:
    #             inputs = json.loads(args.input)
    #         except json.JSONDecodeError:
    #             print("输入参数JSON格式错误,使用空字典作为默认值")
    #             inputs = {}
                
    #         # 同步调用异步函数
    #         import asyncio
            
    #         # 发送请求
    #         result = asyncio.run(send_dify_chat_request(
    #             query=args.query,
    #             inputs=inputs,
    #             user_id=args.user,
    #             api_key=args.api_key,
    #             api_url=args.api_url
    #         ))
            
    #         # 打印结果摘要
    #         if result["success"]:
    #             if not result.get("answer"):
    #                 print("请求成功,但没有获取到回答")
    #             # 注意:实际回答已经在函数内部打印出来了
    #         else:
    #             print(f"请求失败: {result.get('error', '未知错误')}")
    mcp.run(transport='stdio')

放入 Claude 桌面应用

在 claude_desktop_config.josn 中加入:

"mcptool":{
      "command":"uv",
      "args":[
        "--directory",
        "D:\\Desktop\\Claude-Files\\mcptool",   <---改为你自己存放脚本的位置
        "run",
        "mcptool.py"
      ]
    }

重启客户端就可以了。

实验结果:

在 chatwise 中使用

能在 Claude 桌面端使用就能在 chatwise 等带有 mcp client 功能的软件中使用,只需要改一下命令:

uv --directory D:\Desktop\Claude-Files\mcptool run mcptool.py

指定你的脚本位置以及运行方法就 OK 了。

在 Cherry studio 中使用

Cherry studio 使用 mcp 总是没有 Claude 和 chatwise 丝滑,但也能用,同样把上面的单独指令放入。