@sanex3339

ExecutorService — как правильно получить данные из нескольких Callable потоков?

Есть класс, реализующий Callable и возвращающий результаты вычислений. Сами вычисления длятся около 2-3 секунд.

Нужно непрерывно бесконечно получать результат из этого класса в 8 потоках. Данные, возвращаемые каждым потоком ни как не зависят друг от друга. Полученные данные суммируются друг с другом в основном потоке исполнения.

Сейчас сделано так:

public void run () {
        executorService = Executors.newFixedThreadPool(this.threadsCount); // 8 потоков
        threadsPool = new ArrayList<>();

        for (int i = 0; i < threadsCount; i++) {
            threadsPool.add(
                executorService.submit(
                    dataProvider.callback() // возвращает Callable
                )
            );
        }

        while (threadsPool.size() > 0) {
            startThread();
        }

        executorService.shutdown();
    }

private void startThread () {
        try {
            dataHandler.callback(
                threadsPool.get(0) // отправляем полученные данные на суммирование
            );
        } catch (ExecutionException | InterruptedException e) {
            e.printStackTrace();
        }

        threadsPool.remove(0); // удаляем текущий поток
        threadsPool.add(
            this.executorService.submit(
               dataProvider.callback() // добавляем новый заместо старого
            )
        );
    }


Есть ли более правильные способы сделать то же самое? И можно как то ускорить процесс?

UPD:
Я сейчас сделал пару тестов, у меня i7 2600k, при threadsCount = 6 30 итераций вычислений (30 результатов от потоков) выполнялись 1:23 секунды, при threadsCount = 4 - 1:16, при threadsCount = 8 - около 1:50.
Хотя по идее, чем больше потоков - тем быстрее должны быть вычисления.

Т.е. явно что то не то происходит с реализацией пула потоков.
  • Вопрос задан
  • 746 просмотров
Решения вопроса 1
@sanex3339 Автор вопроса
Нашел решение с использованием CompletitionService.

executorService = Executors.newFixedThreadPool(threadsCount);
        completionService = new ExecutorCompletionService<>(executorService);

        for (int i = 0; i < threadsCount; i++) {
            completionService.submit(
                renderDataProvider.callback()
            );
        }

        while (true) {
            try {
                renderDataHandler.callback(
                    completionService.take()
                );
            } catch (ExecutionException | InterruptedException e) {
                e.printStackTrace();
            }

            completionService.submit(
                renderDataProvider.callback()
            );
        }
Ответ написан
Комментировать
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы