0

I have multiple tasks, each being executed on Schedulers.newThread(). The task is a method, which returns Observable<Long>.

The overall structure looks like this:

public void createAndPerformOperations(int mDataStructureSize, int operationsAmount) {
    disposables.add(Single.fromCallable(() ->
                    (new OperationsFactory()).getOperations((new DataStructureFactory()).getMaps(mDataStructureSize)))
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(operations -> {
                for (int i = 0; i < operations.size(); i++) {
                    performOperation(operations.get(i), i, operationsAmount);
                }
            }));
}

private void performOperation(Operation operation, int id, int operationsAmount) {
    disposables.add(Observable.defer(() -> operation
                    .executeAndReturnUptime(operationsAmount))
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(upTime -> uptimeStream.onNext(new Pair<>(id, upTime))));
}

The uptimeStream is a PublishSubject, which is observed in a Fragment. I need to somehow call the PublishSubject's observer onComplete method, but only when all tasks are finished and all Pairs are consumed.

Although the amount of tasks is known, I'm trying to avoid implementing any type of hardcoded counter for consumed items, since the code should be reusable if that amount changes. I've tried calling onComplete method directly from different places, but nothing seems to really work. The main problem is that I don't need just a confirmation that all tasks are finished, but I also need to process all the values in the parent Fragment

I'm fine with removing PublishSubject in general, if other solution works.

UPD: I'm aware of this solution, but I can't find Observable.from in RXJava 3, and in this solution .zip() doesn't work for me, since I can't manually list all the observables + need to include ID field besides the actual return value

UPD2: I deleted the HashMap thing because I've got to thinking, and the main point here is to still get the results one by one, and execute only onComplete after the tasks are completed. So the solution with fromIterable does not really fit there.

  • It's `fromIterable` in RxJava 3. – akarnokd Nov 23 '22 at 12:13
  • Yeah, okay, but it doesn't really work because it defeats the purpose of the async execution. If I'm getting results after all the tasks are finished, I might as well just run it on the main thread. The question is more about how to keep track of the process so I could execute onComplete after all the tasks are finished – Andrei Aleksandrov Nov 23 '22 at 14:23

0 Answers0