kshnkvn
@kshnkvn
yay ✌️ t.me/kshnkvn

Почему не происходит запись в MongoDB при нескольких потоках?

При изучении MongoDB наткнулся на возможность использования вместо сгенерированного значения _id упорядоченный (1, 2, 3...):
def get_next_sequence(collection, name):
    return collection.find_and_modify({'_id': name}, update={'$inc': {'seq': 1}}, new=True).get('seq')


def insert_in_db():
    client = MongoClient(mongo_url)
    db = client['']
    collection = db['']
    print(collection.insert_one({'_id': get_next_sequence(collection, 'userid'), 'value': f'{random.randint(10000, 2147483647)}'}))
    client.close()


Вариант работает, но меня смутила возможность его использования в несколько потоков. Если я правильно понимаю, то скрипт выполняет 2 запроса в БД:
1. Узнает значение последнего элемента
2. Присваивает новый элемент со значением +1

Если к нему обратятся одновременно несколько запросов? Начал проверять:
with Pool(processes=200) as pool:
    for _ in range(100000):
        pool.apply_async(insert_in_db)
    pool.close()
    pool.join()

Начал наращивать кол-во потоков по чуть-чуть - со значений в 50 и до 1000.
Вплоть до кол-во потоков в 200-250 проблем не было вообще никаких - все записывалось, после - начались пропуски записи. Особенно заметно после 400 потоков. При 1000 потоков из 100000 запросов на запись в базе оказалось только чуть больше 90000, при этом ошибок никаких нет. Сейчас у меня 2 предположения:
1. Не выдерживает сервер с MongoDB (3 ядра, 4гб ОЗУ). При работе скрипта были видны моменты, когда он просто останавливался на несколько секунд, иногда на 10-30 секунд. Видимо не мог подключиться. Соответственно некоторые потоки просто не могли подключиться и запись не происходила.
2. Обращаются несколько потоков, все получают информацию, что на данный момент, к примеру, 1389 записей в базе и пытаются записать запись под номером 1389. Смущает то, что ошибок записи нет в выводе, хотя по идеи должна быть.
  • Вопрос задан
  • 203 просмотра
Решения вопроса 1
@Taus
Вы не получаете результат выполнения в созданных процессах из pool.apply_async. Это плохая практика, потому что при выполнении кода в дочерних процессах могут быть исключения, которые стоит обрабатывать в основном процессе. Почитайте дополнительно документацию. Пример:
import multiprocessing

def f():
    raise ValueError()

with multiprocessing.Pool() as pool:
    for _ in range(10):
        pool.apply_async(f) # no errors

with multiprocessing.Pool() as pool:
    for _ in range(10):
        result = pool.apply_async(f)
        result.get(timeout=1) # raise ValueError

Поскольку у вас такой обработки, то можно предположить следующее. При некотором числе создаваемых процессов создание MongoClient или запросы .insert_one|.find_and_modify бросает исключение, связанное с превышением какого-то таймаута (посмотрите необязательные аргументы mongo_client и исключения)
Ответ написан
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы