I have multiple tasks, each being executed on Schedulers.newThread()
. The task is a method, which returns Observable<Long>
.
The overall structure looks like this:
public void createAndPerformOperations(int mDataStructureSize, int operationsAmount) {
disposables.add(Single.fromCallable(() ->
(new OperationsFactory()).getOperations((new DataStructureFactory()).getMaps(mDataStructureSize)))
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(operations -> {
for (int i = 0; i < operations.size(); i++) {
performOperation(operations.get(i), i, operationsAmount);
}
}));
}
private void performOperation(Operation operation, int id, int operationsAmount) {
disposables.add(Observable.defer(() -> operation
.executeAndReturnUptime(operationsAmount))
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(upTime -> uptimeStream.onNext(new Pair<>(id, upTime))));
}
The uptimeStream
is a PublishSubject, which is observed in a Fragment.
I need to somehow call the PublishSubject's observer onComplete method, but only when all tasks are finished and all Pairs are consumed.
Although the amount of tasks is known, I'm trying to avoid implementing any type of hardcoded counter for consumed items, since the code should be reusable if that amount changes. I've tried calling onComplete method directly from different places, but nothing seems to really work. The main problem is that I don't need just a confirmation that all tasks are finished, but I also need to process all the values in the parent Fragment
I'm fine with removing PublishSubject in general, if other solution works.
UPD: I'm aware of this solution, but I can't find Observable.from
in RXJava 3, and in this solution .zip()
doesn't work for me, since I can't manually list all the observables + need to include ID field besides the actual return value
UPD2: I deleted the HashMap thing because I've got to thinking, and the main point here is to still get the results one by one, and execute only onComplete after the tasks are completed. So the solution with fromIterable
does not really fit there.