ACP-over-HTTP — возобновляемый поток событий сессии (Last-Event-ID)
Статус: дизайн + реализация в этом PR. Закрывает пробел с возобновляемостью, отслеживаемый как RFD Phase 4 в
README.md§7 / строка “Resume cursor (ringLast-Event-ID)”.
Проблема
Поток событий сессии Streamable-HTTP для /acp (GET /acp с заголовком Acp-Session-Id) работает только в реальном времени (live-only): он не генерирует последовательность SSE id: и не учитывает заголовок запроса Last-Event-ID при переподключении.
Когда прокси плоскости управления (control-plane) закрывает долгоживущее SSE-соединение из-за простоя в середине хода (сама демон отправляет retry: 3000, а входные прокси часто обрывают длинные SSE), клиент переподключается и возвращает себе права владельца, но теряется каждый кадр контента, созданный демоном во время разрыва — уведомления session/update, содержащие agent_thought_chunk / agent_message_chunk. Ход всё равно достигает терминального состояния (генерируется / синтезируется turn_complete), поэтому UI показывает “done” с пустым или обрезанным телом. Повторная отправка того же промпта работает, что и выдает проблему: потеря происходит на транспортном уровне, а не в модели.
Симптомы и полевые доказательства каталогизированы в заметках по интеграции как §1.8 (sdk-known-issues.md).
Что уже есть (и почему это небольшое изменение)
Движок повторного воспроизведения (replay engine) уже создан и проверен в боях — пробел заключается лишь в том, что транспорт /acp не подключен к нему.
packages/acp-bridge/src/eventBus.ts:
- Монотонный
idдля каждой сессии, начинающийся с 1 (nextId, присваивается вpublish()). - Ограниченный кольцевой буфер для каждой сессии (
DEFAULT_RING_SIZE = 8000, переопределение оператором черезqwen serve --event-ring-size). subscribeEvents(sessionId, { lastEventId, signal })воспроизводит кадры из кольцевого буфера сid > lastEventIdдо того, как начнут поступать live-события, и генерирует синтетические управляющие кадрыreplay_complete,state_resync_required(вытеснение из кольца / сброс эпохи при перезапуске демона),client_evicted,slow_client_warning.
REST-поверхность GET /session/:id/events уже использует всё это: она читает last-event-id (server.ts → parseLastEventId), передает его в subscribeEvents и сериализует каждый кадр с помощью строки SSE id: (formatSseFrame). Проблема в том, что транспорт /acp ничего из этого не делает:
| Слой | REST /session/:id/events | /acp GET (сейчас) |
|---|---|---|
читает заголовок Last-Event-ID | да | нет |
передает lastEventId в subscribeEvents | да | нет (dispatch.ts pumpSessionEvents) |
генерирует строку SSE id: | да (formatSseFrame) | нет (SseStream.send пишет только data:) |
В acp-http/sse-stream.ts об этом даже сказано в комментарии: “no ring-buffer id: sequencing — resumability is RFD Phase 4, deferred.” Данный PR устраняет эту отсрочку.
Решение по протоколу — строка SSE id: (а не _meta внутри payload)
Две SSE-поверхности переносят разные payload:
- REST потоки переносят конверты
BridgeEvent({ id, v, type, data, _meta }). Парсер SDK (sdk-typescript/src/daemon/sse.ts) извлекает курсор из поляidJSON-конверта (он читает только строкиdata:). /acpпотоки переносят сырые объекты JSON-RPC 2.0 (уведомленияsession/update, запросыsession/request_permission, ответы). У них нетidконверта для передачи курсора шины, аidв JSON-RPC означает другое (id запроса).
Поэтому для /acp курсором возобновления является стандартная строка SSE id::
- Она нативна для EventSource — SSE-клиент, соответствующий спецификации (включая вендорный
AcpHttpTransport), автоматически отслеживает последнийid:и автоматически отправляет его обратно в заголовкеLast-Event-IDпри переподключении. - Она сохраняет payload JSON-RPC чистым (без внедрения нестандартного
_meta.qwen.eventIdв кадры протокола). - Она зеркально отражает то, что
formatSseFrameуже генерирует в REST, поэтому обе поверхности используют одинаковые ideventBusи одинаковую семантикуLast-Event-ID.
Только кадры, созданные шиной, несут id: (session/update, session/request_permission, уведомления, отправляемые демоном). Ответы/реплики JSON-RPC, которые передаются в потоке сессии, не являются событиями шины и не несут id: — их нет в кольцевом буфере, и их отслеживание при воспроизведении намеренно не реализовано (потерянный ответ на промпт в процессе выполнения — это отдельная проблема §1.7, выходящая за рамки данного текста; §1.8 касается потерянных кадров контента, которые все являются событиями session/update шины).
Синтетические терминальные кадры (client_evicted, stream_error, …) не имеют id шины и поэтому не генерируют строку id: — в соответствии с REST, чтобы они не занимали слот в монотонной последовательности, с которой клиент возобновляет работу.
Изменения
transport-stream.ts—send(message, id?: number). Опциональныйid— это id события шины для отслеживания курсора SSE.sse-stream.ts—send(message, id?)добавляетid: ${id}\nперед строкойdata:, еслиid !== undefined(зеркальноformatSseFrame).ws-stream.ts—send(message, id?)принимает и игнорируетid: WebSocket — это stateful-соединение, без SSE-воспроизведения (согласуется сAcpWsTransport.supportsReplay = false).connection-registry.ts—sendSession(sessionId, frame, id?)передаетidвstream.send. Буфер до подключения для каждой сессии хранит пары{ frame, id? }, чтобы буферизованный кадр сохранял свой курсор при сбросе (flush) во время подключения. (Буфер на уровне соединения не изменен — эти кадры являются ответами JSON-RPC без id шины.)dispatch.tstranslateEventпередаетevent.idчерез каждый вызовsendSession/binding.stream.sendдля событий шины.pumpSessionEvents(conn, sessionId, signal, lastEventId?)перенаправляетlastEventIdвsubscribeEvents— напрямую переиспользуя существующее воспроизведение кольцевого буфера.
index.ts— ветка session-stream дляGET /acpчитает заголовокLast-Event-ID(через строгийparseLastEventId, с тем же правилом принятия только десятичных цифр, что и в REST) и передает его вpumpSessionEvents.
Никаких изменений в eventBus/bridge — движок переиспользуется без изменений.
Заставляем возобновление реально работать (grace/reclaim для session-stream)
Механизм id:/Last-Event-ID, описанный выше, необходим, но недостаточен — сам по себе он никогда не срабатывает в реальном потоке. Ранее, когда SSE-поток сессии закрывался на транспортном уровне, обработчик GET запускал полный снос closeSessionStream: он удалял сессию из ownedSessions, прерывал выполняющийся промпт и отключал клиент bridge. В реальном порядке событий EventSource/прокси (старый сокет закрывается первым, затем клиент переподключается) это означает, что переподключение с Last-Event-ID отклоняется с кодом 403 при проверке прав владельца еще до того, как курсор будет прочитан — а промпт, генерирующий контент, уже прерван. Движку воспроизведения было бы не к чему переподключаться.
Поэтому закрытие session-stream на транспортном уровне теперь выполняет отключение (detach) вместо полного сноса (AcpConnection.detachSessionStream): оно останавливает только поток + его подписку на события и сохраняет привязку, права владельца, выполняющийся промпт и регистрацию клиента bridge активными в течение льготного окна (SESSION_GRACE_MS, зеркально CONN_GRACE_MS). Переподключение в пределах этого окна повторно подключает поток (attachSessionStream сбрасывает таймер льготного периода — reclaim), и воспроизведение кольцевого буфера заполняет пробел. Если переподключения не происходит, таймер льготного периода запускает полный снос — ограничивая стоимость “сбежавшего” промпта. Полный снос остается немедленным для явного session/close и для сноса соединения (destroy). Обработчик GET делает ветвление на основе stream.isClosed: закрытие транспорта → detach с льготным периодом; завершение pump, пока поток еще открыт (завершение подпроцесса / ошибка итератора) → полное закрытие (zombie stream).
Две проверки корректности воспроизведения, которые это разблокирует
Обе скрыты до тех пор, пока возобновление реально не запустится; grace/reclaim выше делает их достижимыми, поэтому они поставляются вместе:
- Никакой двойной доставки И никаких скрытых потерь (буфер ↔ кольцо). Буферизованное событие шины также находится в кольце EventBus (оно было опубликовано там, чтобы получить свой id). Поэтому при возобновлении (при наличии
Last-Event-ID)attachSessionStreamполучает курсор и вообще не сбрасывает (flush) буферизованные кадры с id — воспроизведение кольца (начатое с курсора клиента) является единственным путем доставки для каждого события шины после курсора. Это намеренно не схема “сбросить буфер, затем продвинуть курсор воспроизведения за его пределы”: кадр, отправленный в уже мертвый сокет, но так и не полученный клиентом, имеет id ниже id буфера, но выше курсора клиента, поэтому продвижение курсора за пределы буфера скрыто отбросит его. Передача кольцу всех событий шины доставляет каждое ровно один раз без пробелов. Кадры без id (ответы JSON-RPC, маршрутизируемые черезreplySession) не являются событиями кольца, поэтому кольцо не будет доставлять их повторно — но их также нельзя сбрасывать при подключении: буферизованный результатsession/prompt, сброшенный до воспроизведения, придет раньше фрагментов контента, которые ему предшествовали (клиент видит “done” до тела — это именно та ошибка усеченного тела, которую исправляет §1.8). Поэтому при возобновлении кадры без id откладываются: они остаются в буфере, и pump событий освобождает их (flushBufferedSessionFrames), как только воспроизведение опустеет — только по событиюreplay_complete, сохраняя исходный порядок потока. Что критично, НЕ по событиюstate_resync_required: EventBus генерирует этот кадр до кадров воспроизведения (затем все равно генерируетreplay_completeв конце), поэтому сброс по нему поместит ответ перед воспроизведенным контентом. Случай только live (нетLast-Event-ID⇒ нет воспроизведения ⇒ нетreplay_complete) покрывается безопасным сбросом pump после цикла. (Новое подключение безLast-Event-IDне имеет якоря кольца, поэтому оно сбрасывает весь буфер сразу, по порядку, как и раньше.) - Идемпотентный
permission_requestпри воспроизведении.permission_request— это событие кольца с id, поэтому переподключение, курсор которого предшествует еще не отвеченному разрешению, воспроизводит его.translateEventтеперь переиспользует существующую записьconn.pendingдля этогоbridgeRequestId(повторно отправляя тот же исходящий id JSON-RPC для синхронизации) вместо создания второго id + записи — никаких осиротевших pending, никаких двойных промптов для клиента, который дедуплицирует по_meta.requestId.
parseLastEventId вынесен в общий serve/sse-last-event-id.ts, используемый обеими поверхностями REST и /acp, чтобы их строгие правила принятия/отклонения и логирование оператора не могли разойтись.
Обратная совместимость
- Старые клиенты, не отправляющие
Last-Event-ID→lastEventIdравенundefined→subscribeEventsзапускается в live-режиме, ровно как сегодня. - Добавление строк
id:— это обратно совместимое SSE — клиент, игнорирующий это поле, не затрагивается; клиент на базе EventSource начинает отслеживать его бесплатно. - Вендорный SDK
AcpHttpTransportподключает воспроизведение в этом PR — он устанавливаетsupportsReplay = trueи повторно отправляетLast-Event-IDпри переподключении, поэтому кадры из пробела воспроизводятся из кольца, и потеря контента из §1.8 устраняется без необходимости дальнейших изменений в демоне. (Отдельный переключатель внешнего транспортаagent-webостается отложенным — см. “За рамками”.) Изменение в демоне остается инертным для любого потребителя, который по-прежнему сообщаетsupportsReplay = falseи опускает заголовок. - Поверхность REST не затронута.
План тестирования
sse-stream.test.ts—send(msg, 7)генерируетid: 7\nпередdata:;send(msg)(без id) пропускает строкуid:; порядокid:→data:→ пустая строка.transport.test.ts(end-to-end через транспорт/acp):- live-кадры
session/updateтеперь приходят со строкойid:; GET /acpсLast-Event-ID: Nпередает курсор вsubscribeEvents; новый поток без заголовка ведет себя как сегодня;- переполнение
Last-Event-ID(>MAX_SAFE_INTEGER) → только live; - реальный порядок закрытия и переподключения: закрыть старый SSE первым, затем переподключиться с
Last-Event-ID— утверждать 200, а не 403 (права владельца сохранены) и промпт не прерывается (grace/reclaim); - воспроизведенный
permission_requestпереиспользует запись pending (тот же исходящий id).
- live-кадры
connection-registry.test.ts— подключение без возобновления сбрасывает весь буфер, передаваяidкаждого кадра; подключение с возобновлением (при наличии курсора) пропускает кадры с id (ими владеет воспроизведение кольца), но все равно сбрасывает ответы JSON-RPC без id;detachSessionStreamсохраняет права владельца/промпт в течение льготного окна, а затем выполняет снос по истечении срока; переподключение в пределах окна выполняет reclaim (отменяет ожидающий снос).ws-stream.test.ts—send(msg, id)игнорирует id: сетевой кадр WS — это чистый JSON, никакого SSEid:фрейминга не просачивается.
За рамками (все еще отложено)
- Транспорты WebSocket / HTTP/2.
- Разрешение разрешений между соединениями из §1.7 (голосование, отправленное через POST на другой
Acp-Connection-Id, отличный от того, который транслировал промпт) — отдельная, чувствительная к безопасности проблема, отслеживаемая как собственный follow-up. Данный PR делает переводpermission_requestидемпотентным при воспроизведении (см. выше), но не добавляет глобальное для сессии разрешение requestId. Он также не добавляет идемпотентность воспроизведения ответов для УЖЕ РАЗРЕШЕННОГО разрешения: как только клиент проголосовал, запись pending потребляется, поэтому последующее переподключение, которое воспроизводит (все еще находящийся в кольце)permission_request, повторно отправляет промпт с тем же_meta.requestId. Соответствующий спецификации клиент дедуплицирует по этому id (контракт, на который уже опирается путь воспроизведения), а остаточная осиротевшая запись pending убирается при снесе — агент никогда не зависает — но запись разрешенных результатов в ограниченный LRU для каждой сессии для повторной отправки записанного голоса (полная идемпотентность для клиентов без дедупликации) относится к этому же follow-up по координации разрешений, поскольку добавляет состояние разрешенного разрешения в путь голосования. - Потерянный ответ на промпт в процессе выполнения в потоке сессии — восстановленные кадры контента все проходят через кольцо
eventBus; ответ JSON-RPC не является событием кольца. - Переключатель
supportsReplayна стороне потребителя во внешнемAcpHttpTransportдляagent-web(находится в другом репозитории; разблокируется данным PR). - Голосование за разрешения через экспортируемые транспорты SDK. Экспортируемые
AcpHttpTransport/AcpWsTransportпредставляютsession/request_permissionкак событиеpermission_request, но API голосования SDK (respondToPermission/respondToSessionPermission) маппятся на запросsession/permission, для которого у демона ACP нет обработчика — он принимает голосование за разрешение только как ответ JSON-RPC, повторяющий исходящий id_qwen_perm_N. Проведение цикла голосования является частью follow-up по координации разрешений из §1.7. Связанный аспект: pump ответов сессии без подписчиков (ensureSessionReplyPump) открывает реальный поток сессииGET /acp, который демон рассматривает как live-поток — поэтомуpermission_requestагента, поднятый, когда подключен только pump ответов, МАРШРУТИЗИРУЕТСЯ в этот поток и отбрасывается pump (он пересылает только ответы JSON-RPC), подвешивая посредника, тогда как при полном отсутствии потока демон отменяет и отклоняет (cancel-denies), и агент продолжает работу. Как демоновское различие “это реальный потребитель или просто pump ответов?”, так и обработка на стороне SDK (отклонять локально / выводить в callback разрешения) относятся к тому же follow-up по координации разрешений, поскольку сам pump не может проголосовать. Потребителям, которым нужна обработка разрешений, следует открыватьsubscribeEventsперед выдачей RPC сессии (задокументированный контракт), что дает демону реальный поток потребителя. - RPC сессии, вызываемые изнутри цикла
subscribeEventsв экспортируемомAcpHttpTransport. Поток сессии/acpимеет одного читателя: пока асинхронный генератор потребителя припаркован междуyield, читатель не опустошает очередь. Если потребитель делаетawaitдля RPC, маршрутизируемого через сессию (session/set_model,session/prompt, …) изнутри своего собственного цикла обработки событий,sendRequestподавляет фоновый pump ответов (подписка “активна”), но припаркованный генератор никогда не читает ответ — вызов зависает, пока потребитель не извлечет следующее событие. Надежное исправление — сделать читатель сессии фоновым pump, который всегда опустошает ответы JSON-RPC и ставит в очередь толькоDaemonEventдля итератора; отложено как целенаправленный follow-up, поскольку это структурное изменение для opt-in, недавно экспортированного транспорта и не влияет на транспорт REST по умолчанию. - Автоматическая защита от расхождений
SESSION_STREAM_REPLY_METHODS⇄replySession. НаборSESSION_STREAM_REPLY_METHODSSDK должен зеркально отражать места вызоваreplySession(...)демона вdispatch.ts(другой пакет); метод, добавленный там без добавления сюда, не открывает pump ответов, иsendRequestбез подписчика для него зависает до прерывания. Система типов ни одного из пакетов это не обеспечивает. Защита в CI (легковесный скрипт или vitest, который извлекает имена методов ответов сессии демона и сравнивает их с набором SDK) — это правильное решение, но кросс-пакетный инструмент статического анализа — это своя собственная целенаправленная задача, и не тривиальный grep: правильному экстрактору нужен легкий анализ потока данных, потому что ответsession/promptгенерируется НЕ внутри его блокаcase 'session/prompt'. Промпт запускается асинхронно, и егоreplySession(...)срабатывает позже из обработчика завершения промпта (другое место вызова), поэтому наивное сканирование “какие блокиcaseсодержатreplySession” ошибочно ИСКЛЮЧИТsession/promptи провалит сборку против правильного набора. Пока набор невелик и стабилен, а JSDoc у константы документирует инвариант; надежное долгосрочное решение — заставить демона анонсировать имена своих маршрутизируемых через сессию методов (единый источник истины), а не парситьdispatch.ts.