Всем привет! Написал парсера, который работает на потоках, с помощью модуля concurrent. Данные которые я парсю сворачиваю в список, превращаю в строку через json и добавляю в json файл; но заметил такое, что иногда(очень редко) как-будто добавляются одновременно две строки в файл(прям в тоже время) и каким-то образом строки, которые в одно и тоже время добавляются становится невалидными, типа строка на строку залазит и скобочки,запятые, кавычки путаются как бы. Думаю, что нужно сделать чтоб эти данные помещались в очередь, а потом уже в файл. Но, я не могу понять как работает очередь.
Очереди, они асинхронны? То есть если в потоки строка будет записываться в очередь, то она сразу будет доступна уже на выходе? У меня идет цикл по данным из файла. Нужно ли будет дожидаться окончания цикла, чтобы достать данные из очереди?
Вот краткий пример как я делаю:
def append_in_file(data):
with open('conf/result_store.json','a') as file_json:
try:
file_json.write(json.dumps(data)+'\n')
except Exception as e:
print('Error 7: ',e.args,item,offers)
def parser(item, index,proxy, list_ignore,window,length_data):
str_req = f"site.com/search?detailNum={item['idnp']}&make={item['brand']}"
list_item = []
u_a = get_user_agent()
headers = {'Accept': 'application/json, text/plain, */*',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'ru-RU,ru;q=0.9,en-US;q=0.8,en;q=0.7',
'Cache-Control': 'max-age=0',
'Connection': 'keep-alive',
'Host': 'emex.ru',
'User-Agent': u_a}
try:
item_req = requests.get(str_req, proxies=proxy,
headers=headers,
timeout=10)
content_str = item_req.content.decode('utf-8')
content_json = json.loads(content_str)['searchResult']
except:
list_item.append([item['idnp'],'',item['alias'],item['min_count'],item['max_time_delivery'],'0',item['count_sell']])
list_sort = []
if 'points' in content_json:
if content_json['points']['list']:
list_sort = sorted(content_json['points']['list'], key=lambda obj:obj['price']['value'])[0]
str_req += f"&locationId={list_sort['locationId']}&showAll=true"
else:
list_item.append([item['idnp'],'',item['alias'],item['min_count'],item['max_time_delivery'],'0',item['count_sell']])
else:
list_item.append([item['idnp'],'',item['alias'],item['min_count'],item['max_time_delivery'],'0',item['count_sell']])
if list_sort:
i = 0
while True:
i += 1
if i >= 5:
break
two_proxy = proxy
try:
item_req = requests.get(str_req, proxies=two_proxy,
headers=headers,
timeout=10)
content_str = item_req.content.decode('utf-8')
content_json = json.loads(content_str)['searchResult']
if content_json['originals']:
break
except Exception as e:
two_proxy = get_proxy()
list_sorted = []
# list_offer = []
try:
list_sorted = sorted(content_json['originals'][0]['offers'], key=lambda obj:obj['price']['value'])
except Exception as e:
print('Error1: ',e.args)
list_item.append([item['idnp'],'',item['alias'],item['min_count'],item['max_time_delivery'],'0',item['count_sell']])
if not list_sorted:
list_item.append([item['idnp'],'',item['alias'],item['min_count'],item['max_time_delivery'],'0',item['count_sell']])
else:
list_data = [item['idnp'],content_json['name'],item['alias'],item['min_count'],item['max_time_delivery']]
for i, offer in enumerate(list_sorted):
if i == 3:
break
if offer['rating']['code'].upper() in list_ignore:
continue
if int(offer['quantity']) >= int(item['min_count']) and int(offer['delivery']['value']) <= int(item['max_time_delivery']):
list_data.append(offer['price']['value'])
list_data.append(offer['rating']['code'])
list_data.append(item['count_sell'])
if not list_item:
append_in_file(list_data)
else:
append_in_file(list_item)
def start(window, file_input,list_ignore,count_threads,new_name):
wbf = Workbook()
ws = wbf.active
ws.append(['Артикул','Наименование','Бренд','Мин.кол-во штук','Макс.время доставки','Цена1','Склад1','Цена2','Склад2','Цена3','Склад3','Кол-во продаж'])
with open('conf/result_store.json','w') as file_write:
file_write.write('')
window['out_note'].Update(value='Загрузка данных из файла')
list_data = get_list_data(file_input)
window['out_note'].Update(value='Получение прокси')
proxy = get_proxy()
count_requests = 25
futures_to_url = []
with thx(max_workers=int(count_threads)) as executor:
for index, item in enumerate(list_data):
if index >= count_requests:
proxy = get_proxy()
count_requests += 25
futures_to_url.append(executor.submit(parser, item, index, proxy, list_ignore,window,len(list_data)))
try:
with open('conf/result_store.json','r',encoding='utf-8') as file_json:
data_json = [json.loads(item) for item in file_json]
for data in list_data:
for item in data_json:
if data['idnp'] in item[0] or data['idnp'] == item[0]:
ws.append(item)
break
wbf.save(f'{new_name}.xlsx')
except Exception as e:
window['out_note'].Update(value='Произошла ошибка с сохранение данных!')
with open('conf/log.info','w') as file_log:
file_log.write(e.args[0])
в функции start я получаю данные из файла, затем запускается цикл по этим данным. и соотвественно запускается поток для каждой итерации. Каждые 25 итерации меняется IP-адресс, парсер работает через Tor Browser, для получения IP нужно некоторое время.
Далее запускается в потоки функция parser, которая парсит, в исключениях, если происходит какая-то ошибка то в список добавляется данные в которых есть ошибки. Ну и т.д.
И затем вызывается функция append_in_file, которая добавляет данные в файл в виде json'a.
И вот вопрос: если я буду вместо вызова функции append_in_file добавлять переданный аргумент в очередь, она будет сразу же доступна где-то на выходе?
Я пробовал использовать метод as_completed, но почему-то у меня там что-то не получилось
Как мне использовать очереди для моего кода? Что нужно поменять?