@vikholodov

Почему импорт происходит так медленно?

Добрый день, коллеги!
Импортирую товары с api, ограничение 9000 запросов в минуту, взял облачный VDS с 12 ядрами, поставил 16 процессов экспериментальным методом и зафигачил. Если верить консоли (я решил засечь время обработки каждого "косяра" каждым процессом в отдельности, получил порядка 3-5 секунд, максимум, на каждый), в консоле очень бодро бегут строчки. Но на деле в базу залезает примерно 2000 - 3000 товаров в минуту, что оооочень медленно (нужно 30000 в минуту). Что я делаю не так? Я уже пожертвовал импортом дополнительных изображений и проверки на наличие товаров в других категориях (тупо дубли делаю с привязкой к другим категориям), что уже очень плохо, но скорости так и нет.
import random

import time
from django.core.management import BaseCommand
from decimal import Decimal
import multiprocessing
from multiprocessing import Pool
from django.db.models import Count
import zeep
from django.db import transaction
from pytils.translit import slugify

from core.models import Product, Category, ProductImage


def import_category_products(category):
	print('import category %s' % category.name)
	time.sleep(random.uniform(0.1, 3))
	lock = multiprocessing.Lock()
	from django.db import connection
	connection.close()

	allegro_api_key = 'key'

	prod_web_api = 'https://webapi.allegro.pl/service.php?wsdl'
	offset = 0
	while True:
		client = zeep.Client(wsdl=prod_web_api)
		options = {'item': {'filterId': 'category', 'filterValueId': [str(category.catId)]}}

		try:
			result = client.service.doGetItemsList(
				allegro_api_key, 1, filterOptions=options, resultSize=1000, resultOffset=offset, resultScope=2
			)
		except Exception as e:
			print('result exc: %s' % e)
			time.sleep(300)
			continue

		if not result or not result['itemsList']:
			print('finish(%s)' % offset)
			break

		print('products: %s' % len(result['itemsList']['item']))
		lock.acquire()
		print (multiprocessing.current_process())
		print('time loop start:%s' %time.clock() )
		lock.release()
		for product in result['itemsList']['item']:
			
			#if product['priceInfo']['item'][0]['priceType'] == 'bidding':  # priceType, исключить bidding
				#continue

			try:
				images = product['photosInfo']['item']
	  
				general_image = None
				for p in images:
					if p['photoSize'] == 'large' and p['photoIsMain'] is True:
						general_image = p['photoUrl']  # большое фото
				if not general_image:
					continue
			except Exception as e:
				continue

			sku = product['itemId']
			category_id = product['categoryId']

			try:
				product_object = Product.objects.get(sku=sku)

			except Product.DoesNotExist:
				product_object = None

			if product_object:
				continue            

			is_disabled = product['endingTime'] 
			if is_disabled == u'Zako\u0144czona':
				continue  # Если статус закончился, то не добавляем

			condition = product['conditionInfo']  # condition new used
			if condition == 'new':
				condition = 'Новый'
			elif condition == 'used':
				condition = 'Б.у.'

			# Польская цена в злотых
			price_pl = Decimal(product['priceInfo']['item'][0]['priceValue']).quantize(Decimal('.00'))

			# умножаем на дефолтный курс, курс каждый день по крону обновляется
			price_ru = (price_pl * Decimal(17.00))

			# Price with shipping
			p_shipping = Decimal(product['priceInfo']['item'][1]['priceValue']).quantize(Decimal('.00'))

			shipping_price_ru = (p_shipping - price_pl) * Decimal(17.00)  # стоимость доставки в рублях
			product_count = product['leftCount']  # Товарный остаток
			sales_count = product['bidsCount']  # Количество продаж
			buyers_count = product['biddersCount']  # Количество покупателей
			name_poland = product['itemTitle']  # Заголовок
			slug = slugify(name_poland) + '-' + str(category_id) + '-' + (str(sku))
			try:
				cat = Category.objects.get(catId=category_id)
			except:
				continue		
			with transaction.atomic():
				product_object = Product.objects.create(
					slug=slug,
					sku=sku,
					price_pl=price_pl,
					price_ru=price_ru,
					name_poland=name_poland,
					condition=condition,
					buyers_count=buyers_count,
					general_image=general_image,
					product_count=product_count,
					sales_count=sales_count,
					pl_shipping_ru=shipping_price_ru,
					is_disabled=False,
					category_id=cat
				)
		lock.acquire()	
		print (multiprocessing.current_process())	
		print('time loop end:%s' %time.clock() )
		lock.release()

		offset += 1000
		

class Command(BaseCommand):
	def handle(self, *args, **options):
		categories = Category.objects.filter(parent=None).exclude(is_main=True).annotate(products_count=Count('categories_products__id')).order_by('products_count').distinct().iterator()
		#categories = Category.objects.filter(parent=None).exclude(is_main=True).iterator()
		with Pool(processes=16) as pool:
			pool.map(import_category_products, categories)
  • Вопрос задан
  • 118 просмотров
Пригласить эксперта
Ответы на вопрос 1
Hivemaster
@Hivemaster
Админ, который хочет программировать
При вставке в базу большого количества записей, первое, что нужно попробовать для ускорения - это bulk-create.
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы