Features/pulse-triggers

Pulse Triggers

Cron and webhook triggers are first-class Hydra instances. Their definitions live in the graph (cronTrigger_instance / webhookTrigger_instance). Their execution history lives under storage/sessions/pulse/ and is never written to the graph.

dominird owns the clock for crons. Python owns execution for both kinds.


Trigger Types

TypeObject keyHydra ID prefixFired by
CroncronTrigger_instanceCrdominird clock → POST /api/pulse/triggers/CronTrigger/{id}/test
WebhookwebhookTrigger_instanceWhExternal HTTP → POST /api/pulse/webhooks/{slug}

Ontology Schema

CronTrigger — cronTrigger_instance

PetioleBranchCardinalityRequiredNotes
hasNameTitle1yesDisplay name
hasDescriptionDescription1Human-readable purpose
isEnabledTriggerEnabled1yes"enabled" or "disabled"
hasCronExpressionCronExpression1yes5-field standard cron. dominird prepends 0 for seconds.
hasTimezoneTimezone1IANA tz (e.g. America/Los_Angeles). Not yet applied at runtime — stored for future use.
runsExpressionHydraExpressionmanyPython snippets executed in order on each fire
hasInputDefaultsTriggerInput1JSON object merged under pulse_input
hasAgentAssigneeagent_instancemanyAgents dispatched when mode includes agents
hasDispatchModeDispatchMode1See dispatch modes table below
hasAgentMaxRoundsAgentMaxRounds1Per-agent LLM round cap (default 8)
hasAgentTaskTemplateAgentTaskTemplate1Deprecated — do not use
hasModeratorAgent(resolved from body)1Agent ID that acts as moderator in round-table mode (2+ assignees)

WebhookTrigger — webhookTrigger_instance

All CronTrigger petioles except hasCronExpression / hasTimezone, plus:

PetioleBranchCardinalityRequiredNotes
hasRouteSlugWebhookRouteSlug1yesURL-safe slug; forms the public URL path
hasMethodWebhookMethod1yesHTTP method (typically POST)
hasAuthModeWebhookAuthMode1yesnone, shared_secret (see auth modes below)
hasSecretRefWebhookSecretRef1The literal secret value compared to X-Pulse-Secret header
hasSamplePayloadWebhookSamplePayload1Sample JSON payload for UI testing

Writing Trigger Instances

python
# Cron trigger — runs every day at 09:00
store.write_instance("cronTrigger_instance", {
    "hasName": "Daily Summary",
    "isEnabled": "enabled",
    "hasCronExpression": "0 9 * * *",
    "runsExpression": [
        "print('running daily summary')",
        "store.write_instance('note_Instance', {'hasBody': 'Daily summary ran.'})",
    ],
})

# Webhook trigger — receives external events
store.write_instance("webhookTrigger_instance", {
    "hasName": "GitHub Push",
    "isEnabled": "enabled",
    "hasRouteSlug": "github-push",
    "hasMethod": "POST",
    "hasAuthMode": "shared_secret",
    "hasSecretRef": "my-secret-token",
    "runsExpression": [
        "print(f'push received: {webhook_payload}')",
    ],
})

Cron Expression Format

dominird uses the cron Rust crate. It expects 6 fields (seconds first).
A 5-field user expression is normalized by prepending 0:

User input: "0 9 * * *" Stored as: "0 9 * * *" Fired as: "0 0 9 * * *" ← dominird prepends 0 for seconds

Standard 5-field cron syntax is fully supported:

FieldValues
Minute0–59
Hour0–23
Day of month1–31
Month1–12
Day of week0–7 (0 and 7 = Sunday)

Examples:

ExpressionMeaning
* * * * *Every minute
0 9 * * *Daily at 09:00 UTC
0 9 * * 1Every Monday at 09:00 UTC
*/15 * * * *Every 15 minutes
0 0 1 * *First of each month at midnight

dominird polls the Python backend every 30 seconds (tunable via DOMINIRD_PULSE_POLL_SECS). Minimum reliable cron granularity is 1 minute. Disable the scheduler with DOMINIRD_PULSE_SCHEDULER=0.

After a daemon restart, new triggers start their cursor at "now" — no burst of missed fires.


Webhook Auth Modes

hasAuthModeBehavior
noneNo authentication — any POST fires the trigger
shared_secretCompares X-Pulse-Secret header to hasSecretRef value; returns 401 on mismatch
signed_headerNot yet implemented — returns 501

Webhook URL pattern (after dominird proxy):

POST /api/pulse/webhooks/{slug}

Request handling:

  • Body is parsed as JSON if valid; falls back to raw UTF-8 string.
  • None if the body is empty.
  • All headers are captured and available in expressions as webhook_headers.

Hydra Expression Execution

Expressions from runsExpression are joined in order and executed as a single Python cell in the notebook kernel. The kernel has store (a HydraStore) pre-injected.

Injected variables

VariableTypeContent
pulse_contextdictFull context envelope (see keys below)
pulse_inputdictpulse_context["input"] — merged input defaults + fire-time input
webhook_payloadAnyParsed request body (None for cron fires)
webhook_headersdictRequest headers ({} for cron fires)
pulse_artifacts_dirstrWritable directory for output files. os.chdir is called here before your code runs.
storeHydraStorePre-initialized, scoped to the active workspace store

pulse_context keys

KeyTypeDescription
sourcestr"cron", "webhook", "test", or "replay"
store_idstrActive Hydra store name
trigger_typestr"CronTrigger" or "WebhookTrigger"
trigger_kindstr"cron" or "webhook"
trigger_idstrHydra instance ID of the trigger
inputdictMerged hasInputDefaults + fire-time input
webhook_payloadAnyPresent for webhook fires only
webhook_headersdictPresent for webhook fires only
fired_atstrISO-8601 UTC timestamp (cron fires only, from dominird)

Writing instances from an expression

python
# Write a new instance on every cron fire
report = store.write_instance("note_Instance", {
    "hasBody": f"Cron fired at {pulse_context.get('fired_at')}",
    "hasSource": "automation",
})

# Write an instance from webhook payload data
if webhook_payload and isinstance(webhook_payload, dict):
    name = webhook_payload.get("sender", {}).get("login", "unknown")
    store.write_instance("person_Instance", {
        "hasGivenName": name,
    })

Writing artifacts (files)

python
import json

# Files written here appear under the run's artifacts/ bucket
with open("output.json", "w") as f:
    json.dump({"status": "ok", "input": pulse_input}, f)

Accessing previous cron input defaults

python
# pulse_input merges hasInputDefaults with fire-time input
threshold = pulse_input.get("threshold", 10)

Dispatch Modes

hasDispatchMode controls how expressions and agents compose on each fire.

ModeBehavior
expressions_onlyRun runsExpression list; no agents. Default when no assignees.
agents_onlyDispatch hasAgentAssignee agents; skip expressions.
expressions_then_agentsExpressions first; agents receive expression stdout/result in pulse_event. Auto-inferred when both are present.
agents_then_expressionsAgents first; their results are injected into pulse_context["agent_results"] before expressions run.
parallelExpressions and agents start simultaneously via asyncio.gather.

Auto-inference (when hasDispatchMode is absent):

Has expressionsHas assigneesResolved mode
yesnoexpressions_only
noyesagents_only
yesyesexpressions_then_agents

Agent Dispatch

When assignees are present, each hasAgentAssignee value is a Hydra agent_instance ID.

Single assignee_run_single_agent via /apps/agent/execute.
2+ assignees_run_round_table via /apps/agent/round-table. The first assignee is the moderator unless hasModeratorAgent is set.

Agents receive a pulse_event envelope (not a fabricated task description):

json
{
  "trigger": {"name": "...", "id": "...", "kind": "cron", "type": "CronTrigger"},
  "source": "cron",
  "fired_at": "2026-04-30T02:00:00.000Z",
  "run_name": "run_042",
  "input": {"...": "..."},
  "expressions_result": {"stdout": "...", "result": null, "error": null}
}

hasAgentMaxRounds caps LLM rounds per agent (default 8). For round-table mode the cap is multiplied by the number of assignees.


Run Lifecycle

Each trigger fire creates a run under:

storage/sessions/pulse/{cron|webhook}/{trigger_id}/ session.json branches/ main/ branch.json run_001/ run.json ← status, timing, agent summary input.json ← pulse_context snapshot at fire time events.jsonl ← append-only lifecycle event log stdout.log stderr.log result.json ← final result after completion artifacts/ ← files written by expressions agents/ {agent_id}/ rounds/ ← LLM round JSON files artifacts/ ← files written by agent via write_artifact deliverables/ ← files promoted via promote_to_deliverable

Run statuses: runningsucceeded / failed / partial

partial means some (not all) agents failed while others succeeded.


HTTP API

All paths are under /api/pulse/ (proxied through dominird).

MethodPathPurpose
GET/pulse/healthKernel liveness + hydra_ok flag
GET/pulse/triggersList all enabled cron and webhook triggers
POST/pulse/triggers/{TriggerType}/{id}/testManual fire. Body: ExecuteBody.
GET/pulse/triggers/{TriggerType}/{id}/runsList runs for a trigger
GET/pulse/triggers/{TriggerType}/{id}/runs/{run_name}Single run with events
GET/pulse/triggers/{TriggerType}/{id}/runs/{run_name}/roundsAggregated rounds across all agents
GET/pulse/triggers/{TriggerType}/{id}/rounds/aggregateAll rounds across all runs
POST/pulse/webhooks/{slug}Inbound webhook endpoint
GET/pulse/eventsSSE stream of run_created / run_updated / run_completed
GET/pulse/files/{kind}/{trigger_id}List expression artifacts for latest run
GET/pulse/files/{kind}/{trigger_id}/{bucket}/{name}Serve an artifact file

ExecuteBody (manual test / cron fire)

python
{
    "source":      "test",        # "test", "cron", "webhook", "replay"
    "expressions": None,          # override trigger's runsExpression (optional)
    "input":       {},            # merged into pulse_input
    "timeout":     30,            # kernel execution timeout in seconds
    "branch":      "main",
    "persist":     True,
}

SSE events

Connect to GET /api/pulse/events for a server-sent event stream.

EventPayload keys
hellosubscriber_id, heartbeat_seconds
run_createdtrigger_type, trigger_id, run_name, status, source
run_updatedsame + intermediate status
run_completedsame + duration_ms, error, total_file_count

A : ping comment is sent every 20 s to keep the connection alive through proxies.


Validation Rules

The Pulse router enforces these before starting a run:

ConditionError
Mode is expressions_only and no expressions422 Trigger has no runnable expressions
Mode is agents_only and no assignees422 Trigger has no agent assignees for agents_only mode
source is not cron, webhook, test, or replay400
Webhook isEnabled is "disabled"409 Webhook trigger is disabled
Webhook hasAuthMode is shared_secret and header mismatch401 Invalid webhook secret
Webhook slug not found404

Recipes

Create a cron trigger that writes a Hydra instance daily

python
store.write_instance("cronTrigger_instance", {
    "hasName": "Daily Intake Check",
    "isEnabled": "enabled",
    "hasCronExpression": "0 8 * * *",
    "runsExpression": [
        """
from backend.repositories.hydra_repo import append_record
import datetime

append_record("log_Instance", {
    "id": f"daily-check-{datetime.date.today()}",
    "hasBody": {"@value": "Daily intake check ran"},
    "hasDate": {"@value": str(datetime.date.today())},
})
"""
    ],
})

Create a webhook trigger that processes an inbound payload

python
store.write_instance("webhookTrigger_instance", {
    "hasName": "Order Received",
    "isEnabled": "enabled",
    "hasRouteSlug": "order-received",
    "hasMethod": "POST",
    "hasAuthMode": "shared_secret",
    "hasSecretRef": "webhook-secret-abc123",
    "runsExpression": [
        """
order_id = (webhook_payload or {}).get("order_id")
customer = (webhook_payload or {}).get("customer_name", "Unknown")

if order_id:
    store.write_instance("task_Instance", {
        "hasName": f"Process order {order_id}",
        "hasAssignee": customer,
    })
    print(f"Created task for order {order_id}")
"""
    ],
})

Test a trigger manually via API

python
import httpx

resp = httpx.post(
    "http://127.0.0.1:9370/api/pulse/triggers/CronTrigger/Cr-00-abc123/test",
    json={"source": "test", "input": {"debug": True}, "timeout": 60},
)
run = resp.json()["run"]
print(run["run_name"], run["status"])

Fire a webhook from a test script

python
import httpx

resp = httpx.post(
    "http://127.0.0.1:9370/api/pulse/webhooks/order-received",
    json={"order_id": "ORD-001", "customer_name": "Alice"},
    headers={"X-Pulse-Secret": "webhook-secret-abc123"},
)
print(resp.json())

Dispatch an agent on cron fire

python
store.write_instance("cronTrigger_instance", {
    "hasName": "Weekly Report Agent",
    "isEnabled": "enabled",
    "hasCronExpression": "0 9 * * 1",       # Mondays at 09:00
    "hasAgentAssignee": ["Ag-00-reportBot"],  # agent instance ID
    "hasDispatchMode": "agents_only",
    "hasAgentMaxRounds": "12",
})

Combine expressions + agent

python
store.write_instance("webhookTrigger_instance", {
    "hasName": "Support Ticket Ingest",
    "isEnabled": "enabled",
    "hasRouteSlug": "support-ticket",
    "hasMethod": "POST",
    "hasAuthMode": "none",
    "runsExpression": [
        # Pre-process and normalize the payload
        """
ticket_id = (webhook_payload or {}).get("id")
priority = (webhook_payload or {}).get("priority", "normal")
print(f"ticket {ticket_id} priority={priority}")
"""
    ],
    "hasAgentAssignee": ["Ag-00-triageAgent"],
    "hasDispatchMode": "expressions_then_agents",
})

Environment Variables

VariableDefaultPurpose
DOMINIRD_PULSE_SCHEDULER(enabled)Set 0 / false / off to disable cron firing
DOMINIRD_PULSE_POLL_SECS30How often dominird polls Python for trigger list
PreviousWorkflowsNextConnecting MCP Servers