@gitinit

Как использовать django-rq и rq_scheduler для запуска периодически повторяющихся задач?

Если Вы использовали django-rq и rq_scheduler для запуска периодически повторяющихся задач, приведите мне пример вашего кода. Буду очень благодарен.
apps.py
import sys

from django.apps import AppConfig
from django_rq import get_scheduler


class PrinterAppConfig(AppConfig):
    name = 'printer_app'

    def ready(self):
        from printer_app.async_tasks import streams_tasks

        if "rqscheduler" not in sys.argv:
            return

        scheduler = get_scheduler('print_check')

        for job in scheduler.get_jobs():
            job.delete()

        streams_tasks(scheduler)

async_tasks.py
import django_rq
import requests
from django_rq import job

from datetime import datetime, timedelta

from checks.models import Printer, Check


def new_checks():
    url = 'http://127.0.0.1:8000/new_checks/'
    headers = {'Api-Token': '0796859f206682d5fb185bcda09f0fa5',
               'Api-Secret-Key': 'P2jg8WASSok8'}

    response = requests.get(url, headers=headers).json()  # словарь
    return response


def streams_tasks(scheduler):
    scheduler.schedule(
        scheduled_time=datetime.utcnow(),
        func=new_checks,
        interval=1,  # в секундах
    )

  • Вопрос задан
  • 1208 просмотров
Решения вопроса 2
@pyHammer
python-rq использовал для фоновой обработки, но вот периодические задания с этим cron справляется более чем отлично. Ни вижу ни одной причины заморачиваться. Пишите свою собственную manage.py команду и все, больше ничего не требуется
Ответ написан
Комментировать
@gitinit Автор вопроса
Я планировал запускать задачу каждые 2 секунды. Для этого нужно было указать 2 интервала. Первый интервал это как часто scheduler проверяет Redis и отправляет запланированные задания для исправления очередей. И 2-ой интервал это как часто задача должна выполняться. Чтобы быть уверенным в том, что задача запустится в нужное время, нужно убедиться что 1-ый интервал меньше или равен 2-ому и также является его делителем. Для запуска c помощью django_rq команды необходимо выполнить manage.py rqscheduler --interval 1
apps.py
import sys

from django.apps import AppConfig
from django_rq import get_scheduler


class PrinterAppConfig(AppConfig):
    name = 'printer_app'

    def ready(self):
        from printer_app.async_tasks import streams_tasks

        if "rqscheduler" not in sys.argv:
            return

        scheduler = get_scheduler('print_check', interval=1)

        for job in scheduler.get_jobs():
            job.delete()

        streams_tasks(scheduler)

async_tasks.py
import django_rq
import requests
from django_rq import job

from datetime import datetime, timedelta

from checks.models import Printer, Check


def new_checks():
    url = 'http://127.0.0.1:8000/new_checks/'
    headers = {'Api-Token': '0796859f206682d5fb185bcda09f0fa5',
               'Api-Secret-Key': 'P2jg8WASSok8'}

    response = requests.get(url, headers=headers).json()  # словарь
    return response


def streams_tasks(scheduler):
    scheduler.schedule(
        scheduled_time=datetime.utcnow(),
        func=new_checks,
        interval=2,  # в секундах
    )

Ответ написан
Комментировать
Пригласить эксперта
Ответы на вопрос 1
@elyasa
Попробуйте python daemon это в разы удобнее чем заморачиваться с редисом итд
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы