ACP-over-HTTP — 再開可能なセッションイベントストリーム (Last-Event-ID)
ステータス: 本PRで設計および実装。
README.md§7 / “Resume cursor (ringLast-Event-ID)” の行で追跡されていた再開性のギャップを解消します。
問題
/acp Streamable-HTTP セッションイベントストリーム(Acp-Session-Id ヘッダーを伴う GET /acp)はライブのみです。SSE の id: シーケンスを出力せず、再接続時に Last-Event-ID リクエストヘッダーも無視します。
コントロールプレーンプロキシがターン中に長時間稼働する SSE 接続をアイドルタイムアウトで切断すると(デーモン自体は retry: 3000 を送信しますが、イングレスプロキシは長時間の SSE を頻繁に切断します)、クライアントは再接続して所有権を再取得しますが、ギャップ中にデーモンが生成したすべてのコンテンツフレームは失われます — agent_thought_chunk / agent_message_chunk を含む session/update 通知です。ターンは依然として終端状態に到達し(turn_complete が生成/合成される)、UI は空または切り詰められた本文で「完了」を表示します。同じプロンプトを再送信すると機能するため、これが決定的な証拠となります。損失はモデルではなくトランスポートのギャップで発生しています。
症状とフィールドの証拠は、統合ノートで §1.8 (sdk-known-issues.md) としてカタログ化されています。
既存の機能(およびこれが小規模な理由)
リプレイエンジンはすでに構築され、実戦で検証済みです。ギャップは /acp トランスポートがそれに接続されていないことだけです。
packages/acp-bridge/src/eventBus.ts:
- セッションごとの単調増加
id。1 から始まり(nextId、publish()で割り当て)。 - セッションごとの制限付きリングバッファ(
DEFAULT_RING_SIZE = 8000、オペレーターによるオーバーライドはqwen serve --event-ring-size)。 subscribeEvents(sessionId, { lastEventId, signal })は、ライブイベントが流れる前にid > lastEventIdのリングフレームをリプレイし、合成制御フレームreplay_complete、state_resync_required(リングからの追放 / デーモン再起動時のエポックリセット)、client_evicted、slow_client_warningを出力します。
REST 表面である GET /session/:id/events はすでにこれらすべてを消費しています。last-event-id を読み取り(server.ts → parseLastEventId)、それを subscribeEvents に渡し、各フレームを SSE の id: 行でシリアライズします(formatSseFrame)。バグは、/acp トランスポートがこれらを一切行っていないことです。
| 層 | REST /session/:id/events | /acp GET (現在) |
|---|---|---|
Last-Event-ID ヘッダーの読み取り | yes | no |
lastEventId を subscribeEvents に渡す | yes | no (dispatch.ts pumpSessionEvents) |
SSE id: 行の出力 | yes (formatSseFrame) | no (SseStream.send は data: のみを書き込む) |
acp-http/sse-stream.ts にもコメントでそう書かれています。“リングバッファの id: シーケンシングなし — 再開性は RFD フェーズ 4 であり、延期された”。 本 PR はこの延期を解消します。
ワイヤーレベルの決定 — SSE id: 行(ペイロード内の _meta ではない)
2つの SSE 表面は異なるペイロードを運びます。
- REST は
BridgeEventエンベロープ({ id, v, type, data, _meta })をストリーミングします。SDK パーサー(sdk-typescript/src/daemon/sse.ts)は JSON エンベロープのidフィールドからカーソルを抽出します(data:行のみを読み取ります)。 /acpは生の JSON-RPC 2.0 オブジェクト(session/update通知、session/request_permissionリクエスト、レスポンス)をストリーミングします。これらにはバースカーソルを運ぶエンベロープidがなく、JSON-RPC のidは別の意味(リクエスト ID)を持ちます。
したがって、/acp の再開カーソルは標準的な SSE id: 行となります。
- EventSource ネイティブです — 仕様に準拠した SSE クライアント(ベンダー提供の
AcpHttpTransportを含む)は、最後のid:を自動追跡し、再接続時にLast-Event-IDヘッダーとして自動送信します。 - JSON-RPC ペイロードをクリーンに保ちます(プロトコルフレームへの非標準の
_meta.qwen.eventId注入なし)。 - REST で
formatSseFrameがすでに出力しているものを反映するため、両方の表面で同じeventBusID と同じLast-Event-IDセマンティクスを共有します。
バス発信のフレームのみが id: を持ちます(session/update、session/request_permission、デーモンプッシュの通知)。セッションストリームに乗る JSON-RPC のレスポンス/リプライはバスイベント_ではなく_、id: を持ちません — これらはリング内になく、意図的にリプレイ追跡されません(飛行中のプロンプト_レスポンス_の損失は別途追跡されている §1.7 の懸念であり、ここでは対象外です。§1.8 は失われた_コンテンツ_フレームに関するもので、これらはすべてバスの session/update イベントです)。
合成終端フレーム(client_evicted、stream_error など)にはバス id がないため、id: 行を出力しません — REST と一致させ、クライアントが再開する単調シーケンスのスロットを消費しないようにします。
変更点
transport-stream.ts—send(message, id?: number)。オプションのidは SSE カーソル追跡用のバスイベント ID です。sse-stream.ts—send(message, id?)は、id !== undefinedの場合にdata:行の前にid: ${id}\nを付加します(formatSseFrameを反映)。ws-stream.ts—send(message, id?)はidを受け入れ無視します。WebSocket はステートフルな接続であり、SSE リプレイはありません(AcpWsTransport.supportsReplay = falseと整合)。connection-registry.ts—sendSession(sessionId, frame, id?)はidをstream.sendにスレッド化します。セッションごとのアタッチ前バッファは{ frame, id? }ペアを保存するため、バッファリングされたフレームはアタッチ時にフラッシュされてもカーソルを保持します(接続スコープのバッファは変更なし — これらのフレームはバス ID を持たない JSON-RPC レスポンスです)。dispatch.tstranslateEventは、バスイベントのすべてのsendSession/binding.stream.send呼び出しを通じてevent.idを渡します。pumpSessionEvents(conn, sessionId, signal, lastEventId?)はlastEventIdをsubscribeEventsに転送し、既存のリングリプレイを直接再利用します。
index.ts—GET /acpセッションストリームブランチはLast-Event-IDヘッダーを読み取り(厳密なparseLastEventIdを介し、REST と同じく10進数の数字のみを受け入れるルール)、それをpumpSessionEventsに渡します。
eventBus/ブリッジの変更はありません — エンジンはそのまま再利用されます。
再開を実際に機能させる(セッションストリームの猶予/再取得)
上記の id:/Last-Event-ID の配管は必要ですが、十分ではありません — それだけでは実際のフローで決して発火しません。以前は、セッション SSE ストリームがトランスポートレベルで閉じると、GET ハンドラーは完全な closeSessionStream テアダウンを実行していました。セッションを ownedSessions から削除し、飛行中のプロンプトを中止し、ブリッジクライアントをデタッチしていました。実際の EventSource/プロキシの順序(古いソケットが_最初に_閉じ、その後クライアントが再接続する)では、Last-Event-ID を持つ再接続は、カーソルが読み取られる前に所有権チェックによって 403 で拒否されます — そしてコンテンツを生成していたプロンプトはすでに中止されています。リプレイエンジンは再接続する対象を失います。
そのため、トランスポートレベルのセッションストリームクローズは、テアダウンではなくデタッチを行うようになりました(AcpConnection.detachSessionStream)。ストリームとそのイベントサブスクリプションのみを停止し、バインディング、所有権、飛行中のプロンプト、およびブリッジクライアントの登録を猶予ウィンドウ(SESSION_GRACE_MS、CONN_GRACE_MS を反映)の間存続させます。ウィンドウ内の再接続は再アタッチされ(attachSessionStream が猶予タイマーをクリア — 再取得)、リングリプレイがギャップを埋めます。再接続が到着しない場合、猶予タイマーが完全なテアダウンを実行し、暴走プロンプトのコストを制限します。明示的な session/close および接続テアダウン(destroy)の場合、完全なテアダウンは引き続き即時に実行されます。GET ハンドラーは stream.isClosed で分岐します。トランスポートクローズ → 猶予付きデタッチ。ストリームがまだ開いている間にポンプが終了する場合(サブプロセス完了 / イテレーターエラー) → 完全クローズ(ゾンビストリーム)。
これによって解除される2つのリプレイ正確性ガード
どちらも再開が実際に実行されるまで潜在しています。上記の猶予/再取得によってこれらが到達可能になるため、一緒に提供されます。
- 二重配信なし、かつサイレントな損失なし(バッファ ↔ リング)。 バッファリングされたバスイベントは EventBus リングにも_存在します_(ID を取得するためにそこにパブリッシュされたため)。したがって、再開時(
Last-Event-IDが存在する場合)、attachSessionStreamはカーソルを渡され、ID を持つバッファフレームを一切フラッシュしません — クライアントのカーソルで開始されるリングリプレイが、カーソル以降のすべてのバスイベントの単一の配信パスとなります。これは意図的に「バッファをフラッシュし、その後リプレイカーソルをその先に進める」では_ありません_。すでに死んだソケットに送信されたがクライアントに受信されなかったフレームは、バッファの ID よりも_下_であり、クライアントのカーソルより_上_の ID を持つため、カーソルをバッファの先に進めるとサイレントにドロップされます。リングにすべてのバスイベントを所有させることで、ギャップなく各イベントを正確に1回配信します。ID_なし_フレーム(replySession経由でルーティングされる JSON-RPC リプライ)はリングイベントではないため、リングはそれらを再配信しません — しかし、アタッチ時にもフラッシュしてはなりません。リプレイ前にフラッシュされたバッファリングされたsession/promptの_結果_は、それに先行するコンテンツチャンクよりも先に到着してしまいます(クライアントは本文の前に「完了」を見てしまう — §1.8 が修正するまさにその切り詰められた本文の失敗)。したがって、再開時、ID なしフレームは延期されます。バッファに残され、イベントポンプはリプレイが枯渇した後(replay_completeのみで、元のストリーム順序を保持して)それらをリリースします(flushBufferedSessionFrames)。重要なのは、state_resync_requiredでは行わないことです。EventBus はリプレイフレーム_の前に_そのフレームを出力し(その後最後にreplay_completeを出力する)、そこでフラッシュするとリプライがリプレイされたコンテンツより先に来てしまいます。ライブのみのケース(Last-Event-IDなし ⇒ リプレイなし ⇒replay_completeなし)は、ポンプのポストループセーフティフラッシュでカバーされます(Last-Event-IDなしの新規接続にはリングアンカーがないため、以前と同様にバッファ全体を即座に順序通りにフラッシュします)。 - リプレイ下での冪等な
permission_request。permission_requestは ID を持つリングイベントであるため、まだ回答されていないパーミッションより前のカーソルを持つ再接続はそれをリプレイします。translateEventは、2番目の ID + エントリを生成する代わりに、そのbridgeRequestIdに対して既存のconn.pendingエントリを再利用するようになりました(キャッチアップのために同じ送信 JSON-RPC ID を再送信) — 孤立した pending はなく、_meta.requestIdで重複排除するクライアントに対する二重プロンプトもありません。
parseLastEventId は、REST と /acp の両方の表面で使用される共有の serve/sse-last-event-id.ts に抽出されるため、それらの厳密な受け入れ/拒否ルールとオペレーターログが乖離することはありません。
後方互換性
Last-Event-IDを送信しない古いクライアント →lastEventIdはundefined→subscribeEventsは今日とまったく同じようにライブで開始されます。id:行の追加は後方互換性のある SSE です — フィールドを無視するクライアントは影響を受けません。EventSource ベースのクライアントは無料で追跡を開始します。- ベンダー提供の SDK
AcpHttpTransportは本 PR でリプレイをオプトインします —supportsReplay = trueを設定し、再接続時にLast-Event-IDを再送信するため、ギャップフレームはリングからリプレイされ、§1.8 のコンテンツ損失はデーモンの追加変更なしに解消されます(別の外部agent-webトランスポートの切り替えは延期されたままです — 「対象外」を参照)。デーモンの変更は、引き続きsupportsReplay = falseを報告しヘッダーを省略するコンシューマーに対しては不活性のままです。 - REST 表面は変更されていません。
テスト計画
sse-stream.test.ts—send(msg, 7)はdata:の前にid: 7\nを出力します。send(msg)(ID なし)はid:行を省略します。順序はid:→data:→ 空行です。transport.test.ts(/acpトランスポートを介したエンドツーエンド):- ライブの
session/updateフレームがid:行付きで到着するようになりました。 Last-Event-ID: Nを持つGET /acpはカーソルをsubscribeEventsに流します。ヘッダーなしの新しいストリームは今日と同様に動作します。- オーバーフローした
Last-Event-ID(>MAX_SAFE_INTEGER) → ライブのみ。 - 実際のクローズ後の再接続順序: 古い SSE を_最初に_クローズし、次に
Last-Event-IDで再接続します — 403 ではなく 200(所有権が保持される)をアサーションし、プロンプトが中止されない(猶予/再取得)ことを確認します。 - リプレイされた
permission_requestは pending エントリを再利用します(同じ送信 ID)。
- ライブの
connection-registry.test.ts— 非再開アタッチは各フレームのidをスレッド化してバッファ全体をフラッシュします。再開アタッチ(カーソル存在)は ID を持つフレームをスキップします(リングリプレイがそれらを所有)が、ID なし JSON-RPC リプライは引き続きフラッシュします。detachSessionStreamは猶予ウィンドウをまたいで所有権/プロンプトを保持し、期限切れでテアダウンします。ウィンドウ内の再接続は再取得します(保留中のテアダウンをキャンセル)。ws-stream.test.ts—send(msg, id)は ID を無視します。WS ワイヤーフレームはベアの JSON であり、SSEid:フレーミングが漏れ込むことはありません。
対象外(引き続き延期)
- WebSocket / HTTP/2 トランスポート。
- §1.7 クロス接続パーミッション解決(プロンプトをストリーミングしたものとは異なる
Acp-Connection-Idに POST された投票) — これは別の、セキュリティに敏感な懸念であり、独自のフォローアップとして追跡されています。本 PR はリプレイ下でのpermission_request変換を冪等にしますが(上記)、セッショングローバルの requestId 解決は追加しません。また、すでに解決されたパーミッションに対するレスポンスリプレイの冪等性も追加しません。クライアントが投票すると pending エントリは消費されるため、(まだリングに残っている)permission_requestをリプレイする後の再接続は、同じ_meta.requestIdでプロンプトを再送信します。準拠したクライアントはその ID で重複排除し(リプレイパスがすでに依存している契約)、残りの孤立した pending エントリはテアダウン時に回収されます — エージェントがストールすることはありません — しかし、記録された投票を再送信するために解決された結果をセッションごとの制限付き LRU に記録すること(重複排除しないクライアントに対する完全な冪等性)は、投票パスに解決済みパーミッション状態を追加するため、同じパーミッション調整のフォローアップに属します。 - セッションストリーム上の飛行中の_プロンプトレスポンス_の損失 — 回復されたコンテンツフレームはすべて
eventBusリングを流れます。JSON-RPC レスポンスはリングイベントではありません。 - 外部の
agent-webAcpHttpTransportにおけるコンシューマー側のsupportsReplay切り替え(別のリポジトリに存在し、本 PR によってブロック解除されます)。 - エクスポートされた SDK トランスポートを介したパーミッション投票。 エクスポートされた
AcpHttpTransport/AcpWsTransportはsession/request_permissionをpermission_requestイベントとして表面化しますが、SDK の投票 API(respondToPermission/respondToSessionPermission)は、ACP デーモンにハンドラーが存在しないsession/permissionリクエストにマッピングされます — デーモンは、送信_qwen_perm_NID をエコーする JSON-RPC _レスポンス_としてのみパーミッション投票を受け入れます。投票のラウンドトリップの配線は §1.7 パーミッション調整のフォローアップの一部です。関連する側面:サブスクライバーなしセッションリプライポンプ(ensureSessionReplyPump)は実際のGET /acpセッションストリームを開き、デーモンはこれをライブストリームとして扱います — そのため、リプライポンプのみがアタッチされている間に発生したエージェントのpermission_requestはそのストリームにルーティングされ、ポンプによってドロップされます(JSON-RPC レスポンスのみを転送するため)、メディエーターがハングします。一方、ストリームが全くない場合、デーモンはキャンセル拒否し、エージェントは続行します。デーモン側の「これは実際のコンシューマーか、それともリプライポンプか?」という区別と、SDK 側の処理(ローカルで拒否 / パーミッションコールバックに表面化)は、ポンプ自体が投票できないため、同じパーミッション調整のフォローアップに属します。パーミッション処理が必要なコンシューマーは、セッション RPC を発行する前にsubscribeEventsを開く必要があります(文書化された契約)。これにより、デーモンに実際のコンシューマーストリームが提供されます。 - エクスポートされた
AcpHttpTransportのsubscribeEventsループ内から発行されるセッション RPC。 セッション/acpストリームはシングルリーダーです。コンシューマーの非同期ジェネレーターがyield間で駐車されている間、リーダーはドレインしません。コンシューマーが独自のイベント処理ループ内からセッションルーティングされた RPC(session/set_model、session/promptなど)をawaitすると、sendRequestはバックグラウンドリプライポンプを抑制し(サブスクリプションが「アクティブ」であるため)、駐車されたジェネレーターはリプライを読み取りません — コンシューマーが次のイベントをプルするまで呼び出しがハングします。堅牢な修正は、セッションリーダーを常に JSON-RPC レスポンスをドレインし、イテレーターに対してDaemonEventのみをキューイングするバックグラウンドポンプにすることです。これはオプトインの新しくエクスポートされたトランスポートに対する構造的な変更であり、デフォルトの REST トランスポートには影響しないため、集中したフォローアップとして延期されます。 SESSION_STREAM_REPLY_METHODS⇄replySessionの乖離に対する自動ガード。 SDK のSESSION_STREAM_REPLY_METHODSセットは、dispatch.ts(別のパッケージ)にあるデーモンのreplySession(...)呼び出しサイトを反映する必要があります。ここに追加せずにそこにメソッドを追加すると、リプライポンプが開かず、サブスクライバーなしのsendRequestが中止されるまでハングします。どちらのパッケージの型システムもこれを強制しません。CI ガード(デーモンのセッションリプライメソッド名を抽出し、SDK セットと差分を取る軽量なスクリプトまたは vitest)が適切な修正ですが、パッケージ間の静的解析ツールはそれ自体が集中したタスクであり、単純な grep では済みません。正しいエクストラクターには軽量のデータフロー解析が必要だからです。session/promptのリプライはcase 'session/prompt'ブロック内で出力され_ない_ためです。プロンプトは非同期に開始され、そのreplySession(...)は後でプロンプト完了ハンドラー(別の呼び出しサイト)から発生するため、「どのcaseブロックにreplySessionが含まれているか」という単純なスキャンはsession/promptを誤って_除外_し、正しいセットに対してビルドを失敗させます。当面の間、セットは小さく安定しており、定数の JSDoc に不変条件が文書化されています。堅牢な長期的な修正は、dispatch.tsをスクレイピングするのではなく、デーモンにセッションルーティングされたメソッド名(共有の信頼できるソース)をアドバタイズさせることです。