• Как использовать django-rq и rq_scheduler для запуска периодически повторяющихся задач?

    @gitinit Автор вопроса
    Я планировал запускать задачу каждые 2 секунды. Для этого нужно было указать 2 интервала. Первый интервал это как часто scheduler проверяет Redis и отправляет запланированные задания для исправления очередей. И 2-ой интервал это как часто задача должна выполняться. Чтобы быть уверенным в том, что задача запустится в нужное время, нужно убедиться что 1-ый интервал меньше или равен 2-ому и также является его делителем. Для запуска c помощью django_rq команды необходимо выполнить manage.py rqscheduler --interval 1
    apps.py
    import sys
    
    from django.apps import AppConfig
    from django_rq import get_scheduler
    
    
    class PrinterAppConfig(AppConfig):
        name = 'printer_app'
    
        def ready(self):
            from printer_app.async_tasks import streams_tasks
    
            if "rqscheduler" not in sys.argv:
                return
    
            scheduler = get_scheduler('print_check', interval=1)
    
            for job in scheduler.get_jobs():
                job.delete()
    
            streams_tasks(scheduler)

    async_tasks.py
    import django_rq
    import requests
    from django_rq import job
    
    from datetime import datetime, timedelta
    
    from checks.models import Printer, Check
    
    
    def new_checks():
        url = 'http://127.0.0.1:8000/new_checks/'
        headers = {'Api-Token': '0796859f206682d5fb185bcda09f0fa5',
                   'Api-Secret-Key': 'P2jg8WASSok8'}
    
        response = requests.get(url, headers=headers).json()  # словарь
        return response
    
    
    def streams_tasks(scheduler):
        scheduler.schedule(
            scheduled_time=datetime.utcnow(),
            func=new_checks,
            interval=2,  # в секундах
        )

    Ответ написан
    Комментировать