Как сервисы будут хранить данные?#

Окружение#

Для того, чтобы сообразить как удобнее и эффективнее всего хранить данные сервисов RAG-слоя в redis, поднял redis-stack.

> docker pull redis/redis-stack
> docker run -d --name redis-stack -p 6379:6379 -p 8001:8001 redis/redis-stack:latest

Почему redis-stack? Оне предоставляет расширенный набор возможностей. В частности тип данных JSON, а также хранение многомерных векторов с векторным поиском. Лицензия для внутреннего использования бесплатна (если вдруг России снова станет не начхать на лицензии). Есть вебморда для удобного доступа к данным. Да и в целом Redis двигает себя на рынке как эффективное решение для построение real-time приложений на базе AI. В redis-stack есть все фичи. Подробности тут:

Поработаем руками и подумаем#

Python зависимости#

!pip install redis[hiredis]
Collecting redis[hiredis]
  Obtaining dependency information for redis[hiredis] from https://files.pythonhosted.org/packages/e8/02/89e2ed7e85db6c93dfa9e8f691c5087df4e3551ab39081a4d7c6d1f90e05/redis-6.4.0-py3-none-any.whl.metadata
  Downloading redis-6.4.0-py3-none-any.whl.metadata (10 kB)
Collecting hiredis>=3.2.0 (from redis[hiredis])
  Obtaining dependency information for hiredis>=3.2.0 from https://files.pythonhosted.org/packages/52/5f/1148e965df1c67b17bdcaef199f54aec3def0955d19660a39c6ee10a6f55/hiredis-3.2.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata
  Downloading hiredis-3.2.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.2 kB)
Downloading hiredis-3.2.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (170 kB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 170.1/170.1 kB 1.8 MB/s eta 0:00:0031m1.6 MB/s eta 0:00:01
?25hDownloading redis-6.4.0-py3-none-any.whl (279 kB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 279.8/279.8 kB 5.5 MB/s eta 0:00:003 MB/s eta 0:00:01
?25hInstalling collected packages: redis, hiredis
Successfully installed hiredis-3.2.1 redis-6.4.0

[notice] A new release of pip is available: 23.2.1 -> 25.2
[notice] To update, run: python3.11 -m pip install --upgrade pip

Инициируем клиент#

import redis

r = redis.Redis()

Сгенерируем строковое значение большого размера#

import string
import secrets

size = 5 * 1024 * 1024

data = bytearray(size)
view = memoryview(data)
for position in range(size):
    view[position] = ord(secrets.choice(string.printable))

long_5Mb_string = data.decode('utf8')

String#

# Имитация того, что в значении сериализованный JSON размером 25Мб
r.set('long_string', long_5Mb_string * 5)
res = r.get('long_string')
len(res)/1024/1024
25.0

Можно хранить всё в строках. Ключи будут полностью составными, например:

chat_id:request_id:files
chat_id:request_id:query

Способ медленный и неудобный в использовании. Невозможно удалить все данные запроса или чата одним махом, нужен перебор всех ключей и сравнение с шаблоном из логики образования ключей. Не годится.

Hash#

# Составной ключ chat_id::request_id
# В работе будут ключи длинее
key = f'{secrets.token_urlsafe(nbytes=12)}::{secrets.token_urlsafe(nbytes=12)}'
print(key)
dPuDmlaCU8eLci0G::KuGU5HTh_QURDay2
# В query лежит строка на 10Мб (имитация сериализованного в строку JSON)
r.hset(key, mapping={'user': 'username', 'query': long_5Mb_string * 2, 'grade': 12345})
3
result = r.hgetall(key)
print(result.keys())
# Не буду печатать значения, объём большой
dict_keys([b'grade', b'user', b'query'])
r.hget(key, 'grade')
b'12345'
# Другой сервис добавил свой JSON на 15Мб
r.hset(key, 'files', data.decode('utf8') * 3)
1

Удобнее, чем строка. Часть логики по составлению ключей переложили на тип данных, но не всю. Ключи остаются составными, нужен перебор всех ключей для удаления. Подходит плохо

Set#

Не подходит совсем

List, Sorted set#

Хуче, чем Hash. В логике сервисов необходимо учитывать порядок значений. Ключ остаётся составным, как в Hash. Не подходит

Vector set#

Не подходит для хранения данных сервисов RAG-слоя, тем не менее, эта структура данных заслуживает отдельного внимания и изучения из кода. Советую почитать вот это:

Было бы интересно построить какие-нибудь фичи на Redis Vector set. Например для временного хранения векторов документов, залитых пользователем. Или, например, как векторную базу данных для горячих данных. Надо подумать. Ещё надо попросить Алексея рассказать рассматривал ли он Redis как векторную базу данных. Вряд ли он расссматривал этот вариант всерьёз, но, быть может, успел узнать что-то интересное.

Streams#

Структура данных, которая ведёт себя как append-only log. Из стрима можно читать, наподобие tail -f любым количеством читателей. Понятное, дело, что в общем случае для данных сервисов RAG-слоя это не подходит. Но зато на Redis Stream можно реализовать интерактивный ответ от LLM.

Вот смотрите, API в ближайшей реализации может отдать ответ из LLM только целиком, так как только после того, как AdapterLLM получит полный ответ от LLM, он кладёт его в БД Контекста и сообщает об успешно выполненной задаче Супервизору (через шину).

flowchart LR


LLM(((LLM))) ---> Allm([Adapter LLM]) 
-- LLM Answer --> DB[( DBContext )]
Allm -. DoneMessage .-> DBus@{ shape: cyl, label: Шина }
-. HasAnswerMessage .-> API
DB -- LLM Answer --> API -- LLM Answer --> User((User))

Теперь, с использованием Redis Stream, Adapter’у начинает читать ответ из LLM стримом, при этом сразу сообщает в шину, что ответ есть, а сами данные льёт в Redis Stream. API получает сообщение о том, что появился ответ в виде стрима, начинает читать стрим из Redis и отдавать его пользователю (это вариант, где клиент пользователя ожидает ответа в открытом соединении).

flowchart LR
  
  
  LLM(((LLM))) e1@==> Allm([Adapter LLM])
  Allm e2@== LLM Answer ==> DB[( DBContex / Redis / Stream )]
  DB e3@== LLM Answer ==> API
  API e4@== LLM Answer ==> User((User))
  Allm -. HasStreamMessage .-> DBus@{ shape: cyl, label: Шина }
  DBus@{ shape: cyl, label: Шина } -. HasStreamMessage .-> API
  
  e1@{ animate: true }
  e2@{ animate: true }
  e3@{ animate: true }
  e4@{ animate: true }
  

О завершении чтения адаптером ответа данных тоже необходимо сообщать, чтобы АПИ знало, что данные закончились. Сразу оговорюсь, что это упрощённая схема и мимо Супервизора сообщения от сервисов не ходят.

flowchart LR
  
  
  LLM(((LLM))) e1@==> Allm([Adapter LLM])
  Allm e2@== LLM Answer ==> DB[( DBContex / Redis / Stream )]
  DB e3@== LLM Answer ==> API
  API e4@== LLM Answer ==> User((User))
  Allm -. HasStreamMessage .-> DBus@{ shape: cyl, label: Шина }
  Allm -. DoneMessage .-> DBus@{ shape: cyl, label: Шина }
  DBus@{ shape: cyl, label: Шина } -. HasStreamMessage .-> API
  DBus@{ shape: cyl, label: Шина } -. StreamFinishedMessage .-> API
  
  e1@{ animate: true }
  e2@{ animate: true }
  e3@{ animate: true }
  e4@{ animate: true }
  

Таким образом мы получаем ответ от LLM, который хранится в БД и его можно прочитать любое количество раз, при этом, пока он пишется, его можно отдавать пользователю в режиме stream. При этом, при обрыве соединения и переподключении, пользователь получит либо весь записанны ответ целиком + стрим с дописыванием, либо дописывание с того места, где он остановился читать, простор для реализации идей.

Всё остальное Event-Driven, для чего задумывался Redis Stream, любую событийно ориентированную логику, в нашем технологическом стеке лучше строить на RabbitMQ, там больше возможностей.

Geospatial#

Удобно для нахождения гео меток в указанном радиусе или в заданном прямоугольнике. Нам без надобности.

Bitmaps, Bitfields#

Не для данных RAG-сервисов конечно, но надо подумать

JSON#

Целевой тип данных, то ради чего затевалось “Поработать руками и подумать”. Предполагаю, что можно забыть составные ключи и использовать Redis, как документоориентированную БД со всеми преимуществами запросов по JSON (Redis Query Engine). Давайте попробуем.

Предположительная схема данных одного запроса#

import time

chat_id, _, request_id = key.partition('::')

answer_stream_key = secrets.token_urlsafe(nbytes=12)
full_json_data = {
    'request_id': request_id,
    'chat_id': chat_id,
    'created_at': time.time(),
    'query': long_5Mb_string,
    'external_search_files': ['url1', 'url2'],  # Формат содержимого нужно уточнить
    'vector_search_files': [
        # Формат содержимого нужно уточнять. Внизу пример для проверки работы вложенности. Можно взять за основу.
        {
            'file_name': 'abc',
            'content_location': {'chapter': 15, 'paragraph': 10},
            'content': long_5Mb_string[:1024],
        },
        {
            'file_name': 'some rules',
            'content_location': {'chapter': 2, 'paragraph': 8},
            'content': long_5Mb_string[500:1024*2],
        },
    ], 
    'prompt': long_5Mb_string[1000:3000],
    'optimized_prompt': long_5Mb_string[1000:2000],
    'answer_stream_key': answer_stream_key,
}
from copy import deepcopy

# Подготовим документы

# Скопируем данные из примера
data1 = deepcopy(full_json_data)
data2 = deepcopy(full_json_data)
data3 = deepcopy(full_json_data)
data4 = deepcopy(full_json_data)

# Разные запросы одного чата
data1['request_id'] = secrets.token_urlsafe(nbytes=12)
data2['request_id'] = secrets.token_urlsafe(nbytes=12)
data3['request_id'] = secrets.token_urlsafe(nbytes=12)

# Тут запрос из другого чата
data4['request_id'] = secrets.token_urlsafe(nbytes=12)
data4['chat_id'] = str(secrets.token_bytes(nbytes=12))
data4['vector_search_files'][0]['file_name'] = 'qwerty'  # Другой файл
# Cоздаём объект под ключом 'requests' в корневом JSON Redis,
# в котором будут лежать данные запросов пользователей
r.json().set('requests', '$', {})
True
# Запишем полные данные
print(r.json().set(data1['request_id'], '$.requests', data1))
print(r.json().set(data2['request_id'], '$.requests', data2))
print(r.json().set(data3['request_id'], '$.requests', data3))
print(r.json().set(data4['request_id'], '$.requests', data4))

# Данные лежать в корневом JSON'е Redis под общим ключом 'reqeusts', 
# далее под ключом ID записи. Я за ID записи взял request_id,
# это просто пример, ID можно генерировать, на логику это не повлияет.
global_redis_json = {
    'requests': {
        'id1': {}, # data1
        'id2': {}, # data2
        # ...
    }
}
---------------------------------------------------------------------------
ResponseError                             Traceback (most recent call last)
Cell In[24], line 2
      1 # Запишем полные данные
----> 2 print(r.json().set(data1['request_id'], '$.requests', data1))
      3 print(r.json().set(data2['request_id'], '$.requests', data2))
      4 print(r.json().set(data3['request_id'], '$.requests', data3))

File ~/.pyenv/versions/3.11.5/envs/jupyter/lib/python3.11/site-packages/redis/commands/json/commands.py:255, in JSONCommands.set(self, name, path, obj, nx, xx, decode_keys)
    253 elif xx:
    254     pieces.append("XX")
--> 255 return self.execute_command("JSON.SET", *pieces)

File ~/.pyenv/versions/3.11.5/envs/jupyter/lib/python3.11/site-packages/redis/client.py:621, in Redis.execute_command(self, *args, **options)
    620 def execute_command(self, *args, **options):
--> 621     return self._execute_command(*args, **options)

File ~/.pyenv/versions/3.11.5/envs/jupyter/lib/python3.11/site-packages/redis/client.py:632, in Redis._execute_command(self, *args, **options)
    630     self.single_connection_lock.acquire()
    631 try:
--> 632     return conn.retry.call_with_retry(
    633         lambda: self._send_command_parse_response(
    634             conn, command_name, *args, **options
    635         ),
    636         lambda _: self._close_connection(conn),
    637     )
    638 finally:
    639     if self._single_connection_client:

File ~/.pyenv/versions/3.11.5/envs/jupyter/lib/python3.11/site-packages/redis/retry.py:105, in Retry.call_with_retry(self, do, fail)
    103 while True:
    104     try:
--> 105         return do()
    106     except self._supported_errors as error:
    107         failures += 1

File ~/.pyenv/versions/3.11.5/envs/jupyter/lib/python3.11/site-packages/redis/client.py:633, in Redis._execute_command.<locals>.<lambda>()
    630     self.single_connection_lock.acquire()
    631 try:
    632     return conn.retry.call_with_retry(
--> 633         lambda: self._send_command_parse_response(
    634             conn, command_name, *args, **options
    635         ),
    636         lambda _: self._close_connection(conn),
    637     )
    638 finally:
    639     if self._single_connection_client:

File ~/.pyenv/versions/3.11.5/envs/jupyter/lib/python3.11/site-packages/redis/client.py:604, in Redis._send_command_parse_response(self, conn, command_name, *args, **options)
    600 """
    601 Send a command and parse the response
    602 """
    603 conn.send_command(*args, **options)
--> 604 return self.parse_response(conn, command_name, **options)

File ~/.pyenv/versions/3.11.5/envs/jupyter/lib/python3.11/site-packages/redis/client.py:651, in Redis.parse_response(self, connection, command_name, **options)
    649         options.pop(NEVER_DECODE)
    650     else:
--> 651         response = connection.read_response()
    652 except ResponseError:
    653     if EMPTY_RESPONSE in options:

File ~/.pyenv/versions/3.11.5/envs/jupyter/lib/python3.11/site-packages/redis/connection.py:672, in AbstractConnection.read_response(self, disable_decoding, disconnect_on_error, push_request)
    670 if isinstance(response, ResponseError):
    671     try:
--> 672         raise response
    673     finally:
    674         del response  # avoid creating ref cycles

ResponseError: new objects must be created at the root

Посмотрел в доку, судя по всему такая схема не подойдёт, надо новые объекты создавать в корневом. Значит писать будем в корень, под идентификатором, начинающимся, например со слова request, чтобы не путаться.

# Запишем полные данные

def get_root_key(some_data: dict) -> str:
    return f'request:{some_data["request_id"]}'

print(r.json().set(get_root_key(data1), '$', data1))
print(r.json().set(get_root_key(data2), '$', data2))
print(r.json().set(get_root_key(data3), '$', data3))
print(r.json().set(get_root_key(data4), '$', data4))

# Данные лежат в корневом JSON'е Redis под ключами 'request:{ID}' 
# Я за ID записи взял request_id, это просто пример,
# ID можно генерировать, на логику это не повлияет.
global_redis_json = {
    'request:id1': {}, # data1
    'request:id2': {}, # data2
        # ...
}
True
True
True
True

image.png

В браузере видно, что ключи, формируемые таким образом Redis группируются. Не зря, сделал как в документации. Верной дорогой идём, товарищи.

Теперь попробуем поделать запросы.

# Все данные по request_id
target_key = get_root_key(data2)

data = r.json().get(target_key)
print('Keys:', data.keys())
print()
print(f'chat_id = {data["chat_id"]}')
print(f'query = {repr(data["query"][:100])}...')  # размер большой, печатаю кусочек
Keys: dict_keys(['request_id', 'chat_id', 'created_at', 'query', 'external_search_files', 'vector_search_files', 'prompt', 'optimized_prompt', 'answer_stream_key'])

chat_id = dPuDmlaCU8eLci0G
query = "!\t^GC8,@O,W7\\vY8GRhCYKW&\x0cr\\^Or[;0sGo~J%0'r( uLi\x0b}7ChQH.$AS%uxn.`CVx;\r\nc`_,{tC=ow`U{o2\\N_-r'bI!Yi c1R"...
# Все запросы чата
print(r.json().get('$', '*'))
None
# Все устаревшие запросы
rotten_at = time.time() - 60 * 60 * 24   # Сутки назад
print(r.json().get('$', f'$..created_at > {rotten_at}'))
None

Нужно время, чтобы разобраться с самими запросами и с тем, как удобнее всего положить данные. В документации есть хорошие примеры запросов по спискам. Судя по всему надо переложить все объекты запросов в список. В этом случае структура получится примерно такая:

global_redis_json = {
    'requests': [
        {}, # data1
        {}, # data2
        # ...
    ]
}

Я полагаю, что ход моих мыслей ясен, JSON - рабочий вариант, осталось найти рабочие примеры. Текущую версию документа я отправляю на рассмотрение коллегам, сам продолжу,

Попытка номер 3, документы запросов в списке#

Почистил Redis, теперь заново положим данные.

# Теперь под requests список
r.json().set('requests', '$', [])
True
# Запишем полные данные

# Эффективность arrappend: O(1)
print(r.json().arrappend('requests', '$', data1))
print(r.json().arrappend('requests', '$', data2))
print(r.json().arrappend('requests', '$', data3))
print(r.json().arrappend('requests', '$', data4))

# Данные лежат в корневом JSON'е Redis под ключами 'request:{ID}' 
# Я за ID записи взял request_id, это просто пример,
# ID можно генерировать, на логику это не повлияет.
global_redis_json = {
    'requests':[
        {}, # data1
        {}, # data2
        # ...
    ]
}
[1]
[2]
[3]
[4]
# Все запросы
all_requests = r.json().get('requests')

print(len(all_requests))
for item in all_requests:
    print(item['request_id'])
4
VIFO3UEJXtjlYfE4
uZ6rXWoBem0dB2ow
v6WF1yX6ROyFwq0K
vxQDEx_Dfz2_PuN3
# Все данные одного запроса по request_id
requests = r.json().get('requests', f'$..[?(@.request_id == \'{data2["request_id"]}\')]')
print(f'Type: {type(requests)}')
print(f'Length: {len(requests)}')
print(f'Type: {type(requests[0])}')
print(f'request_id: {requests[0]["request_id"]}')
print()
print(requests[0].keys())
Type: <class 'list'>
Length: 1
Type: <class 'dict'>
request_id: uZ6rXWoBem0dB2ow

dict_keys(['request_id', 'chat_id', 'created_at', 'query', 'external_search_files', 'vector_search_files', 'prompt', 'optimized_prompt', 'answer_stream_key'])
# Все запросы одного чата
requests = r.json().get('requests', f'$..[?(@.chat_id == \'{data2["chat_id"]}\')]')
print(f'Type: {type(requests)}')
print(f'Length: {len(requests)}')
print()
for num in range(len(requests)):
    print(f'Type: {type(requests[num])}')
    print(f'chat_id: {requests[num]["chat_id"]}, request_id: {requests[num]["request_id"]}')
print()
print(requests[0].keys())
Type: <class 'list'>
Length: 3

Type: <class 'dict'>
chat_id: dPuDmlaCU8eLci0G, request_id: VIFO3UEJXtjlYfE4
Type: <class 'dict'>
chat_id: dPuDmlaCU8eLci0G, request_id: uZ6rXWoBem0dB2ow
Type: <class 'dict'>
chat_id: dPuDmlaCU8eLci0G, request_id: v6WF1yX6ROyFwq0K

dict_keys(['request_id', 'chat_id', 'created_at', 'query', 'external_search_files', 'vector_search_files', 'prompt', 'optimized_prompt', 'answer_stream_key'])
# Неполные данные (Отдельные ключи)
# Попробуем выбрать все файлы всех запросов одного чата
data_query = f'$..[?(@.chat_id == \'{data2["chat_id"]}\')].vector_search_files'
print(f'{data_query=}')
print()
requests = r.json().get('requests', data_query)
print(f'Type: {type(requests)}')
print(f'Length: {len(requests)}')
print()

from pprint import pprint

pprint(requests[0])
data_query="$..[?(@.chat_id == 'dPuDmlaCU8eLci0G')].vector_search_files"

Type: <class 'list'>
Length: 3

[{'content': '!\t^GC8,@O,W7\\vY8GRhCYKW&\x0c'
             "r\\^Or[;0sGo~J%0'r( uLi\x0b"
             '}7ChQH.$AS%uxn.`CVx;\r\n'
             "c`_,{tC=ow`U{o2\\N_-r'bI!Yi c1Ry?q8b\x0c"
             ' PJ%HD*o@_fYf{@19*xZ(};kdw{TMa968<2WKumFoKa;RM\x0c'
             '!+9=4@L/WG0X/6P%dol$5yl*z#dIGcI/<j[I4#+Nwtt(9f_i4rnX\\\x0c'
             't?7zz!=Krd!exI;\n'
             ' ;vdi+O w7&X@[3NIKS {U25K6XVjmeUVz\r'
             '-30Y`g%e\\^.Kd63:7^4.#K;Sx:h!1S:495A^FCyP\x0c'
             'R?um,W@#|BL|@D5GTULTeLP!\r'
             'HCj\n'
             '|>\x0b'
             '-_.6OtD\n'
             '7ItK{.}Q:\n'
             '_vVoN3f>nCg-)-BhJVyse*]+iC+@?c"!Pvt+ Y4Vs1lYs+Z=t0+w<IkT&\'P=>|\n'
             'mxl+seFAb6vIk#-T]L/\\S/28dWS-\x0c'
             '!A?K(scbcj^h6;"\r'
             '.Y~&<VYIx1Wi;6j)z9Kn>d@=8ko0IB#}*i\n'
             '4kCG32ov1>\tMP_}\n'
             ");\t6'J(Udux9KjY,)\n"
             '@>C,|d!`\x0c'
             'L6jZ-9sYI\n'
             'v-4ojw:V<&ku{8)8mZJ@b?jzNd$,\x0b'
             'LQDmeXA\\\n'
             '\r'
             '):o!3[US7vF.Bs}4X=.~M;z]`FKvhu1=CC\r'
             'U/l\n'
             '/!=.i:QeTF-\\$tox\\sqycspk2X\x0c'
             'P`,X/o)E<1g*-G\x0c'
             "gFs5hSvLNO:c*le:;?1Zg>}a& 'npY$GdN:}}AX`Y\tSR7a&3tIO "
             '"*Ii5$zRi,/hcYQ4|s@Ro#R0WMK?#%F7A&rDD9V*\'Y(\n'
             '4_l(\x0b'
             '\x0c'
             '?:37@0.waSDuJ8!`.A.^@W-xi\x0c'
             '\x0b'
             '_1&\x0c'
             'XyETY!#C"c`%<pii&-8EVJF|2.mF4e#I?@/]3GM]b=xzOA)"L6U:&kPPCo^U$d)ra3qv_4^VJ#/VL8#xBE?^\r'
             'sMxUCdV+hcLNdjVd^mdi?;}!kh,]Q\\t2/U$BDR<Z~!nxAu8h"oM%L^q|[=9Z>fq:V$p)@Ek<cWU4sZeQEP{y4*i\t'
             'sa3v!/\x0b'
             '0"/b%z^{$i ZrUu/CG&e:iUdD726nfK<n,Y*K#.\\~wWpu"',
  'content_location': {'chapter': 15, 'paragraph': 10},
  'file_name': 'abc'},
 {'content': '1>\tMP_}\n'
             ");\t6'J(Udux9KjY,)\n"
             '@>C,|d!`\x0c'
             'L6jZ-9sYI\n'
             'v-4ojw:V<&ku{8)8mZJ@b?jzNd$,\x0b'
             'LQDmeXA\\\n'
             '\r'
             '):o!3[US7vF.Bs}4X=.~M;z]`FKvhu1=CC\r'
             'U/l\n'
             '/!=.i:QeTF-\\$tox\\sqycspk2X\x0c'
             'P`,X/o)E<1g*-G\x0c'
             "gFs5hSvLNO:c*le:;?1Zg>}a& 'npY$GdN:}}AX`Y\tSR7a&3tIO "
             '"*Ii5$zRi,/hcYQ4|s@Ro#R0WMK?#%F7A&rDD9V*\'Y(\n'
             '4_l(\x0b'
             '\x0c'
             '?:37@0.waSDuJ8!`.A.^@W-xi\x0c'
             '\x0b'
             '_1&\x0c'
             'XyETY!#C"c`%<pii&-8EVJF|2.mF4e#I?@/]3GM]b=xzOA)"L6U:&kPPCo^U$d)ra3qv_4^VJ#/VL8#xBE?^\r'
             'sMxUCdV+hcLNdjVd^mdi?;}!kh,]Q\\t2/U$BDR<Z~!nxAu8h"oM%L^q|[=9Z>fq:V$p)@Ek<cWU4sZeQEP{y4*i\t'
             'sa3v!/\x0b'
             '0"/b%z^{$i ZrUu/CG&e:iUdD726nfK<n,Y*K#.\\~wWpu"+DBi=</[g&wu.gn\r'
             'A2t_~]^S\n'
             'W!"kt1A+\\xI)mJhV<U3L5F7O\tsFa\\zljW\r'
             '_n\x0b'
             'LF\r'
             "w'u1+j__Tc&9cLgSXMp7\x0c"
             'i\n'
             'Y|0.?2+F)TU+_=W(:-@@+!:D.m$bl_-\\E\x0c'
             '\t^1+ 2W*}}CgEL\n'
             'ja}WuPS ;Qw3TX\r'
             '.lu3e&8q\\b:3\x0c'
             'yA\x0b'
             '&gbPu&}*]=;Brq*`+|f<!\n'
             '\telr}45Dn,do:Q_|yY9|,YX7iVb~\x0c'
             'CX;1c\x0b'
             'Fpp_I3g~8T5YHxNh]`d[KXXekwDjlc=p("X2eij=[x\'n7|+BI`$WO_!DO|dM1A\r'
             'eT#kW>\x0b'
             "fuAfb?x'Ltsky+~\\3U;eSV##cYM]!+Q,Iw \x0b"
             'egq[RU9.gvst\t!<s_\r'
             'Xe(M;om(KTa{\x0c'
             "^i'\n"
             'TGPHYo,ib`?K.fHKWt(;4[a:=u,K8F"F3=B"@\t&i_p.wn]g\t'
             "~O}*P!h~e/sb_n'Badt\x0c"
             "a5nDAAh?jk1ccl\\*I%f^9Vbo4Jy*j)v\\Zv`'?\tmTr^F`#7vMQ\n"
             'xSWmBhMZnXh?"4x_(/\x0b'
             'f9F\\>V?Z=B3paC\n'
             'zb@ =U|(7Pjp@$`yXkFLt&8w_WX#}&)(s5AM,7^+;d\n'
             '\\b\\Ci\r'
             '>vYj"%v`YWlG\t1/hA$3(hfpmy\tHEVwMWW.0\t>zH;P[U%>dB,*^Z@m4U(\t'
             "B_J1)f.{f&!VN\\1A|'M_]qH{P\x0c"
             'j5-lK&\\%#0*hS\\NZTl wjark\r'
             'ZI;\n'
             'n\r'
             ']? Xv1R&4o*#*X&2T,!*kBiHlvCN9K\ti\\z>I\t'
             '&gdKa@;1YAg[I]/?H3U@+DODWVoJLvx*6pA^z(5c_}w,w~`9,KS+WX\t'
             '4XP/%({(!tT[=7RTn6!Az5eB2pRIm)chFe*+8"3\'UMT1d\r'
             'Fx^J7*O~^>&7t.\x0b'
             'wH8/I\x0c'
             'Jkqf=QgJG\r'
             '~p0LbJ\r'
             '*m#N)J:&{/*.\tRM\\|MdW#+XQ;\x0b'
             'lC|kX?\tB\x0c'
             '_8_",;v*H![!Uax]bkc\x0b'
             'HFI4S6ZqEzvXj\x0b'
             '\n'
             "MMo[F[<5$TqG57re5M o%;U}'5^@T~tcs4\n"
             '\\wmo}@U4\r'
             '+/^AX}}]El"b\\_/\r'
             ';3SE$.[\r'
             ";7+ki=~J@'S91}*1uV}d/lIkj1=5)",
  'content_location': {'chapter': 2, 'paragraph': 8},
  'file_name': 'some rules'}]
# Устаревшие запросы
# Получим request_id всех устаревших запросов
rotten_at = time.time() - 60 * 60 * 24   # Сутки назад (так должно быть, но это сейчас не отработает, всё свежее)
rotten_at_10seconds = time.time() - 10   # Всё, что старше 10 секунд назад

data_query1 = f'$..[?(@.created_at < {rotten_at})].request_id'
data_query2 = f'$..[?(@.created_at < {rotten_at_10seconds})].request_id'
print(f'{data_query=}')
print()
print(r.json().get('requests', data_query1))  # не успело протухнуть
print(r.json().get('requests', data_query2))  # тут должны быть все документы через 10 секунд
data_query='$..[?(@.created_at < 1758892457.1518474)].request_id'

[]
['VIFO3UEJXtjlYfE4', 'uZ6rXWoBem0dB2ow', 'v6WF1yX6ROyFwq0K', 'vxQDEx_Dfz2_PuN3']

Найти в документации:#

Блокирование JSON при поиске

Время жизни записей

С точки зрения масштабирования Redis

Сравнение скоростей двух подходво (JSON vs консервативный подход)

Я проверяю блокировку до понедельника. Если блокировки нет, то берём в реализацию.