SSE Event Bus & Backpressure
Overview
EventBus (packages/acp-bridge/src/eventBus.ts) is the per-session in-memory pub/sub that feeds the daemon’s GET /session/:id/events SSE route. It assigns each event a monotonic id, buffers recent events in a bounded ring for Last-Event-ID replay, fans published events out to all subscribers, applies per-subscriber backpressure (warning at 75% queue fill, eviction at the cap), and emits two synthetic terminal frames (client_evicted, slow_client_warning) that the SDK treats as first-class events but the bus marks without an id so they do not consume a slot in the per-session sequence.
EventBus is currently package-private to acp-bridge and consumed by the bridge factory through one closed-over instance per session. A future refactor (called out at line 150–159 of eventBus.ts) will lift it to a top-level building block so channels, dual-output, and future WebSocket transports can subscribe through the same bus instead of running parallel streams.
Responsibilities
- Assign per-session monotonic event ids starting at 1.
- Buffer the last
ringSizeevents for replay on subscribe-with-lastEventId. - Fan published events out to ≤
maxSubscribersconcurrent subscribers. - Apply per-subscriber bounded queues; drop overflowing subscribers with a synthetic
client_evictedterminal frame. - Emit
slow_client_warningonce per overflow episode at 75% queue fill, with 37.5% hysteresis to prevent repeated warnings. - Tear subscriptions down promptly on
AbortSignal.abort(). - Cleanly close every subscriber on bus close (e.g. session teardown).
- Never throw from
publish(the contract is “publish is always safe to call”).
Architecture
| Constant | Value | Purpose |
|---|---|---|
EVENT_SCHEMA_VERSION | 1 | Stamped on every BridgeEvent.v; bumped on breaking frame changes. |
DEFAULT_RING_SIZE | 8000 | Per-session replay ring. Operator override via --event-ring-size. |
DEFAULT_MAX_QUEUED | 256 | Per-subscriber backlog cap. |
DEFAULT_MAX_SUBSCRIBERS | 64 | Per-session subscriber cap. |
WARN_THRESHOLD_RATIO | 0.75 | slow_client_warning trigger fraction of maxQueued. |
WARN_RESET_RATIO | 0.375 | Hysteresis re-arm fraction. |
MAX_EVENT_RING_SIZE (in bridge.ts) | 1_000_000 | Soft upper bound on BridgeOptions.eventRingSize to catch out-of-memory failures caused by typos. |
BridgeEvent
interface BridgeEvent {
id?: number; // monotonic per session; absent on synthetic terminal frames
v: 1; // EVENT_SCHEMA_VERSION
type: string; // one of the 43 known types or future-extensible
data: unknown; // payload (typed per-type by the SDK; see 09-event-schema.md)
originatorClientId?: string; // set when the event derives from a clientId-stamped request
}SubscribeOptions
interface SubscribeOptions {
lastEventId?: number; // replay from after this id (Last-Event-ID resume)
signal?: AbortSignal; // aborts the subscription promptly
maxQueued?: number; // per-subscriber backlog cap; default 256
}subscribe() returns an AsyncIterable<BridgeEvent>. The SSE route consumes it with for await. Registration is synchronous — by the time subscribe() returns, the subscriber is already attached, so a publish() that races with the consumer’s first next() is still delivered.
BoundedAsyncQueue
The per-subscriber queue. Two pivotal behaviors:
- Live cap is on live items only. Items inserted via
forcePush()carry aforced: truetag per entry and never count towardmaxSize. This lets theLast-Event-IDreplay path force-push hundreds of historical frames into a fresh subscriber without immediately tripping the live cap and evicting the just-resumed subscriber. liveCountis maintained as a field, not derived fromforcedInBufposition. The earlier position-based heuristic broke whenslow_client_warningstarted force-pushing mid-stream (warnings go to the BACK of the queue, not the front like replays). Per-entryforcedtags are position-independent.
push(value) returns false (instead of blocking or throwing) when the live backlog is at the cap — the bus uses that signal to evict the subscriber. forcePush(value) bypasses the cap. close({drain?: boolean}) drains pending items by default; abort-path passes drain: false to drop them immediately.
Workflow
Publish
publish never throws. Closing the bus mid-publish (the shutdown path closes per-session buses before awaiting channel.kill()) returns undefined rather than throwing because the agent may still emit sessionUpdate notifications in the small window between bus close and channel kill.
Subscribe + replay (with ring-eviction detection)
If subs.size >= maxSubscribers at subscribe time, SubscriberLimitExceededError is thrown — the SSE route catches it and serializes a stream_error synthetic frame to the rejected client so they do not see a silent empty stream. Returning an empty iterable instead would leave operators without visibility into “some clients get events, some do not” under load.
Ring-eviction → state_resync_required (the recovery flow)
When a consumer reconnects with Last-Event-ID: N and the ring’s earliest surviving event has id > N + 1, the events in [N+1, earliestInRing-1] were evicted before the consumer reconnected. The naïve replay would silently succeed with a non-contiguous suffix, the SDK reducer would keep applying deltas as if the stream were contiguous, and its state would diverge from the daemon’s truth — with no terminal signal.
Implemented in EventBus.subscribe():
- First check
opts.lastEventId >= this.nextId. If true, the client cursor is from an older bus epoch (daemon restart / EventBus reconstruction), so the bus emitsreason: 'epoch_reset'and replays the whole current ring. - Otherwise compute
earliestInRing = this.ring[0]?.id. - If
earliestInRing > opts.lastEventId + 1, force-push a synthetic frame before the replay frames:{ "v": 1, "type": "state_resync_required", "data": { "reason": "ring_evicted", "lastDeliveredId": <opts.lastEventId>, "earliestAvailableId": <earliestInRing> } } - Continue the normal replay loop afterwards.
Critical contracts (and what the #4360 review corrected):
- No
id— same no-slot pattern asclient_evicted, so it does not occupy a slot in the per-session monotonic sequence other subscribers observe. - Stream stays open — unlike
client_evicted(genuinely terminal),state_resync_requiredis recovery-oriented. Replay + live frames continue flowing afterward. - Reducer auto-skips deltas — the SDK side flips
awaitingResync = trueand applies onlystate_resync_required, the terminal frames, and full-state snapshots until consumer code callsloadSessionand clears the flag. See09-event-schema.mdforRESYNC_PASSTHROUGH_TYPES. - Network-friendly — frames stay on the wire so the SDK can compute a “what you missed” diff later if it wants to. No extra reconnect cycle is required.
Eviction terminal flow
When a subscriber’s live backlog has been at maxQueued and the next push() returns false:
- Mark
sub.evicted = true. - Construct
client_evictedframe withoutid—{ v: 1, type: 'client_evicted', data: { reason: 'queue_overflow', droppedAfter: <last delivered id> } }. queue.forcePush(evictionFrame)so the consumer iterator sees one terminal frame.queue.close()so iteration unwinds after the terminal frame.- Call
sub.dispose()— removes fromsubsand detaches theAbortSignallistener; without this cleanup, stalled consumers’ closures remain live untilAbortSignalgarbage collection.
Abort flow
AbortSignal.abort() → onAbort():
queue.close({drain: false})— drop buffered items so the SSE route does not keep serializing events to a socket nobody is listening to.dispose()— idempotent through adisposedflag.
Already-aborted signals at subscribe time call onAbort() synchronously before returning the iterator.
State & Lifecycle
nextIdstarts at 1 and only ever increments.lastEventIdgetter returnsnextId - 1.ringis bounded; eviction-by-shift is O(n) once full. AtringSize=8000that measures in low milliseconds on high-volume sessions — well below per-frame latency budget. A circular-buffer refactor is deferred until profiling flags it or operators increase--event-ring-sizeby an order of magnitude.close()flipsclosed, closes every subscriber’s queue, and clearssubs. Subsequentpublish()/subscribe()are no-ops (publishreturns undefined;subscribereturnsemptyAsyncIterable).- Each session owns one
EventBus. Bus close happens beforechannel.kill()so in-flight publishes during shutdown return undefined rather than throwing.
Dependencies
- Consumed by
packages/acp-bridge/src/bridge.ts(BridgeClient.sessionUpdate/BridgeClient.extNotification→events.publish(...)). - Consumed by
packages/cli/src/serve/server.ts(SSE route handler →events.subscribe(...)then formatsBridgeEventto SSE wire frames). - Re-export shim:
packages/cli/src/serve/event-bus.ts→@qwen-code/acp-bridge/eventBus. - SDK consumer:
packages/sdk-typescript/src/daemon/sse.ts(parseSseStream), thenasKnownDaemonEvent(see09-event-schema.md,13-sdk-daemon-client.md).
Configuration
--event-ring-size <n>— per-session ring depth; soft-capped atMAX_EVENT_RING_SIZE = 1_000_000.- Subscriber
?maxQueued=Nquery parameter onGET /session/:id/events, range[16, 2048]. SDK clients pre-flightcaps.features.slow_client_warningbefore opting in. BridgeOptions.eventRingSize(overrides daemon default for embedded usage).- Capability tags:
session_events,slow_client_warning,typed_event_schema.
Caveats & Known Limits
- Synthetic frames have no
id. SDK consumers usingLast-Event-IDresume only record frames with ids;slow_client_warning,client_evicted,state_resync_required, andreplay_completedo not advance the cursor and do not consume per-session sequence numbers. If two id-bearing live frames have a real gap, handle it through the ring-eviction / epoch-reset resync path rather than treating it as a private synthetic frame. client_evictedis per-subscriber, not per-session. The same client can reconnect.BoundedAsyncQueueiterator is not safe for concurrent drivers — two simultaneous.next()calls would race for the same event. Daemon usage is sequential (for await ... ofin the SSE route handler), so this is safe in production.- The bus is currently package-private; channels and the web UI must subscribe through the daemon’s HTTP SSE route, not by reaching into the bus directly. Stage 1.5 will lift this.
References
packages/acp-bridge/src/eventBus.ts(entire file)packages/acp-bridge/src/bridge.ts(publish sites, esp.BridgeClient.sessionUpdateand the F3 permission events)packages/cli/src/serve/server.ts(SSE route handler — formatsBridgeEventto wire SSE)packages/sdk-typescript/src/daemon/sse.ts(SSE wire parser on the client side)- Wire reference:
../qwen-serve-protocol.md(theLast-Event-IDreconnect contract).