1. Создать очередь (asyncio.Queue)
2. Парсер пишет в очередь, писарь csv читает очередь и пишет в неё.
3. Парсер и писарь должны запускаться одновременно, то есть, типа такого
q = asyncio.Queue() # с maxsize можно поиграться, в данном случае отставание писаря от парсера может быть не более чем на один элемент
await asyncio.gather(get_pages_data(q), csv_writer(q))
4. Парсеров, кажется, надо сделать больше одного. То есть, появляется второй asyncio.Queue (назовём его work_queue), куда падают lines из файла, эту очередь слушают N воркеров (скажем, 5 штук), получают элемент, работают с ним, затем пишут в result_queue, который слушает писарь csv и записывает результат в файлик
Псевдокод будет выглядеть так:
async def file_reader(work_q, n_parsers):
with open('all_links_1.txt', 'r') as f:
lines = [line.strip() for line in f.readlines()]
for line in lines:
await work_q.put(line)
for _ in range(n_parsers): # говорим парсерам, что работы больше нет
await work_q.put(None)
async def parser(work_q, results_q):
while True:
line = await work_q.get()
if line is None:
return
result = ... магия с походом в http ...
await results_q.put(result)
async def writer(results_q):
with open('companys.csv', 'w', newline='') as file: # возможно, открывать файл имеет смысл при каждом получении элемента и закрывать после записи, так файл всегда будет "целым", но процесс записи будет дольше
writer = csv.writer(file, delimiter=',')
while True:
result = await results_q.get()
if result is None:
return
writer.writerow([result['name'], result['phone'], result['edrpou']])
async def main():
work_queue = asyncio.Queue()
results_queue = asyncio.Queue(10) # парсер не должен ждать, пока писарь запишет в файл (хард может быть занят), поэтому небольшой буфер
n_parsers = 5
tasks = []
parsers = []
reader_task = asyncio.create_task(file_reader(work_queue, n_parsers))
tasks.append(tasks)
for _ in range(n_parsers):
parser_task = asyncio.create_task(parser(work_queue, results_queue))
tasks.append(parser_task)
parsers.append(parser_task)
tasks.append(asyncio.create_task(writer(results_queue)))
await asyncio.gather(*parsers) # ждём все парсеры
await results_queue.put(None) # говорим писарю, что больше ничего не будет
await asyncio.gather(*tasks) # дожидаемся все остальные таски (вернее, будет только одна — writer)
Однако нужно понимать, что порядок результатов при таком подходе не будет гарантированным или хоть сколько-то стабильным. Поэтому, если порядок важен, стоит писать в файл какую-то промежуточную структуру (пусть тот же csv, но с доп столбцом link) и под конец всех работ вычитывать её, сортировать и складывать уже в нужном порядке