1

I am using RxAndroid ,Retrofit and SqlBrite .

POJO Classes:
Eg : file_path = "....../videos/.mp4"

public class VideoResponse {
    @SerializedName("files")
    @Expose
    private List<VideoFiles> files = null;
    .....
}

public class VideoFiles {
    @SerializedName("file_path")
    @Expose
    private String remotePath;

    private String localPath;
    .....
}

Passing the list to setLocalPath from the apiService.

 @Inject
    public RemoteDataSource(ApiService service,DownloadUtils downloadUtils) {
        this.service = service;
        this.downloadUtils = downloadUtils;
    }
    @Override
        public Observable<List<VideoResponse>> getVideoResponse() {
            return service.getVideoResponseFromServer()
                    .compose(RxUtils.applySchedulers())
       ==>             .map(this::setVideoLocalPath)
                    .doOnSubscribe(disposable -> Timber.d("*** Video Sync Started....."))
                    .doOnError(throwable -> Timber.d("*** Video Sync Failed ...."))
                    .doOnComplete(() -> Timber.d(" *** Video Sync Complete...."));
        }

Passing every remote Path to DownloadUtils and get back the altered List of VideoResponse.

   private List<VideoResponse> setVideoLocalPath(List<VideoResponse> videoResponses) {
        for (VideoResponse r : videoResponses) {
            for (VideoFiles file : r.getFiles()) {
                downloadUtils.downloadVideoFromInternet(file, service);
            }
        }
        return videoResponses;
    }

Downloading and Setting the Local Path ;

public class DownloadUtils {

        public void downloadVideoFromInternet(VideoFiles video, ApiService service) {
        service.downloadFileByUrl(video.getRemotePath())
                .flatMap(processResponse("video", video.getFileTitle()))
                .subscribe(handleVideoResult(video));
    }

    private Observer<? super File> handleVideoResult(VideoFiles video) {
        return new Observer<File>() {
            @Override
            public void onSubscribe(Disposable d) {
                Timber.i("*** Download File OnSubscribe ***");
            }

            @Override
            public void onNext(File file) {
                Timber.d(" $$$$ Video File Path $$$ -> %s", file.getAbsolutePath());
                video.setLocalPath(file.getAbsolutePath());
            }

            @Override
            public void onError(Throwable e) {
                Timber.e(e);
            }

            @Override
            public void onComplete() {
                Timber.i("*** Download File Completed ****");
            }
        };
    }

    private Function<Response<ResponseBody>, Observable<File>> processResponse(String folderName, String fileTitle) {
        return response -> saveToDisk(response, folderName, fileTitle);
    }

    private Observable<File> saveToDisk(Response<ResponseBody> response, String fileTitle, String folderName) {
        return Observable.create(subscriber -> {
            try {
                File file = new File("/data/aster/" + folderName + fileTitle);
                if (!file.exists()) {
                    file.mkdirs();
                }
                BufferedSink bufferedSink = Okio.buffer(Okio.sink(file));
                bufferedSink.writeAll(response.body().source());
                bufferedSink.close();
                subscriber.onNext(file);
                subscriber.onComplete();
            } catch (IOException e) {
                e.printStackTrace();
                subscriber.onError(e);
            }
        });
    }
}

The problem is the video files are not getting downloaded and each one stops at on subscribe. After passing values to setLocalVideoPath , the downloads are not getting finished and i get NetworkOnMainThreadException and the app crashes..Is there a better way to implement this logic..!!Kindly help.

robin_24
  • 109
  • 2
  • 9
  • What for does `DownloadUtils` contain `RxJava` code? Your `getVideoResponse` will run the download on the background thread already... – Anatolii Sep 01 '18 at 06:14
  • @Anatolii .. Hi..even if i remove the rxjava part from the **Download Utils**, won't Retrofit run the download in the background? – robin_24 Sep 01 '18 at 07:12

2 Answers2

1

If RxUtils.applySchedulers is applying the following then at the point you go through the mapping operation and subsequently hit service.downloadFileByUrl this will be executed on the main thread.

.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());

If you move the observeOn call after the map operation then service.downloadFileByUrl should get executed off the main thread, i.e.

@Override
public Observable<List<VideoResponse>> getVideoResponse() {
    return service.getVideoResponseFromServer()
            .subscribeOn(Schedulers.io())
            .map(this::setVideoLocalPath)
            .observeOn(AndroidSchedulers.mainThread());
            .doOnSubscribe(disposable -> Timber.d("*** Video Sync Started....."))
            .doOnError(throwable -> Timber.d("*** Video Sync Failed ...."))
            .doOnComplete(() -> Timber.d(" *** Video Sync Complete...."));
}
Chris
  • 2,332
  • 1
  • 14
  • 17
1

The problem is the order in which you execute your operations, specifically here

....
.compose(RxUtils.applySchedulers())
.map(this::setVideoLocalPath)
...

If you look into the source code for RxUtils.applySchedulers() you'll find that the transformer looks like this:

static Observable.Transformer schedulersTransformer = new Observable.Transformer<Object, Object>() {
    @Override public Observable<Object> call(Observable<Object> observable) {
        return observable
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread());
    }
};

So, observeOn(AndroidSchedulers.mainThread()) switches your thread performing the work to the main one and as your map goes next it performs the work on the main thread. I don't see any reason why your map operator should go afterwards. Simply change the order as follows:

....
.map(this::setVideoLocalPath)
.compose(RxUtils.applySchedulers())
...

Here, the execution chain is fixed and your map operator will do work on the io thread. Also, please note that your doOnSubscribe performs the work on the main thread as it goes after compose(RxUtils.applySchedulers()).

Anatolii
  • 14,139
  • 4
  • 35
  • 65
  • @Anatolli..hi...how do i implement a listener to set the progress in the **download utils**..?..i have implemented a repository pattern(MVP) as well...any help! – robin_24 Sep 03 '18 at 12:01
  • @rdk1234 I guess this can help you - https://stackoverflow.com/questions/42118924/android-retrofit-download-progress – Anatolii Sep 03 '18 at 14:04
  • @Anatelli..thanks for the post..!!! should i set those callbacks inside the view contract and call those from the **DownloadUtils** or separate listener..suggestions?..!! – robin_24 Sep 03 '18 at 14:58