Vollirik_M
@Vollirik_M
Данные анализирую, немного питонирую

Причины рандомных падений кода с использование multiprocessing?

Друзья, привет!
По работе от коллеги достался скрипт на Python, который использует библиотеку multiprocessing. Кратко, этот скрипт делает следующее: 1) обращается к файлу .xlsx, путь к которому указан в коде, и читает его; 2) логинится в корпоративный Sharepoint; 3) Загружает батчами по n строк данные из файла в Sharepoint.

При выполнении данного скрипта рандомным образом появляется ошибка вида:

Exception in thread Thread-11 (_handle_results):
Traceback (most recent call last):
File "D:\Python\Lib\threading.py", line 1045, in _bootstrap_inner
self.run()
File "D:\Python\Lib\threading.py", line 982, in run
self._target(*self._args, **self._kwargs)
File "D:\Python\Lib\multiprocessing\pool.py", line 579, in _handle_results
task = get()
^^^^^
File "D:\Python\Lib\multiprocessing\connection.py", line 251, in recv
return _ForkingPickler.loads(buf.getbuffer())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "D:\Python\Lib\site-packages\office365\runtime\client_request_exception.py", line 7, in __init__
content_type = self.response.headers.get('Content-Type', '').lower().split(';')[0]
^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'headers'

Она может появиться как сразу после запуска кода, так и не реализоваться вовсе, либо произойти в любой момент в процессе. Закономерности возникновения нет, происходит как на моем корпоративном ноутбуке, так и на персональном ПК. У коллеги с такой конфигурацией железа и версиями Python и всех библиотек такая ошибка не реализуется.

Сам код:
import numpy as np
import pandas as pd
import time
from tqdm import tqdm
import multiprocessing
from multiprocessing import Pool
from connection import Sharepoint

site = Sharepoint()
processes = multiprocessing.cpu_count()

data_file = 'C:/Users/micha/OneDrive/Текущая работа/routeList update/ShopList_Data_2023-11.xlsx'

df = pd.read_excel(data_file, usecols=[
                   'Route', 'ID SAP', 'Address', 'Name', 'Business', 'Chain'])

df_list = df.to_dict(orient='records')
df_splits = np.array_split(df_list, 1 + len(df_list) // 2000)


def load_new_items(iters):

    sharepoint_list = site.ctx.web.lists.get_by_title('routeList')

    for record in iters:
        sharepoint_list.add_item({'Title': record['Route'],
                                  'field_1': str(record['ID SAP']),
                                  'field_2': record['Address'],
                                  'field_3': str(record['Name']),
                                  'Business': record['Business'],
                                  'Chain': record['Chain'],
                                  })
    site.ctx.execute_batch(items_per_batch=100)


if __name__ == '__main__':

    start = time.perf_counter()

    rows = 0
    for part in df_splits:

        start_part = time.perf_counter()

        splits = np.array_split(part, processes)
        iterator = [list(i) for i in splits]

        with Pool(processes=processes) as pool:
            pool.map(load_new_items, iterator)

        rows += len(part)
        time_part = round(time.perf_counter() - start_part, 0)
        total_time = int(round(time.perf_counter() - start, 0))
        timeout = 15.0 if time_part < 40.0 else 15.0 + time_part - 40.0

        print(f'\nЗагружено {rows} строк из {len(df_list)}')
        print(f'Time (part): {time_part} sec')
        print(
            f'Time (total): {total_time//60} min {round(total_time % 60, 0)} sec')
        print(f'Timeout: {round(timeout, 0)} sec\n')

        if part is not df_splits[-1]:
            time.sleep(timeout)


Модуль connection используется логично для подключения к Sharepoint:

from office365.runtime.auth.user_credential import UserCredential
from office365.sharepoint.client_context import ClientContext


class Sharepoint():

   def __init__(self):
      self.USERNAME = "***"
      self.PASSWORD = "***"
      self.SHAREPOINT_URL = "https://***.sharepoint.com"
      self.SHAREPOINT_SITE = "https://***.sharepoint.com/teams/***"

      self.ctx = ClientContext(self.SHAREPOINT_SITE).with_credentials(UserCredential(self.USERNAME, self.PASSWORD))


   def get_list(self, listname: str):
      return self.ctx.web.lists.get_by_title(listname)


Вопрос - что я делаю не так?
  • Вопрос задан
  • 226 просмотров
Решения вопроса 1
sergey-gornostaev
@sergey-gornostaev Куратор тега Python
Седой и строгий
Выглядит так, будто Sharepoint иногда не отвечает.
Ответ написан
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы