Проблема с обращением к Google Drive API
Синхронный вызов не вызывает никаких ошибок:
def backup_everything(*bases):
tasks = [
i.backup()
for i in bases
]
Результат:
Асинхронщина же выдаёт ошибку сертификата SSL (как и многопоточность через unsync, кстати):
async def backup_coro(*bases):
loop = asyncio.get_event_loop()
tasks = [
loop.run_in_executor(None, i.backup)
for i in bases
]
return await asyncio.gather(*tasks)
asyncio.run(backup_coro(*bases))
Результат:
Методы backup() различных классов обращаются к одноимённой функции:
def backup(f_name: str, f_path: str) -> None:
# удалить старые версии файлов
last_backup = drive.search({'name': f_name})
if last_backup:
del_item(last_backup[0]['id'], "Previous base deleted")
try:
# ID папки, куда заливать файл
fold_id = folder(BACKUP_FOLDER_NAME)
f_mime_type = mimeTypes[file_ext(f_name)]
drive.upload_file(f_name, f_path, f_mime_type, fold_id)
except Exception as trouble:
print(trouble, f_path, 'Terminating...', sep='\n')
else:
print(f"File: '{f_name}' uploaded")
Которая в свою очередь обращается к методам класса по работе с Google Drive API.
Метод, в котором происходит ошибка:
def del_item(self, _id: str) -> None:
self.drive_service.files().delete(fileId=_id).execute()
Где drive_service – googleapiclient.discovery.build('drive', 'v3', credentials). Ничего необычного, иными словами.
Вопрос(ы): почему это происходит? Как это исправить? Есть у гугловских библиотек более элегантные способы работы асинхронно?