maximkv25
@maximkv25
web-developer

Почему периодичность в celery не выполняется асинхронно?

Всем привет, разбираюсь с celery и есть проблема
Каждую минуту исполняется period task
# Список отправляемых сообщений
@periodic_task(
    run_every=(crontab(minute='*/1'))
)
def message_delayed():
    # if request.method == 'GET':
        mes = Message.objects.filter(status='scheduled').values('owner_id', 'message_id', 'timestamp')
        # получаем id сообщений которые соответствуют времени отправки
        send_dlist = {i['message_id']: get_token(i['owner_id']) for i in mes if
                      i['timestamp'] in range(round(time.time() - 60), round(time.time()))}
        print('TIME ', range(round(time.time() - 60), round(time.time())))
        print('SEND LIST ', send_dlist)


        user_id = 259
        user = UserProfile.objects.get(id=user_id)


        scheduled_req.delay(send_dlist)


@task
def scheduled_req(send_dlist):
    from api.models import Scheduled
    for i in send_dlist:
        url = 'http://######8080/api/v1/message/send'
        data = dict(token=send_dlist.get(i), message_id=i)
        req = requests.post(url, data=json.dumps(data))

        try:
            Scheduled.objects.create(response=req.json())
        except Exception as e:
            print(str(e))


Вообщем что-то я упустил, получается что селери ждет пока scheduled_req завершит отправку, вот логи
11-50 проверил что к отправке
11-51 тоже
11-52 получил сообщения которые нужно отправить
11-53 как видно задание получено, но принт не работает, как буд-то ждет пока предыдущие отправятся
не могу разобраться...

[2017-02-24 11:50:00,042: WARNING/PoolWorker-1] TIME
[2017-02-24 11:50:00,043: WARNING/PoolWorker-1] range(1487936940, 1487937000)
[2017-02-24 11:50:00,043: WARNING/PoolWorker-1] SEND LIST
[2017-02-24 11:50:00,043: WARNING/PoolWorker-1] {}
[2017-02-24 11:50:00,057: INFO/MainProcess] Received task: api.controllers.message.scheduled_req[00362657-8e66-4e0a-852e-50bfe71468ed]  
[2017-02-24 11:50:00,061: INFO/PoolWorker-1] Task api.controllers.message.message_delayed[deb42bc9-5736-4b6e-97c6-6ffe7499d35a] succeeded in 0.021350613998947665s: None
[2017-02-24 11:50:00,064: INFO/PoolWorker-1] Task api.controllers.message.scheduled_req[00362657-8e66-4e0a-852e-50bfe71468ed] succeeded in 0.0007557420176453888s: None
[2017-02-24 11:51:00,027: INFO/MainProcess] Received task: api.controllers.message.message_delayed[3a265703-5083-4003-ad0f-a856fe729e1d]  
[2017-02-24 11:51:00,035: WARNING/PoolWorker-1] TIME
[2017-02-24 11:51:00,035: WARNING/PoolWorker-1] range(1487937000, 1487937060)
[2017-02-24 11:51:00,036: WARNING/PoolWorker-1] SEND LIST
[2017-02-24 11:51:00,036: WARNING/PoolWorker-1] {}
[2017-02-24 11:51:00,040: INFO/MainProcess] Received task: api.controllers.message.scheduled_req[2692fa0f-c6fa-43b8-bab8-c7b397e2c1bd]  
[2017-02-24 11:51:00,041: INFO/PoolWorker-1] Task api.controllers.message.message_delayed[3a265703-5083-4003-ad0f-a856fe729e1d] succeeded in 0.010261352988891304s: None
[2017-02-24 11:51:00,044: INFO/PoolWorker-1] Task api.controllers.message.scheduled_req[2692fa0f-c6fa-43b8-bab8-c7b397e2c1bd] succeeded in 0.0005564790044445544s: None
[2017-02-24 11:52:00,043: INFO/MainProcess] Received task: api.controllers.message.message_delayed[b082c5f4-7ba5-41a3-be65-ea06c627dc53]  
[2017-02-24 11:52:00,072: WARNING/PoolWorker-1] TIME
[2017-02-24 11:52:00,072: WARNING/PoolWorker-1] range(1487937060, 1487937120)
[2017-02-24 11:52:00,073: WARNING/PoolWorker-1] SEND LIST
[2017-02-24 11:52:00,073: WARNING/PoolWorker-1] {'33_10': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_17': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_16': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_18': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_14': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_5': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_6': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_13': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_15': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_8': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_3': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_0': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_19': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_12': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_4': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_1': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_2': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_9': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_11': 'e276bd68-f9b7-11e6-a095-52540010ddb4', '33_7': 'e276bd68-f9b7-11e6-a095-52540010ddb4'}
[2017-02-24 11:52:00,079: INFO/MainProcess] Received task: api.controllers.message.scheduled_req[8003eb93-cf6a-4ee2-977c-bcd81b758486]  
[2017-02-24 11:52:00,080: INFO/PoolWorker-1] Task api.controllers.message.message_delayed[b082c5f4-7ba5-41a3-be65-ea06c627dc53] succeeded in 0.035682129993801937s: None
[2017-02-24 11:52:00,086: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:52:17,179: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:52:34,951: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:52:50,855: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:53:00,031: INFO/MainProcess] Received task: api.controllers.message.message_delayed[42b9e874-62de-4a0a-a976-02c1fe29f5c9]  
[2017-02-24 11:53:06,045: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:53:20,860: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:53:36,361: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:53:51,889: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:54:00,047: INFO/MainProcess] Received task: api.controllers.message.message_delayed[36a575ac-cade-481d-897f-f7bf722a4d15]  
[2017-02-24 11:54:13,885: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:54:30,652: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:54:46,091: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:55:00,008: INFO/MainProcess] Received task: api.controllers.message.message_delayed[a1ae26d3-61b7-402c-97a6-d798a89d7ad9]  
[2017-02-24 11:55:01,357: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:55:16,274: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:55:31,450: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:55:46,190: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:56:00,035: INFO/MainProcess] Received task: api.controllers.message.message_delayed[7f0e576a-0004-4837-a973-ce76e5ed5854]  
[2017-02-24 11:56:01,688: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
[2017-02-24 11:56:16,942: INFO/PoolWorker-1] Starting new HTTP connection (1): alpha
  • Вопрос задан
  • 502 просмотра
Пригласить эксперта
Ответы на вопрос 1
@iMrDron
В дополнение к ответу Максима, вспомнил что был на докладе где рассказывали как при помощи asyncio скачать какие-то данные со всего вк за сутки, посмотрите, может вам будет полезно для вашей реализации - https://youtu.be/8wvQGRJiKdY?t=1699

По сути вы можете начать с того что бы не сильно все прям менять, просто сделать таски целери на asyncio, что бы не 10к тасок, а 10 тасок, и каждая по 1000 реквестов делала при помощи asyncio. ну и если при этом будет 10 воркеров то по идее все будет одновременно как раз таки работать.
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы