Задать вопрос
@Vladimyr1991
Младший бекенд разработчик

Как настроить 2 независимые очереди для обработки задач, запуская сервис на одном порте?

Как разграничить настройками очереди и выполнения воркеров так, чтобы короткие задачи не стояли в очереди из-за долгих задач?

Общее описание
Написан Restful бекенд на фласке. 3 эндпоинта. Работают асинхронно через один порт.
2 из них обрабатывают задачу быстро, 1 - долго. Нужно их разделить так, чтобы для коротких задач всегда был воркер который будет обрабатывать задачу, как только она приходит.
Могу запустить через 2 порта. Но внешний сервер шлет запросы на один порт.

Сейчас 2 независимые очереди, но задачи в них выполняются в общем порядке, то есть короткие задачи ждут своей очереди, как я понимаю воркеры у них общие.

Заранее спасибо за помощь.

Через Nginx настроен реверс прокси.

Настройки сервера и очереди сообщений:
from flask import Flask
from flask_restful import Api
from flask_sqlalchemy import SQLAlchemy
from flask_rest.celery_pack import make_celery
from flask_migrate import Migrate
from kombu import Queue, Exchange

app = Flask(__name__)

app.config.from_object('config')
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///mapping_status.db"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config.update(
    CELERY_BROKER_URL='redis://localhost:6379',
    CELERY_RESULT_BACKEND='redis://localhost:6379',
    CELERY_QUEUES = (
    Queue('long', Exchange('long'), routing_key='long'),
    Queue('short', Exchange('short'), routing_key='short'),
    ),
    # CELERY ROUTES
    CELERY_ROUTES = {
    'flask_rest.flask_rest_release1.map_this': {'queue': 'long'},
    'flask_rest.flask_rest_release1.map_annul_dd': {'queue': 'short'},
    'flask_rest.flask_rest_release1.map_fns_dd': {'queue': 'short'}
    }
)

api = Api(app)
db = SQLAlchemy(app)
migrate = Migrate(app, db)
celery = make_celery(app)

from flask_rest.flask_rest_release1 import long_task, short_task_1, short_task_2


api.add_resource(long_task, '/long_task')
api.add_resource(short_task_1, '/short_task_1')
api.add_resource(short_task_2, '/short_task_2')


настройки UWSGI
[uwsgi]
master = true
http-socket = 0.0.0.0:8000
chdir = /root/to/chdir
smart-attach-daemon = celery_worker/all_queue.pid celery -A flask_rest.celery worker -Q long, short -E -n program_with_tasks --pidfile celery_worker/all_queue.pid
touch-reload = uwsgi_dev_1.ini
workers = 1
processes = 2
chmod-socket = 777
wsgi-file = run.py
callable = app
task_acks_late = True
task_track_started = True
; daemonize = uwsgi_dev_1.log
CELERYD_MAX_TASKS_PER_CHILD = 1 
CELERY_CREATE_MISSING_QUEUES = True
  • Вопрос задан
  • 113 просмотров
Подписаться 2 Средний Комментировать
Решения вопроса 1
@Vladimyr1991 Автор вопроса
Младший бекенд разработчик
Почитал немного документации и решил методом научного тыка провести эксперимент и задачу решил. Оказалось, что в рамках одного .ini файла с настройками можно указать 2 раза запуск celery.

Отдельные настройки можно выполнить в строке запуска сельдерея.

[uwsgi]
master = true
http-socket = 0.0.0.0:8000
chdir = /app/mapping
smart-attach-daemon = celery_worker/long.pid celery -A flask_rest.celery worker -Q long -E -n worker_long --concurrency=3 --pidfile celery_worker/long.pid
smart-attach-daemon = celery_worker/short.pid celery -A flask_rest.celery worker -Q short -E -n worker_short --concurrency=1 --pidfile celery_worker/short.pid
touch-reload = uwsgi_prod.ini
workers = 4
processes = 2
chmod-socket = 777
wsgi-file = run.py
callable = app
task_acks_late = True
task_track_started = True
daemonize = uwsgi_prod.log

CELERYD_MAX_TASKS_PER_CHILD = 1 
CELERY_CREATE_MISSING_QUEUES = True


как то так...
Ответ написан
Комментировать
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы