Skip to content

Стрим ответа от LLM¤

Redis Stream¤

В текущем документе описан концептуальный подход стриминга ответа от LLM пользователю с сохранением этого ответа в ContextDB в типе данных RedisStream. Подробности использования RedisStream в конексте задач RAG-слоя с примерами кода, описаны в Использование Redis Stream.

Сбор данных¤

Запрос пользователя приходит в API Gateway, который является единой точкой взаимодействия с LLM. API кладёт данные запроса в ContextDB (Redis) в RedisJSON и посылает событие шину о том, что получен новый Query от пользователя.

flowchart LR classDef edgeLabel background-color:#ffffff; subgraph DB["ContextDB (Redis)"] json@{ shape: docs, label: RedisJSON } end subgraph DBs["Исполнение сценария"] direction TB databus@{ shape: das, label: Data Bus } -.- sv@{ shape: rounded, label: Supervisor } end API@{ shape: rounded, label: API Gateway } User(((User))) --> API API e1@-- Query Данные --> DB API -. Новый Query .-> DBs class API TurquoisePastel2Drawio class User WhiteDrawio class DB WhiteArea class json GreenPastelDrawio class DBs WhiteArea class databus PurplePastelDrawio class sv PurplePastelDrawio

Во время выполнения сценария, данные в ContextDB для Query, обогащаются различными сервисами.

flowchart BT classDef edgeLabel background-color:#ffffff; subgraph DB["ContextDB (Redis)"] json@{ shape: docs, label: RedisJSON } end subgraph DBs["Исполнение сценария"] direction TB databus@{ shape: das, label: Data Bus } -.- sv@{ shape: rounded, label: Supervisor } end DBs -- Результаты поиска VectorSearch --> DB DBs -- Данные из источников --> DB DBs -- Выбор prompt для LLM --> DB class DB WhiteArea class json GreenPastelDrawio class DBs WhiteArea class databus PurplePastelDrawio class sv PurplePastelDrawio

Конечной точкой взаимодействия с LLM является Adapter LLM, который формирует из собранных данных формирует окончательный запрос в LLM.

flowchart LR classDef edgeLabel background-color:#ffffff; subgraph DB["ContextDB (Redis)"] json@{ shape: docs, label: RedisJSON } end subgraph DBs["Исполнение сценария"] direction TB databus@{ shape: das, label: Data Bus } -.- sv@{ shape: rounded, label: Supervisor } end API@{ shape: rounded, label: API Gateway } User(((User))) --> API API e1@-- Query Данные --> DB API -. Новый Query .-> DBs DB -- Query Данные --> Allm@{ shape: rounded, label: Adapter LLM } DBs -. Новый Query .-> Allm Allm -- Request --> llm@{ shape: cloud, label: LLM } class llm GrayPastelDrawio class Allm BluePastelDrawio class DB WhiteArea class json GreenPastelDrawio class DBs WhiteArea class databus PurplePastelDrawio class sv PurplePastelDrawio class API TurquoisePastel2Drawio class User WhiteDrawio

Ответ от LLM¤

Целиком¤

В текущей реализации ответ от LLM вычитывается адаптером польностью и кладётся в JSON в ContextDB, после чего API Gateway получает сигнал о том, что есть ответ и возвращает его пользователю.

flowchart RL classDef edgeLabel background-color:#ffffff; subgraph DB["ContextDB (Redis)"] json@{ shape: docs, label: RedisJSON } end subgraph DBs["Исполнение сценария"] direction TB databus@{ shape: das, label: Data Bus } -.- sv@{ shape: rounded, label: Supervisor } end llm@{ shape: cloud, label: LLM } Allm@{ shape: rounded, label: Adapter LLM } API@{ shape: rounded, label: API Gateway } llm e1@== Server Event Stream специфичный для модели==> Allm Allm -- JSON Данные Ответа --> DB DB -- JSON Данные Ответа --> API API -- LLM JSON --> User(((User))) Allm -. Есть Ответ .-> DBs DBs -. Есть Ответ .-> API e1@{ animate: true } class llm GrayPastelDrawio class Allm BluePastelDrawio class DB WhiteArea class rstream GreenPastelDrawio class DBs WhiteArea class databus PurplePastelDrawio class sv PurplePastelDrawio class API TurquoisePastel2Drawio class User WhiteDrawio

Стрим¤

Задача стоит в том, чтобы отдавать ответ пользователю в формате стрима. Предлагаю для этого воспользоваться RedisStream. Адаптер будет писать свой ответ в ContextDB в тип данных RedisStream. API Gateway будет слушать этот стрим и возвращать ответ пользователю. В случае потери соединения, либо для служебного использования, стрим может быть перечитан любое количетсво раз, а затем удалён, либо перенесён на постоянно хранение для аудита.

flowchart RL classDef edgeLabel background-color:#ffffff; subgraph DB["ContextDB (Redis)"] rstream@{ shape: bow-rect, label: Redis Stream } end subgraph DBs["Исполнение сценария"] direction TB databus@{ shape: das, label: Data Bus } -.- sv@{ shape: rounded, label: Supervisor } end llm@{ shape: cloud, label: LLM } Allm@{ shape: rounded, label: Adapter LLM } API@{ shape: rounded, label: API Gateway } llm e1@== Server Sent Events Stream специфичный для модели==> Allm Allm e2@== Формат SiburLLMStream ==> DB DB e3@== Формат SiburLLMStream ==> API API e4@== LLM Answer ==> User(((User))) Allm -. Start Stream .-> DBs Allm -. Finish Stream .-> DBs DBs -. Stream Is Started .-> API DBs -. Stream Is Finished .-> API e1@{ animate: true } e2@{ animate: true } e3@{ animate: true } e4@{ animate: true } class llm GrayPastelDrawio class Allm BluePastelDrawio class DB WhiteArea class rstream GreenPastelDrawio class DBs WhiteArea class databus PurplePastelDrawio class sv PurplePastelDrawio class API TurquoisePastel2Drawio class User WhiteDrawio

Цензурирование ответа¤

Прямая схема¤

flowchart RL classDef edgeLabel background-color:#ffffff; subgraph DB["ContextDB (Redis)"] rstream@{ shape: bow-rect, label: Redis Stream } end subgraph DBs["Исполнение сценария"] direction TB databus@{ shape: das, label: Data Bus } -.- sv@{ shape: rounded, label: Supervisor } end llm@{ shape: cloud, label: LLM } Allm@{ shape: rounded, label: Adapter LLM } API@{ shape: rounded, label: API Gateway } llm e1@== Server Sent Events Stream специфичный для модели==> Allm Allm e2@== Формат SiburLLMStream ==> Csr((Censor)) Csr e5@== Формат SiburLLMStream ==> DB Csr -. Stream Interrupted .-> DBs DB e3@== Формат SiburLLMStream ==> API API e4@== LLM Answer ==> User(((User))) Allm -. Start Stream .-> DBs Allm -. Finish Stream .-> DBs DBs -. Stream Is Started .-> API DBs -. Stream Is Finished .-> API e1@{ animate: true } e2@{ animate: true } e3@{ animate: true } e4@{ animate: true } e5@{ animate: true } class llm GrayPastelDrawio class Allm BluePastelDrawio class Csr RedPastelDrawio class DB WhiteArea class rstream GreenPastelDrawio class DBs WhiteArea class databus PurplePastelDrawio class sv PurplePastelDrawio class API TurquoisePastel2Drawio class User WhiteDrawio

Параллельная схема¤

flowchart RL classDef edgeLabel background-color:#ffffff; subgraph DB["ContextDB (Redis)"] direction TB rstream2@{ shape: bow-rect, label: Redis Stream Censored } ~~~ rstream1@{ shape: bow-rect, label: Redis Stream Raw } end subgraph DBs["Исполнение сценария"] direction TB databus@{ shape: das, label: Data Bus } -.- sv@{ shape: rounded, label: Supervisor } end llm@{ shape: cloud, label: LLM } Allm@{ shape: rounded, label: Adapter LLM } API@{ shape: rounded, label: API Gateway } llm e1@== Server Sent Events Stream специфичный для модели==> Allm Allm e2@== Формат SiburLLMStream ==> DB DB e5@==> Csr((Censor)) Csr e6@==> DB Csr -. Stream Interrupted .-> DBs DB e3@== Формат SiburLLMStream ==> API API e4@== LLM Answer ==> User(((User))) Allm -. Start Stream .-> DBs Allm -. Finish Stream .-> DBs DBs -. Stream Is Started .-> API DBs -. Stream Is Finished .-> API e1@{ animate: true } e2@{ animate: true } e3@{ animate: true } e4@{ animate: true } e5@{ animate: true } e6@{ animate: true } class llm GrayPastelDrawio class Allm BluePastelDrawio class Csr RedPastelDrawio class DB WhiteArea class rstream1,rstream2 GreenPastelDrawio class DBs WhiteArea class databus PurplePastelDrawio class sv PurplePastelDrawio class API TurquoisePastel2Drawio class User WhiteDrawio

Цензурирование¤

Цензурирование ответа от LLM можно производить на стриме, не дожидаясь его завершения. Маскировать/Демаскировать личные данные, запрещать ответ, так как тематика ответа вышла за допустимые пределы. Что угодно.

В том числе, можно реализовать крайний вариант, когда ответ вычитывается цензором полностью, а затем принимается решение о его допуске в API, после чего API стримит ответ пользователю, имитируя realtime поток. Только в этом случае рализовывать стрим от адаптера нет особого смысла.

Форматы взаимодействия между сервисами¤

Адаптер получает ответ специфичный для каждой модели. Помимо того, что форматов ответов много, формат ответа LLM в Serve Sent Events пересылает большое количество лишних данных. Для унификации ответа от LLM и экономии оперативной памяти, необходимо разработать свой формат данных для Stream (SiburLLMStream на диаграммах).

Мы не может отдавать данные в формате SiburLLMStream пользователю. "Пользователем" являются подключаемые системы, часто это будут коробочные решения. И эти решения гарантированно будут понимать формат ответа OpenAI моделей.

В связи с этим, на сторонео API Gateway необходимо будет преобразовывать SiburLLMStream в OpenAI SSE Stream. Таким образом, для пользователей извне, RAG-слой будет замаскирован (с точки зрения взаимодействия) под модель OpenAI.