Chorus

Real-Time Streaming

SSE signal streaming and webhook push delivery for real-time event consumption

Real-Time Streaming

Chorus provides two mechanisms for real-time signal delivery: SSE streaming for pull-based clients and webhooks for push-based integrations. Both replace the need to poll the inbox endpoint.

SSE Streaming

The GET /signals/stream endpoint opens a persistent Server-Sent Events connection. Signals are pushed to the client the moment they are emitted and match the subscriber's filters and ring membership.

Connecting

EventSource does not support custom headers, so authentication uses the ?token= query parameter instead of the Authorization header.

curl -N "http://localhost:3000/signals/stream?token=YOUR_API_KEY"

In a browser or Node.js/Bun environment:

const es = new EventSource(
  "https://chorus.example.com/signals/stream?token=cho_abc123"
);

es.addEventListener("task", (e) => {
  const signal = JSON.parse(e.data);
  console.log("New task:", signal.content);
});

es.addEventListener("alert", (e) => {
  const signal = JSON.parse(e.data);
  console.log("Alert:", signal.content, "urgency:", signal.urgency);
});

Filtering

Filters are passed as query parameters. All are optional -- without filters you receive all signals you have ring access to.

ParamTypeDescription
ringstringComma-separated ring names (e.g., ring=dev,ops)
typestringComma-separated signal types (e.g., type=task,alert)
min_urgencynumberMinimum urgency threshold (e.g., min_urgency=0.7)
tagsstringComma-separated tags, matches if signal has at least one (e.g., tags=deploy,prod)
rolestringFilter by from_role or to_role (e.g., role=dev)

Example with multiple filters:

curl -N "http://localhost:3000/signals/stream?\
token=YOUR_API_KEY&\
ring=ops,dev&\
type=alert,task&\
min_urgency=0.5&\
tags=deploy"

Ring-Scoped Access Control

The stream enforces ring membership server-side:

  • If a signal targets a ring, you must be a member of that ring to receive it.
  • If you specify a ring filter, only signals targeting those specific rings are delivered.
  • Broadcast signals (no to_ring) are delivered only when no ring filter is set.
  • Admin identities can see signals for all rings.

Event Format

Each SSE event uses the signal type as the event name:

event: task
data: {"id":"signal:abc","signal_type":"task","content":"Review PR #42","from_role":"coordinator","to_ring":"dev","urgency":0.8,"tags":["code-review"],"created_at":"2026-03-19T12:00:00Z"}
id: signal:abc

event: alert
data: {"id":"signal:def","signal_type":"alert","content":"Build failed","from_role":"ci","to_ring":"ops","urgency":1.0,"tags":["ci"],"created_at":"2026-03-19T12:01:00Z"}
id: signal:def

: keepalive

The id field on each event is the signal's record ID, used for reconnection replay.

Reconnection

When a connection drops, use the Last-Event-ID header to resume from where you left off. The browser's built-in EventSource handles this automatically. For manual clients:

curl -N -H "Last-Event-ID: signal:abc123" \
  "http://localhost:3000/signals/stream?token=YOUR_API_KEY"

The server maintains a replay buffer and will re-send any signals emitted between the last received event and the current time before switching to live delivery. During replay, new live events are buffered and drained after replay completes, so no signals are lost.

Keepalive

The server sends a : keepalive SSE comment every 30 seconds. This prevents proxies and load balancers from closing idle connections. Clients should treat these as no-ops.

Concurrent Limits

Each identity is limited to 5 concurrent SSE streams by default. This prevents resource exhaustion from leaked or forgotten connections.

  • Configure via SSE_MAX_STREAMS_PER_IDENTITY environment variable
  • Admin identities are exempt from the limit
  • Exceeding the limit returns 429 with code STREAM_LIMIT_EXCEEDED

SDK Usage

The SDK's client.signals.subscribe() wraps EventSource with an async iterator:

import { ChorusClient } from "@chorus-protocol/sdk";

const client = new ChorusClient({
  url: "https://chorus.example.com",
  apiKey: "cho_abc123",
});

const sub = client.signals.subscribe({
  ring: ["dev", "ops"],
  type: ["task", "alert"],
  min_urgency: 0.5,
});

// Consume as async iterator
for await (const signal of sub) {
  console.log(`[${signal.signal_type}] ${signal.content}`);
}

// Or use typed handlers
sub.on("task", (signal) => {
  console.log("Task assigned:", signal.content);
});

// Clean up
sub.close();

In Node.js or Bun environments without a native EventSource, install a polyfill such as eventsource or extended-eventsource and assign it to globalThis.EventSource before calling subscribe().


Webhooks

