Добрый день, коллеги!
Импортирую товары с api, ограничение 9000 запросов в минуту, взял облачный VDS с 12 ядрами, поставил 16 процессов экспериментальным методом и зафигачил. Если верить консоли (я решил засечь время обработки каждого "косяра" каждым процессом в отдельности, получил порядка 3-5 секунд, максимум, на каждый), в консоле очень бодро бегут строчки. Но на деле в базу залезает примерно 2000 - 3000 товаров в минуту, что оооочень медленно (нужно 30000 в минуту). Что я делаю не так? Я уже пожертвовал импортом дополнительных изображений и проверки на наличие товаров в других категориях (тупо дубли делаю с привязкой к другим категориям), что уже очень плохо, но скорости так и нет.
import random
import time
from django.core.management import BaseCommand
from decimal import Decimal
import multiprocessing
from multiprocessing import Pool
from django.db.models import Count
import zeep
from django.db import transaction
from pytils.translit import slugify
from core.models import Product, Category, ProductImage
def import_category_products(category):
print('import category %s' % category.name)
time.sleep(random.uniform(0.1, 3))
lock = multiprocessing.Lock()
from django.db import connection
connection.close()
allegro_api_key = 'key'
prod_web_api = 'https://webapi.allegro.pl/service.php?wsdl'
offset = 0
while True:
client = zeep.Client(wsdl=prod_web_api)
options = {'item': {'filterId': 'category', 'filterValueId': [str(category.catId)]}}
try:
result = client.service.doGetItemsList(
allegro_api_key, 1, filterOptions=options, resultSize=1000, resultOffset=offset, resultScope=2
)
except Exception as e:
print('result exc: %s' % e)
time.sleep(300)
continue
if not result or not result['itemsList']:
print('finish(%s)' % offset)
break
print('products: %s' % len(result['itemsList']['item']))
lock.acquire()
print (multiprocessing.current_process())
print('time loop start:%s' %time.clock() )
lock.release()
for product in result['itemsList']['item']:
#if product['priceInfo']['item'][0]['priceType'] == 'bidding': # priceType, исключить bidding
#continue
try:
images = product['photosInfo']['item']
general_image = None
for p in images:
if p['photoSize'] == 'large' and p['photoIsMain'] is True:
general_image = p['photoUrl'] # большое фото
if not general_image:
continue
except Exception as e:
continue
sku = product['itemId']
category_id = product['categoryId']
try:
product_object = Product.objects.get(sku=sku)
except Product.DoesNotExist:
product_object = None
if product_object:
continue
is_disabled = product['endingTime']
if is_disabled == u'Zako\u0144czona':
continue # Если статус закончился, то не добавляем
condition = product['conditionInfo'] # condition new used
if condition == 'new':
condition = 'Новый'
elif condition == 'used':
condition = 'Б.у.'
# Польская цена в злотых
price_pl = Decimal(product['priceInfo']['item'][0]['priceValue']).quantize(Decimal('.00'))
# умножаем на дефолтный курс, курс каждый день по крону обновляется
price_ru = (price_pl * Decimal(17.00))
# Price with shipping
p_shipping = Decimal(product['priceInfo']['item'][1]['priceValue']).quantize(Decimal('.00'))
shipping_price_ru = (p_shipping - price_pl) * Decimal(17.00) # стоимость доставки в рублях
product_count = product['leftCount'] # Товарный остаток
sales_count = product['bidsCount'] # Количество продаж
buyers_count = product['biddersCount'] # Количество покупателей
name_poland = product['itemTitle'] # Заголовок
slug = slugify(name_poland) + '-' + str(category_id) + '-' + (str(sku))
try:
cat = Category.objects.get(catId=category_id)
except:
continue
with transaction.atomic():
product_object = Product.objects.create(
slug=slug,
sku=sku,
price_pl=price_pl,
price_ru=price_ru,
name_poland=name_poland,
condition=condition,
buyers_count=buyers_count,
general_image=general_image,
product_count=product_count,
sales_count=sales_count,
pl_shipping_ru=shipping_price_ru,
is_disabled=False,
category_id=cat
)
lock.acquire()
print (multiprocessing.current_process())
print('time loop end:%s' %time.clock() )
lock.release()
offset += 1000
class Command(BaseCommand):
def handle(self, *args, **options):
categories = Category.objects.filter(parent=None).exclude(is_main=True).annotate(products_count=Count('categories_products__id')).order_by('products_count').distinct().iterator()
#categories = Category.objects.filter(parent=None).exclude(is_main=True).iterator()
with Pool(processes=16) as pool:
pool.map(import_category_products, categories)