Skip to Content
ДизайнDaemon Acp HTTPACP-over-HTTP — возобновляемый поток событий сессии (Last-Event-ID)

ACP-over-HTTP — возобновляемый поток событий сессии (Last-Event-ID)

Статус: дизайн + реализация в этом PR. Закрывает пробел с возобновляемостью, отслеживаемый как RFD Phase 4 в README.md §7 / строка “Resume cursor (ring Last-Event-ID)”.

Проблема

Поток событий сессии Streamable-HTTP для /acp (GET /acp с заголовком Acp-Session-Id) работает только в реальном времени (live-only): он не генерирует последовательность SSE id: и не учитывает заголовок запроса Last-Event-ID при переподключении.

Когда прокси плоскости управления (control-plane) закрывает долгоживущее SSE-соединение из-за простоя в середине хода (сама демон отправляет retry: 3000, а входные прокси часто обрывают длинные SSE), клиент переподключается и возвращает себе права владельца, но теряется каждый кадр контента, созданный демоном во время разрыва — уведомления session/update, содержащие agent_thought_chunk / agent_message_chunk. Ход всё равно достигает терминального состояния (генерируется / синтезируется turn_complete), поэтому UI показывает “done” с пустым или обрезанным телом. Повторная отправка того же промпта работает, что и выдает проблему: потеря происходит на транспортном уровне, а не в модели.

Симптомы и полевые доказательства каталогизированы в заметках по интеграции как §1.8 (sdk-known-issues.md).

Что уже есть (и почему это небольшое изменение)

Движок повторного воспроизведения (replay engine) уже создан и проверен в боях — пробел заключается лишь в том, что транспорт /acp не подключен к нему.

packages/acp-bridge/src/eventBus.ts:

  • Монотонный id для каждой сессии, начинающийся с 1 (nextId, присваивается в publish()).
  • Ограниченный кольцевой буфер для каждой сессии (DEFAULT_RING_SIZE = 8000, переопределение оператором через qwen serve --event-ring-size).
  • subscribeEvents(sessionId, { lastEventId, signal }) воспроизводит кадры из кольцевого буфера с id > lastEventId до того, как начнут поступать live-события, и генерирует синтетические управляющие кадры replay_complete, state_resync_required (вытеснение из кольца / сброс эпохи при перезапуске демона), client_evicted, slow_client_warning.

REST-поверхность GET /session/:id/events уже использует всё это: она читает last-event-id (server.tsparseLastEventId), передает его в subscribeEvents и сериализует каждый кадр с помощью строки SSE id: (formatSseFrame). Проблема в том, что транспорт /acp ничего из этого не делает:

СлойREST /session/:id/events/acp GET (сейчас)
читает заголовок Last-Event-IDданет
передает lastEventId в subscribeEventsданет (dispatch.ts pumpSessionEvents)
генерирует строку SSE id:да (formatSseFrame)нет (SseStream.send пишет только data:)

В acp-http/sse-stream.ts об этом даже сказано в комментарии: “no ring-buffer id: sequencing — resumability is RFD Phase 4, deferred.” Данный PR устраняет эту отсрочку.

Решение по протоколу — строка SSE id: (а не _meta внутри payload)

Две SSE-поверхности переносят разные payload:

  • REST потоки переносят конверты BridgeEvent ({ id, v, type, data, _meta }). Парсер SDK (sdk-typescript/src/daemon/sse.ts) извлекает курсор из поля id JSON-конверта (он читает только строки data:).
  • /acp потоки переносят сырые объекты JSON-RPC 2.0 (уведомления session/update, запросы session/request_permission, ответы). У них нет id конверта для передачи курсора шины, а id в JSON-RPC означает другое (id запроса).

Поэтому для /acp курсором возобновления является стандартная строка SSE id::

  • Она нативна для EventSource — SSE-клиент, соответствующий спецификации (включая вендорный AcpHttpTransport), автоматически отслеживает последний id: и автоматически отправляет его обратно в заголовке Last-Event-ID при переподключении.
  • Она сохраняет payload JSON-RPC чистым (без внедрения нестандартного _meta.qwen.eventId в кадры протокола).
  • Она зеркально отражает то, что formatSseFrame уже генерирует в REST, поэтому обе поверхности используют одинаковые id eventBus и одинаковую семантику Last-Event-ID.

Только кадры, созданные шиной, несут id: (session/update, session/request_permission, уведомления, отправляемые демоном). Ответы/реплики JSON-RPC, которые передаются в потоке сессии, не являются событиями шины и не несут id: — их нет в кольцевом буфере, и их отслеживание при воспроизведении намеренно не реализовано (потерянный ответ на промпт в процессе выполнения — это отдельная проблема §1.7, выходящая за рамки данного текста; §1.8 касается потерянных кадров контента, которые все являются событиями session/update шины).

