Задать вопрос
Viji
@Viji
DevOps Engineer

Почему следующий код может иногда падать с runtime error?

Примерно в 30% код внизу падает (питон 3.9 последний)

import asyncio
import datetime
import colorama
import random


def main():
    # Changed this from the video due to changes in Python 3.10:
    # DeprecationWarning: There is no current event loop, loop = asyncio.get_event_loop()
    loop = asyncio.new_event_loop()

    t0 = datetime.datetime.now()
    print(colorama.Fore.WHITE + "App started.", flush=True)

    data = asyncio.Queue()

    task1 = loop.create_task(generate_data(20, data))
    task2 = loop.create_task(generate_data(20, data))
    task3 = loop.create_task(process_data(40, data))

    final_task = asyncio.gather(task1, task2, task3)
    loop.run_until_complete(final_task)

    dt = datetime.datetime.now() - t0
    print(colorama.Fore.WHITE + f"App exiting, total time: {dt.total_seconds():,.2f} sec.", flush=True)


async def generate_data(num: int, data: asyncio.Queue):
    for idx in range(1, num + 1):
        item = idx * idx
        await data.put((item, datetime.datetime.now()))

        print(colorama.Fore.YELLOW + f" -- generated item {idx}", flush=True)
        await asyncio.sleep(random.random() + 0.5)


async def process_data(num: int, data: asyncio.Queue):
    processed = 0
    while processed < num:
        item = await data.get()

        processed += 1
        value = item[0]
        t = item[1]
        dt = datetime.datetime.now() - t

        print(colorama.Fore.CYAN + f" +++ Processed value {value} after {dt.total_seconds():,.2f} sec.", flush=True)
        await asyncio.sleep(0.5)


if __name__ == '__main__':
    main()


c такой ошибкой
RuntimeError: Task <Task pending name='Task-3' coro=<process_data() running at async_program.py:40> cb=[_gather.<locals>._done_callback() at .pyenv/versions/3.9.13/lib/python3.9/asyncio/tasks.py:767]> got Future <Future pending> attached to a different loop
Task was destroyed but it is pending!
task: <Task pending name='Task-1' coro=<generate_data() running at async_program.py:34> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f4babcf3400>()]> cb=[_gather.<locals>._done_callback() at .pyenv/versions/3.9.13/lib/python3.9/asyncio/tasks.py:767]>
Task was destroyed but it is pending!
task: <Task pending name='Task-2' coro=<generate_data() running at async_program.py:34> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f4babcf33a0>()]> cb=[_gather.<locals>._done_callback() at .pyenv/versions/3.9.13/lib/python3.9/asyncio/tasks.py:767]>

Process finished with exit code 1


программу взял с гитхаба, разбираюсь
  • Вопрос задан
  • 1104 просмотра
Подписаться 2 Простой Комментировать
Пригласить эксперта
Ответы на вопрос 1
Код [запускался на Python 3.8, 3.9, 3.10]
import asyncio
import datetime
import random
import colorama


async def main():
    t0 = datetime.datetime.now()
    print(colorama.Fore.WHITE + "App started.", flush=True)

    data = asyncio.Queue()

    task1 = asyncio.ensure_future(generate_data(20, data))
    task2 = asyncio.ensure_future(generate_data(20, data))
    task3 = asyncio.ensure_future(process_data(40, data))
    # Переименовано из "final_task" в "gather_result" т.к. gather возвращает результат (Список значений объектов)
    # В данном коде ничего не возвращается и можно удалить print и саму переменную
    gather_result = await asyncio.gather(task1, task2, task3)
    print(gather_result)
    dt = datetime.datetime.now() - t0
    print(colorama.Fore.WHITE + f"App exiting, total time: {dt.total_seconds():,.2f} sec.", flush=True)


async def generate_data(num: int, data: asyncio.Queue):
    for idx in range(1, num + 1):
        item = idx * idx  # equal idx ** 2 
        await data.put((item, datetime.datetime.now()))

        print(colorama.Fore.YELLOW + f" -- generated item {idx}", flush=True)
        await asyncio.sleep(random.random() + 0.5)


async def process_data(num: int, data: asyncio.Queue):
    processed = 0
    while processed < num:
        item = await data.get()

        processed += 1
        value = item[0]
        t = item[1]
        dt = datetime.datetime.now() - t

        print(colorama.Fore.CYAN + f" +++ Processed value {value} after {dt.total_seconds():,.2f} sec.", flush=True)
        await asyncio.sleep(0.5)


if __name__ == '__main__':
    asyncio.run(main())

Asyncio "Running Tasks Concurrently" gather
asyncio.gather на русском
Приведенный вами код актуален для Python 3.6 (Пункт 18.5.3.5.1. документации asyncio Python 3.6.15), но не для 3.7+
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы