@3FANG

Как итерировать асинхронный генератор, который уже находится в работе?

У меня есть асинхронный генератор (он вырван из контекста другого скрипта):
import asyncio
from time import time


PROXY_KEYS = {
    "http://1.1.1.1:9404": "ff4e67941cb08be2fbc164fbbd00",
    "http://1.1.1.1:9309": "9286f4c4de633ed5fd39420sff1d",
    "http://1.1.1.1:9551": "4c62aa15fa9743e55a1ff1e0696e",
}


async def get_proxy_and_key(proxy_and_key: dict):
    """
    Получить пару прокси-APIключ.

    Проходится циклом по всем парам, каждую он отдает на 10(9) запросов,
    если перед повторным использованием пары не прошла 1 секунда, то скрипт ожидает
    истечения этого времени для избежания ошибки Ratelimit exceed.
    """
    while True:
        t0 = time()
        for proxy, key in proxy_and_key.items():
            for _ in range(9):
                yield proxy, key
        end = round(time() - t0, 2)
        if end < 1:
            # можно сделать ожидание истечения оставшегося до 1 секунды времени,
            # но я поставил именно 1 сек, чтоб наверняка 
            await asyncio.sleep(1)


async def request(proxy_and_key_gen):
    print(await anext(proxy_and_key_gen))


async def main():
    tasks = []
    gen = get_proxy_and_key(PROXY_KEYS)
    for i in range(100):
        task = asyncio.create_task(request(gen))
        tasks.append(task)

    await asyncio.gather(*tasks)


asyncio.run(main())

(это демонстрационный код, для проверки работы асинхронного генератора)

Вывод получается такой:
('http://1.1.1.1:9404', 'ff4e67941cb08be2fbc164fbbd00')
('http://1.1.1.1:9404', 'ff4e67941cb08be2fbc164fbbd00')
('http://1.1.1.1:9404', 'ff4e67941cb08be2fbc164fbbd00')
('http://1.1.1.1:9404', 'ff4e67941cb08be2fbc164fbbd00')
('http://1.1.1.1:9404', 'ff4e67941cb08be2fbc164fbbd00')
('http://1.1.1.1:9404', 'ff4e67941cb08be2fbc164fbbd00')
('http://1.1.1.1:9404', 'ff4e67941cb08be2fbc164fbbd00')
('http://1.1.1.1:9404', 'ff4e67941cb08be2fbc164fbbd00')
('http://1.1.1.1:9404', 'ff4e67941cb08be2fbc164fbbd00')
('http://1.1.1.1:9309', '9286f4c4de633ed5fd39420sff1d')
('http://1.1.1.1:9309', '9286f4c4de633ed5fd39420sff1d')
('http://1.1.1.1:9309', '9286f4c4de633ed5fd39420sff1d')
('http://1.1.1.1:9309', '9286f4c4de633ed5fd39420sff1d')
('http://1.1.1.1:9309', '9286f4c4de633ed5fd39420sff1d')
('http://1.1.1.1:9309', '9286f4c4de633ed5fd39420sff1d')
('http://1.1.1.1:9309', '9286f4c4de633ed5fd39420sff1d')
('http://1.1.1.1:9309', '9286f4c4de633ed5fd39420sff1d')
('http://1.1.1.1:9309', '9286f4c4de633ed5fd39420sff1d')
('http://1.1.1.1:9551', '4c62aa15fa9743e55a1ff1e0696e')
('http://1.1.1.1:9551', '4c62aa15fa9743e55a1ff1e0696e')
('http://1.1.1.1:9551', '4c62aa15fa9743e55a1ff1e0696e')
('http://1.1.1.1:9551', '4c62aa15fa9743e55a1ff1e0696e')
('http://1.1.1.1:9551', '4c62aa15fa9743e55a1ff1e0696e')
('http://1.1.1.1:9551', '4c62aa15fa9743e55a1ff1e0696e')
('http://1.1.1.1:9551', '4c62aa15fa9743e55a1ff1e0696e')
('http://1.1.1.1:9551', '4c62aa15fa9743e55a1ff1e0696e')
('http://1.1.1.1:9551', '4c62aa15fa9743e55a1ff1e0696e')
Traceback (most recent call last):
  File "/home/russ/development/TON/test2.py", line 46, in <module>
    asyncio.run(main())
  File "/usr/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
    return future.result()
  File "/home/russ/development/TON/test2.py", line 44, in main
    await asyncio.gather(*tasks)
  File "/home/russ/development/TON/test2.py", line 34, in request
    print(await anext(proxy_and_key_gen))
RuntimeError: anext(): asynchronous generator is already running


Во время ожидания выполнения очередной задачи, когда генератор прошелся по всем парам по 9 раз и вернулся к первой паре, время выполнения было меньше 1 секунды, и соответственно, генератор выполнил инструкцию await asyncio.sleep(1). Как я понял, следующая задача вызвала объект генератора, но тот уже как бы работал (был в ожидании из-за asyncio.sleep(1)), что и вызвало ошибку.

Как это исправить?
  • Вопрос задан
  • 288 просмотров
Пригласить эксперта
Ответы на вопрос 1
@3FANG Автор вопроса
Для решения своей проблемы я воспользовался очередью(asyncio.Queue), т.е. я стал пользоваться не асинхронным генератором, а корутиной, которая вместо возврата значения (с помощью yield) добавляла его в очередь. А в другую корутину я уже передавал не асинхронный генератор, а эту самую очередь и потом из нее доставал значение.
Вот что вышло:
import asyncio
from time import time


PROXY_KEYS = {
    "http://1.1.1.1:9404": "ff4e67941cb08be2fbc164fbbd00",
    "http://1.1.1.1:9309": "9286f4c4de633ed5fd39420sff1d",
    "http://1.1.1.1:9551": "4c62aa15fa9743e55a1ff1e0696e",
}


async def get_proxy_and_key(proxy_and_key: dict, queue: asyncio.Queue):
    """
    Получить пару прокси-APIключ.

    Проходится циклом по всем парам, каждую он отдает на 10(9) запросов,
    если перед повторным использованием пары не прошла 1 секунда, то скрипт ожидает
    истечения этого времени для избежания ошибки Ratelimit exceed.
    """
    while True:
        t0 = time()
        for proxy, key in proxy_and_key.items():
            for _ in range(9):
                await queue.put((proxy, key))
        end = round(time() - t0, 2)
        if end < 1:
            # можно сделать ожидание истечения оставшегося до 1 секунды времени,
            # но я поставил именно 1 сек, чтоб наверняка 
            await asyncio.sleep(1)


async def request(queue: asyncio.Queue):
    while True:
        pair = await queue.get()
        # Какие-то манипуляции, но мы просто будем выводить на печать
        print(pair)


async def main():
    queue = asyncio.Queue(maxsize=100)
    queue_task = asyncio.create_task(get_proxy_and_key(PROXY_KEYS, queue))

    tasks = []
    for i in range(100):
        task = asyncio.create_task(request(queue))
        tasks.append(task)

    await asyncio.gather(queue_task, *tasks)


asyncio.run(main())
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы