ValarMayar
@ValarMayar
Д У Р А Ч О К / недопрограммист графоман

Как организовать микро слипы в задаче celery?

Доброго времени суток.
Собственно вопрос такой.
Написал я бота на aiogram + бэкенд на django.
Бот выполняет рассылку по каналам телеграм.

Принцип работы:
В боте получаем запрос на запуск рассылки -> обрабатываем запрос на бэке и запускаем рассылку(понятное дело для этого был выбран celery). Аккаунт который получает команду на рассылку начинает её делать, но между каждым сообщением мне нужно сделать интервал 3 секунды и между каждыми 100 сообщениями 1000 секунд. (может быть несколько таких параллельных рассылок)

код выглядит примерно так:
While True:
      for g in groups[:100]:
          send_message(g, msg)
          time.sleep(3)
      time.sleep(1000)


Из ответов тут же на тостере я видел, что celery не допускает использования sleep(во всяком случае тогда другие задачи будут блокироваться).

По идее я могу создавать task с помощью celery beat с интервалом в 1000 секунд, но 3 секунду sleep между каждым сообщением не дают мне покоя, ибо мне нужна какая никакая конкуренциями между несколькими рассылками, что бы условные 2-7 секунд соблюдались для каждой из запущенных рассылок.

Буду крайне благодарен за пояснение.

PS возможно я вообще не в ту сторону смотрю или где то что то понял не так
  • Вопрос задан
  • 125 просмотров
Решения вопроса 2
@marazmiki
Укротитель питонов
Вынесите в отдельную задачу отправку одного сообщения в один чат. А во второй задаче, внутри цикла, вызывайте эти задачи с указанием ETA
Ответ написан
Комментировать
Assargin
@Assargin
Перед ответом смотрю наличие ✔ в ваших вопросах
Явное указание ETA ради лимитирования - так себе механизм. Если воркер вдруг умрёт, то после перезапуска новому воркеру придут на выполнение сразу все "опоздавшие" задания, и чем дольше воркер лежал, тем больше будет пачка. Также, сложно регулировать проставление абсолютного значения ETA, если вдруг придётся делать это одновременно для нескольких потоков рассылок, чтобы они, выполняясь параллельно, не выбирали весь лимит.

Ещё в Celery есть механизм rate limit на тип таска, правда, он работает только в пределах 1 воркера.

Т.е. в тех нечастых случаях, когда на 1 очередь запускается более 1 воркера + для случаев, когда лимит с типом таска матчится не 1:1 (например, лимит на API общий, а методов в этом API может быть 100500, каждый дёргается с помощью отдельного типа таска) - есть реализация на основе token bucket.
Ответ написан
Комментировать
Пригласить эксперта
Ответы на вопрос 1
alternativshik
@alternativshik
нужен глобальный какой-то бакет, если будет запущено 10 рассылок одновременно, то между запросами не будет по 3 секунды слипа, даже если ты их поставишь в каждой таске. И на основе этого глобального какого-то счетчика надо рулить всеми рассылками.
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы