I am having trouble implementing RxJava3/Retrofit2, what I need to accomplish is:
The general idea is to sync a cloud DB to an SqLite DB (Room) on a device. The DB could get large, around 100,000 registers or more, so the sync process can take some time, my first attempt was to do it in one request and get all registers, then save them to SqLite (Room), but this in some cases, depending on the device generated some out of memory exceptions, so after some research I found out that RxJava is the answer, also implementing some API call pagination as well.
- First, I was trying to do a proof of concept, to fire a first Retrofit call and show the response on the activity, but I haven't been able to make it work, I am stuck!!
This is the Error I get: java.lang.ClassCastException: io.reactivex.rxjava3.internal.observers.LambdaObserver cannot be cast to io.reactivex.rxjava3.core.Observable
- What I am trying to do is and I am asking for help here is; a first call where I can get to total number of register and define the number of pages, based on that, send multiple Retrofit request, every time I get the response (List < Item >), save them to room.
Following some code
gradle
//Retrofit
implementation 'com.squareup.retrofit2:retrofit:2.8.1'
implementation 'com.squareup.retrofit2:converter-gson:2.8.1'
//Retrofit2 Adapter for RxJava 3
implementation "com.github.akarnokd:rxjava3-retrofit-adapter:3.0.0"
//okhttp3 Logging Interceptor
implementation "com.squareup.okhttp3:logging-interceptor:4.5.0"
//RxJava
implementation "io.reactivex.rxjava3:rxjava:3.0.2"
implementation 'io.reactivex.rxjava3:rxandroid:3.0.
class Item
public class Item {
private String epc;
private String barcode;
private String name;
private String description;
private String brand;
@SerializedName("serial_number")
private String serialNumber;
private double cost;
@SerializedName("fk_item_state")
private int status;
@SerializedName("fk_category")
private int category;
@SerializedName("fk_location")
private int location;
private String parent;
@SerializedName("responsable")
private String responsible;
@SerializedName("purchase_date")
private String purchaseDate;
@SerializedName("creation_date")
private String creationDate;
@SerializedName("last_update")
private String lastUpdate;
@SerializedName("inventory_date")
private String inventoryDate;
@SerializedName("last_seen")
private String lastSeen;
...
class ItemSyncDetails
public class ItemSyncDetails {
@SerializedName("CurrentPage")
int currentPage;
@SerializedName("PageCount")
int pageCount;
@SerializedName("PageSize")
int pageSize;
@SerializedName("RecordCount")
int recordCount;
@SerializedName("Results")
List<Item> mItemList;
...
interface FrekuencyApi
import io.reactivex.rxjava3.core.Observable;
import retrofit2.http.GET;
import retrofit2.http.Query;
public interface FrekuencyApi {
@GET("item")
Observable<ItemSyncDetails> getItemsPageDetails(
@Query("pageSize") Integer pageSize,
@Query("currentPage") Integer currentPage,
@Query("sortBy") Integer sortBy
);
...
class MainActivity
import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.CompositeDisposable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import okhttp3.Interceptor;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.logging.HttpLoggingInterceptor;
import retrofit2.Retrofit;
import retrofit2.converter.gson.GsonConverterFactory;
...
public class MainActivity extends AppCompatActivity {
private TextView tvResult;
private Button send;
private ProgressDialog mProgressDialog;
private ItemSyncDetails mItemSyncDetails;
private List<Item> mItemsList;
private CompositeDisposable disposables = new CompositeDisposable();
private FrekuencyApi frekuencyApi;
private Retrofit retrofit;
private int mNumPages;
private CompositeDisposable compositeDisposable;
private HttpLoggingInterceptor logging;
private static final String TAG = "MainActivity";
private Object handleResults;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
tvResult = findViewById(R.id.tv_result);
send = findViewById(R.id.btnSend);
mItemSyncDetails = new ItemSyncDetails();
mProgressDialog = new ProgressDialog(this);
mItemsList = new ArrayList<Item>();
HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
OkHttpClient client = new OkHttpClient.Builder()
.addInterceptor(interceptor)
.addNetworkInterceptor(new Interceptor() {
@Override
public okhttp3.Response intercept(Chain chain) throws IOException {
Request request = chain.request().newBuilder()
.build();
return chain.proceed(request);
}
}).build();
retrofit = new Retrofit.Builder()
.baseUrl("http://192.168.1.10:82/api/v1.0/")
.client(client)
.addCallAdapterFactory(RxJava3CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.build();
send.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
//TODO: EVENTO AL HACER CLICK EN BOTON
getRecordsCount();
}
});
}
private void getRecordsCount(){
FrekuencyApi frekuencyApi = retrofit.create(FrekuencyApi.class);
Observable<ItemSyncDetails> observable = (Observable<ItemSyncDetails>) frekuencyApi.getRecordsCount(1,1,1)
.subscribeOn(Schedulers.io())
.retry(3)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::HandleResults, this::handleError );
}
private void HandleResults(ItemSyncDetails itemSyncDetails) {
this.mItemSyncDetails = itemSyncDetails;
int pageSize = 100;
int numPages = itemSyncDetails.getPageCount()/pageSize;
if (itemSyncDetails.getRecordCount() < pageSize || itemSyncDetails.getRecordCount()%pageSize != 0){
numPages++;
}
this.mNumPages = numPages;
tvResult.append("Page size: " + pageSize + "\n");
tvResult.append("Page number: " + this.mNumPages + "\n");
tvResult.append("Total items: " + itemSyncDetails.getRecordCount() + "\n");
}
private void handleError(Throwable throwable) {
Log.e("Observer", ""+ throwable.toString());
Toast.makeText(this, "ERROR DE CONEXION",
Toast.LENGTH_LONG).show();
}
...
Error Log
D/AndroidRuntime: Shutting down VM
E/AndroidRuntime: FATAL EXCEPTION: main
Process: com.frekuency.retrofitapptest, PID: 9120
java.lang.ClassCastException: io.reactivex.rxjava3.internal.observers.LambdaObserver cannot be cast to io.reactivex.rxjava3.core.Observable
at com.frekuency.retrofitapptest.views.MainActivity.getRecordsCount(MainActivity.java:110)
at com.frekuency.retrofitapptest.views.MainActivity.access$000(MainActivity.java:39)
at com.frekuency.retrofitapptest.views.MainActivity$2.onClick(MainActivity.java:98)
at android.view.View.performClick(View.java:5273)
at android.view.View$PerformClick.run(View.java:21315)
at android.os.Handler.handleCallback(Handler.java:743)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:150)
at android.app.ActivityThread.main(ActivityThread.java:5659)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:822)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:712)
D/OkHttp: --> GET http://192.168.1.10:82/api/v1.0/item?pageSize=1¤tPage=1&sortBy=1
--> END GET
I/Process: Sending signal. PID: 9120 SIG: 9
I/System: core_booster, getBoosterConfig = false
json for - http://192.168.1.10:82/api/v1.0/item?pageSize=1¤tPage=1&sortBy=1
{
Results: [
{
epc: "202020202020202030303031",
barcode: "0001",
name: "Televisor Samnsung",
description: "0001",
creation_date: "2020-02-26T10:55:06",
last_update: "2020-02-26T10:55:06",
last_seen: "2020-02-26T10:55:06",
brand: "Samnsung",
serial_number: "0001",
parent: "",
fk_category: 1,
responsable: "",
purchase_date: "2020-02-26T10:55:06",
cost: 0,
fk_location: 1008,
fk_item_state: 1,
inventory_date: "2020-02-26T10:55:06"
}
],
CurrentPage: 1,
PageCount: 65565,
PageSize: 1,
RecordCount: 65565
}
Thanks in advance, I really appreciate your help.
I did some changes to class MainActivity
public class MainActivity extends AppCompatActivity {
...
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
tvResult = findViewById(R.id.tv_result);
send = findViewById(R.id.btnSend);
mItemSyncDetails = new ItemSyncDetails();
mProgressDialog = new ProgressDialog(this);
mItemsList = new ArrayList<Item>();
HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
OkHttpClient client = new OkHttpClient.Builder()
.addInterceptor(interceptor)
.addNetworkInterceptor(new Interceptor() {
@Override
public okhttp3.Response intercept(Chain chain) throws IOException {
Request request = chain.request().newBuilder()
.build();
return chain.proceed(request);
}
}).build();
retrofit = new Retrofit.Builder()
.baseUrl("http://192.168.1.10:82/api/v1.0/")
.client(client)
.addCallAdapterFactory(RxJava3CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.build();
send.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
showProgressDialog("Items");
getRecordsCount();
}
});
}
private void getRecordsCount(){
retrofit.create(FrekuencyApi.class)
.getRecordsCount(1,1,1)
.subscribeOn(Schedulers.io())
.retry(3)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::HandleResults, this::handleError,this::getNumPagesHandlerComplete );
}
private void HandleResults(ItemSyncDetails itemSyncDetails) {
this.mItemSyncDetails = itemSyncDetails;
int pageSize = 100;
int numPages = itemSyncDetails.getRecordCount()/pageSize;
if (itemSyncDetails.getRecordCount() < pageSize || itemSyncDetails.getRecordCount()%pageSize != 0){
numPages++;
}
this.mNumPages = numPages;
tvResult.append("Tamaño de pagina: " + pageSize + "\n");
tvResult.append("Numero de paginas: " + this.mNumPages + "\n");
tvResult.append("Numero total de registros: " + itemSyncDetails.getRecordCount() + "\n");
}
private void getNumPagesHandlerComplete() {
getAllRecords(mNumPages);
}
private void getAllRecords(int numPages){
frekuencyApi = retrofit.create(FrekuencyApi.class);
//numPages: total of pages are the number of times to send the request to API
Observable.range(1, numPages)
.concatMap(i -> frekuencyApi.getItemsPageDetails(100,i,1))
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::getAllHandleResults, this::handleError,this::handleComplete);
}
private void getAllHandleResults(ItemSyncDetails itemSyncDetails) {
//TODO: Should it be the right place to save data to DB?
//Get the progress
int tmp = getProgress(itemSyncDetails.getPageCount(),
itemSyncDetails.getCurrentPage(),itemSyncDetails.getPageSize());
//Update ProgressDialog Progress
mProgressDialog.setProgress(tmp);
tvResult.setText("Progreso: "+ tmp + "%\n");
if (itemSyncDetails.getCurrentPage() == itemSyncDetails.getPageCount()){
//Showing on screen when last request is done
tvResult.append("******************\n");
tvResult.append(itemSyncDetails.getItemList().toString());
tvResult.append("******************\n");
}
}
private void handleComplete() {
//when the last request finished
tvResult.append("\nProceso de Sincronizacion terminado con exito!");
closeProgressDialog();
}
private void handleError(Throwable throwable) {
Log.e("Observer", ""+ throwable.toString());
tvResult.append("\n******************\n");
tvResult.append("ERROR DE CONEXION...\n");
tvResult.append(throwable.toString());
Toast.makeText(this, "ERROR DE CONEXION",
Toast.LENGTH_LONG).show();
closeProgressDialog();
}
...
With the changes to the MainActivity class, basically there are two methods getRecordsCount() and getAllRecords(int numPages), each one fires an RX Java call process, the first one invokes one Retrofit call, and based on the answer of that first call, the second method is called and it will send (n) request to the API, where n is parameter numPages, I use a progressDialog that it is updated and when the hole process finished the progressDialog is close(). On the onComplete method of the first RX java call finished, it called the second RX Java method, at this point what I need is:
- How can I improve this to a more elegant code in one Rx java process, instead of two calls?
- I am thinking to put a Room insert call to the DB for a List on the getAllHandleResults() method, it is the right place to do the insert to the DB?
- Right now I am using concatMap on the getAllHandleResults() method, as far as I understand, this operator execute the Retrofit calls in a sequential order, I am doing it this way, because before trying RX Java + Retrofit, I was using Volley + Room + LiveData + ViewModel + AsyncTask, I did notice that the DB insertion was done one at a time, this operation is slower than the Retrofit Call, and one insertion operation needs to wait to the previous insertion to finished, so now I have the DB part using Room + LiveData + ViewModel + AsyncTask, so when the insert method is invoke from the onNext method (this happens on the Main-thread), the insertion will be execute on an AsyncTask in the background Thread, do you have any recommendation on this matter? can be this DB part done as well using RX Java? do you have some web-page where I can get information about it? Once again, thanks a lot for your help.