@lemonlimelike

Как работают очереди в Python?

Всем привет! Написал парсера, который работает на потоках, с помощью модуля concurrent. Данные которые я парсю сворачиваю в список, превращаю в строку через json и добавляю в json файл; но заметил такое, что иногда(очень редко) как-будто добавляются одновременно две строки в файл(прям в тоже время) и каким-то образом строки, которые в одно и тоже время добавляются становится невалидными, типа строка на строку залазит и скобочки,запятые, кавычки путаются как бы. Думаю, что нужно сделать чтоб эти данные помещались в очередь, а потом уже в файл. Но, я не могу понять как работает очередь.
Очереди, они асинхронны? То есть если в потоки строка будет записываться в очередь, то она сразу будет доступна уже на выходе? У меня идет цикл по данным из файла. Нужно ли будет дожидаться окончания цикла, чтобы достать данные из очереди?

Вот краткий пример как я делаю:

def append_in_file(data):
    with open('conf/result_store.json','a') as file_json:
        try:
            file_json.write(json.dumps(data)+'\n')
        except Exception as e:
            print('Error 7: ',e.args,item,offers)


def parser(item, index,proxy, list_ignore,window,length_data):
    str_req = f"site.com/search?detailNum={item['idnp']}&make={item['brand']}"
    list_item = []
    
    u_a = get_user_agent()
    headers = {'Accept': 'application/json, text/plain, */*',
               'Accept-Encoding': 'gzip, deflate, br',
               'Accept-Language': 'ru-RU,ru;q=0.9,en-US;q=0.8,en;q=0.7',
               'Cache-Control': 'max-age=0',
               'Connection': 'keep-alive',
               'Host': 'emex.ru',
               'User-Agent': u_a}
    try:
        item_req = requests.get(str_req, proxies=proxy,
	                            headers=headers,
	                            timeout=10)
        content_str = item_req.content.decode('utf-8')
        content_json = json.loads(content_str)['searchResult']
    except:
        list_item.append([item['idnp'],'',item['alias'],item['min_count'],item['max_time_delivery'],'0',item['count_sell']])
    
    list_sort = []
    if 'points' in content_json:
        if content_json['points']['list']:
            list_sort = sorted(content_json['points']['list'], key=lambda obj:obj['price']['value'])[0]
            str_req += f"&locationId={list_sort['locationId']}&showAll=true"
        else:
            list_item.append([item['idnp'],'',item['alias'],item['min_count'],item['max_time_delivery'],'0',item['count_sell']])
    else:
        list_item.append([item['idnp'],'',item['alias'],item['min_count'],item['max_time_delivery'],'0',item['count_sell']])
    
    if list_sort:
        i = 0
        while True:
            i += 1
            if i >= 5:
                break
            two_proxy = proxy
            try:
                item_req = requests.get(str_req, proxies=two_proxy,
                                        headers=headers,
                                        timeout=10)
                content_str = item_req.content.decode('utf-8')
                content_json = json.loads(content_str)['searchResult']
                if content_json['originals']:
                    break
            except Exception as e:
                two_proxy = get_proxy()


        list_sorted = []
        # list_offer = []
        try:
            list_sorted = sorted(content_json['originals'][0]['offers'], key=lambda obj:obj['price']['value'])
        except Exception as e:
            print('Error1: ',e.args)
            list_item.append([item['idnp'],'',item['alias'],item['min_count'],item['max_time_delivery'],'0',item['count_sell']])

        if not list_sorted:
            list_item.append([item['idnp'],'',item['alias'],item['min_count'],item['max_time_delivery'],'0',item['count_sell']])
        else:
            list_data = [item['idnp'],content_json['name'],item['alias'],item['min_count'],item['max_time_delivery']]
            for i, offer in enumerate(list_sorted):
                if i == 3:
                    break
                if offer['rating']['code'].upper() in list_ignore:
                    continue
                if int(offer['quantity']) >= int(item['min_count']) and int(offer['delivery']['value']) <= int(item['max_time_delivery']):
                    list_data.append(offer['price']['value'])
                    list_data.append(offer['rating']['code'])

        list_data.append(item['count_sell'])
    if not list_item:
        append_in_file(list_data)
    else:
        append_in_file(list_item)


def start(window, file_input,list_ignore,count_threads,new_name):
    wbf = Workbook()
    ws = wbf.active
    ws.append(['Артикул','Наименование','Бренд','Мин.кол-во штук','Макс.время доставки','Цена1','Склад1','Цена2','Склад2','Цена3','Склад3','Кол-во продаж'])
    with open('conf/result_store.json','w') as file_write:
        file_write.write('')
    window['out_note'].Update(value='Загрузка данных из файла')
    list_data = get_list_data(file_input)
    window['out_note'].Update(value='Получение прокси')
    proxy = get_proxy()
    count_requests = 25
    futures_to_url = []
    with thx(max_workers=int(count_threads)) as executor:
        for index, item in enumerate(list_data):
            if index >= count_requests:
                proxy = get_proxy()
                count_requests += 25
            futures_to_url.append(executor.submit(parser, item, index, proxy, list_ignore,window,len(list_data)))
       
    try:
        with open('conf/result_store.json','r',encoding='utf-8') as file_json:
            data_json = [json.loads(item) for item in file_json]
        for data in list_data:
            for item in data_json:
                if data['idnp'] in item[0] or data['idnp'] == item[0]:
                    ws.append(item)
                    break
               
        wbf.save(f'{new_name}.xlsx')
    except Exception as e:
        window['out_note'].Update(value='Произошла ошибка с сохранение данных!')
        with open('conf/log.info','w') as file_log:
            file_log.write(e.args[0])


в функции start я получаю данные из файла, затем запускается цикл по этим данным. и соотвественно запускается поток для каждой итерации. Каждые 25 итерации меняется IP-адресс, парсер работает через Tor Browser, для получения IP нужно некоторое время.
Далее запускается в потоки функция parser, которая парсит, в исключениях, если происходит какая-то ошибка то в список добавляется данные в которых есть ошибки. Ну и т.д.
И затем вызывается функция append_in_file, которая добавляет данные в файл в виде json'a.

И вот вопрос: если я буду вместо вызова функции append_in_file добавлять переданный аргумент в очередь, она будет сразу же доступна где-то на выходе?

Я пробовал использовать метод as_completed, но почему-то у меня там что-то не получилось

Как мне использовать очереди для моего кода? Что нужно поменять?
  • Вопрос задан
  • 702 просмотра
Пригласить эксперта
Ответы на вопрос 2
dimonchik2013
@dimonchik2013
non progredi est regredi
concurrent конечно асинхронный модуль
Ответ написан
Комментировать
@Stqs
senior software developer
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы