SSE шина событий и противодавление
Обзор
EventBus (packages/acp-bridge/src/eventBus.ts) — это внутрисессионная pub/sub шина в памяти, которая питает SSE-маршрут GET /session/:id/events демона. Она присваивает каждому событию монотонный идентификатор, буферизирует последние события в ограниченном кольцевом буфере для воспроизведения через Last-Event-ID, рассылает опубликованные события всем подписчикам, применяет к каждому подписчику противодавление (предупреждение при заполнении очереди на 75%, вытеснение при достижении предела) и генерирует два синтетических терминальных кадра (client_evicted, slow_client_warning), которые SDK обрабатывает как события первого класса, но шина помечает без id, чтобы они не занимали слот в последовательности сессии.
EventBus в настоящее время является приватным для пакета acp-bridge и используется фабрикой моста через один замыкающий экземпляр на сессию. Будущий рефакторинг (указанный в строках 150–159 файла eventBus.ts) сделает его строительным блоком верхнего уровня, чтобы каналы, двойной вывод и будущие WebSocket-транспорты могли подписываться через ту же шину, а не через параллельные потоки.
Обязанности
- Назначать монотонные идентификаторы событий для каждой сессии, начиная с 1.
- Буферизировать последние
ringSizeсобытий для воспроизведения при подписке сlastEventId. - Рассылвать опубликованные события не более чем
maxSubscribersодновременным подписчикам. - Применять ограниченные очереди для каждого подписчика; отбрасывать переполнившихся подписчиков с синтетическим терминальным кадром
client_evicted. - Генерировать
slow_client_warningодин раз за эпизод переполнения при заполнении очереди на 75%, с гистерезисом 37,5% для предотвращения повторных предупреждений. - Немедленно отзывать подписки при
AbortSignal.abort(). - Аккуратно закрывать каждого подписчика при закрытии шины (например, при завершении сессии).
- Никогда не выбрасывать исключения из
publish(контракт — ‘вызов publish всегда безопасен’).
Архитектура
| Константа | Значение | Назначение |
|---|---|---|
EVENT_SCHEMA_VERSION | 1 | Проставляется в каждом BridgeEvent.v; увеличивается при изменениях, нарушающих совместимость. |
DEFAULT_RING_SIZE | 8000 | Кольцевой буфер воспроизведения для каждой сессии. Оператор может переопределить через --event-ring-size. |
DEFAULT_MAX_QUEUED | 256 | Максимальный размер очереди для каждого подписчика. |
DEFAULT_MAX_SUBSCRIBERS | 64 | Максимальное количество подписчиков на сессию. |
WARN_THRESHOLD_RATIO | 0.75 | Доля maxQueued, при которой срабатывает slow_client_warning. |
WARN_RESET_RATIO | 0.375 | Доля для повторного взведения гистерезиса. |
MAX_EVENT_RING_SIZE (в bridge.ts) | 1_000_000 | Мягкая верхняя граница для BridgeOptions.eventRingSize, чтобы выявить ошибки с памятью, вызванные опечатками. |
BridgeEvent
interface BridgeEvent {
id?: number; // monotonic per session; absent on synthetic terminal frames
v: 1; // EVENT_SCHEMA_VERSION
type: string; // one of the 43 known types or future-extensible
data: unknown; // payload (typed per-type by the SDK; see 09-event-schema.md)
originatorClientId?: string; // set when the event derives from a clientId-stamped request
}SubscribeOptions
interface SubscribeOptions {
lastEventId?: number; // replay from after this id (Last-Event-ID resume)
signal?: AbortSignal; // aborts the subscription promptly
maxQueued?: number; // per-subscriber backlog cap; default 256
}subscribe() возвращает AsyncIterable<BridgeEvent>. SSE-маршрут потребляет его через for await. Регистрация синхронна — к моменту возврата из subscribe() подписчик уже присоединен, поэтому publish(), вызываемый одновременно с первым next() потребителя, всё равно будет доставлен.
BoundedAsyncQueue
Очередь для каждого подписчика. Два ключевых поведения:
-
Ограничение применяется только к активным элементам. Элементы, вставленные через
forcePush(), получают тегforced: trueдля каждой записи и никогда не учитываются вmaxSize. Это позволяет пути воспроизведенияLast-Event-IDпринудительно вставить сотни исторических кадров в нового подписчика, не вызывая немедленного срабатывания ограничения и вытеснения только что возобновленного подписчика. -
liveCountподдерживается как поле, а не вычисляется из позицииforcedInBuf. Более ранняя эвристика, основанная на позиции, сломалась, когдаslow_client_warningстал принудительно вставляться в середину потока (предупреждения помещаются в КОНЕЦ очереди, а не в начало, как воспроизводимые). Тегиforcedдля каждой записи не зависят от позиции.
push(value) возвращает false (вместо блокировки или исключения), когда активный резерв достигает предела — шина использует этот сигнал для вытеснения подписчика. forcePush(value) обходит ограничение. close({drain?: boolean}) по умолчанию сливает ожидающие элементы; при прерывании передается drain: false, чтобы немедленно отбросить их.
Рабочий процесс
Публикация
publish никогда не выбрасывает исключений. Закрытие шины в середине публикации (путь завершения закрывает шины для каждой сессии перед ожиданием channel.kill()) возвращает undefined, а не выбрасывает исключение, поскольку агент всё ещё может отправлять уведомления sessionUpdate в небольшом окне между закрытием шины и убийством канала.
Подписка + воспроизведение (с обнаружением вытеснения из кольца)
Если subs.size >= maxSubscribers на момент подписки, выбрасывается SubscriberLimitExceededError — маршрут SSE перехватывает его и сериализует синтетический фрейм stream_error для отклоненного клиента, чтобы он не видел пустой поток в тишине. Возвращение пустой итерации вместо этого оставило бы операторов без возможности увидеть, что «некоторые клиенты получают события, а некоторые нет» под нагрузкой.
Вытеснение из кольца → state_resync_required (поток восстановления)
Когда потребитель переподключается с Last-Event-ID: N, а самое раннее сохраненное событие в кольце имеет id > N + 1, события в диапазоне [N+1, earliestInRing-1] были вытеснены до того, как потребитель переподключился. Наивное воспроизведение молча бы удалось с не непрерывным суффиксом, редуктор SDK продолжал бы применять дельты, как если бы поток был непрерывным, и его состояние разошлось бы с истиной демона — без терминального сигнала.
Реализовано в EventBus.subscribe():
- Сначала проверка
opts.lastEventId >= this.nextId. Если истина, курсор клиента из более старой эпохи шины (перезапуск демона / реконструкция EventBus), поэтому шина генерируетreason: 'epoch_reset'и воспроизводит все текущее кольцо. - В противном случае вычисляется
earliestInRing = this.ring[0]?.id. - Если
earliestInRing > opts.lastEventId + 1, принудительно отправляется синтетический фрейм перед фреймами воспроизведения:{ "v": 1, "type": "state_resync_required", "data": { "reason": "ring_evicted", "lastDeliveredId": <opts.lastEventId>, "earliestAvailableId": <earliestInRing> } } - После этого продолжается обычный цикл воспроизведения.
Критические контракты (и что было исправлено в ревью #4360):
- Нет
id— тот же шаблон без слота, что и уclient_evicted, поэтому он не занимает слот в монотонной последовательности для каждой сессии, которую видят другие подписчики. - Поток остается открытым — в отличие от
client_evicted(действительно терминального),state_resync_requiredориентирован на восстановление. Воспроизведение и живые фреймы продолжают поступать после него. - Редуктор автоматически пропускает дельты — сторона SDK переключает
awaitingResync = trueи применяет толькоstate_resync_required, терминальные фреймы и снапшоты полного состояния, пока код потребителя не вызоветloadSessionи не сбросит флаг. См.09-event-schema.mdдляRESYNC_PASSTHROUGH_TYPES. - Дружелюбно к сети — фреймы остаются на проводе, чтобы SDK мог вычислить «что вы пропустили» позже, если захочет. Дополнительный цикл переподключения не требуется.
Терминальный поток вытеснения
Когда живой бэклог подписчика достиг maxQueued, и следующий вызов push() возвращает false:
- Установить
sub.evicted = true. - Создать фрейм
client_evictedбезid—{ v: 1, type: 'client_evicted', data: { reason: 'queue_overflow', droppedAfter: <last delivered id> } }. queue.forcePush(evictionFrame)чтобы итератор потребителя увидел один терминальный фрейм.queue.close()чтобы итерация завершилась после терминального фрейма.- Вызвать
sub.dispose()— удаляет изsubsи отключает слушательAbortSignal; без этой очистки замыкания зависших потребителей остаются живыми до сборки мусораAbortSignal.
Поток прерывания
AbortSignal.abort() → onAbort():
queue.close({drain: false})— отбросить буферизированные элементы, чтобы SSE-маршрут не продолжал сериализовать события в сокет, который никто не слушает.dispose()— идемпотентен благодаря флагуdisposed.
Уже прерванные сигналы во время подписки вызывают onAbort() синхронно перед возвратом итератора.
Состояние и жизненный цикл
nextIdначинается с 1 и только увеличивается. ГеттерlastEventIdвозвращаетnextId - 1.ringограничен; вытеснение через сдвиг — O(n) после заполнения. ПриringSize=8000это занимает несколько миллисекунд на сессиях с высокой нагрузкой — значительно ниже бюджета задержки на фрейм. Рефакторинг на кольцевой буфер отложен до тех пор, пока профилирование не укажет на это, или операторы не увеличат--event-ring-sizeна порядок.close()переключаетclosed, закрывает очередь каждого подписчика и очищаетsubs. Последующие вызовыpublish()/subscribe()являются no-ops (publishвозвращает undefined;subscribeвозвращаетemptyAsyncIterable).- Каждая сессия владеет одним
EventBus. Закрытие шины происходит доchannel.kill(), чтобы выполняющиеся в процессе завершения публикации возвращали undefined, а не выбрасывали исключение.
Зависимости
- Потребляется в
packages/acp-bridge/src/bridge.ts(BridgeClient.sessionUpdate/BridgeClient.extNotification→events.publish(...)). - Потребляется в
packages/cli/src/serve/server.ts(обработчик SSE-маршрута →events.subscribe(...)затем форматируетBridgeEventв проволочные фреймы SSE). - Повторно экспортируемый шим:
packages/cli/src/serve/event-bus.ts→@qwen-code/acp-bridge/eventBus. - Потребитель SDK:
packages/sdk-typescript/src/daemon/sse.ts(parseSseStream), затемasKnownDaemonEvent(см.09-event-schema.md,13-sdk-daemon-client.md).
Конфигурация
--event-ring-size <n>— глубина кольца на сессию; мягкое ограничениеMAX_EVENT_RING_SIZE = 1_000_000.- Параметр запроса подписчика
?maxQueued=NнаGET /session/:id/events, диапазон[16, 2048]. Клиенты SDK предварительно проверяютcaps.features.slow_client_warningперед включением. BridgeOptions.eventRingSize(переопределяет значение по умолчанию демона для встроенного использования).- Теги возможностей:
session_events,slow_client_warning,typed_event_schema.
Оговорки и известные ограничения
- Синтетические фреймы не имеют
id. Потребители SDK, использующиеLast-Event-IDдля возобновления, записывают только фреймы с id;slow_client_warning,client_evicted,state_resync_requiredиreplay_completeне перемещают курсор и не потребляют порядковые номера сессии. Если между двумя живыми фреймами с id есть реальный разрыв, обрабатывайте его через путь повторной синхронизации при вытеснении из кольца/сбросе эпохи, а не рассматривайте как частный синтетический фрейм. client_evictedдействует для каждого подписчика, а не для сессии. Тот же клиент может переподключиться.- Итератор
BoundedAsyncQueueнебезопасен для параллельных потребителей — два одновременных вызова.next()будут конкурировать за одно событие. Использование в демоне последовательно (for await ... ofв обработчике SSE-маршрута), поэтому в production это безопасно. - В настоящее время шина является package-private; каналы и веб-интерфейс должны подписываться через HTTP SSE-маршрут демона, а не напрямую обращаться к шине. Этап 1.5 снимет это ограничение.
Ссылки
packages/acp-bridge/src/eventBus.ts(весь файл)packages/acp-bridge/src/bridge.ts(места публикации, особенноBridgeClient.sessionUpdateи события разрешений F3)packages/cli/src/serve/server.ts(обработчик SSE-маршрута — форматируетBridgeEventв проволочный SSE)packages/sdk-typescript/src/daemon/sse.ts(парсер проволочного SSE на стороне клиента)- Проволочный протокол:
../qwen-serve-protocol.md(контракт переподключенияLast-Event-ID).