Имеется django приложение. Приложение генерирует задачи посредством celery beat.
Задачи - ни что иное, как серии запросов к API сторонних сервисов.
- Запросы идут через прокси
- Запросы могут быть довольно долгими
- Размер серии определяется типом задачи, и может включать несколько десятков запросов.
Изначально все работало на celery, но непрактичность такого подхода довольно очевидна. Воркеры спали 90% рабочего времени (подключение к прокси и ожидания ответов).
Естественно, с увеличением кол-ва задач мне пришлось бы создавать неразумное количество воркеров, которые бы попросту расходовали ресурсы.
Выбор стал очевиден - переписать все на asyncio.
Но, на сколько мне известно celery и asyncio с коробки не особо дружат. Следовательно celery отпадает.
(Да, я слышал про celery-pool-asyncio, но на мой взгляд слишком муторно для такой простой задачи)
И так. Я написал асинхронный обработчик, который выглядит следующим образом (это не особо важно но вдруг кому-то интересно)
async def process_response_queue(queue: asyncio.Queue):
"""
Получает результаты задач и перезаписывает статус в redis
"""
while True:
struct, result = await queue.get()
SetTaskStatus(struct, result)
class TaskManager:
"""
Управление задачами
"""
def __init__(self):
self.queue = asyncio.Queue()
self.tasks: Dict[str, asyncio.Task] = {}
async def Handler(self, **kwargs):
"""
Отдает задачу на выполнение. Удаляет в случае ошибки. Результат помещает в queue
"""
struct = kwargs.get('struct')
try:
# ApiClient просто отправляет запросы
await self.queue.put((struct, await ApiClient(struct)))
finally:
del self.tasks[struct.get('key')]
print(f"Task for {struct.get('key')} canceled")
def start_processor(self):
self.tasks["_processor"] = asyncio.create_task(process_response_queue(self.queue))
def start_new_task(self, **kwargs):
"""
# Создает новую задачу
"""
task_ig = kwargs.get('struct').get('key')
self.tasks[task_ig] = asyncio.create_task(self.Handler(**kwargs))
async def mainloop():
default.REDIS_CLIENT.flushdb()
task_manager = TaskManager()
task_manager.start_processor()
while True:
for key in default.REDIS_CLIENT.scan_iter("task:*"):
struct = json.loads(default.REDIS_CLIENT.get(key).decode())
struct.get('key') = key.decode()
if struct.get("status") == "created":
account = await sync_to_async(Accounts.objects.get)(id=struct["account_id"])
project = await sync_to_async((lambda x: x.cluster.project))(account)
task_manager.start_new_task(account=account, project=project, struct=struct)
struct["status"] = "pending"
default.REDIS_CLIENT.set(key, json.dumps(struct))
Функция mainloop и ApiClient взаимодействует с Django ORM и вспомогательными модулями. Выносить все за пределы приложения мне бы крайне не хотелось, тк важно оставить возможность работать напрямую с моделями. (Кстати, может есть способ?)
Запускается он так:
def LoopStart():
asyncio.run(mainloop())
Thread(target=LoopStart, daemon=True).start()
Бытует мнение, что asyncio + threading - это плохое решение.
Какие проблемы могут возникнуть при такой реализации?
Каким образом мне лучше поступить, если это действительно плохое решение?