Синтетические терминальные кадры (client_evicted, stream_error, …) не имеют id шины и поэтому не генерируют строку id: — в соответствии с REST, чтобы они не занимали слот в монотонной последовательности, с которой клиент возобновляет работу.

Изменения

  1. transport-stream.tssend(message, id?: number). Опциональный id — это id события шины для отслеживания курсора SSE.
  2. sse-stream.tssend(message, id?) добавляет id: ${id}\n перед строкой data:, если id !== undefined (зеркально formatSseFrame).
  3. ws-stream.tssend(message, id?) принимает и игнорирует id: WebSocket — это stateful-соединение, без SSE-воспроизведения (согласуется с AcpWsTransport.supportsReplay = false).
  4. connection-registry.tssendSession(sessionId, frame, id?) передает id в stream.send. Буфер до подключения для каждой сессии хранит пары { frame, id? }, чтобы буферизованный кадр сохранял свой курсор при сбросе (flush) во время подключения. (Буфер на уровне соединения не изменен — эти кадры являются ответами JSON-RPC без id шины.)
  5. dispatch.ts
    • translateEvent передает event.id через каждый вызов sendSession / binding.stream.send для событий шины.
    • pumpSessionEvents(conn, sessionId, signal, lastEventId?) перенаправляет lastEventId в subscribeEvents — напрямую переиспользуя существующее воспроизведение кольцевого буфера.
  6. index.ts — ветка session-stream для GET /acp читает заголовок Last-Event-ID (через строгий parseLastEventId, с тем же правилом принятия только десятичных цифр, что и в REST) и передает его в pumpSessionEvents.

Никаких изменений в eventBus/bridge — движок переиспользуется без изменений.

Заставляем возобновление реально работать (grace/reclaim для session-stream)

Механизм id:/Last-Event-ID, описанный выше, необходим, но недостаточен — сам по себе он никогда не срабатывает в реальном потоке. Ранее, когда SSE-поток сессии закрывался на транспортном уровне, обработчик GET запускал полный снос closeSessionStream: он удалял сессию из ownedSessions, прерывал выполняющийся промпт и отключал клиент bridge. В реальном порядке событий EventSource/прокси (старый сокет закрывается первым, затем клиент переподключается) это означает, что переподключение с Last-Event-ID отклоняется с кодом 403 при проверке прав владельца еще до того, как курсор будет прочитан — а промпт, генерирующий контент, уже прерван. Движку воспроизведения было бы не к чему переподключаться.

Поэтому закрытие session-stream на транспортном уровне теперь выполняет отключение (detach) вместо полного сноса (AcpConnection.detachSessionStream): оно останавливает только поток + его подписку на события и сохраняет привязку, права владельца, выполняющийся промпт и регистрацию клиента bridge активными в течение льготного окна (SESSION_GRACE_MS, зеркально CONN_GRACE_MS). Переподключение в пределах этого окна повторно подключает поток (attachSessionStream сбрасывает таймер льготного периода — reclaim), и воспроизведение кольцевого буфера заполняет пробел. Если переподключения не происходит, таймер льготного периода запускает полный снос — ограничивая стоимость “сбежавшего” промпта. Полный снос остается немедленным для явного session/close и для сноса соединения (destroy). Обработчик GET делает ветвление на основе stream.isClosed: закрытие транспорта → detach с льготным периодом; завершение pump, пока поток еще открыт (завершение подпроцесса / ошибка итератора) → полное закрытие (zombie stream).

Две проверки корректности воспроизведения, которые это разблокирует

Обе скрыты до тех пор, пока возобновление реально не запустится; grace/reclaim выше делает их достижимыми, поэтому они поставляются вместе:

  • Никакой двойной доставки И никаких скрытых потерь (буфер ↔ кольцо). Буферизованное событие шины также находится в кольце EventBus (оно было опубликовано там, чтобы получить свой id). Поэтому при возобновлении (при наличии Last-Event-ID) attachSessionStream получает курсор и вообще не сбрасывает (flush) буферизованные кадры с id — воспроизведение кольца (начатое с курсора клиента) является единственным путем доставки для каждого события шины после курсора. Это намеренно не схема “сбросить буфер, затем продвинуть курсор воспроизведения за его пределы”: кадр, отправленный в уже мертвый сокет, но так и не полученный клиентом, имеет id ниже id буфера, но выше курсора клиента, поэтому продвижение курсора за пределы буфера скрыто отбросит его. Передача кольцу всех событий шины доставляет каждое ровно один раз без пробелов. Кадры без id (ответы JSON-RPC, маршрутизируемые через replySession) не являются событиями кольца, поэтому кольцо не будет доставлять их повторно — но их также нельзя сбрасывать при подключении: буферизованный результат session/prompt, сброшенный до воспроизведения, придет раньше фрагментов контента, которые ему предшествовали (клиент видит “done” до тела — это именно та ошибка усеченного тела, которую исправляет §1.8). Поэтому при возобновлении кадры без id откладываются: они остаются в буфере, и pump событий освобождает их (flushBufferedSessionFrames), как только воспроизведение опустеет — только по событию replay_complete, сохраняя исходный порядок потока. Что критично, НЕ по событию state_resync_required: EventBus генерирует этот кадр до кадров воспроизведения (затем все равно генерирует replay_complete в конце), поэтому сброс по нему поместит ответ перед воспроизведенным контентом. Случай только live (нет Last-Event-ID ⇒ нет воспроизведения ⇒ нет replay_complete) покрывается безопасным сбросом pump после цикла. (Новое подключение без Last-Event-ID не имеет якоря кольца, поэтому оно сбрасывает весь буфер сразу, по порядку, как и раньше.)
  • Идемпотентный permission_request при воспроизведении. permission_request — это событие кольца с id, поэтому переподключение, курсор которого предшествует еще не отвеченному разрешению, воспроизводит его. translateEvent теперь переиспользует существующую запись conn.pending для этого bridgeRequestId (повторно отправляя тот же исходящий id JSON-RPC для синхронизации) вместо создания второго id + записи — никаких осиротевших pending, никаких двойных промптов для клиента, который дедуплицирует по _meta.requestId.

parseLastEventId вынесен в общий serve/sse-last-event-id.ts, используемый обеими поверхностями REST и /acp, чтобы их строгие правила принятия/отклонения и логирование оператора не могли разойтись.

Обратная совместимость

  • Старые клиенты, не отправляющие Last-Event-IDlastEventId равен undefinedsubscribeEvents запускается в live-режиме, ровно как сегодня.
  • Добавление строк id: — это обратно совместимое SSE — клиент, игнорирующий это поле, не затрагивается; клиент на базе EventSource начинает отслеживать его бесплатно.
  • Вендорный SDK AcpHttpTransport подключает воспроизведение в этом PR — он устанавливает supportsReplay = true и повторно отправляет Last-Event-ID при переподключении, поэтому кадры из пробела воспроизводятся из кольца, и потеря контента из §1.8 устраняется без необходимости дальнейших изменений в демоне. (Отдельный переключатель внешнего транспорта agent-web остается отложенным — см. “За рамками”.) Изменение в демоне остается инертным для любого потребителя, который по-прежнему сообщает supportsReplay = false и опускает заголовок.
  • Поверхность REST не затронута.

План тестирования

  • sse-stream.test.tssend(msg, 7) генерирует id: 7\n перед data:; send(msg) (без id) пропускает строку id:; порядок id:data: → пустая строка.
  • transport.test.ts (end-to-end через транспорт /acp):
    • live-кадры session/update теперь приходят со строкой id:;
    • GET /acp с Last-Event-ID: N передает курсор в subscribeEvents; новый поток без заголовка ведет себя как сегодня;
    • переполнение Last-Event-ID (> MAX_SAFE_INTEGER) → только live;
    • реальный порядок закрытия и переподключения: закрыть старый SSE первым, затем переподключиться с Last-Event-ID — утверждать 200, а не 403 (права владельца сохранены) и промпт не прерывается (grace/reclaim);
    • воспроизведенный permission_request переиспользует запись pending (тот же исходящий id).
  • connection-registry.test.ts — подключение без возобновления сбрасывает весь буфер, передавая id каждого кадра; подключение с возобновлением (при наличии курсора) пропускает кадры с id (ими владеет воспроизведение кольца), но все равно сбрасывает ответы JSON-RPC без id; detachSessionStream сохраняет права владельца/промпт в течение льготного окна, а затем выполняет снос по истечении срока; переподключение в пределах окна выполняет reclaim (отменяет ожидающий снос).
  • ws-stream.test.tssend(msg, id) игнорирует id: сетевой кадр WS — это чистый JSON, никакого SSE id: фрейминга не просачивается.

За рамками (все еще отложено)

  • Транспорты WebSocket / HTTP/2.
  • Разрешение разрешений между соединениями из §1.7 (голосование, отправленное через POST на другой Acp-Connection-Id, отличный от того, который транслировал промпт) — отдельная, чувствительная к безопасности проблема, отслеживаемая как собственный follow-up. Данный PR делает перевод permission_request идемпотентным при воспроизведении (см. выше), но не добавляет глобальное для сессии разрешение requestId. Он также не добавляет идемпотентность воспроизведения ответов для УЖЕ РАЗРЕШЕННОГО разрешения: как только клиент проголосовал, запись pending потребляется, поэтому последующее переподключение, которое воспроизводит (все еще находящийся в кольце) permission_request, повторно отправляет промпт с тем же _meta.requestId. Соответствующий спецификации клиент дедуплицирует по этому id (контракт, на который уже опирается путь воспроизведения), а остаточная осиротевшая запись pending убирается при снесе — агент никогда не зависает — но запись разрешенных результатов в ограниченный LRU для каждой сессии для повторной отправки записанного голоса (полная идемпотентность для клиентов без дедупликации) относится к этому же follow-up по координации разрешений, поскольку добавляет состояние разрешенного разрешения в путь голосования.
  • Потерянный ответ на промпт в процессе выполнения в потоке сессии — восстановленные кадры контента все проходят через кольцо eventBus; ответ JSON-RPC не является событием кольца.
  • Переключатель supportsReplay на стороне потребителя во внешнем AcpHttpTransport для agent-web (находится в другом репозитории; разблокируется данным PR).
  • Голосование за разрешения через экспортируемые транспорты SDK. Экспортируемые AcpHttpTransport/AcpWsTransport представляют session/request_permission как событие permission_request, но API голосования SDK (respondToPermission / respondToSessionPermission) маппятся на запрос session/permission, для которого у демона ACP нет обработчика — он принимает голосование за разрешение только как ответ JSON-RPC, повторяющий исходящий id _qwen_perm_N. Проведение цикла голосования является частью follow-up по координации разрешений из §1.7. Связанный аспект: pump ответов сессии без подписчиков (ensureSessionReplyPump) открывает реальный поток сессии GET /acp, который демон рассматривает как live-поток — поэтому permission_request агента, поднятый, когда подключен только pump ответов, МАРШРУТИЗИРУЕТСЯ в этот поток и отбрасывается pump (он пересылает только ответы JSON-RPC), подвешивая посредника, тогда как при полном отсутствии потока демон отменяет и отклоняет (cancel-denies), и агент продолжает работу. Как демоновское различие “это реальный потребитель или просто pump ответов?”, так и обработка на стороне SDK (отклонять локально / выводить в callback разрешения) относятся к тому же follow-up по координации разрешений, поскольку сам pump не может проголосовать. Потребителям, которым нужна обработка разрешений, следует открывать subscribeEvents перед выдачей RPC сессии (задокументированный контракт), что дает демону реальный поток потребителя.
  • RPC сессии, вызываемые изнутри цикла subscribeEvents в экспортируемом AcpHttpTransport. Поток сессии /acp имеет одного читателя: пока асинхронный генератор потребителя припаркован между yield, читатель не опустошает очередь. Если потребитель делает await для RPC, маршрутизируемого через сессию (session/set_model, session/prompt, …) изнутри своего собственного цикла обработки событий, sendRequest подавляет фоновый pump ответов (подписка “активна”), но припаркованный генератор никогда не читает ответ — вызов зависает, пока потребитель не извлечет следующее событие. Надежное исправление — сделать читатель сессии фоновым pump, который всегда опустошает ответы JSON-RPC и ставит в очередь только DaemonEvent для итератора; отложено как целенаправленный follow-up, поскольку это структурное изменение для opt-in, недавно экспортированного транспорта и не влияет на транспорт REST по умолчанию.
  • Автоматическая защита от расхождений SESSION_STREAM_REPLY_METHODSreplySession. Набор SESSION_STREAM_REPLY_METHODS SDK должен зеркально отражать места вызова replySession(...) демона в dispatch.ts (другой пакет); метод, добавленный там без добавления сюда, не открывает pump ответов, и sendRequest без подписчика для него зависает до прерывания. Система типов ни одного из пакетов это не обеспечивает. Защита в CI (легковесный скрипт или vitest, который извлекает имена методов ответов сессии демона и сравнивает их с набором SDK) — это правильное решение, но кросс-пакетный инструмент статического анализа — это своя собственная целенаправленная задача, и не тривиальный grep: правильному экстрактору нужен легкий анализ потока данных, потому что ответ session/prompt генерируется НЕ внутри его блока case 'session/prompt'. Промпт запускается асинхронно, и его replySession(...) срабатывает позже из обработчика завершения промпта (другое место вызова), поэтому наивное сканирование “какие блоки case содержат replySession” ошибочно ИСКЛЮЧИТ session/prompt и провалит сборку против правильного набора. Пока набор невелик и стабилен, а JSDoc у константы документирует инвариант; надежное долгосрочное решение — заставить демона анонсировать имена своих маршрутизируемых через сессию методов (единый источник истины), а не парсить dispatch.ts.
Last updated on