Skip to Content
デザインDaemon Acp HTTPACP-over-HTTP — 再開可能なセッションイベントストリーム (Last-Event-ID)

ACP-over-HTTP — 再開可能なセッションイベントストリーム (Last-Event-ID)

ステータス: 本PRで設計および実装。 README.md §7 / “Resume cursor (ring Last-Event-ID)” の行で追跡されていた再開性のギャップを解消します。

問題

/acp Streamable-HTTP セッションイベントストリーム(Acp-Session-Id ヘッダーを伴う GET /acp)はライブのみです。SSE の id: シーケンスを出力せず、再接続時に Last-Event-ID リクエストヘッダーも無視します。

コントロールプレーンプロキシがターン中に長時間稼働する SSE 接続をアイドルタイムアウトで切断すると(デーモン自体は retry: 3000 を送信しますが、イングレスプロキシは長時間の SSE を頻繁に切断します)、クライアントは再接続して所有権を再取得しますが、ギャップ中にデーモンが生成したすべてのコンテンツフレームは失われますagent_thought_chunk / agent_message_chunk を含む session/update 通知です。ターンは依然として終端状態に到達し(turn_complete が生成/合成される)、UI は空または切り詰められた本文で「完了」を表示します。同じプロンプトを再送信すると機能するため、これが決定的な証拠となります。損失はモデルではなくトランスポートのギャップで発生しています。

症状とフィールドの証拠は、統合ノートで §1.8 (sdk-known-issues.md) としてカタログ化されています。

既存の機能(およびこれが小規模な理由)

リプレイエンジンはすでに構築され、実戦で検証済みです。ギャップは /acp トランスポートがそれに接続されていないことだけです。

packages/acp-bridge/src/eventBus.ts:

  • セッションごとの単調増加 id。1 から始まり(nextIdpublish() で割り当て)。
  • セッションごとの制限付きリングバッファ(DEFAULT_RING_SIZE = 8000、オペレーターによるオーバーライドは qwen serve --event-ring-size)。
  • subscribeEvents(sessionId, { lastEventId, signal }) は、ライブイベントが流れる前に id > lastEventId のリングフレームをリプレイし、合成制御フレーム replay_completestate_resync_required(リングからの追放 / デーモン再起動時のエポックリセット)、client_evictedslow_client_warning を出力します。

REST 表面である GET /session/:id/events はすでにこれらすべてを消費しています。last-event-id を読み取り(server.tsparseLastEventId)、それを subscribeEvents に渡し、各フレームを SSE の id: 行でシリアライズします(formatSseFrame)。バグは、/acp トランスポートがこれらを一切行っていないことです。

REST /session/:id/events/acp GET (現在)
Last-Event-ID ヘッダーの読み取りyesno
lastEventIdsubscribeEvents に渡すyesno (dispatch.ts pumpSessionEvents)
SSE id: 行の出力yes (formatSseFrame)no (SseStream.senddata: のみを書き込む)

acp-http/sse-stream.ts にもコメントでそう書かれています。“リングバッファの id: シーケンシングなし — 再開性は RFD フェーズ 4 であり、延期された”。 本 PR はこの延期を解消します。

ワイヤーレベルの決定 — SSE id: 行(ペイロード内の _meta ではない)

2つの SSE 表面は異なるペイロードを運びます。

  • REST は BridgeEvent エンベロープ{ id, v, type, data, _meta })をストリーミングします。SDK パーサー(sdk-typescript/src/daemon/sse.ts)は JSON エンベロープの id フィールドからカーソルを抽出します(data: 行のみを読み取ります)。
  • /acp生の JSON-RPC 2.0 オブジェクトsession/update 通知、session/request_permission リクエスト、レスポンス)をストリーミングします。これらにはバースカーソルを運ぶエンベロープ id がなく、JSON-RPC の id は別の意味(リクエスト ID)を持ちます。

したがって、/acp の再開カーソルは標準的な SSE id:となります。

  • EventSource ネイティブです — 仕様に準拠した SSE クライアント(ベンダー提供の AcpHttpTransport を含む)は、最後の id: を自動追跡し、再接続時に Last-Event-ID ヘッダーとして自動送信します。
  • JSON-RPC ペイロードをクリーンに保ちます(プロトコルフレームへの非標準の _meta.qwen.eventId 注入なし)。
  • REST で formatSseFrame がすでに出力しているものを反映するため、両方の表面で同じ eventBus ID と同じ Last-Event-ID セマンティクスを共有します。

バス発信のフレームのみが id: を持ちます(session/updatesession/request_permission、デーモンプッシュの通知)。セッションストリームに乗る JSON-RPC のレスポンス/リプライはバスイベント_ではなく_、id: を持ちません — これらはリング内になく、意図的にリプレイ追跡されません(飛行中のプロンプト_レスポンス_の損失は別途追跡されている §1.7 の懸念であり、ここでは対象外です。§1.8 は失われた_コンテンツ_フレームに関するもので、これらはすべてバスの session/update イベントです)。

合成終端フレーム(client_evictedstream_error など)にはバス id がないため、id: 行を出力しません — REST と一致させ、クライアントが再開する単調シーケンスのスロットを消費しないようにします。

変更点

  1. transport-stream.tssend(message, id?: number)。オプションの id は SSE カーソル追跡用のバスイベント ID です。
  2. sse-stream.tssend(message, id?) は、id !== undefined の場合に data: 行の前に id: ${id}\n を付加します(formatSseFrame を反映)。
  3. ws-stream.tssend(message, id?)id を受け入れ無視します。WebSocket はステートフルな接続であり、SSE リプレイはありません(AcpWsTransport.supportsReplay = false と整合)。
  4. connection-registry.tssendSession(sessionId, frame, id?)idstream.send にスレッド化します。セッションごとのアタッチ前バッファ{ frame, id? } ペアを保存するため、バッファリングされたフレームはアタッチ時にフラッシュされてもカーソルを保持します(接続スコープのバッファは変更なし — これらのフレームはバス ID を持たない JSON-RPC レスポンスです)。
  5. dispatch.ts
    • translateEvent は、バスイベントのすべての sendSession / binding.stream.send 呼び出しを通じて event.id を渡します。
    • pumpSessionEvents(conn, sessionId, signal, lastEventId?)lastEventIdsubscribeEvents に転送し、既存のリングリプレイを直接再利用します。
  6. index.tsGET /acp セッションストリームブランチは Last-Event-ID ヘッダーを読み取り(厳密な parseLastEventId を介し、REST と同じく10進数の数字のみを受け入れるルール)、それを pumpSessionEvents に渡します。

eventBus/ブリッジの変更はありません — エンジンはそのまま再利用されます。

再開を実際に機能させる(セッションストリームの猶予/再取得)

上記の id:/Last-Event-ID の配管は必要ですが、十分ではありません — それだけでは実際のフローで決して発火しません。以前は、セッション SSE ストリームがトランスポートレベルで閉じると、GET ハンドラーは完全な closeSessionStream テアダウンを実行していました。セッションを ownedSessions から削除し、飛行中のプロンプトを中止し、ブリッジクライアントをデタッチしていました。実際の EventSource/プロキシの順序(古いソケットが_最初に_閉じ、その後クライアントが再接続する)では、Last-Event-ID を持つ再接続は、カーソルが読み取られる前に所有権チェックによって 403 で拒否されます — そしてコンテンツを生成していたプロンプトはすでに中止されています。リプレイエンジンは再接続する対象を失います。

そのため、トランスポートレベルのセッションストリームクローズは、テアダウンではなくデタッチを行うようになりました(AcpConnection.detachSessionStream)。ストリームとそのイベントサブスクリプションのみを停止し、バインディング、所有権、飛行中のプロンプト、およびブリッジクライアントの登録を猶予ウィンドウ(SESSION_GRACE_MSCONN_GRACE_MS を反映)の間存続させます。ウィンドウ内の再接続は再アタッチされ(attachSessionStream が猶予タイマーをクリア — 再取得)、リングリプレイがギャップを埋めます。再接続が到着しない場合、猶予タイマーが完全なテアダウンを実行し、暴走プロンプトのコストを制限します。明示的な session/close および接続テアダウン(destroy)の場合、完全なテアダウンは引き続き即時に実行されます。GET ハンドラーは stream.isClosed で分岐します。トランスポートクローズ → 猶予付きデタッチ。ストリームがまだ開いている間にポンプが終了する場合(サブプロセス完了 / イテレーターエラー) → 完全クローズ(ゾンビストリーム)。

これによって解除される2つのリプレイ正確性ガード

どちらも再開が実際に実行されるまで潜在しています。上記の猶予/再取得によってこれらが到達可能になるため、一緒に提供されます。

  • 二重配信なし、かつサイレントな損失なし(バッファ ↔ リング)。 バッファリングされたバスイベントは EventBus リングにも_存在します_(ID を取得するためにそこにパブリッシュされたため)。したがって、再開時(Last-Event-ID が存在する場合)、attachSessionStream はカーソルを渡され、ID を持つバッファフレームを一切フラッシュしません — クライアントのカーソルで開始されるリングリプレイが、カーソル以降のすべてのバスイベントの単一の配信パスとなります。これは意図的に「バッファをフラッシュし、その後リプレイカーソルをその先に進める」では_ありません_。すでに死んだソケットに送信されたがクライアントに受信されなかったフレームは、バッファの ID よりも_下_であり、クライアントのカーソルより_上_の ID を持つため、カーソルをバッファの先に進めるとサイレントにドロップされます。リングにすべてのバスイベントを所有させることで、ギャップなく各イベントを正確に1回配信します。ID_なし_フレーム(replySession 経由でルーティングされる JSON-RPC リプライ)はリングイベントではないため、リングはそれらを再配信しません — しかし、アタッチ時にもフラッシュしてはなりません。リプレイ前にフラッシュされたバッファリングされた session/prompt の_結果_は、それに先行するコンテンツチャンクよりも先に到着してしまいます(クライアントは本文の前に「完了」を見てしまう — §1.8 が修正するまさにその切り詰められた本文の失敗)。したがって、再開時、ID なしフレームは延期されます。バッファに残され、イベントポンプはリプレイが枯渇した後(replay_complete のみで、元のストリーム順序を保持して)それらをリリースします(flushBufferedSessionFrames)。重要なのは、state_resync_required では行わないことです。EventBus はリプレイフレーム_の前に_そのフレームを出力し(その後最後に replay_complete を出力する)、そこでフラッシュするとリプライがリプレイされたコンテンツより先に来てしまいます。ライブのみのケース(Last-Event-ID なし ⇒ リプレイなし ⇒ replay_complete なし)は、ポンプのポストループセーフティフラッシュでカバーされます(Last-Event-ID なしの新規接続にはリングアンカーがないため、以前と同様にバッファ全体を即座に順序通りにフラッシュします)。
  • リプレイ下での冪等な permission_request permission_request は ID を持つリングイベントであるため、まだ回答されていないパーミッションより前のカーソルを持つ再接続はそれをリプレイします。translateEvent は、2番目の ID + エントリを生成する代わりに、その bridgeRequestId に対して既存の conn.pending エントリを再利用するようになりました(キャッチアップのために同じ送信 JSON-RPC ID を再送信) — 孤立した pending はなく、_meta.requestId で重複排除するクライアントに対する二重プロンプトもありません。

parseLastEventId は、REST と /acp の両方の表面で使用される共有の serve/sse-last-event-id.ts に抽出されるため、それらの厳密な受け入れ/拒否ルールとオペレーターログが乖離することはありません。

後方互換性

  • Last-Event-ID を送信しない古いクライアントlastEventIdundefinedsubscribeEvents は今日とまったく同じようにライブで開始されます。
  • id: 行の追加は後方互換性のある SSE です — フィールドを無視するクライアントは影響を受けません。EventSource ベースのクライアントは無料で追跡を開始します。
  • ベンダー提供の SDK AcpHttpTransport は本 PR でリプレイをオプトインしますsupportsReplay = true を設定し、再接続時に Last-Event-ID を再送信するため、ギャップフレームはリングからリプレイされ、§1.8 のコンテンツ損失はデーモンの追加変更なしに解消されます(別の外部 agent-web トランスポートの切り替えは延期されたままです — 「対象外」を参照)。デーモンの変更は、引き続き supportsReplay = false を報告しヘッダーを省略するコンシューマーに対しては不活性のままです。
  • REST 表面は変更されていません。

テスト計画

  • sse-stream.test.tssend(msg, 7)data: の前に id: 7\n を出力します。send(msg)(ID なし)は id: 行を省略します。順序は id:data: → 空行です。
  • transport.test.ts/acp トランスポートを介したエンドツーエンド):
    • ライブの session/update フレームが id: 行付きで到着するようになりました。
    • Last-Event-ID: N を持つ GET /acp はカーソルを subscribeEvents に流します。ヘッダーなしの新しいストリームは今日と同様に動作します。
    • オーバーフローした Last-Event-ID(> MAX_SAFE_INTEGER) → ライブのみ。
    • 実際のクローズ後の再接続順序: 古い SSE を_最初に_クローズし、次に Last-Event-ID で再接続します — 403 ではなく 200(所有権が保持される)をアサーションし、プロンプトが中止されない(猶予/再取得)ことを確認します。
    • リプレイされた permission_request は pending エントリを再利用します(同じ送信 ID)。
  • connection-registry.test.ts — 非再開アタッチは各フレームの id をスレッド化してバッファ全体をフラッシュします。再開アタッチ(カーソル存在)は ID を持つフレームをスキップします(リングリプレイがそれらを所有)が、ID なし JSON-RPC リプライは引き続きフラッシュします。detachSessionStream は猶予ウィンドウをまたいで所有権/プロンプトを保持し、期限切れでテアダウンします。ウィンドウ内の再接続は再取得します(保留中のテアダウンをキャンセル)。
  • ws-stream.test.tssend(msg, id) は ID を無視します。WS ワイヤーフレームはベアの JSON であり、SSE id: フレーミングが漏れ込むことはありません。

対象外(引き続き延期)

  • WebSocket / HTTP/2 トランスポート。
  • §1.7 クロス接続パーミッション解決(プロンプトをストリーミングしたものとは異なる Acp-Connection-Id に POST された投票) — これは別の、セキュリティに敏感な懸念であり、独自のフォローアップとして追跡されています。本 PR はリプレイ下での permission_request 変換を冪等にしますが(上記)、セッショングローバルの requestId 解決は追加しません。また、すでに解決されたパーミッションに対するレスポンスリプレイの冪等性も追加しません。クライアントが投票すると pending エントリは消費されるため、(まだリングに残っている)permission_request をリプレイする後の再接続は、同じ _meta.requestId でプロンプトを再送信します。準拠したクライアントはその ID で重複排除し(リプレイパスがすでに依存している契約)、残りの孤立した pending エントリはテアダウン時に回収されます — エージェントがストールすることはありません — しかし、記録された投票を再送信するために解決された結果をセッションごとの制限付き LRU に記録すること(重複排除しないクライアントに対する完全な冪等性)は、投票パスに解決済みパーミッション状態を追加するため、同じパーミッション調整のフォローアップに属します。
  • セッションストリーム上の飛行中の_プロンプトレスポンス_の損失 — 回復されたコンテンツフレームはすべて eventBus リングを流れます。JSON-RPC レスポンスはリングイベントではありません。
  • 外部の agent-web AcpHttpTransport におけるコンシューマー側の supportsReplay 切り替え(別のリポジトリに存在し、本 PR によってブロック解除されます)。
  • エクスポートされた SDK トランスポートを介したパーミッション投票。 エクスポートされた AcpHttpTransport/AcpWsTransportsession/request_permissionpermission_request イベントとして表面化しますが、SDK の投票 API(respondToPermission / respondToSessionPermission)は、ACP デーモンにハンドラーが存在しない session/permission リクエストにマッピングされます — デーモンは、送信 _qwen_perm_N ID をエコーする JSON-RPC _レスポンス_としてのみパーミッション投票を受け入れます。投票のラウンドトリップの配線は §1.7 パーミッション調整のフォローアップの一部です。関連する側面:サブスクライバーなしセッションリプライポンプensureSessionReplyPump)は実際の GET /acp セッションストリームを開き、デーモンはこれをライブストリームとして扱います — そのため、リプライポンプのみがアタッチされている間に発生したエージェントの permission_request はそのストリームにルーティングされ、ポンプによってドロップされます(JSON-RPC レスポンスのみを転送するため)、メディエーターがハングします。一方、ストリームが全くない場合、デーモンはキャンセル拒否し、エージェントは続行します。デーモン側の「これは実際のコンシューマーか、それともリプライポンプか?」という区別と、SDK 側の処理(ローカルで拒否 / パーミッションコールバックに表面化)は、ポンプ自体が投票できないため、同じパーミッション調整のフォローアップに属します。パーミッション処理が必要なコンシューマーは、セッション RPC を発行する前に subscribeEvents を開く必要があります(文書化された契約)。これにより、デーモンに実際のコンシューマーストリームが提供されます。
  • エクスポートされた AcpHttpTransportsubscribeEvents ループ内から発行されるセッション RPC。 セッション /acp ストリームはシングルリーダーです。コンシューマーの非同期ジェネレーターが yield 間で駐車されている間、リーダーはドレインしません。コンシューマーが独自のイベント処理ループ内からセッションルーティングされた RPC(session/set_modelsession/prompt など)を await すると、sendRequest はバックグラウンドリプライポンプを抑制し(サブスクリプションが「アクティブ」であるため)、駐車されたジェネレーターはリプライを読み取りません — コンシューマーが次のイベントをプルするまで呼び出しがハングします。堅牢な修正は、セッションリーダーを常に JSON-RPC レスポンスをドレインし、イテレーターに対して DaemonEvent のみをキューイングするバックグラウンドポンプにすることです。これはオプトインの新しくエクスポートされたトランスポートに対する構造的な変更であり、デフォルトの REST トランスポートには影響しないため、集中したフォローアップとして延期されます。
  • SESSION_STREAM_REPLY_METHODSreplySession の乖離に対する自動ガード。 SDK の SESSION_STREAM_REPLY_METHODS セットは、dispatch.ts(別のパッケージ)にあるデーモンの replySession(...) 呼び出しサイトを反映する必要があります。ここに追加せずにそこにメソッドを追加すると、リプライポンプが開かず、サブスクライバーなしの sendRequest が中止されるまでハングします。どちらのパッケージの型システムもこれを強制しません。CI ガード(デーモンのセッションリプライメソッド名を抽出し、SDK セットと差分を取る軽量なスクリプトまたは vitest)が適切な修正ですが、パッケージ間の静的解析ツールはそれ自体が集中したタスクであり、単純な grep では済みません。正しいエクストラクターには軽量のデータフロー解析が必要だからです。session/prompt のリプライは case 'session/prompt' ブロック内で出力され_ない_ためです。プロンプトは非同期に開始され、その replySession(...) は後でプロンプト完了ハンドラー(別の呼び出しサイト)から発生するため、「どの case ブロックに replySession が含まれているか」という単純なスキャンは session/prompt を誤って_除外_し、正しいセットに対してビルドを失敗させます。当面の間、セットは小さく安定しており、定数の JSDoc に不変条件が文書化されています。堅牢な長期的な修正は、dispatch.ts をスクレイピングするのではなく、デーモンにセッションルーティングされたメソッド名(共有の信頼できるソース)をアドバタイズさせることです。
Last updated on