По отдельности сбор категорий и информации по одной ссылке происходит.
Когда объединяю код, то данные в ссылке на странице
отсутствуют.
import requests
import logging
import time
import aiohttp
import asyncio
import asyncpg
from datetime import datetime
from urllib.parse import urlparse
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("data_collection.log"),
logging.StreamHandler()
]
)
def extract_child_data(children, data_list):
"""Извлечение данных из дочерних элементов."""
for child in children:
try:
category_name = child.get('name', 'Unknown')
category_url = child.get('url', '')
shard = child.get('shard', '')
query = child.get('query', '')
data_list.append({
'category_name': category_name,
'category_url': category_url,
'shard': shard,
'query': query
})
# Рекурсивно обработать вложенных потомков
if 'childs' in child:
extract_child_data(child['childs'], data_list)
except Exception as e:
logging.warning(f"Ошибка при обработке элемента: {child}. Ошибка: {e}")
def get_with_retries(url, headers, retries=3):
"""Отправка GET-запроса с повторными попытками в случае ошибки."""
for attempt in range(retries):
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status() # Проверка на HTTP-ошибки
return response
except requests.RequestException as e:
logging.warning(f"Попытка {attempt + 1}/{retries} не удалась: {e}")
if attempt < retries - 1:
time.sleep(2 ** attempt) # Экспоненциальная задержка
raise requests.RequestException(f"Не удалось получить данные после {retries} попыток")
def get_catalogs_wb():
"""Получение списка категорий с Wildberries."""
url = 'https://static-basket-01.wbbasket.ru/vol0/data/main-menu-ru-ru-v3.json'
headers = {
"Accept": "*/*",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
"Accept-Language": "ru-RU,ru;q=0.9,en-US;q=0.8,en;q=0.7",
"Connection": "keep-alive"
}
try:
response = get_with_retries(url, headers)
if response.text.strip() == "":
logging.error("Сервер вернул пустой ответ.")
return []
data = response.json()
except requests.RequestException as e:
logging.error(f"Ошибка загрузки данных: {e}")
return []
except ValueError as e:
logging.error(f"Ошибка преобразования в JSON: {e}. Ответ сервера: {response.text[:500]}")
return []
# Анализ данных...
data_list = []
for d in data:
if 'childs' in d:
extract_child_data(d['childs'], data_list)
logging.info(f"Успешно извлечено {len(data_list)} записей.")
return data_list
async def get_category(shard, query, page=1):
"""Запрос данных с конкретной страницы."""
url = f'https://catalog.wb.ru/catalog/{shard}/v2/catalog?ab_testing=false&appType=1&cat={query}&curr=rub&dest=-1257786&hide_dtype=10&lang=ru&sort=popular&spp=30&page={page}'
# url = f'https://catalog.wb.ru/catalog/{shard}/v2/catalog?appType=1&curr=rub&lang=ru&page={page}&{query}'
headers = {
"Accept": "*/*",
"Accept-Language": "ru-RU,ru;q=0.9,en-US;q=0.8,en;q=0.7",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, ssl=False) as response:
if response.status == 200:
return await response.json()
else:
logging.error(f"Ошибка при запросе страницы {page}: статус {response.status}")
return None
except Exception as e:
logging.error(f"Ошибка подключения к API на странице {page}: {e}")
return None
async def prepare_items(response):
"""Обработка данных на одной странице."""
if not response:
logging.warning("Ответ от API пустой. Список товаров не будет обработан.")
return []
products_raw = response.get('data', {}).get('products', [])
products = []
for product in products_raw:
sizes = product.get('sizes', [])
first_size = sizes[0] if sizes else None
price = first_size.get('price', {}) if first_size else {}
products.append({
'id': product.get('id'),
'brand': product.get('brand'),
'name': product.get('name'),
'feedbacks': product.get('feedbacks', 0),
'nmReviewRating': product.get('nmReviewRating', 0.0),
'sale': product.get('sale', 0),
'priceU': price.get('basic', 0) / 100,
'salePriceU': price.get('total', 0) / 100
})
return products
async def save_to_database(products):
"""Сохранение данных в базу."""
if not products:
logging.warning("Список товаров пустой. Данные не будут записаны в базу.")
return
try:
conn = await asyncpg.connect(
user='postgres',
password='',
database='',
host='localhost',
port=5432
)
await conn.execute('''
CREATE TABLE IF NOT EXISTS products (
id SERIAL PRIMARY KEY,
product_id TEXT UNIQUE,
brand TEXT,
name TEXT,
feedbacks INTEGER,
nmReviewRating REAL,
sale INTEGER,
priceU REAL,
salePriceU REAL
)
''')
prepared_products = [
(
str(p['id']),
p['brand'],
p['name'],
p['feedbacks'],
p['nmReviewRating'],
p['sale'],
p['priceU'],
p['salePriceU']
)
for p in products
]
await conn.executemany('''
INSERT INTO products (product_id, brand, name, feedbacks, nmReviewRating, sale, priceU, salePriceU)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (product_id) DO UPDATE
SET brand = EXCLUDED.brand,
name = EXCLUDED.name,
feedbacks = EXCLUDED.feedbacks,
nmReviewRating = EXCLUDED.nmReviewRating,
sale = EXCLUDED.sale,
priceU = EXCLUDED.priceU,
salePriceU = EXCLUDED.salePriceU;
''', prepared_products)
logging.info(f"Данные успешно записаны в базу. Количество записей: {len(products)}.")
await conn.close()
except Exception as e:
logging.error(f"Ошибка записи данных в базу: {e}")
async def fetch_all_pages(shard, query):
"""Сбор данных из категории."""
page = 1
all_products = []
while True:
logging.info(f"Получение данных из категории {shard}, страница {page}.")
response = await get_category(shard, query, page)
if not response or not response.get('data', {}).get('products', []):
logging.info(f"Данные на странице {page} отсутствуют для категории {shard}. Завершение сбора.")
break
products = await prepare_items(response)
all_products.extend(products)
page += 1
return all_products
async def main():
catalogs = get_catalogs_wb()
start_time = datetime.now()
logging.info("Начало сбора данных.")
print(f"Начало сбора данных: {start_time}")
total_products = 0 # Счётчик общего количества товаров
for catalog in catalogs:
shard = catalog['shard']
query = catalog['query']
category_name = catalog['category_name']
if not shard or not query:
logging.warning(f"Пропускаем категорию {category_name} из-за отсутствия shard или query.")
continue
logging.info(f"Обработка категории {category_name} (shard: {shard}).")
all_products = await fetch_all_pages(shard, query)
if all_products:
await save_to_database(all_products)
total_products += len(all_products)
# Печать собранной информации по категории
print(f"Категория: {category_name}")
print(f"Собрано товаров: {len(all_products)}")
print("Примеры товаров:")
for product in all_products[:5]: # Печать первых 5 товаров
print(f" - {product['name']} (Бренд: {product['brand']}, Цена: {product['priceU']}, Скидка: {product['sale']}%)")
print("-" * 50)
end_time = datetime.now()
execution_time = end_time - start_time
# Финальный отчёт
logging.info(f"Сбор данных завершён. Время выполнения: {execution_time}.")
print(f"Сбор данных завершён: {end_time}")
print(f"Общее количество собранных товаров: {total_products}")
print(f"Время выполнения: {execution_time}")
if __name__ == '__main__':
asyncio.run(main())