Как организовать синхронную последовательность команд на колбеках с RxJava2?
Понемногу ковыряю RxJava2, но не могу собрать набор разрозненных обзорных статеек в одну картину. В массе своей примеры касаются поступающих однотипных событий от различных эммитеров. Да, для этих задач RxJava2 - приятная вещь, но как её применить для более сложных задач - пока понять не могу.
Собственно, вопрос следующий:
Возможно ли с помощью RxJava2 удобно решить следующую задачу, и если да, собственно, как?
Есть набор задач, которые должны исполнять цепочкой, генерация каждой следующей задачи осуществляется методом, который на основе результата исполнения задач может либо сгенерировать следующее звено, либо завершить цепочку.
Задача исполняется следующим образом: либо запускается метод, который возвращает (или не возвращает) колбек в заданный таймаут, либо получает (или не получает) уведомление от другого компонента (listener, eventbus). Вся генерируемая цепочка исполняется в отдельном потоке.
Собственно, как создать емиттер или disposable source, который будет генерировать новые события (новую команду) на основе результатов выполнения старой (результат исполнения передается колбеком, к примеру, в тело замыкания).
Я так понимаю, для этого подходят Subject'ы, но с ними нужны какие-то хитрые манипуляции с потоками и в любом случае мне не очевидно, как построить всю эту цепочку.
Лучшее, что я придумал - это просто поток, внутри которого что-то похожее на Iterable, при старте каждой новой задачи запускается таймаут handler и имеется PublishSubject, на который подписан сам менеджер этих команд, а команда или хендлер таймаута вызывает onNext субъекта с результатом исполнения. Но, это явно не reactive решение.
Конкретная задача: коммуникация с ble устройством с помощью NS Ble. Протокольные последовательности команд, к примеру, такая последовательность:
1) Прочитать характеристику, ответ приходит в колбеке.
2) В зависимости от прочитанного - отправить на другую характеристику пакет с тем или иным содержанием.
3) Дождаться от библиотеки по характеристике (уведомления), что ответ пришлет устройство.
Как связать API RxJava и эту задачу (абстрактную)?
Если честно, не совсем понятно, как в теле эмиттера дождаться колбека от листенера, к примеру, уведомлений от характеристики (или любого другого листенера). Допустим, для таймаута можно использовать каким-то образом Maybe и таймер. Ну и потом, протокольные ветвления достаточно большие, и часто приходится использовать цепочку от 2 до 10-30 последовательных команд, которые должны генерироваться на лету. Мне все-таки кажется, что надо отталкиваться от PublishSubject, каким-то образом обработать задачу, как ивент, по результатам которой заставить PublishSubject выдать ту или иную задачу в зависимости от результатов исполнения предыдущих.
Денис, подписчик (subscribe) будет ждать emitter.onSuccess(result), соответственно например функция getCharact1 не завершится пока не будет сделан этот вызов... я все асинхронные вызовы так оборачиваю, даже если библиотеки не поддерживают синхронщину
class NotificationSingleOnSubscribe implements SingleOnSubscribe, SomeListener {
int previous;
NotificationSingleOnSubscribe(int previous) {
this.previous = previous;
}
void onNotificationReceived(Notif notification) {
if(notification.isOk() && previous == someValueWaitForNotification)
emitter.onSuccess(someGoodResult);
}
void subscribe() {
if(previous == caseOne) {
// Do something for case one
} else if (previous == someValueWaitForNotification) {
// Wait for notification received by onNotificationReceived (do nothing)
Handler h = new Handler();
h.postDelayed(()->emitter.onSuccess(timeoutCode), 1000L);
}
}
}
А как сгенерировать такую последовательность синглов на лету? Пример: приходит уведомление от характеристики с типом данных на устройстве, в зависимости от типа надо запросить n раз выдать такой-то кусок дампа и получить его по уведомлению. То есть размерность цепочек непостоянна, также, как типы запросов к устройству. И не хотелось бы задавать для каждого типа данных на ключе свои заранее определенные последовательности из синглов, это нерационально и не является хорошим решением в стиле Rx, по идее.
Денис, сингл на то и сингл чтобы выполняться один раз, есть еще Completable, Observable, вот последний имеет метод onNext, т.е. можно внутри rx метода обрабатывать последовательность событий... вы почитайте мат часть по rx, посмотрите примеры кода, может меньше вопросов будет )) но идея вообще такова что внутри rx метода создаете например листенер какой либо бибилотеки, внутри колбэка вызываете onSuccess, или onNext в зависимости от задачи
tiroman, смотрел, да, матчасти у меня нет, это даже не скрываю. А можете подсказать что-то развернутое и с примерами по RxJava2? Просто очень многое, что есть - слишком обрывочно и не уходит дальше
а ковырять, к примеру, RxBle и другой Open Source нет желания, пока не разберусь более менее, что из чего растет.
На vogella неплохой стартовый туториал, как по мне, на медиуме есть подборка статей. Но примеры из разряда helloworld. Как по мне, все-равно надо передавать именно команды емиттером, потом их выполнять и на основе выполнения передавать в исходный Observable с эмиттером новые команды. Но, на самом деле, пока смутно понимаю, что и как делается.
Уффф, ну ок
Сейчас работаю с очередями запросов в отдельном потоке, сначала ручками генерирую IO очередь
Сама очередь:
spoiler
public class CommandQue {
private final LinkedList<Command> commands = new LinkedList<>();
protected final ArrayList<CommandResult> results = new ArrayList<>();
private Command currentCommand;
private final Handler timeoutHandler = new Handler();
private TimeoutRunnable timeoutRunnable;
private DeviceOperationListener operationListener;
private final LinkedList<CommandMessage> messages = new LinkedList<>();
public CommandQue(long queId, int quePriority, final String operationName, int operationType, int operationId) {
this.queId = queId;
this.quePriority = quePriority;
this.operationName = operationName;
this.operationType = operationType;
this.operationId = operationId;
}
public final int getOperationId() {
return operationId;
}
public void setOperationListener(DeviceOperationListener deviceOperationListener) {
this.operationListener = deviceOperationListener;
}
public void setOnQueFinish(@NonNull final OnQueFinish onQueFinish) {
this.onQueFinish = onQueFinish;
}
public void append(Command command) {
command.setCommandIndex(lastCommandIndex);
commands.addLast(command);
lastCommandIndex++;
}
void appendCommandMessage(@Nullable CommandMessage message) {
if(message != null)
messages.addLast(message);
}
public @Nullable CommandMessage getLastCommandMessage() {
if(messages.size() > 0)
return messages.getLast();
else return null;
}
public LinkedList<CommandMessage> getCommandMessages() {
return messages;
}
void appendExecutionResult(CommandResult result) {
if(timeoutRunnable != null)
timeoutHandler.removeCallbacks(timeoutRunnable);
if(finished) return;
resultCounter++;
results.add(resultCounter, result);
if(operationListener != null) {
DeviceOperationBuilder builder = new DeviceOperationBuilder();
builder.setCurrentOperation(result.operationIndex)
.setDevice(getDevice())
.setOperationName(getOperationName())
.setOverallOperationsCount(commands.size())
.setResult(result)
.setQueId(queId)
.setOperationType(operationType)
.setPriorityGroup(quePriority);
operationListener.listen(builder.build());
}
if(result.code > 0) {
finished = true;
onQueFinish.onQueFinished(results);
}
}
public void cancelQueue() {
appendExecutionResult(new CommandResultBuilder().setQueId(queId)
.setOperationIndex(-1)
.setCode(QUE_FINISHED_BY_REQUEST)
.setErrMessage("Que finished manually")
.build());
}
int size = -1;
public void nextCommand() {
if(size == -1) size = commands.size();
if(timeoutRunnable != null)
timeoutHandler.removeCallbacks(timeoutRunnable);
currentTask++;
if(currentTask == size) {
appendExecutionResult(new CommandResultBuilder().setQueId(queId)
.setOperationIndex(0)
.setCode(CommandResult.QUE_FINISHED_SUCCESSFULLY)
.setErrMessage("No commands left in que")
.build());
return;
}
currentCommand = commands.get(currentTask);
CommandResult validationResult = currentCommand.onPreExecute(results, currentCommand.commandIndex, this);
if(validationResult.code > 0)
onQueFinish.onQueFinished(results);
switch (currentCommand.type) {
case COMMAND_TYPE_READ:
currentCommand.execute();
break;
case COMMAND_TYPE_WRITE:
currentCommand.execute();
break;
case COMMAND_TYPE_WAIT_FOR_NOTIFICATION:
break;
case COMMAND_TYPE_WAIT:
break;
default:
appendExecutionResult(
new CommandResultBuilder()
.setQueId(queId)
.setOperationIndex(currentCommand.commandIndex)
.setCode(INTERNAL_ERROR)
.setErrMessage("Unknown command type")
.build()
);
}
if(currentCommand.executionTimeout != null) {
timeoutRunnable = new TimeoutRunnable(currentCommand);
timeoutHandler.postDelayed(timeoutRunnable, currentCommand.executionTimeout);
}
}
public void notificationChannelIO(Object notificationObject) {
CommandResult result = currentCommand.onNotificationReceived(notificationObject);
if(result == null) return; // Skipping notification
appendExecutionResult(result);
if(result.code == OK_READY_FOR_NEXT)
nextCommand();
}
private static CommandResult generateTimeoutResult(long queId, int commandId) {
...
}
public interface OnQueFinish {
void onQueFinished(ArrayList<CommandResult> results);
}
class TimeoutRunnable implements Runnable {
final Command command;
TimeoutRunnable(Command command) {
this.command = command;
}
@Override
public void run() {
if(command.type == COMMAND_TYPE_WAIT) {
CommandResult result = command.onWaitComplete();
if(result == null) return;
appendExecutionResult(command.onWaitComplete());
if(result.code == OK_READY_FOR_NEXT) {
nextCommand();
}
} else {
appendExecutionResult(generateTimeoutResult(queId, command.commandIndex));
}
}
}
}
Команда в очереди
spoiler
public abstract class Command {
protected final CommandQue commandQue;
protected final int type;
final Long executionTimeout;
protected int commandIndex = -1;
protected final int onExecutionSuccess;
public Command(@NonNull CommandQue commandQue, int type, @Nullable Long executionTimeout, int onExecutionSuccess) {
this.commandQue = commandQue;
this.type = type;
this.executionTimeout = executionTimeout;
this.onExecutionSuccess = onExecutionSuccess;
}
public final static int COMMAND_TYPE_READ = 1;
public final static int COMMAND_TYPE_WRITE = 2;
public final static int COMMAND_TYPE_WAIT_FOR_NOTIFICATION = 3;
public final static int COMMAND_TYPE_WAIT = 4;
void setCommandIndex(int index) {
this.commandIndex = index;
}
public interface OnExecutedIO {
void onExecution(CommandResult result);
}
protected CommandMessage message;
public abstract @Nullable Object createMessage(Object object);
abstract public CommandResult onPreExecute(@NonNull List<CommandResult> results, int commandIndex, CommandQue que);
void execute() {
executeRequest((CommandResult result) -> {
commandQue.appendExecutionResult(result);
commandQue.appendCommandMessage(message);
commandQue.nextCommand();
}
);
}
abstract public void executeRequest(OnExecutedIO OnExecutedIO);
abstract public @Nullable CommandResult onNotificationReceived(Object notification);
abstract public @Nullable CommandResult onWaitComplete();
}
public class BLEDeviceCommand extends Command {
private final String serviceCharacteristic;
protected OnPreExecute onPreExecute;
private byte[] writeDataIO;
private final NotificationProcessor notificationProcessor;
protected CommandResult preExecuteCommandResult;
public interface OnPreExecute {
CommandResult onPreExecute(@NonNull List<CommandResult> results, int commandIndex, CommandQue que);
}
public interface NotificationProcessor {
CommandResult process(BLEDeviceCommandQue que, Object notificationObject, int idx, int onSuccess);
}
public BLEDeviceCommand(@NonNull BLEDeviceCommandQue commandQue, int type, @Nullable Long executionTimeout,
String serviceCharacteristic, @NonNull OnPreExecute onPreExecute,
@Nullable NotificationProcessor notificationProcessor, int onExecutionSuccess) {
super(commandQue, type, executionTimeout, onExecutionSuccess);
this.serviceCharacteristic = serviceCharacteristic;
this.onPreExecute = onPreExecute;
this.notificationProcessor = notificationProcessor;
}
@Override
public @Nullable Object createMessage(Object object) {
return null;
}
@Override
public CommandResult onPreExecute(@NonNull List<CommandResult> results, int commandIndex, CommandQue que) {
preExecuteCommandResult = onPreExecute.onPreExecute(results, commandIndex, que);
if(type == COMMAND_TYPE_WRITE)
writeDataIO = preExecuteCommandResult.data;
return preExecuteCommandResult;
}
@Override
public void executeRequest(OnExecutedIO OnExecutedIO) {
if(type == COMMAND_TYPE_WRITE) {
((BLEDeviceCommandQue) commandQue).masterDeviceManager.getBLEManager().writeData(
writeDataIO,
serviceCharacteristic,
(request) -> {
Object obj = createMessage(request);
if(obj != null)
message = new CommandMessage(commandIndex, obj);
OnExecutedIO.onExecution(
new CommandResultBuilder().setQueId(commandQue.queId)
.setOperationIndex(commandIndex)
.setData(writeDataIO)
.setCode(onExecutionSuccess)
.build());
}
,
(request, bleErrCode, tr) -> OnExecutedIO.onExecution(
new CommandResultBuilder().setQueId(commandQue.queId)
.setOperationIndex(commandIndex)
.setCode(CommandResult.IO_ERROR)
.setIoCode(bleErrCode)
.setException(tr)
.build()),
23 // TODO 1 MTU
);
return;
}
if(type == COMMAND_TYPE_READ) {
((BLEDeviceCommandQue) commandQue).masterDeviceManager.getBLEManager().readData(
new DeviceIOEvents.READ_REQUEST(...),
(response) -> {
Object obj = createMessage(response);
if(obj != null)
message = new CommandMessage(commandIndex, obj);
OnExecutedIO.onExecution(
new CommandResultBuilder().setQueId(commandQue.queId)
.setOperationIndex(commandIndex)
.setData(response.toByteArray())
.setCode(onExecutionSuccess)
.build());
},
(response, bleErrCode, tr) -> OnExecutedIO.onExecution(
new CommandResultBuilder().setQueId(commandQue.queId)
.setOperationIndex(commandIndex)
.setCode(CommandResult.IO_ERROR)
.setIoCode(bleErrCode)
.setException(tr)
.build())
);
}
}
@Override
public CommandResult onWaitComplete() {
return new CommandResultBuilder().ююю.build();
}
@Nullable
@Override
public CommandResult onNotificationReceived(Object notification) {
if(type != COMMAND_TYPE_WAIT_FOR_NOTIFICATION)
return null;
if(notificationProcessor != null)
return notificationProcessor.process((BLEDeviceCommandQue) commandQue, notification, commandIndex, onExecutionSuccess);
return null;
}
}
Очередь запускается в отдельном потоке, обрабатывает весь этот список команд, собирает по списку результатов выполнения один объект и потом EventBus'ом отправляет его заинтересованным листенерам. Вообще, работает ок, но переусложнено и не допускает создание очереди налету. Хочу из куска легаси переработать в что-то удобоваримое и читаемое.
Хотя, если честно, даже не копайтесь, это уже мои проблемы :) Просто интересно, как сделать очередь с генерацией команд для исполнения налету. Вы уже вполне себе помогли, спасибо.
Советы на то, что я понял из вопроса)
1. разделить callback'и и rx. Т.е сделать обвертки которые возвращают Observable/Single/... т.е. типа (назовем) RxB
2. на более абстрактном уровне работа с RxB . стандартные merge, flatMap и тд (стандартный код Rx)
и) Лучше вместо PublishSubject использовать PublishRelay
пункты стандартные. Можно почитать почти во всех книгах. Главное разделить на уровни и тогда все станет ясно
1) callback -> rxjava
2) rxjava код
3) rxjava подписка в ui