Skip to content

yaml_workflow.tasks.notify_tasks

yaml_workflow.tasks.notify_tasks

Notification task supporting webhook, log, slack, and file channels.

Classes

Functions

notify_task(config: TaskConfig) -> Dict[str, Any]

Send a notification via one of several supported channels.

Parameters:

Name Type Description Default
config TaskConfig

Task configuration with namespace support.

required
Inputs

channel (str): Notification channel. Required. One of webhook, log, slack, file.

webhook channel::

channel: webhook
url: "https://hooks.example.com/..."
message: "Workflow completed"
method: POST          # optional, default POST
headers:              # optional extra headers
  X-Custom: value
payload:              # optional; used as-is; otherwise {"text": message}
  text: "..."

log channel::

channel: log
message: "Step completed"
level: info           # debug|info|warning|error, default: info
file: "notify.log"    # optional; appended as plain text

slack channel::

channel: slack
webhook_url: "{{ env.SLACK_WEBHOOK_URL }}"
message: "{{ workflow.name }} finished"
username: "yaml-workflow"     # optional
icon_emoji: ":robot_face:"    # optional
color: good                   # optional: good|warning|danger|#hex

file channel::

channel: file
file: "notifications.jsonl"
message: "Completed"
append: true          # default true
format: jsonl         # jsonl|text, default: jsonl

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: {"channel": channel, "status": "sent", "message": message}

Raises:

Type Description
TaskExecutionError

If the notification cannot be delivered.

Source code in src/yaml_workflow/tasks/notify_tasks.py
@register_task("notify")
def notify_task(config: TaskConfig) -> Dict[str, Any]:
    """
    Send a notification via one of several supported channels.

    Args:
        config: Task configuration with namespace support.

    Inputs:
        channel (str): Notification channel. Required. One of ``webhook``,
            ``log``, ``slack``, ``file``.

        **webhook channel**::

            channel: webhook
            url: "https://hooks.example.com/..."
            message: "Workflow completed"
            method: POST          # optional, default POST
            headers:              # optional extra headers
              X-Custom: value
            payload:              # optional; used as-is; otherwise {"text": message}
              text: "..."

        **log channel**::

            channel: log
            message: "Step completed"
            level: info           # debug|info|warning|error, default: info
            file: "notify.log"    # optional; appended as plain text

        **slack channel**::

            channel: slack
            webhook_url: "{{ env.SLACK_WEBHOOK_URL }}"
            message: "{{ workflow.name }} finished"
            username: "yaml-workflow"     # optional
            icon_emoji: ":robot_face:"    # optional
            color: good                   # optional: good|warning|danger|#hex

        **file channel**::

            channel: file
            file: "notifications.jsonl"
            message: "Completed"
            append: true          # default true
            format: jsonl         # jsonl|text, default: jsonl

    Returns:
        Dict[str, Any]: ``{"channel": channel, "status": "sent", "message": message}``

    Raises:
        TaskExecutionError: If the notification cannot be delivered.
    """
    task_name = str(config.name or "notify_task")
    task_type = str(config.type or "notify")
    logger = get_task_logger(config.workspace, task_name)

    try:
        log_task_execution(logger, config.step, config.context, config.workspace)

        processed = config.process_inputs()
        config.processed_inputs = processed

        channel = processed.get("channel", "")
        if not channel:
            raise ValueError("'channel' input is required for the notify task.")

        message = str(processed.get("message", ""))

        # Derive a workflow name for structured outputs
        workflow_name = ""
        if config.context:
            wf = config.context.get("workflow", {})
            if isinstance(wf, dict):
                workflow_name = str(wf.get("name", ""))

        # ----------------------------------------------------------------
        # Dispatch by channel
        # ----------------------------------------------------------------
        extra: Dict[str, Any] = {}

        if channel == "webhook":
            url = processed.get("url")
            if not url:
                raise ValueError("webhook channel requires 'url'.")
            method = processed.get("method", "POST")
            extra_headers = processed.get("headers", {}) or {}
            payload = processed.get("payload", None)
            send_result = _notify_webhook(
                url,
                message,
                method=method,
                extra_headers=extra_headers,
                payload=payload,
            )
            extra["status_code"] = send_result["status_code"]

        elif channel == "log":
            level = processed.get("level", "info")
            log_file = processed.get("file", None)
            _notify_log(message, level=level, file=log_file)

        elif channel == "slack":
            webhook_url = processed.get("webhook_url")
            if not webhook_url:
                raise ValueError("slack channel requires 'webhook_url'.")
            username = processed.get("username", "yaml-workflow")
            icon_emoji = processed.get("icon_emoji", ":robot_face:")
            color = processed.get("color", None)
            send_result = _notify_slack(
                webhook_url,
                message,
                username=username,
                icon_emoji=icon_emoji,
                color=color,
            )
            extra["status_code"] = send_result["status_code"]

        elif channel == "file":
            dest_file = processed.get("file")
            if not dest_file:
                raise ValueError("file channel requires 'file'.")
            append = bool(processed.get("append", True))
            fmt = processed.get("format", "jsonl")
            _notify_file(
                dest_file,
                message,
                workflow_name=workflow_name,
                append=append,
                fmt=fmt,
            )

        else:
            raise ValueError(
                f"Unsupported notification channel: '{channel}'. "
                "Supported channels: webhook, log, slack, file."
            )

        result: Dict[str, Any] = {
            "channel": channel,
            "status": "sent",
            "message": message,
            **extra,
        }
        log_task_result(logger, result)
        return result

    except (
        TaskExecutionError,
        TemplateError,
        urllib.error.HTTPError,
        urllib.error.URLError,
        OSError,
        ValueError,
    ) as e:
        if isinstance(e, urllib.error.HTTPError):
            try:
                error_body = e.read().decode("utf-8")
            except Exception:
                error_body = ""
            error_msg = (
                f"HTTP {e.code} {e.reason} sending notification to "
                f"{processed.get('url') or processed.get('webhook_url', '?')}: "
                f"{error_body}"
            )
            e = ValueError(error_msg)

        context = ErrorContext(
            step_name=task_name,
            task_type=task_type,
            error=e,
            task_config=config.step,
            template_context=config.context,
        )
        handle_task_error(context)
        return {}  # Unreachable