Real-Time Streaming
SSE signal streaming and webhook push delivery for real-time event consumption
Real-Time Streaming
Chorus provides two mechanisms for real-time signal delivery: SSE streaming for pull-based clients and webhooks for push-based integrations. Both replace the need to poll the inbox endpoint.
SSE Streaming
The GET /signals/stream endpoint opens a persistent Server-Sent Events connection. Signals are pushed to the client the moment they are emitted and match the subscriber's filters and ring membership.
Connecting
EventSource does not support custom headers, so authentication uses the ?token= query parameter instead of the Authorization header.
curl -N "http://localhost:3000/signals/stream?token=YOUR_API_KEY"
In a browser or Node.js/Bun environment:
const es = new EventSource(
"https://chorus.example.com/signals/stream?token=cho_abc123"
);
es.addEventListener("task", (e) => {
const signal = JSON.parse(e.data);
console.log("New task:", signal.content);
});
es.addEventListener("alert", (e) => {
const signal = JSON.parse(e.data);
console.log("Alert:", signal.content, "urgency:", signal.urgency);
});
Filtering
Filters are passed as query parameters. All are optional -- without filters you receive all signals you have ring access to.
| Param | Type | Description |
|---|---|---|
ring | string | Comma-separated ring names (e.g., ring=dev,ops) |
type | string | Comma-separated signal types (e.g., type=task,alert) |
min_urgency | number | Minimum urgency threshold (e.g., min_urgency=0.7) |
tags | string | Comma-separated tags, matches if signal has at least one (e.g., tags=deploy,prod) |
role | string | Filter by from_role or to_role (e.g., role=dev) |
Example with multiple filters:
curl -N "http://localhost:3000/signals/stream?\
token=YOUR_API_KEY&\
ring=ops,dev&\
type=alert,task&\
min_urgency=0.5&\
tags=deploy"
Ring-Scoped Access Control
The stream enforces ring membership server-side:
- If a signal targets a ring, you must be a member of that ring to receive it.
- If you specify a
ringfilter, only signals targeting those specific rings are delivered. - Broadcast signals (no
to_ring) are delivered only when no ring filter is set. - Admin identities can see signals for all rings.
Event Format
Each SSE event uses the signal type as the event name:
event: task
data: {"id":"signal:abc","signal_type":"task","content":"Review PR #42","from_role":"coordinator","to_ring":"dev","urgency":0.8,"tags":["code-review"],"created_at":"2026-03-19T12:00:00Z"}
id: signal:abc
event: alert
data: {"id":"signal:def","signal_type":"alert","content":"Build failed","from_role":"ci","to_ring":"ops","urgency":1.0,"tags":["ci"],"created_at":"2026-03-19T12:01:00Z"}
id: signal:def
: keepalive
The id field on each event is the signal's record ID, used for reconnection replay.
Reconnection
When a connection drops, use the Last-Event-ID header to resume from where you left off. The browser's built-in EventSource handles this automatically. For manual clients:
curl -N -H "Last-Event-ID: signal:abc123" \
"http://localhost:3000/signals/stream?token=YOUR_API_KEY"
The server maintains a replay buffer and will re-send any signals emitted between the last received event and the current time before switching to live delivery. During replay, new live events are buffered and drained after replay completes, so no signals are lost.
Keepalive
The server sends a : keepalive SSE comment every 30 seconds. This prevents proxies and load balancers from closing idle connections. Clients should treat these as no-ops.
Concurrent Limits
Each identity is limited to 5 concurrent SSE streams by default. This prevents resource exhaustion from leaked or forgotten connections.
- Configure via
SSE_MAX_STREAMS_PER_IDENTITYenvironment variable - Admin identities are exempt from the limit
- Exceeding the limit returns
429with codeSTREAM_LIMIT_EXCEEDED
SDK Usage
The SDK's client.signals.subscribe() wraps EventSource with an async iterator:
import { ChorusClient } from "@chorus-protocol/sdk";
const client = new ChorusClient({
url: "https://chorus.example.com",
apiKey: "cho_abc123",
});
const sub = client.signals.subscribe({
ring: ["dev", "ops"],
type: ["task", "alert"],
min_urgency: 0.5,
});
// Consume as async iterator
for await (const signal of sub) {
console.log(`[${signal.signal_type}] ${signal.content}`);
}
// Or use typed handlers
sub.on("task", (signal) => {
console.log("Task assigned:", signal.content);
});
// Clean up
sub.close();
In Node.js or Bun environments without a native EventSource, install a polyfill such as eventsource or extended-eventsource and assign it to globalThis.EventSource before calling subscribe().
Webhooks
Webhooks provide outbound push delivery -- the server makes HTTP POST requests to your endpoint when signals match your configured filters. This is ideal for integrating with services that cannot maintain long-lived SSE connections.
Creating a Webhook
curl -X POST http://localhost:3000/admin/webhooks \
-H "Authorization: Bearer YOUR_ADMIN_KEY" \
-H "Content-Type: application/json" \
-d '{
"url": "https://my-service.example.com/chorus-hook",
"filters": {
"types": ["alert", "task"],
"rings": ["ops"],
"min_urgency": 0.5
}
}'
The response includes a secret field -- this is the HMAC key for verifying signatures. Store it securely; it is only returned at creation time.
Webhook Payload
Each delivery is an HTTP POST with this JSON body:
{
"signal": {
"id": "signal:abc123",
"signal_type": "alert",
"content": "Production error rate above threshold",
"from_role": "monitoring",
"to_ring": "ops",
"urgency": 1.0,
"tags": ["production", "error"],
"created_at": "2026-03-19T12:00:00Z"
},
"webhook_id": "webhook:def456",
"timestamp": "2026-03-19T12:00:01Z"
}
Verifying Signatures
Every delivery includes an X-Chorus-Signature header with the HMAC-SHA256 signature of the raw request body. Verify it to ensure the payload was not tampered with.
import { createHmac } from "node:crypto";
function verifyWebhookSignature(
body: string,
signature: string,
secret: string,
): boolean {
const expected =
"sha256=" +
createHmac("sha256", secret).update(body).digest("hex");
return expected === signature;
}
// In your HTTP handler:
const body = await request.text();
const signature = request.headers.get("X-Chorus-Signature");
if (!verifyWebhookSignature(body, signature, WEBHOOK_SECRET)) {
return new Response("Invalid signature", { status: 401 });
}
const payload = JSON.parse(body);
Delivery Headers
| Header | Value |
|---|---|
Content-Type | application/json |
X-Chorus-Signature | sha256={64-char-hex} |
X-Chorus-Event | Signal type (e.g., task, alert) |
User-Agent | Chorus-Webhook/1.0 |
Retry Behavior
If your endpoint returns a non-2xx response or the request times out (10 seconds), Chorus retries with exponential backoff:
| Attempt | Delay |
|---|---|
| 1st retry | 5 seconds |
| 2nd retry | 30 seconds |
| 3rd retry | 5 minutes |
After 3 failed attempts, the delivery is moved to dead_letter status and the webhook is marked as failing. A retry reaper checks for retryable deliveries every 60 seconds.
Managing Deliveries
View delivery history for a webhook:
curl http://localhost:3000/admin/webhooks/webhook:abc123/deliveries \
-H "Authorization: Bearer YOUR_ADMIN_KEY"
Replay a dead-lettered delivery:
curl -X POST http://localhost:3000/admin/webhooks/deliveries/webhook_delivery:xyz/replay \
-H "Authorization: Bearer YOUR_ADMIN_KEY"
Pausing and Disabling
Temporarily pause a webhook without deleting it:
curl -X PATCH http://localhost:3000/admin/webhooks/webhook:abc123 \
-H "Authorization: Bearer YOUR_ADMIN_KEY" \
-H "Content-Type: application/json" \
-d '{"active": false}'
Set active: true to resume delivery.
Bootstrap Configuration
Webhooks can be pre-configured in the bootstrap YAML file:
webhooks:
- url: "https://hooks.slack.com/services/T00/B00/abc"
filters:
types: ["alert"]
min_urgency: 0.8
- url: "https://my-service.example.com/chorus-events"
filters:
rings: ["ops", "dev"]
types: ["task", "alert", "artifact"]
See the Bootstrap guide for the complete YAML format.
Building Adapters
SSE streaming and webhooks provide the building blocks for integrating Chorus with external platforms. Here are common patterns:
Discord
Use a webhook pointed at a Discord webhook URL (or a small adapter service that translates the Chorus payload to Discord's embed format):
webhooks:
- url: "https://your-adapter.example.com/discord"
filters:
types: ["alert", "artifact"]
rings: ["ops"]
Your adapter receives the Chorus payload and forwards it as a Discord embed.
Slack
For Slack incoming webhooks, build a lightweight adapter that maps signal fields to Slack's Block Kit format. Filter by ring to route signals to different Slack channels.
Matrix
Use the Matrix Client-Server API from an adapter service. Subscribe to the SSE stream and forward matching signals as room messages.
General Pattern
- SSE adapter -- a long-running process that consumes
client.signals.subscribe()and forwards to your target platform. Best for custom logic and bidirectional integrations. - Webhook adapter -- a stateless HTTP handler that receives Chorus POST requests and translates them. Best for simple one-way notification flows.