SSE Event Bus とバックプレッシャー
概要
EventBus(packages/acp-bridge/src/eventBus.ts)は、セッションごとのインメモリ pub/sub であり、デーモンの GET /session/:id/events SSE ルートにイベントを供給します。各イベントに単調増加 id を割り当て、Last-Event-ID リプレイ用に最近のイベントを有界リングバッファに保持し、すべてのサブスクライバーへ公開されたイベントをファンアウトし、サブスクライバーごとのバックプレッシャー(キュー充填率 75% で警告、上限到達で排除)を適用し、SDK がファーストクラスのイベントとして扱う 2 つの合成ターミナルフレーム(client_evicted、slow_client_warning)を発行します。ただし、これらのフレームはバスによって id なし としてマークされるため、セッションごとのシーケンスのスロットを消費しません。
EventBus は現在 acp-bridge パッケージプライベートであり、セッションごとに 1 つのクローズドインスタンスを通じてブリッジファクトリーから利用されます。将来のリファクタリング(eventBus.ts の 150〜159 行目に記載)では、チャネル、デュアル出力、将来の WebSocket トランスポートが並列ストリームを実行する代わりに同じバスを通じてサブスクライブできるよう、トップレベルのビルディングブロックへ昇格される予定です。
責務
- セッションごとの単調増加イベント id を 1 から割り当てる。
subscribe-with-lastEventId時のリプレイ用に、最新ringSize件のイベントをバッファリングする。- 公開されたイベントを最大
maxSubscribersの同時サブスクライバーへファンアウトする。 - サブスクライバーごとの有界キューを適用し、オーバーフローしたサブスクライバーを合成
client_evictedターミナルフレームでドロップする。 - キュー充填率 75% でオーバーフローエピソードごとに 1 回
slow_client_warningを発行し、繰り返しの警告を防ぐため 37.5% のヒステリシスを設ける。 AbortSignal.abort()発生時にサブスクリプションを速やかに解除する。- バスクローズ時(例:セッション終了)にすべてのサブスクライバーをクリーンに閉じる。
publishから例外を投げない(「publish は常に安全に呼び出せる」というコントラクト)。
アーキテクチャ
| 定数 | 値 | 目的 |
|---|---|---|
EVENT_SCHEMA_VERSION | 1 | すべての BridgeEvent.v にスタンプされる。フレームの破壊的変更時にインクリメントされる。 |
DEFAULT_RING_SIZE | 8000 | セッションごとのリプレイリング。--event-ring-size でオペレーターが上書き可能。 |
DEFAULT_MAX_QUEUED | 256 | サブスクライバーごとのバックログ上限。 |
DEFAULT_MAX_SUBSCRIBERS | 64 | セッションごとのサブスクライバー上限。 |
WARN_THRESHOLD_RATIO | 0.75 | slow_client_warning をトリガーする maxQueued に対する割合。 |
WARN_RESET_RATIO | 0.375 | ヒステリシスの再アーム割合。 |
MAX_EVENT_RING_SIZE(bridge.ts 内) | 1_000_000 | タイポによるメモリ不足を防ぐための BridgeOptions.eventRingSize のソフト上限。 |
BridgeEvent
interface BridgeEvent {
id?: number; // monotonic per session; absent on synthetic terminal frames
v: 1; // EVENT_SCHEMA_VERSION
type: string; // one of the 43 known types or future-extensible
data: unknown; // payload (typed per-type by the SDK; see 09-event-schema.md)
originatorClientId?: string; // set when the event derives from a clientId-stamped request
}SubscribeOptions
interface SubscribeOptions {
lastEventId?: number; // replay from after this id (Last-Event-ID resume)
signal?: AbortSignal; // aborts the subscription promptly
maxQueued?: number; // per-subscriber backlog cap; default 256
}subscribe() は AsyncIterable<BridgeEvent> を返します。SSE ルートは for await でこれを消費します。登録は 同期的 です — subscribe() が返った時点でサブスクライバーはすでにアタッチされているため、コンシューマーの最初の next() と競合する publish() もデリバリーされます。
BoundedAsyncQueue
サブスクライバーごとのキューです。重要な動作が 2 つあります。
- ライブ上限はライブアイテムのみに適用される。
forcePush()で挿入されたアイテムはエントリーごとにforced: trueタグを持ち、maxSizeにカウントされません。これにより、Last-Event-IDリプレイパスが何百もの過去フレームを新しいサブスクライバーへ強制プッシュしても、直ちにライブ上限を超えて再開直後のサブスクライバーが排除されることを防ぎます。 liveCountはフィールドとして管理される(forcedInBufポジションからの導出ではない)。以前のポジションベースのヒューリスティックは、slow_client_warningがストリーム途中で強制プッシュを開始したときに壊れました(警告はリプレイのようにキューの先頭ではなく、末尾に追加されます)。エントリーごとのforcedタグはポジション非依存です。
push(value) はライブバックログが上限に達した場合に(ブロックや例外ではなく)false を返します — バスはこのシグナルを受けてサブスクライバーを排除します。forcePush(value) は上限をバイパスします。close({drain?: boolean}) はデフォルトで保留中のアイテムをドレインします。アボートパスは drain: false を渡して即座にドロップします。
ワークフロー
Publish
publish は例外を投げません。バスのシャットダウン中(シャットダウンパスは channel.kill() を await する前にセッションごとのバスをクローズします)に publish を呼び出しても、例外ではなく undefined を返します。これは、バスクローズとチャネルキルの間の短い窓で、エージェントがまだ sessionUpdate 通知を発行する可能性があるためです。
Subscribe + リプレイ(リングエビクション検出あり)
サブスクライブ時に subs.size >= maxSubscribers の場合、SubscriberLimitExceededError がスローされます — SSE ルートはこれをキャッチし、拒否されたクライアントに stream_error 合成フレームをシリアライズして返すため、クライアントが無言の空ストリームを受け取ることはありません。代わりに空のイテラブルを返すと、負荷下で「一部のクライアントはイベントを受け取れるが、他は受け取れない」状況をオペレーターが把握できなくなります。
リングエビクション → state_resync_required(リカバリーフロー)
コンシューマーが Last-Event-ID: N で再接続したとき、リングの最古のサバイバルイベントが id > N + 1 を持つ場合、[N+1, earliestInRing-1] の範囲のイベントはコンシューマーの再接続前にエビクトされています。単純なリプレイは非連続なサフィックスでサイレントに成功し、SDK リデューサーはストリームが連続しているかのようにデルタを適用し続け、その状態はデーモンの実態と乖離します — ターミナルシグナルも発生しません。
EventBus.subscribe() での実装:
- まず
opts.lastEventId >= this.nextIdを確認します。真の場合、クライアントカーソルは古いバスエポック(デーモン再起動 / EventBus の再構築)のものであるため、バスはreason: 'epoch_reset'を発行し、現在のリング全体をリプレイします。 - それ以外の場合は
earliestInRing = this.ring[0]?.idを計算します。 earliestInRing > opts.lastEventId + 1の場合、リプレイフレームの前に合成フレームを強制プッシュします:{ "v": 1, "type": "state_resync_required", "data": { "reason": "ring_evicted", "lastDeliveredId": <opts.lastEventId>, "earliestAvailableId": <earliestInRing> } }- 通常のリプレイループを続行します。
重要なコントラクト(#4360 レビューで修正された点):
idなし —client_evictedと同じスロットなしパターンにより、他のサブスクライバーが観察するセッションごとの単調シーケンスのスロットを占有しません。- ストリームはオープンのまま —
client_evicted(真のターミナル)とは異なり、state_resync_requiredはリカバリー志向です。リプレイとライブフレームはその後も流れ続けます。 - リデューサーは自動的にデルタをスキップする — SDK 側は
awaitingResync = trueにフリップし、コンシューマーコードがloadSessionを呼び出してフラグをクリアするまで、state_resync_required、ターミナルフレーム、フルステートスナップショットのみを適用します。RESYNC_PASSTHROUGH_TYPESについては09-event-schema.mdを参照してください。 - ネットワーク効率的 — フレームはワイヤー上に残るため、SDK は後で「何を見逃したか」の差分を計算できます。追加の再接続サイクルは不要です。
エビクションターミナルフロー
サブスクライバーのライブバックログが maxQueued にある状態で次の push() が false を返した場合:
sub.evicted = trueにマークする。client_evictedフレームをidなし で構築する —{ v: 1, type: 'client_evicted', data: { reason: 'queue_overflow', droppedAfter: <last delivered id> } }。queue.forcePush(evictionFrame)でコンシューマーイテレーターに 1 つのターミナルフレームを届ける。queue.close()でターミナルフレームの後にイテレーションが終了するようにする。sub.dispose()を呼び出す —subsから削除し、AbortSignalリスナーをデタッチする。このクリーンアップなしでは、停止したコンシューマーのクロージャーがAbortSignalのガベージコレクションまで生き続けます。
アボートフロー
AbortSignal.abort() → onAbort():
queue.close({drain: false})— バッファされたアイテムをドロップし、SSE ルートが誰も聴いていないソケットへのイベントシリアライズを継続しないようにする。dispose()—disposedフラグによって冪等。
サブスクライブ時にすでにアボート済みのシグナルは、イテレーターを返す前に onAbort() を同期的に呼び出します。
状態とライフサイクル
nextIdは 1 から始まり、インクリメントのみ行われます。lastEventIdゲッターはnextId - 1を返します。ringは有界です。満杯になるとシフトによるエビクションは O(n) です。ringSize=8000の場合、高負荷セッションでも数ミリ秒程度で、フレームごとのレイテンシバジェットを大きく下回ります。循環バッファへのリファクタリングは、プロファイリングでフラグが立つか、オペレーターが--event-ring-sizeを桁違いに増加させるまで延期されます。close()はclosedをフリップし、すべてのサブスクライバーのキューを閉じ、subsをクリアします。以降のpublish()/subscribe()はノーオペレーションになります(publishは undefined を返し、subscribeはemptyAsyncIterableを返します)。- 各セッションは 1 つの
EventBusを所有します。バスクローズはchannel.kill()の前に発生するため、シャットダウン中のインフライト publish は例外ではなく undefined を返します。
依存関係
packages/acp-bridge/src/bridge.tsで消費(BridgeClient.sessionUpdate/BridgeClient.extNotification→events.publish(...))。packages/cli/src/serve/server.tsで消費(SSE ルートハンドラー →events.subscribe(...)でBridgeEventを SSE ワイヤーフレームにフォーマット)。- 再エクスポートシム:
packages/cli/src/serve/event-bus.ts→@qwen-code/acp-bridge/eventBus。 - SDK コンシューマー:
packages/sdk-typescript/src/daemon/sse.ts(parseSseStream)、その後asKnownDaemonEvent(09-event-schema.md、13-sdk-daemon-client.md参照)。
設定
--event-ring-size <n>— セッションごとのリング深度。MAX_EVENT_RING_SIZE = 1_000_000でソフトキャップ。GET /session/:id/eventsの?maxQueued=Nクエリパラメーター。範囲は[16, 2048]。SDK クライアントはオプトイン前にcaps.features.slow_client_warningを事前確認します。BridgeOptions.eventRingSize(組み込み利用時にデーモンデフォルトを上書き)。- ケイパビリティタグ:
session_events、slow_client_warning、typed_event_schema。
注意事項と既知の制限
- 合成フレームには
idがない。Last-Event-IDレジュームを使用する SDK コンシューマーは id 付きフレームのみを記録します。slow_client_warning、client_evicted、state_resync_required、replay_completeはカーソルを進めず、セッションごとのシーケンス番号を消費しません。id 付きのライブフレーム間に実際のギャップがある場合は、プライベート合成フレームとして扱うのではなく、リングエビクション / エポックリセットの再同期パスを通じて処理してください。 client_evictedはサブスクライバーごとであり、セッションごとではありません。同じクライアントは再接続できます。BoundedAsyncQueueイテレーターは並行ドライバーには安全ではありません — 2 つの同時.next()呼び出しは同じイベントを巡って競合します。デーモンの使用は順次的(SSE ルートハンドラーのfor await ... of)なので、本番環境では安全です。- バスは現在パッケージプライベートです。チャネルと Web UI は、バスに直接アクセスするのではなく、デーモンの HTTP SSE ルートを通じてサブスクライブする必要があります。Stage 1.5 でこれが解消される予定です。
参照
packages/acp-bridge/src/eventBus.ts(ファイル全体)packages/acp-bridge/src/bridge.ts(publish サイト、特にBridgeClient.sessionUpdateと F3 パーミッションイベント)packages/cli/src/serve/server.ts(SSE ルートハンドラー —BridgeEventをワイヤー SSE にフォーマット)packages/sdk-typescript/src/daemon/sse.ts(クライアント側の SSE ワイヤーパーサー)- ワイヤーリファレンス:
../qwen-serve-protocol.md(Last-Event-ID再接続コントラクト)