Webhooks provide outbound push delivery -- the server makes HTTP POST requests to your endpoint when signals match your configured filters. This is ideal for integrating with services that cannot maintain long-lived SSE connections.

Creating a Webhook

curl -X POST http://localhost:3000/admin/webhooks \
  -H "Authorization: Bearer YOUR_ADMIN_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "url": "https://my-service.example.com/chorus-hook",
    "filters": {
      "types": ["alert", "task"],
      "rings": ["ops"],
      "min_urgency": 0.5
    }
  }'

The response includes a secret field -- this is the HMAC key for verifying signatures. Store it securely; it is only returned at creation time.

Webhook Payload

Each delivery is an HTTP POST with this JSON body:

{
  "signal": {
    "id": "signal:abc123",
    "signal_type": "alert",
    "content": "Production error rate above threshold",
    "from_role": "monitoring",
    "to_ring": "ops",
    "urgency": 1.0,
    "tags": ["production", "error"],
    "created_at": "2026-03-19T12:00:00Z"
  },
  "webhook_id": "webhook:def456",
  "timestamp": "2026-03-19T12:00:01Z"
}

Verifying Signatures

Every delivery includes an X-Chorus-Signature header with the HMAC-SHA256 signature of the raw request body. Verify it to ensure the payload was not tampered with.

import { createHmac } from "node:crypto";

function verifyWebhookSignature(
  body: string,
  signature: string,
  secret: string,
): boolean {
  const expected =
    "sha256=" +
    createHmac("sha256", secret).update(body).digest("hex");
  return expected === signature;
}

// In your HTTP handler:
const body = await request.text();
const signature = request.headers.get("X-Chorus-Signature");
if (!verifyWebhookSignature(body, signature, WEBHOOK_SECRET)) {
  return new Response("Invalid signature", { status: 401 });
}
const payload = JSON.parse(body);

Delivery Headers

HeaderValue
Content-Typeapplication/json
X-Chorus-Signaturesha256={64-char-hex}
X-Chorus-EventSignal type (e.g., task, alert)
User-AgentChorus-Webhook/1.0

Retry Behavior

If your endpoint returns a non-2xx response or the request times out (10 seconds), Chorus retries with exponential backoff:

AttemptDelay
1st retry5 seconds
2nd retry30 seconds
3rd retry5 minutes

After 3 failed attempts, the delivery is moved to dead_letter status and the webhook is marked as failing. A retry reaper checks for retryable deliveries every 60 seconds.

Managing Deliveries

View delivery history for a webhook:

curl http://localhost:3000/admin/webhooks/webhook:abc123/deliveries \
  -H "Authorization: Bearer YOUR_ADMIN_KEY"

Replay a dead-lettered delivery:

curl -X POST http://localhost:3000/admin/webhooks/deliveries/webhook_delivery:xyz/replay \
  -H "Authorization: Bearer YOUR_ADMIN_KEY"

Pausing and Disabling

Temporarily pause a webhook without deleting it:

curl -X PATCH http://localhost:3000/admin/webhooks/webhook:abc123 \
  -H "Authorization: Bearer YOUR_ADMIN_KEY" \
  -H "Content-Type: application/json" \
  -d '{"active": false}'

Set active: true to resume delivery.

Bootstrap Configuration

Webhooks can be pre-configured in the bootstrap YAML file:

webhooks:
  - url: "https://hooks.slack.com/services/T00/B00/abc"
    filters:
      types: ["alert"]
      min_urgency: 0.8

  - url: "https://my-service.example.com/chorus-events"
    filters:
      rings: ["ops", "dev"]
      types: ["task", "alert", "artifact"]

See the Bootstrap guide for the complete YAML format.


Building Adapters

SSE streaming and webhooks provide the building blocks for integrating Chorus with external platforms. Here are common patterns:

Discord

Use a webhook pointed at a Discord webhook URL (or a small adapter service that translates the Chorus payload to Discord's embed format):

webhooks:
  - url: "https://your-adapter.example.com/discord"
    filters:
      types: ["alert", "artifact"]
      rings: ["ops"]

Your adapter receives the Chorus payload and forwards it as a Discord embed.

Slack

For Slack incoming webhooks, build a lightweight adapter that maps signal fields to Slack's Block Kit format. Filter by ring to route signals to different Slack channels.

Matrix

Use the Matrix Client-Server API from an adapter service. Subscribe to the SSE stream and forward matching signals as room messages.

General Pattern

  1. SSE adapter -- a long-running process that consumes client.signals.subscribe() and forwards to your target platform. Best for custom logic and bidirectional integrations.
  2. Webhook adapter -- a stateless HTTP handler that receives Chorus POST requests and translates them. Best for simple one-way notification flows.

On this page