1

Main POJO:

class VideoResponse{
    List<VideoFiles> videosFiles;
}

I have the following case where i combine the results from two database operation and return as Observable(List(VideoResponse)) .

##Update##

mDbHelper
  ===/* getVideoCategory() returns Observable<List<VideoResponse>> */=========                 
    .getVideoCategory()  
    .flatMapIterable(videoResponses -> videoResponses)
    .doOnNext(videoResponse -> {
              Timber.d("Flatmap Iterable Thread == > %s", Thread.currentThread().getName());})
    .concatMap((Function<VideoResponse, ObservableSource<VideoResponse>>) videoResponse -> {
                Integer videoId = videoResponse.getId();
                return Observable.fromCallable(() -> videoResponse)
                               .doOnNext(videoResponse1 -> {

                            Timber.d("Thread before ZIP WITH ===>  
                                  %s", Thread.currentThread().getName());
                                })

     ===/* getVideoFileList(int videoId) returns Observable<List<VideoFiles>> */====

                              .zipWith(getVideoFilesList(videoId)),
                                        (videoResponse1, videoFiles) -> {
                                            videoResponse1.setFiles(videoFiles);
                                            return videoResponse1;
                                        })
                              .doOnNext(vResponse -> {

                                 Timber.d("Thread After ZIP WITH ===>  
                                  %s",Thread.currentThread().getName());
                                })
           ======= /*This Gets printed*/ ======================
                           .doOnComplete(()->{
                                Timber.d(" OnComplete Thread for Video Files ===>  %s ",Thread.currentThread().getName());
                            });

                     })
                    .toList()
                    .toObservable()

     ===/* Below Print statement is not getting Executed */=================              
                     .doOnComplete(()->{
                    Timber.d(" Thread doOnComplete");
                })
                    .doOnNext(videoResponses -> {
                        Timber.d("Thread after loading from the LOCAL DB ====> %s", Thread.currentThread().getName());
            }); 

Below are the scheduler threads being executed:

 Flatmap Iterable Thread == >  RxCachedThreadScheduler-1
 Thread before ZIP WITH  ===>  RxCachedThreadScheduler-1
 Flatmap Iterable Thread == >  RxCachedThreadScheduler-1
 Thread After ZIP WITH   ===>  RxCachedThreadScheduler-2
 Thread before ZIP WITH  ===>  RxCachedThreadScheduler-2
 Thread After ZIP WITH   ===>  RxCachedThreadScheduler-2

The final onNext is never getting executed.I need to return the List in the OnNext. I have placed observeOn on different positions ,nothing seems to work..!! Any suggestions..

##Update## Using SqlBrite,

  @Override
public Observable<List<VideoResponse>> getVideoCategory() {
        return mDBHelper
                .createQuery(VideoEntry.TABLE_NAME,
                        DbUtils.getSelectAllQuery(VideoEntry.TABLE_NAME))
                .mapToOne(DbUtils::videosFromCursor);



  @Override
    public Observable<List<VideoFiles>> getVideoFilesList(int videoId) {
        return mDBHelper.createQuery(VideoDetailsEntry.TABLE_NAME,
                     DbUtils.getSelectFromId(VideoDetailsEntry.TABLE_NAME,VideoDetailsEntry.COLUMN_VIDEO_ID),
                String.valueOf(videoId))
                .mapToOne(DbUtils::videoDetailsFromCursor);
    }
robin_24
  • 109
  • 2
  • 9
  • 5
    The `toList()` operator will never finish if the upstream observer chain never completes. You can insert logging statements in the observer chain using `doOnComplete()` operator to see when or if any stage completes – Bob Dalgleish Sep 18 '18 at 18:32
  • @BobDalgleish ... placing **doOnComplete** after the zipWith operator(inside the getVideoFile(id)) prints the thread name..whereas placing **doOnComplete** before the **toList** fails to execute...is something wrong with how i use the operators..? – robin_24 Sep 19 '18 at 04:26
  • @BobDalgleish..is there anyother way to combine the observables after passing a value from one observable to another!! – robin_24 Sep 19 '18 at 07:46
  • The logic of your example is buried in the details. Either `videoResponse` is an observable that never completes or `getVideoFilesList()` never completes, or both. – Bob Dalgleish Sep 19 '18 at 21:38
  • @BobDalgleish...hi ..i have updated the post...the **getVideoFilesList(int id)** gets completed but **getVideoCategory()** do not seem to complete!!..any suggestions.. – robin_24 Sep 20 '18 at 06:18
  • 1
    If you only ever want the first value from `getVideoCategory()`, you could use the `take(1)` operator -- it would complete after emitting the first value. – Bob Dalgleish Sep 20 '18 at 13:27
  • @BobDalgleish..i tried adding "*take()** as well... **doOnComplete** is not getting executed after **toList** !! – robin_24 Sep 20 '18 at 14:13
  • That suggests that `getVideoCategory()` is not returning anything. If you applied `take(1)` immediately downstream of `getVideoCategory()`, then that is the most likely answer. – Bob Dalgleish Sep 20 '18 at 15:06
  • @BobDalgleish. ...while debugging the flow is like => doOnNext(after flatMapIterable) => doOnSubscribe => directly goes to "toList => toObservable and after that statements inside **concatMap** gets printed with the required values!! ... **getVideoCategory** seems to be emitting!! – robin_24 Sep 20 '18 at 15:45

1 Answers1

0

As hinted by @BobDalgleish , the OnComplete was being called before the toList instead of after ZipWith .Now i have made the following changes , and i get as complete list from db. I have used concatMap for preserving the order and wait for complettion.

mLocalDataSource
  .getVideoCategory()
  .compose(RxUtils.applySchedulers())
  .flatMap(new Function<List<VideoResponse>, 
              ObservableSource<List<VideoResponse>>>() {
                    @Override
                     public ObservableSource<List<VideoResponse>> apply(List<VideoResponse> videoResponses) throws Exception {
                             return Observable.just(videoResponses)
                                    .concatMap(videoResponses1 -> Observable.fromIterable(videoResponses1)
                                    .concatMap(videoResponse -> Observable.just(videoResponse)
                                    .concatMap(videoResponse1 -> {
                                                Integer videoId = videoResponse1.getId();
                                                return Observable.just(videoResponse1)
                                                      .zipWith(getVideoFilesList(videoId), new BiFunction<VideoResponse, List<VideoFiles>, VideoResponse>() {
                                                                @Override
                                                                public VideoResponse apply(VideoResponse videoResponse1, List<VideoFiles> videoFiles) throws Exception {
                                                                        videoResponse1.setFiles(videoFiles);
                                                                        Timber.d("Video Responses == >  %s",videoResponse1);
                                                                        return videoResponse1;
                                                                        }
                                                                    });
                                                     })))
                                      .toList()
                                      .toObservable()
                                     .observeOn(AndroidSchedulers.mainThread());
                            }
                        }); 

I know this looks kind of messy !!.any suggestion or optimizations , do kindly post..!!

robin_24
  • 109
  • 2
  • 9