akaish
@akaish
Стек Java\Android

Как организовать синхронную последовательность команд на колбеках с RxJava2?

Понемногу ковыряю RxJava2, но не могу собрать набор разрозненных обзорных статеек в одну картину. В массе своей примеры касаются поступающих однотипных событий от различных эммитеров. Да, для этих задач RxJava2 - приятная вещь, но как её применить для более сложных задач - пока понять не могу.
Собственно, вопрос следующий:
Возможно ли с помощью RxJava2 удобно решить следующую задачу, и если да, собственно, как?
Есть набор задач, которые должны исполнять цепочкой, генерация каждой следующей задачи осуществляется методом, который на основе результата исполнения задач может либо сгенерировать следующее звено, либо завершить цепочку.
Задача исполняется следующим образом: либо запускается метод, который возвращает (или не возвращает) колбек в заданный таймаут, либо получает (или не получает) уведомление от другого компонента (listener, eventbus). Вся генерируемая цепочка исполняется в отдельном потоке.
Собственно, как создать емиттер или disposable source, который будет генерировать новые события (новую команду) на основе результатов выполнения старой (результат исполнения передается колбеком, к примеру, в тело замыкания).
Я так понимаю, для этого подходят Subject'ы, но с ними нужны какие-то хитрые манипуляции с потоками и в любом случае мне не очевидно, как построить всю эту цепочку.
Лучшее, что я придумал - это просто поток, внутри которого что-то похожее на Iterable, при старте каждой новой задачи запускается таймаут handler и имеется PublishSubject, на который подписан сам менеджер этих команд, а команда или хендлер таймаута вызывает onNext субъекта с результатом исполнения. Но, это явно не reactive решение.
Конкретная задача: коммуникация с ble устройством с помощью NS Ble. Протокольные последовательности команд, к примеру, такая последовательность:
1) Прочитать характеристику, ответ приходит в колбеке.
2) В зависимости от прочитанного - отправить на другую характеристику пакет с тем или иным содержанием.
3) Дождаться от библиотеки по характеристике (уведомления), что ответ пришлет устройство.
Как связать API RxJava и эту задачу (абстрактную)?
  • Вопрос задан
  • 113 просмотров
Пригласить эксперта
Ответы на вопрос 2
@tiroman
class Scratch {
    public static void main(String[] args) {
        Disposable disposable = getCharact1()
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .flatMap(Scratch::getCharact2)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(s -> {
                    //обрабатываем результат выполнения второй функции
                }, throwable -> {
                    //показываем сообщение об ошибке
                });
    }

    private static Single<Integer> getCharact1() {
        return Single.create(emitter -> {
            int result = 0;

            //выполняем какие-то действия

            if (!emitter.isDisposed()) {
                emitter.onSuccess(result);
            }
        });
    }

    private static Single<String> getCharact2(int result1) {
        return Single.create(emitter -> {
            String result = "";

            //выполняем какие-то действия

            if (!emitter.isDisposed()) {
                emitter.onSuccess(result);
            }
        });
    }
}
Ответ написан
@red-barbarian
Советы на то, что я понял из вопроса)
1. разделить callback'и и rx. Т.е сделать обвертки которые возвращают Observable/Single/... т.е. типа (назовем) RxB
2. на более абстрактном уровне работа с RxB . стандартные merge, flatMap и тд (стандартный код Rx)

и) Лучше вместо PublishSubject использовать PublishRelay

пункты стандартные. Можно почитать почти во всех книгах. Главное разделить на уровни и тогда все станет ясно
1) callback -> rxjava
2) rxjava код
3) rxjava подписка в ui
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы