public class BLEDeviceCommand extends Command {
private final String serviceCharacteristic;
protected OnPreExecute onPreExecute;
private byte[] writeDataIO;
private final NotificationProcessor notificationProcessor;
protected CommandResult preExecuteCommandResult;
public interface OnPreExecute {
CommandResult onPreExecute(@NonNull List<CommandResult> results, int commandIndex, CommandQue que);
}
public interface NotificationProcessor {
CommandResult process(BLEDeviceCommandQue que, Object notificationObject, int idx, int onSuccess);
}
public BLEDeviceCommand(@NonNull BLEDeviceCommandQue commandQue, int type, @Nullable Long executionTimeout,
String serviceCharacteristic, @NonNull OnPreExecute onPreExecute,
@Nullable NotificationProcessor notificationProcessor, int onExecutionSuccess) {
super(commandQue, type, executionTimeout, onExecutionSuccess);
this.serviceCharacteristic = serviceCharacteristic;
this.onPreExecute = onPreExecute;
this.notificationProcessor = notificationProcessor;
}
@Override
public @Nullable Object createMessage(Object object) {
return null;
}
@Override
public CommandResult onPreExecute(@NonNull List<CommandResult> results, int commandIndex, CommandQue que) {
preExecuteCommandResult = onPreExecute.onPreExecute(results, commandIndex, que);
if(type == COMMAND_TYPE_WRITE)
writeDataIO = preExecuteCommandResult.data;
return preExecuteCommandResult;
}
@Override
public void executeRequest(OnExecutedIO OnExecutedIO) {
if(type == COMMAND_TYPE_WRITE) {
((BLEDeviceCommandQue) commandQue).masterDeviceManager.getBLEManager().writeData(
writeDataIO,
serviceCharacteristic,
(request) -> {
Object obj = createMessage(request);
if(obj != null)
message = new CommandMessage(commandIndex, obj);
OnExecutedIO.onExecution(
new CommandResultBuilder().setQueId(commandQue.queId)
.setOperationIndex(commandIndex)
.setData(writeDataIO)
.setCode(onExecutionSuccess)
.build());
}
,
(request, bleErrCode, tr) -> OnExecutedIO.onExecution(
new CommandResultBuilder().setQueId(commandQue.queId)
.setOperationIndex(commandIndex)
.setCode(CommandResult.IO_ERROR)
.setIoCode(bleErrCode)
.setException(tr)
.build()),
23 // TODO 1 MTU
);
return;
}
if(type == COMMAND_TYPE_READ) {
((BLEDeviceCommandQue) commandQue).masterDeviceManager.getBLEManager().readData(
new DeviceIOEvents.READ_REQUEST(...),
(response) -> {
Object obj = createMessage(response);
if(obj != null)
message = new CommandMessage(commandIndex, obj);
OnExecutedIO.onExecution(
new CommandResultBuilder().setQueId(commandQue.queId)
.setOperationIndex(commandIndex)
.setData(response.toByteArray())
.setCode(onExecutionSuccess)
.build());
},
(response, bleErrCode, tr) -> OnExecutedIO.onExecution(
new CommandResultBuilder().setQueId(commandQue.queId)
.setOperationIndex(commandIndex)
.setCode(CommandResult.IO_ERROR)
.setIoCode(bleErrCode)
.setException(tr)
.build())
);
}
}
@Override
public CommandResult onWaitComplete() {
return new CommandResultBuilder().ююю.build();
}
@Nullable
@Override
public CommandResult onNotificationReceived(Object notification) {
if(type != COMMAND_TYPE_WAIT_FOR_NOTIFICATION)
return null;
if(notificationProcessor != null)
return notificationProcessor.process((BLEDeviceCommandQue) commandQue, notification, commandIndex, onExecutionSuccess);
return null;
}
}
public class CommandQue {
private final LinkedList<Command> commands = new LinkedList<>();
protected final ArrayList<CommandResult> results = new ArrayList<>();
private Command currentCommand;
private final Handler timeoutHandler = new Handler();
private TimeoutRunnable timeoutRunnable;
private DeviceOperationListener operationListener;
private final LinkedList<CommandMessage> messages = new LinkedList<>();
public CommandQue(long queId, int quePriority, final String operationName, int operationType, int operationId) {
this.queId = queId;
this.quePriority = quePriority;
this.operationName = operationName;
this.operationType = operationType;
this.operationId = operationId;
}
public final int getOperationId() {
return operationId;
}
public void setOperationListener(DeviceOperationListener deviceOperationListener) {
this.operationListener = deviceOperationListener;
}
public void setOnQueFinish(@NonNull final OnQueFinish onQueFinish) {
this.onQueFinish = onQueFinish;
}
public void append(Command command) {
command.setCommandIndex(lastCommandIndex);
commands.addLast(command);
lastCommandIndex++;
}
void appendCommandMessage(@Nullable CommandMessage message) {
if(message != null)
messages.addLast(message);
}
public @Nullable CommandMessage getLastCommandMessage() {
if(messages.size() > 0)
return messages.getLast();
else return null;
}
public LinkedList<CommandMessage> getCommandMessages() {
return messages;
}
void appendExecutionResult(CommandResult result) {
if(timeoutRunnable != null)
timeoutHandler.removeCallbacks(timeoutRunnable);
if(finished) return;
resultCounter++;
results.add(resultCounter, result);
if(operationListener != null) {
DeviceOperationBuilder builder = new DeviceOperationBuilder();
builder.setCurrentOperation(result.operationIndex)
.setDevice(getDevice())
.setOperationName(getOperationName())
.setOverallOperationsCount(commands.size())
.setResult(result)
.setQueId(queId)
.setOperationType(operationType)
.setPriorityGroup(quePriority);
operationListener.listen(builder.build());
}
if(result.code > 0) {
finished = true;
onQueFinish.onQueFinished(results);
}
}
public void cancelQueue() {
appendExecutionResult(new CommandResultBuilder().setQueId(queId)
.setOperationIndex(-1)
.setCode(QUE_FINISHED_BY_REQUEST)
.setErrMessage("Que finished manually")
.build());
}
int size = -1;
public void nextCommand() {
if(size == -1) size = commands.size();
if(timeoutRunnable != null)
timeoutHandler.removeCallbacks(timeoutRunnable);
currentTask++;
if(currentTask == size) {
appendExecutionResult(new CommandResultBuilder().setQueId(queId)
.setOperationIndex(0)
.setCode(CommandResult.QUE_FINISHED_SUCCESSFULLY)
.setErrMessage("No commands left in que")
.build());
return;
}
currentCommand = commands.get(currentTask);
CommandResult validationResult = currentCommand.onPreExecute(results, currentCommand.commandIndex, this);
if(validationResult.code > 0)
onQueFinish.onQueFinished(results);
switch (currentCommand.type) {
case COMMAND_TYPE_READ:
currentCommand.execute();
break;
case COMMAND_TYPE_WRITE:
currentCommand.execute();
break;
case COMMAND_TYPE_WAIT_FOR_NOTIFICATION:
break;
case COMMAND_TYPE_WAIT:
break;
default:
appendExecutionResult(
new CommandResultBuilder()
.setQueId(queId)
.setOperationIndex(currentCommand.commandIndex)
.setCode(INTERNAL_ERROR)
.setErrMessage("Unknown command type")
.build()
);
}
if(currentCommand.executionTimeout != null) {
timeoutRunnable = new TimeoutRunnable(currentCommand);
timeoutHandler.postDelayed(timeoutRunnable, currentCommand.executionTimeout);
}
}
public void notificationChannelIO(Object notificationObject) {
CommandResult result = currentCommand.onNotificationReceived(notificationObject);
if(result == null) return; // Skipping notification
appendExecutionResult(result);
if(result.code == OK_READY_FOR_NEXT)
nextCommand();
}
private static CommandResult generateTimeoutResult(long queId, int commandId) {
...
}
public interface OnQueFinish {
void onQueFinished(ArrayList<CommandResult> results);
}
class TimeoutRunnable implements Runnable {
final Command command;
TimeoutRunnable(Command command) {
this.command = command;
}
@Override
public void run() {
if(command.type == COMMAND_TYPE_WAIT) {
CommandResult result = command.onWaitComplete();
if(result == null) return;
appendExecutionResult(command.onWaitComplete());
if(result.code == OK_READY_FOR_NEXT) {
nextCommand();
}
} else {
appendExecutionResult(generateTimeoutResult(queId, command.commandIndex));
}
}
}
}
public abstract class Command {
protected final CommandQue commandQue;
protected final int type;
final Long executionTimeout;
protected int commandIndex = -1;
protected final int onExecutionSuccess;
public Command(@NonNull CommandQue commandQue, int type, @Nullable Long executionTimeout, int onExecutionSuccess) {
this.commandQue = commandQue;
this.type = type;
this.executionTimeout = executionTimeout;
this.onExecutionSuccess = onExecutionSuccess;
}
public final static int COMMAND_TYPE_READ = 1;
public final static int COMMAND_TYPE_WRITE = 2;
public final static int COMMAND_TYPE_WAIT_FOR_NOTIFICATION = 3;
public final static int COMMAND_TYPE_WAIT = 4;
void setCommandIndex(int index) {
this.commandIndex = index;
}
public interface OnExecutedIO {
void onExecution(CommandResult result);
}
protected CommandMessage message;
public abstract @Nullable Object createMessage(Object object);
abstract public CommandResult onPreExecute(@NonNull List<CommandResult> results, int commandIndex, CommandQue que);
void execute() {
executeRequest((CommandResult result) -> {
commandQue.appendExecutionResult(result);
commandQue.appendCommandMessage(message);
commandQue.nextCommand();
}
);
}
abstract public void executeRequest(OnExecutedIO OnExecutedIO);
abstract public @Nullable CommandResult onNotificationReceived(Object notification);
abstract public @Nullable CommandResult onWaitComplete();
}
Observable
.from(new String[]{"1", "2", "3", "4", "5", "6"})
.map(stringToInteger);
class NotificationSingleOnSubscribe implements SingleOnSubscribe, SomeListener {
int previous;
NotificationSingleOnSubscribe(int previous) {
this.previous = previous;
}
void onNotificationReceived(Notif notification) {
if(notification.isOk() && previous == someValueWaitForNotification)
emitter.onSuccess(someGoodResult);
}
void subscribe() {
if(previous == caseOne) {
// Do something for case one
} else if (previous == someValueWaitForNotification) {
// Wait for notification received by onNotificationReceived (do nothing)
Handler h = new Handler();
h.postDelayed(()->emitter.onSuccess(timeoutCode), 1000L);
}
}
}
значит в contentMain совсем другое