Skip to content

yaml_workflow.mcp_server

yaml_workflow.mcp_server

MCP server that exposes yaml-workflow pipelines as tools.

Usage

yaml-workflow serve-mcp --dir workflows/

Each workflow YAML file in the directory becomes an MCP tool. Workflow params become tool input parameters. Running a tool executes the workflow and returns results as JSON.

Functions

serve(directory: str, base_dir: str = 'runs') -> None async

Start the MCP server exposing workflows as tools.

Parameters:

Name Type Description Default
directory str

Path to directory containing workflow YAML files.

required
base_dir str

Base directory for workflow run workspaces.

'runs'
Source code in src/yaml_workflow/mcp_server.py
async def serve(directory: str, base_dir: str = "runs") -> None:
    """Start the MCP server exposing workflows as tools.

    Args:
        directory: Path to directory containing workflow YAML files.
        base_dir: Base directory for workflow run workspaces.
    """
    try:
        from mcp.server import Server
        from mcp.server.stdio import stdio_server
        from mcp.types import TextContent, Tool
    except ImportError:
        raise ImportError(
            "MCP server requires the 'mcp' package. "
            "Install it with: pip install 'yaml-workflow[mcp]'"
        )

    from .engine import WorkflowEngine

    server = Server("yaml-workflow")
    workflow_dir = directory

    @server.list_tools()
    async def list_tools() -> list:
        """Return available workflow tools."""
        workflows = _scan_workflows(workflow_dir)
        tools = []
        for wf in workflows:
            tool_name = wf["name"].lower().replace(" ", "_").replace("-", "_")
            tools.append(
                Tool(
                    name=tool_name,
                    description=wf["description"] or f"Run the {wf['name']} workflow",
                    inputSchema=_params_to_schema(wf["params"]),
                )
            )
        return tools

    @server.call_tool()
    async def call_tool(name: str, arguments: dict) -> list:
        """Execute a workflow tool."""
        workflows = _scan_workflows(workflow_dir)

        # Find matching workflow
        target = None
        for wf in workflows:
            tool_name = wf["name"].lower().replace(" ", "_").replace("-", "_")
            if tool_name == name:
                target = wf
                break

        if target is None:
            return [
                TextContent(
                    type="text",
                    text=json.dumps({"error": f"Workflow '{name}' not found"}),
                )
            ]

        try:
            engine = WorkflowEngine(
                target["path"],
                base_dir=base_dir,
            )
            results = engine.run(**arguments)

            # Extract step outputs
            outputs = {}
            if results and results.get("outputs"):
                for step_name, step_data in results["outputs"].items():
                    if isinstance(step_data, dict) and "result" in step_data:
                        outputs[step_name] = step_data["result"]
                    else:
                        outputs[step_name] = step_data

            response = {
                "status": "success",
                "workflow": target["name"],
                "outputs": outputs,
            }
        except Exception as e:
            response = {
                "status": "failed",
                "workflow": target["name"],
                "error": str(e),
            }

        return [
            TextContent(
                type="text",
                text=json.dumps(response, indent=2, default=str),
            )
        ]

    async with stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream, write_stream, server.create_initialization_options()
        )