@stasmarkin
Server side java engineer

Как в джаве управлять потоками при работе с цепочками асинхронных задач?

Последние полгода на работе делаю сервис, в котором пытаюсь достичь наибольшей пропусной способности на связке java + netty + mysql + redis.
У меня он строится по асинхронной модели и в целом делает примерно следующее:

получает по http запрос (или берет из AMQP),
потом опрашивает редис,
потом забирает данные из БД,
на основе этих данных принимает решение, в какой сервис отдавать обработку
(а сервисы сторонние, нужно рассчитывать на худшие сценарии их работы),
потом еще раз может залесть в БД,
потом какие-то еще штуки по дороге
и отдает http-ответ (или закладывает в другую очередь AMQP).

Нагрузка на CPU небольшая. Время одного запроса состоит из 1% времени CPU (промежуточная обработка), 4% -- ответ от базы и 95% время ответа стороннего сервиса.
Очень упрощенно код можно написать так:

void onRequest(Context ctx) {
    // здесь тред netty-worker
    Request request = parseRequest(ctx);

    sqlExecutor.submit(request)
        .thenCompose(model -> {
            // здесь тред sqlExecutor'а, т.к. именно в том тредпуле происходит комплит фьючи
            val something = doSomething(model, ctx);
            switch(pickService(something)) {
                case SERVICE_1: return service1.handleSomething(model);
                ..
                case SERVICE_N: return serviceN.handleSomething(model);
            }
        })

        .thenApply(serviceResponse -> {
            // здесь тред из serviceX
            val somethingElse = doSomethingElse(serviceResponse);
            return toHttpResponse(somethingElse);
        })

        .thenAccept(httpResponse -> {
            // здесь тред из serviceX
            ctx.getResponse().send(httpResponse);
        })

        .exceptionally(e -> {
            // здесь может оказаться netty-worker, sqlExecutor или тред из serviceX
            log.warn(e);
            ctx.getResponse().send(500, "failed");
        });
}


Есть некоторые допущения, но в целом происходит примерно следующее:
В метод onRequest() заходит тред netty-worker. В этом треде выполняется какая-то первичная обработка (parseRequest),
а потом сабмитится реквест в sqlExecutor и строится последующая цепочка вызовов.
Потом через какое-то время будет вызвана функция model -> {}, переданная в thenCompose.
Эту работу делать будет уже тред из sqlExecutor'а, т.к. именно в тредпуле sqlExecutor'а происходит комплит фьючи.
Иными словами, тред, который обрабатывает sql-запросы, потом еще вызовет методы doSomething(model, ctx), pickService(something) и заложит какую-то задачу в serviceX.handleSomething(model).
После этого тред из serviceX везмет эту задачу, обсчитает, вызовет методы doSomethingElse(serviceResponse) и toHttpResponse(somethingElse), и заложит ответ в обратно в исходный контекст запроса.
А через какое-то время это ответ возьмет один из воркеров нетти и запишет в канал.

Я столкнулся со следующими проблемами, которые хотел бы решить:

1) Треды в тредпулах занимаются не той работой, для которой они создавались.
В моем примере тред из sqlExecutor помимо обработки запроса вызовет еще несколько методов.
Если вдруг в этих методах окажется много работы или возникнет какая-то блокировка, то приводит к резкой деградации SQLExecutor-а.
Казалось бы, где sqlExecutor, а где какой-нибудь serviceN. Но кривая имлементация serviceN (блокировки или лишняя работа) может привести в блокировке всего sqlExecutor-а.

2) Допустим, у serviceN пропускная скособность на порядок ниже, чем у sqlExecutor-а.
Тогда на их стыке постоянно будут происходить какие-то странные ситуации, когда sqlExecutor уже подготовил модель для дальнейщей обработки, а заложить эту задачу в serviceN не может.
Дальше я вижу только 4 варианта развития событий:
-- блокироваться, что приводит к деградации sqlExecutor-а
-- кидать исключение, что приводит к потере запроса
-- обойти очередь сервиса и в текущем треде самому делать нужную работу. Что добавляет неуместной работы в sql-тредпуле, но не приводит к резкой деградации
-- иметь какую-то fallback очередь, в которую с бОльшей надежностью можно заложить задачу.
Это не решает проблему, но дает возможность прожить еще какое-то время, пока прочности новой очереди достаточно.

3) Если мы допускаем деградацию для sqlExecutor (т.е. не кидаем исключение, при неудачно попытке заложить задачу в сервис), то разумнее было бы как-то перераспределить нагрузку.
Чтобы SQLExecutor не зацикливался на полуумершем сервисе, а продолжал бы с той же эффективностью нагружать остальные сервисы.
Я понимаю, как можно закостылить такое, но хочется простое и элегантное решение.

4) Для каждой CompletableFuture, которая создается в .then...() мне нужно понимать, в каком экзикуторе будет выполняться функция.
Если вся комозиция происходит в одном методе, то в этом проблемы особо нет. Но если создание всей это цепочки разнесено по нескольким классам,
то приходится к каждому публичному методу добавлять описание в джавадоке, какой тред из какаго трудпула колмплитит возвращаемую фьючу.
Это какая-то сложность, которая появилась на ровном месте, и я не понимаю как от нее можно избавиться
  • Вопрос задан
  • 361 просмотр
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы
22 нояб. 2024, в 02:56
10000 руб./за проект
22 нояб. 2024, в 00:55
500 руб./за проект
21 нояб. 2024, в 23:30
300000 руб./за проект