2

I am having trouble implementing RxJava3/Retrofit2, what I need to accomplish is:

The general idea is to sync a cloud DB to an SqLite DB (Room) on a device. The DB could get large, around 100,000 registers or more, so the sync process can take some time, my first attempt was to do it in one request and get all registers, then save them to SqLite (Room), but this in some cases, depending on the device generated some out of memory exceptions, so after some research I found out that RxJava is the answer, also implementing some API call pagination as well.

  1. First, I was trying to do a proof of concept, to fire a first Retrofit call and show the response on the activity, but I haven't been able to make it work, I am stuck!!

This is the Error I get: java.lang.ClassCastException: io.reactivex.rxjava3.internal.observers.LambdaObserver cannot be cast to io.reactivex.rxjava3.core.Observable

  1. What I am trying to do is and I am asking for help here is; a first call where I can get to total number of register and define the number of pages, based on that, send multiple Retrofit request, every time I get the response (List < Item >), save them to room.

Following some code

gradle

//Retrofit
implementation 'com.squareup.retrofit2:retrofit:2.8.1'
implementation 'com.squareup.retrofit2:converter-gson:2.8.1'
//Retrofit2 Adapter for RxJava 3
implementation "com.github.akarnokd:rxjava3-retrofit-adapter:3.0.0"
//okhttp3 Logging Interceptor
implementation "com.squareup.okhttp3:logging-interceptor:4.5.0"
//RxJava
implementation "io.reactivex.rxjava3:rxjava:3.0.2"
implementation 'io.reactivex.rxjava3:rxandroid:3.0.

class Item

public class Item {
    private String epc;
    private String barcode;
    private String name;
    private String description;
    private String brand;
    @SerializedName("serial_number")
    private String serialNumber;
    private double cost;
    @SerializedName("fk_item_state")
    private int status;
    @SerializedName("fk_category")
    private int category;
    @SerializedName("fk_location")
    private int location;
    private String parent;
    @SerializedName("responsable")
    private String responsible;
    @SerializedName("purchase_date")
    private String purchaseDate;
    @SerializedName("creation_date")
    private String creationDate;
    @SerializedName("last_update")
    private String lastUpdate;
    @SerializedName("inventory_date")
    private String inventoryDate;
    @SerializedName("last_seen")
    private String lastSeen;
...

class ItemSyncDetails

public class ItemSyncDetails {
    @SerializedName("CurrentPage")
    int currentPage;
    @SerializedName("PageCount")
    int pageCount;
    @SerializedName("PageSize")
    int pageSize;
    @SerializedName("RecordCount")
    int recordCount;
    @SerializedName("Results")
    List<Item> mItemList;
...

interface FrekuencyApi

import io.reactivex.rxjava3.core.Observable;
import retrofit2.http.GET;
import retrofit2.http.Query;

public interface FrekuencyApi {

    @GET("item")
    Observable<ItemSyncDetails> getItemsPageDetails(
            @Query("pageSize") Integer pageSize,
            @Query("currentPage") Integer currentPage,
            @Query("sortBy") Integer sortBy
    );
...

class MainActivity

import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.CompositeDisposable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import okhttp3.Interceptor;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.logging.HttpLoggingInterceptor;
import retrofit2.Retrofit;
import retrofit2.converter.gson.GsonConverterFactory;
...

public class MainActivity extends AppCompatActivity {

    private TextView tvResult;
    private Button send;
    private ProgressDialog mProgressDialog;
    private ItemSyncDetails mItemSyncDetails;
    private List<Item> mItemsList;
    private CompositeDisposable disposables = new CompositeDisposable();
    private FrekuencyApi frekuencyApi;
    private Retrofit retrofit;
    private int mNumPages;
    private CompositeDisposable compositeDisposable;
    private HttpLoggingInterceptor logging;

    private static final String TAG = "MainActivity";
    private Object handleResults;


    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        tvResult = findViewById(R.id.tv_result);
        send = findViewById(R.id.btnSend);
        mItemSyncDetails = new ItemSyncDetails();
        mProgressDialog = new ProgressDialog(this);

        mItemsList = new ArrayList<Item>();

        HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
        interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
        OkHttpClient client = new OkHttpClient.Builder()
                .addInterceptor(interceptor)
                .addNetworkInterceptor(new Interceptor() {
                    @Override
                    public okhttp3.Response intercept(Chain chain) throws IOException {
                        Request request = chain.request().newBuilder()
                                .build();
                        return chain.proceed(request);
                    }
                }).build();

        retrofit = new Retrofit.Builder()
                .baseUrl("http://192.168.1.10:82/api/v1.0/")
                .client(client)
                .addCallAdapterFactory(RxJava3CallAdapterFactory.create())
                .addConverterFactory(GsonConverterFactory.create())
                .build();

        send.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                //TODO: EVENTO AL HACER CLICK EN BOTON
                getRecordsCount();
            }
        });
    }

    private void getRecordsCount(){
        FrekuencyApi frekuencyApi = retrofit.create(FrekuencyApi.class);
        Observable<ItemSyncDetails> observable = (Observable<ItemSyncDetails>) frekuencyApi.getRecordsCount(1,1,1)
                .subscribeOn(Schedulers.io())
                .retry(3)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(this::HandleResults, this::handleError );
    }

    private void HandleResults(ItemSyncDetails itemSyncDetails) {
        this.mItemSyncDetails = itemSyncDetails;
        int pageSize = 100;
        int numPages = itemSyncDetails.getPageCount()/pageSize;
        if (itemSyncDetails.getRecordCount() < pageSize || itemSyncDetails.getRecordCount()%pageSize != 0){
            numPages++;
        }
        this.mNumPages = numPages;
        tvResult.append("Page size: " + pageSize + "\n");
        tvResult.append("Page number: " + this.mNumPages + "\n");
        tvResult.append("Total items: " + itemSyncDetails.getRecordCount() + "\n");
    }

    private void handleError(Throwable throwable) {
        Log.e("Observer", ""+ throwable.toString());
        Toast.makeText(this, "ERROR DE CONEXION",
                Toast.LENGTH_LONG).show();
    }
...

Error Log

D/AndroidRuntime: Shutting down VM
E/AndroidRuntime: FATAL EXCEPTION: main
    Process: com.frekuency.retrofitapptest, PID: 9120
    java.lang.ClassCastException: io.reactivex.rxjava3.internal.observers.LambdaObserver cannot be cast to io.reactivex.rxjava3.core.Observable
        at com.frekuency.retrofitapptest.views.MainActivity.getRecordsCount(MainActivity.java:110)
        at com.frekuency.retrofitapptest.views.MainActivity.access$000(MainActivity.java:39)
        at com.frekuency.retrofitapptest.views.MainActivity$2.onClick(MainActivity.java:98)
        at android.view.View.performClick(View.java:5273)
        at android.view.View$PerformClick.run(View.java:21315)
        at android.os.Handler.handleCallback(Handler.java:743)
        at android.os.Handler.dispatchMessage(Handler.java:95)
        at android.os.Looper.loop(Looper.java:150)
        at android.app.ActivityThread.main(ActivityThread.java:5659)
        at java.lang.reflect.Method.invoke(Native Method)
        at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:822)
        at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:712)
D/OkHttp: --> GET http://192.168.1.10:82/api/v1.0/item?pageSize=1&currentPage=1&sortBy=1
    --> END GET
I/Process: Sending signal. PID: 9120 SIG: 9
I/System: core_booster, getBoosterConfig = false

json for - http://192.168.1.10:82/api/v1.0/item?pageSize=1&currentPage=1&sortBy=1

{
  Results: [
  {
    epc: "202020202020202030303031",
    barcode: "0001",
    name: "Televisor Samnsung",
    description: "0001",
    creation_date: "2020-02-26T10:55:06",
    last_update: "2020-02-26T10:55:06",
    last_seen: "2020-02-26T10:55:06",
    brand: "Samnsung",
    serial_number: "0001",
    parent: "",
    fk_category: 1,
    responsable: "",
    purchase_date: "2020-02-26T10:55:06",
    cost: 0,
    fk_location: 1008,
    fk_item_state: 1,
    inventory_date: "2020-02-26T10:55:06"
  }
 ],
 CurrentPage: 1,
 PageCount: 65565,
 PageSize: 1,
 RecordCount: 65565
}

Thanks in advance, I really appreciate your help.

I did some changes to class MainActivity

public class MainActivity extends AppCompatActivity {
...
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        tvResult = findViewById(R.id.tv_result);
        send = findViewById(R.id.btnSend);
        mItemSyncDetails = new ItemSyncDetails();
        mProgressDialog = new ProgressDialog(this);

        mItemsList = new ArrayList<Item>();

        HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
        interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
        OkHttpClient client = new OkHttpClient.Builder()
                .addInterceptor(interceptor)
                .addNetworkInterceptor(new Interceptor() {
                    @Override
                    public okhttp3.Response intercept(Chain chain) throws IOException {
                        Request request = chain.request().newBuilder()
                                .build();
                        return chain.proceed(request);
                    }
                }).build();

        retrofit = new Retrofit.Builder()
                .baseUrl("http://192.168.1.10:82/api/v1.0/")
                .client(client)
                .addCallAdapterFactory(RxJava3CallAdapterFactory.create())
                .addConverterFactory(GsonConverterFactory.create())
                .build();

        send.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
              showProgressDialog("Items");
              getRecordsCount();
            }
        });
    }

private void getRecordsCount(){
    retrofit.create(FrekuencyApi.class)
    .getRecordsCount(1,1,1)
            .subscribeOn(Schedulers.io())
            .retry(3)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(this::HandleResults, this::handleError,this::getNumPagesHandlerComplete );
}

private void HandleResults(ItemSyncDetails itemSyncDetails) {
    this.mItemSyncDetails = itemSyncDetails;
    int pageSize = 100;
    int numPages = itemSyncDetails.getRecordCount()/pageSize;
    if (itemSyncDetails.getRecordCount() < pageSize || itemSyncDetails.getRecordCount()%pageSize != 0){
        numPages++;
    }
    this.mNumPages = numPages;
    tvResult.append("Tamaño de pagina: " + pageSize + "\n");
    tvResult.append("Numero de paginas: " + this.mNumPages + "\n");
    tvResult.append("Numero total de registros: " + itemSyncDetails.getRecordCount() + "\n");
}

private void getNumPagesHandlerComplete() {
    getAllRecords(mNumPages);
}

private void getAllRecords(int numPages){
    frekuencyApi = retrofit.create(FrekuencyApi.class);
    //numPages: total of pages are the number of times to send the request to API
    Observable.range(1, numPages)
    .concatMap(i -> frekuencyApi.getItemsPageDetails(100,i,1))
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(this::getAllHandleResults, this::handleError,this::handleComplete);
}

private void getAllHandleResults(ItemSyncDetails itemSyncDetails) {
    //TODO: Should it be the right place to save data to DB?
    //Get the progress
    int tmp = getProgress(itemSyncDetails.getPageCount(),
            itemSyncDetails.getCurrentPage(),itemSyncDetails.getPageSize());
    //Update ProgressDialog Progress
    mProgressDialog.setProgress(tmp);
    tvResult.setText("Progreso: "+ tmp + "%\n");
    if (itemSyncDetails.getCurrentPage() == itemSyncDetails.getPageCount()){
        //Showing on screen when last request is done
        tvResult.append("******************\n");
        tvResult.append(itemSyncDetails.getItemList().toString());
        tvResult.append("******************\n");
    }
}

private void handleComplete() {
    //when the last request finished
    tvResult.append("\nProceso de Sincronizacion terminado con exito!");
    closeProgressDialog();
}

private void handleError(Throwable throwable) {
    Log.e("Observer", ""+ throwable.toString());
    tvResult.append("\n******************\n");
    tvResult.append("ERROR DE CONEXION...\n");
    tvResult.append(throwable.toString());
    Toast.makeText(this, "ERROR DE CONEXION",
            Toast.LENGTH_LONG).show();
    closeProgressDialog();
}
...

With the changes to the MainActivity class, basically there are two methods getRecordsCount() and getAllRecords(int numPages), each one fires an RX Java call process, the first one invokes one Retrofit call, and based on the answer of that first call, the second method is called and it will send (n) request to the API, where n is parameter numPages, I use a progressDialog that it is updated and when the hole process finished the progressDialog is close(). On the onComplete method of the first RX java call finished, it called the second RX Java method, at this point what I need is:

  1. How can I improve this to a more elegant code in one Rx java process, instead of two calls?
  2. I am thinking to put a Room insert call to the DB for a List on the getAllHandleResults() method, it is the right place to do the insert to the DB?
  3. Right now I am using concatMap on the getAllHandleResults() method, as far as I understand, this operator execute the Retrofit calls in a sequential order, I am doing it this way, because before trying RX Java + Retrofit, I was using Volley + Room + LiveData + ViewModel + AsyncTask, I did notice that the DB insertion was done one at a time, this operation is slower than the Retrofit Call, and one insertion operation needs to wait to the previous insertion to finished, so now I have the DB part using Room + LiveData + ViewModel + AsyncTask, so when the insert method is invoke from the onNext method (this happens on the Main-thread), the insertion will be execute on an AsyncTask in the background Thread, do you have any recommendation on this matter? can be this DB part done as well using RX Java? do you have some web-page where I can get information about it? Once again, thanks a lot for your help.
MarioV
  • 108
  • 11
  • 1
    Delete this part exactly `Observable observable = (Observable)` – denvercoder9 Apr 28 '20 at 22:36
  • @sonnet thanks, I can't believe it was that easy, it works, I change my code to `retrofit.create(FrekuencyApi.class).getRecordsCount(1,1,1).subscribeOn(Schedulers.io()).retry(3).observeOn(AndroidSchedulers.mainThread()).subscribe(this::HandleResults, this::handleError );`. That is the first part, now how can I do the multiples-calls request? – MarioV Apr 28 '20 at 23:02
  • 1
    https://stackoverflow.com/questions/36785090/chaining-requests-in-retrofit-rxjava – denvercoder9 Apr 30 '20 at 09:22
  • @sonnet thanks, I review that post, but I was already working on a solution, I update my post with some changes to the class MainActivity and new questions that I have. – MarioV May 01 '20 at 02:53
  • Does any one know if Room has support for RxJava 3? or if one can use RxJava 3 with latest version of Room 2.2.5? What I found on Android Developer page for Room Persistence Library is - RxJava support for Room is `implementation "androidx.room:room-rxjava2:2.2.5"` – MarioV May 01 '20 at 20:03
  • 1
    Please create separate posts for your questions. The main question was "chaining retrofit calls with rx" which has many answers on stackoverflow. As for rx3 with room, I'd suggest stay with rx2 until all your dependencies have moved to rx3. – denvercoder9 May 02 '20 at 23:06
  • @sonnet thank for your replay, I have already create a new post for the new questions [Rxjava 3 + Retrofit2 - multiple inserts to DB problem](https://stackoverflow.com/questions/61704933/rxjava-3-retrofit2-multiple-inserts-to-db-problem), by the way do you know where I can find the what are the latest versions for Rx2 that will work with Room and Retrofit? – MarioV May 11 '20 at 15:05

0 Answers0