maximkv25
@maximkv25
web-developer

Почему периодичность в celery не выполняется асинхронно?

Всем привет, разбираюсь с celery и есть проблема
Каждую минуту исполняется period task
# Список отправляемых сообщений
@periodic_task(
    run_every=(crontab(minute='*/1'))
)
def message_delayed():
    # if request.method == 'GET':
        mes = Message.objects.filter(status='scheduled').values('owner_id', 'message_id', 'timestamp')
        # получаем id сообщений которые соответствуют времени отправки
        send_dlist = {i['message_id']: get_token(i['owner_id']) for i in mes if
                      i['timestamp'] in range(round(time.time() - 60), round(time.time()))}
        print('TIME ', range(round(time.time() - 60), round(time.time())))
        print('SEND LIST ', send_dlist)


        user_id = 259
        user = UserProfile.objects.get(id=user_id)


        scheduled_req.delay(send_dlist)


@task
def scheduled_req(send_dlist):
    from api.models import Scheduled
    for i in send_dlist:
        url = 'http://######8080/api/v1/message/send'
        data = dict(token=send_dlist.get(i), message_id=i)
        req = requests.post(url, data=json.dumps(data))

        try:
            Scheduled.objects.create(response=req.json())
        except Exception as e:
            print(str(e))


Вообщем что-то я упустил, получается что селери ждет пока scheduled_req завершит отправку, вот логи
11-50 проверил что к отправке
11-51 тоже
11-52 получил сообщения которые нужно отправить
11-53 как видно задание получено, но принт не работает, как буд-то ждет пока предыдущие отправятся
не могу разобраться...

[2017-02-24 11:50:00,042: WARNING/PoolWorker-1] TIME
[2017-02-24 11:50:00,043: WARNING/PoolWorker-1] range(1487936940, 1487937000)
[2017-02-24 11:50:00,043: WARNING/PoolWorker-1] SEND LIST
[2017-02-24 11:50:00,043: WARNING/PoolWorker-1] {}
[2017-02-24 11:50:00,057: INFO/MainProcess] Received task: api.controllers.message.scheduled_req[00362657-8e66-4e0a-852e-50bfe71468ed]  
[2017-02-24 11:50:00,061: INFO/PoolWorker-1] Task api.controllers.message.message_delayed[deb42bc9-5736-4b6e-97c6-6ffe7499d35a] succeeded in 0.021350613998947665s: None
[2017-02-24 11:50:00,064: INFO/PoolWorker-1] Task api.controllers.message.scheduled_req[00362657-8e66-4e0a-852e-50bfe71468ed] succeeded in 0.0007557420176453888s: None
[2017-02-24 11:51:00,027: INFO/MainProcess] Received task: api.controllers.message.message_delayed[3a265703-5083-4003-ad0f-a856fe729e1d]  
[2017-02-24 11:51:00,035: WARNING/PoolWorker-1] TIME
[2017-02-24 11:51:00,035: WARNING/PoolWorker-1] range(1487937000, 1487937060)
[2017-02-24 11:51:00,036: WARNING/PoolWorker-1] SEND LIST
[2017-02-24 11:51:00,036: WARNING/PoolWorker-1] {}
[2017-02-24 11:51:00,040: INFO/MainProcess] Received task: api.controllers.message.scheduled_req[2692fa0f-c6fa-43b8-bab8-c7b397e2c1bd]  
[2017-02-24 11:51:00,041: INFO/PoolWorker-1] Task api.controllers.message.message_delayed[3a265703-5083-4003-ad0f-a856fe729e1d] succeeded in 0.010261352988891304s: None
[2017-02-24 11:51:00,044: INFO/PoolWorker-1] Task api.controllers.message.scheduled_req[2692fa0f-c6fa-43b8-bab8-c7b397e2c1bd] succeeded in 0.0005564790044445544s: None
[2017-02-24 11:52:00,043: INFO/MainProcess] Received task: api.controllers.message.message_delayed[b082c5f4-7ba5-41a3-be65-ea06c627dc53]  
[2017-02-24 11:52:00,072: WARNING/PoolWorker-1] TIME
[2017-02-24 11:52:00,072: WARNING/PoolWorker-1] range(1487937060, 1487937120)
[2017-02-24 11:52:00,073: WARNING/PoolWorker-1] SEND LIST
[2017-02-24 11:52:00,073: WARNING/PoolWorker-1] {'33_10': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_17': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_16': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_18': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_14': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_5': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_6': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_13': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_15': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_8': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_3': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_0': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_19': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_12': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_4': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_1': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_2': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_9': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_11': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_7': 'e276bd68-f9b7-11e6-a095-52540010ddb4'}
[2017-02-24 11:52:00,079: INFO/MainProcess] Received task: api.controllers.message.scheduled_req[8003eb93-cf6a-4ee2-977c-bcd81b758486]  
[2017-02-24 11:52:00,080: INFO/PoolWorker-1] Task api.controllers.message.message_delayed[b082c5f4-7ba5-41a3-be65-ea06c627dc53] succeeded in 0.035682129993801937s: None
[2017-02-24 11:52:00,086: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:52:17,179: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:52:34,951: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:52:50,855: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:53:00,031: INFO/MainProcess] Received task: api.controllers.message.message_delayed[42b9e874-62de-4a0a-a976-02c1fe29f5c9]  
[2017-02-24 11:53:06,045: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:53:20,860: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:53:36,361: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:53:51,889: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:54:00,047: INFO/MainProcess] Received task: api.controllers.message.message_delayed[36a575ac-cade-481d-897f-f7bf722a4d15]  
[2017-02-24 11:54:13,885: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:54:30,652: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:54:46,091: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:55:00,008: INFO/MainProcess] Received task: api.controllers.message.message_delayed[a1ae26d3-61b7-402c-97a6-d798a89d7ad9]  
[2017-02-24 11:55:01,357: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:55:16,274: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:55:31,450: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:55:46,190: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:56:00,035: INFO/MainProcess] Received task: api.controllers.message.message_delayed[7f0e576a-0004-4837-a973-ce76e5ed5854]  
[2017-02-24 11:56:01,688: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:56:16,942: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
  • Вопрос задан
  • 501 просмотр
Пригласить эксперта
Ответы на вопрос 1
@iMrDron
В дополнение к ответу Максима, вспомнил что был на докладе где рассказывали как при помощи asyncio скачать какие-то данные со всего вк за сутки, посмотрите, может вам будет полезно для вашей реализации - https://youtu.be/8wvQGRJiKdY?t=1699

По сути вы можете начать с того что бы не сильно все прям менять, просто сделать таски целери на asyncio, что бы не 10к тасок, а 10 тасок, и каждая по 1000 реквестов делала при помощи asyncio. ну и если при этом будет 10 воркеров то по идее все будет одновременно как раз таки работать.
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы