Приложение обрабатывает некие события, поступающие извне по AMQP (RabbitMQ).
Внутри приложения хочется сделать так, чтобы обработчики можно было комбинировать и выстраивать в произвольном порядке, но чтобы при этом обработчики не зависели друг от друга. В простейшем варианте это просто набор классов, реализующих Function<T, R> - что-то поступает на вход и что-то выходит с другой стороны.
Граф обработки должен задаваться извне (например, в специальном отдельном классе).
Пример: есть class Transformer implements Function<OriginalEvent, TransformedEvent>, есть класс Calculate<TransformedEvent, Double>, UpdateCounter<Double,Void> и есть класс WriteDoDatabase<TransformedEvent, Void>
Хочется написать как в Java 8:
Stream<OriginalEvent,TransformedEvent> s = SomeFramework.map(new Transformer());
s.map(new Calculate()).onEvent(new UpdateCounter());
s.onEvent(new WriteDoDatabase());
И затем при поступлении нового события: s.process(originalEvent) - и этот метод не может вернуться пока не выполнятся все обработчики (при этом некоторые могут выполняться параллельно, например, WriteDoDatabase и Calculate+UpdateCounter
Что я уже думал/пробовал?
- Java 8 Streams - не подходит т.к. нужен push
- rxjava, reactor - не ясно, как обеспечить ожидание всех обработчиков т.к. фреймворки изначально асинхронные
- Guava EventBus - все синхронно, но не понравилось отсутствие возможности явно увязывать обработчики, там только неявно по классу события
Задача выглядит как довольно стандартная, но вот что-то не могу придумать красивый способ. Подскажите!