@AlbertForest

Как запустить бесконечный цикл в django приложении?

Имеется django приложение. Приложение генерирует задачи посредством celery beat.
Задачи - ни что иное, как серии запросов к API сторонних сервисов.
- Запросы идут через прокси
- Запросы могут быть довольно долгими
- Размер серии определяется типом задачи, и может включать несколько десятков запросов.

Изначально все работало на celery, но непрактичность такого подхода довольно очевидна. Воркеры спали 90% рабочего времени (подключение к прокси и ожидания ответов).
Естественно, с увеличением кол-ва задач мне пришлось бы создавать неразумное количество воркеров, которые бы попросту расходовали ресурсы.
Выбор стал очевиден - переписать все на asyncio.
Но, на сколько мне известно celery и asyncio с коробки не особо дружат. Следовательно celery отпадает.
(Да, я слышал про celery-pool-asyncio, но на мой взгляд слишком муторно для такой простой задачи)

И так. Я написал асинхронный обработчик, который выглядит следующим образом (это не особо важно но вдруг кому-то интересно)
async def process_response_queue(queue: asyncio.Queue):
    """
    Получает результаты задач и перезаписывает статус в redis
    """
    while True:
        struct, result = await queue.get()
        SetTaskStatus(struct, result)

class TaskManager:
    """
    Управление задачами
    """
    def __init__(self):
        self.queue = asyncio.Queue()
        self.tasks: Dict[str, asyncio.Task] = {}

    async def Handler(self, **kwargs):
        """
        Отдает задачу на выполнение. Удаляет в случае ошибки. Результат помещает в queue
        """
        struct = kwargs.get('struct')

        try:
            # ApiClient просто отправляет запросы
            await self.queue.put((struct, await ApiClient(struct)))
            
        finally:
            del self.tasks[struct.get('key')]
            print(f"Task for {struct.get('key')} canceled")
        
    def start_processor(self):

        self.tasks["_processor"] = asyncio.create_task(process_response_queue(self.queue))

    def start_new_task(self, **kwargs):
        """
        # Создает новую задачу
        """
        task_ig = kwargs.get('struct').get('key')
        self.tasks[task_ig] = asyncio.create_task(self.Handler(**kwargs))
        
async def mainloop():

    default.REDIS_CLIENT.flushdb()
    task_manager = TaskManager()
    task_manager.start_processor()


    while True:
        for key in default.REDIS_CLIENT.scan_iter("task:*"):
            struct = json.loads(default.REDIS_CLIENT.get(key).decode())
            struct.get('key') = key.decode()
            
            if struct.get("status") == "created":
                account = await sync_to_async(Accounts.objects.get)(id=struct["account_id"])
                project = await sync_to_async((lambda x: x.cluster.project))(account)
                task_manager.start_new_task(account=account, project=project, struct=struct)
                struct["status"] = "pending"
                default.REDIS_CLIENT.set(key, json.dumps(struct))


Функция mainloop и ApiClient взаимодействует с Django ORM и вспомогательными модулями. Выносить все за пределы приложения мне бы крайне не хотелось, тк важно оставить возможность работать напрямую с моделями. (Кстати, может есть способ?)

Запускается он так:
def LoopStart():
    asyncio.run(mainloop())

Thread(target=LoopStart, daemon=True).start()


Бытует мнение, что asyncio + threading - это плохое решение.

Какие проблемы могут возникнуть при такой реализации?
Каким образом мне лучше поступить, если это действительно плохое решение?
  • Вопрос задан
  • 491 просмотр
Решения вопроса 1
sergey-gornostaev
@sergey-gornostaev Куратор тега Django
Седой и строгий
Я бы отказался от использования моделей Django, они не предназначены для использования в конкурентном окружении, и написал отдельный асинхронный микросервис, связанный с django-проектом через пару очередей.
Ответ написан
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы