3

I am trying to do the following; sync a cloud DB using Retrofit to a local SqLite DB (Room) on a device. The DB could get large, around 100,000 registers or more, so the sync process can take some time. So it send a first Retrofit request to get the number of register, so it can calculate the total number of pages, after that it will send multiple Retrofit Request, to get all the data from API, after each request, it saves the data to Room.

Right now, I am having trouble combining two RxJava calls or process, also on the second RxJava process, after a Retrofit call, there is a Room Insert of a List-of-Objets, but after the hole process ends, I notice that not 100% of all the records are inserted, every time that I run the process, the number of records inserted change, it is around 80% - 98%, but never 100%, even though all the Retrofit calls are sent.

Please help me with:

  1. How to make all the process in one RxJava call, not 2 like I have it now?
  2. How to insert 100% of records to Room?

Following the code:

Gradle

def room_version = "2.2.5"
//RxJava 2
implementation "io.reactivex.rxjava2:rxjava:2.2.19"
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
//Retrofit
implementation 'com.squareup.retrofit2:retrofit:2.8.1'
implementation 'com.squareup.retrofit2:converter-gson:2.8.1'
//Retrofit2 Adapter for RxJava 2
implementation "com.squareup.retrofit2:adapter-rxjava2:2.8.1"
//okhttp3 Logging Interceptor
implementation "com.squareup.okhttp3:logging-interceptor:4.5.0"
//Room
implementation "androidx.room:room-runtime:$room_version"
annotationProcessor "androidx.room:room-compiler:$room_version"
//RxJava support for Room
implementation "androidx.room:room-rxjava2:$room_version" 

ItemSyncDetails

...
public class ItemSyncDetails {
    @SerializedName("CurrentPage")
    int currentPage;
    @SerializedName("PageCount")
    int pageCount;
    @SerializedName("PageSize")
    int pageSize;
    @SerializedName("RecordCount")
    int recordCount;
    @SerializedName("Results")
    List<Item> mItemList;
...
}

ItemDao

Note: I haven't used Observer/Flowable/Maybe/Single, because I having been able to make it work with RxJava

import io.reactivex.Flowable;

@Dao
public interface ItemDao {

    @Insert(onConflict = OnConflictStrategy.REPLACE)
    long insert(Item item);

    @Insert(onConflict = OnConflictStrategy.REPLACE)
    List<Long> insertAll(List<Item> items);
...

DataApi

import io.reactivex.rxjava3.core.Observable;
...

public interface DataApi {

    @GET("item")
    Observable<ItemSyncDetails> getItemsByPage(
            @Query("pageSize") Integer pageSize,
            @Query("currentPage") Integer currentPage,
            @Query("sortBy") Integer sortBy
    );

ItemRepository

import io.reactivex.Observable;
    ...

    public class ItemRepository {
    ...

        public ItemRepository(Application application) {
            mDataApi = RetrofitClient.getRetrofitInstance("http://192.168.1.100").create(DataApi.class);
            RfidDatabase db = RfidDatabase.getAppDatabase(application);
            itemDao = db.itemDao();
            itemList = itemDao.getAllItems();
            inserts = 0;
        }

        public List<Long> insertAllLocal (List<Item> itemList) {
            List<Long> items = itemDao.insertAll(itemList);
            inserts += items.size();
            Log.i(TAG, "************insertAllLocal - ItemRepository: " + inserts + "*************");
            Log.i(TAG, "************insertAllLocal - ItemRepository: " + items);
            return items;
        }

        public Observable<ItemSyncDetails> getRecordsCount(){
            return mDataApi.getItemsByPage(1,1,1);
        }

        public Observable<ItemSyncDetails> getItemsPerPage(int pageSize,int currentPage){
            return mDataApi.getItemsByPage(pageSize,currentPage,1);
        }
    ...

SyncConfigFragment 

    import io.reactivex.Observable;
    import io.reactivex.android.schedulers.AndroidSchedulers;
    import io.reactivex.disposables.CompositeDisposable;
    import io.reactivex.functions.Function;
    import io.reactivex.schedulers.Schedule
    ...

    public class SyncConfigFragment extends Fragment {


        private ItemViewModel itemViewModel;
        private ImageView imageSyncItems;
        private ProgressDialog progressDialog;
        private TextView tvSyncDescriptionItems;
        private DataApi service;
        private ItemSyncDetails mItemSyncDetails;
        private List<Item> mItemlist;
        private CompositeDisposable mCompositeDisposable;
        private int mNumPages;
        private int syncProgress;
        ...

        @Override
        public View onCreateView(LayoutInflater inflater, ViewGroup container, Bundle savedInstanceState) {
            View view =  inflater.inflate(R.layout.fragment_config_sync,container, false);
            progressDialog = new ProgressDialog(getActivity());
            sharedPref = getActivity().getSharedPreferences(
                    getString(R.string.sharepref_filename), Context.MODE_PRIVATE);
            mItemlist = new ArrayList<Item>();
            mCompositeDisposable = new CompositeDisposable();
            itemViewModel = ViewModelProviders.of(this).get(ItemViewModel.class);
            tvSyncDescriptionItems = view.findViewById(R.id.tvDescriptionSyncItems);
            if(sharedPref.contains("last_sync_item")) {
                tvSyncDescriptionItems.setText("Última actualización " + sharedPref.getString("last_sync_item",""));
            } else{
                tvSyncDescriptionItems.setText("No se ha Sincronizado");
            }
            imageSyncItems = view.findViewById(R.id.imageViewSyncItems);
            imageSyncItems.setOnClickListener(clickListener);
            return view;
        }

        private View.OnClickListener clickListener = new View.OnClickListener() {
            public void onClick(View v) {
                    if (v.equals(imageSyncItems)) {
                //If I uncomment the next line it does not work
                        //mCompositeDisposable.add(
                        mNumPages = 0;
                        syncProgress = 0;
                        showProgressDialog("Items");
                        getRecordsCount();
                       //); Closing round bracket for mCompositeDisposable
                }
            }
        };//End View.OnClickListener 

        private void getRecordsCount(){
            itemViewModel.getRecordsCount()
                    .subscribeOn(Schedulers.io())
                    .retry(3)
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(this::HandleResults, this::handleError,this::getNumPagesHandlerComplete );
        }

        private void HandleResults(ItemSyncDetails itemSyncDetails) {
            this.mItemSyncDetails = itemSyncDetails;
            int pageSize = 100;
            int numPages = itemSyncDetails.getRecordCount()/pageSize;
            if (itemSyncDetails.getRecordCount() < pageSize || itemSyncDetails.getRecordCount()%pageSize != 0){
                numPages++;
            }
            this.mNumPages = numPages;
        }

        private void getNumPagesHandlerComplete() {
            getAllRecords(mNumPages);
        }

        private void handleError(Throwable throwable) {
            tvSyncDescriptionItems.setText("**********Error de conexión...");
            closeProgressDialog();
        }

        private void getAllRecords(int numPages){
            //numPages: total of pages are the number of times to send the request to API
            Observable.range(1, numPages)
                    .flatMap(i -> itemViewModel.getItemsPerPage(100,i))
                    .map(new Function<ItemSyncDetails, Integer>() {
                        @Override
                        public Integer apply(ItemSyncDetails itemSyncDetails) throws Throwable {
                            return itemViewModel.insertAllLocal(itemSyncDetails.getItemList()).size();
                        }
                    })
                    .subscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(this::getAllHandleResults, this::handleError,this::handleComplete);
        }

        private void getAllHandleResults(Integer i) {
            progressDialog.setProgress(getProgress(i));
        }

        private void handleComplete() {
            //last request finished
            closeProgressDialog();
        }

        private int getProgress(int newItems){
            syncProgress += newItems;
            int progress = 0;
            if (syncProgress == mItemSyncDetails.getRecordCount()){
                progress = 100;
            } else {
                progress = (100 * syncProgress)/mItemSyncDetails.getRecordCount();
            }
            return progress;
        }
    ...
    }

http://192.168.1.10:82/api/v1.0/item?pageSize=1&currentPage=1&sortBy=1

Note: The page size could change, I am using a fixed size of a 100 items per page.

{
  Results: [
  {
    epc: "202020202020202030303031",
    barcode: "0001",
    name: "Televisor Samnsung",
    description: "0001",
    creation_date: "2020-02-26T10:55:06",
    last_update: "2020-02-26T10:55:06",
    last_seen: "2020-02-26T10:55:06",
    brand: "Samnsung",
    serial_number: "0001",
    parent: "",
    fk_category: 1,
    responsable: "",
    purchase_date: "2020-02-26T10:55:06",
    cost: 0,
    fk_location: 1008,
    fk_item_state: 1,
    inventory_date: "2020-02-26T10:55:06"
  }
 ],
 CurrentPage: 1,
 PageCount: 65565,
 PageSize: 1,
 RecordCount: 65565
}
MarioV
  • 108
  • 11
  • There's a lot of code here. Can you please only post the relevant code and remove the unrelated code? Also, can you please rephrase what you're trying to do? Here's an example: "Download a list with 1000 items, that are paginated, and when each page of items is downloaded, save them in local db, and after all pages are saved, update the view". – denvercoder9 May 12 '20 at 10:51
  • Hi @sonnet, question edited, thanks. – MarioV May 12 '20 at 22:54

1 Answers1

1

You posted a json response here before the edit.

    CurrentPage: 1,
    PageCount: 65566,
    PageSize: 1,
    RecordCount: 65566

If I understand correctly, then you have 65k items and 1 item in each page. Meaning 65k pages which means 65k network calls. That's a lot. You could improve this design first.

  1. Divide the entire records into a few pages (maybe even 10 or 20). 1 page will still have thousands of items if the entire records has 10s of thousands of items.
  2. Then use gzip compression to compress the json responses for each page and serve that from the server. Or don't divide the records into pages and pass them all in one response compressed with gzip (if it's not that big).
  3. Unzip the response on android, parse it and then do whatever you want.

This way you reduce a lot of network calls and possibly reduce the wait time for sync.

As to your actual rx question:

val pageSize = 100
viewModel.getRecordsCount()
    .map {
        // logic from `HandleResults` function
        // do some calculation
        var numPages: Int = it.records / pageSize
        if (it.records < pageSize || it.records % pageSize != 0) {
            numPages++
        }
        return@map numPages
    }
    .flatMap { pages -> Observable.range(1, pages) }
    .flatMap { page -> viewModel.getItemsPerPage(pageSize, page) }
    .flatMap { itemSyncDetails ->
        val items = viewModel.insertAllLocal(itemSyncDetails.getItemList())
        return@flatMap Observable.just(items.size)
    }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(....)

I notice that not 100% of all the records are inserted, every time that I run the process, the number of records inserted change, it is around 80% - 98%, but never 100%, even though all the Retrofit calls are sent.

Log the error in handleError function and see what the actual problem is.

denvercoder9
  • 2,979
  • 3
  • 28
  • 41
  • Thanks for you replay @sonnet, I edit the question again and put the json, also if you see the url, there is a parameter pagesize where one can change the page size, what I am doing is; I first send a request rxJava-retrofit with pagesize = 1, just to know the total size of elements and to calculate the total number of pages with a pagesize=100, after that I start sending rxJava-Retrofit request using Range from 1 to total Number Pages, then after every request, I insert the List-of-Items I get from the API to Room, I hope it is more clear now. – MarioV May 13 '20 at 14:46
  • per the DB error, I am using a HTTP Logger, but I don't see any errors on log, the only thing I have noticed is that I don't see all HTTP request per all pages, also I don't see all DB inserts Logs, but not errors, I wasn't sure why is that happening, I thought it just cannot handle all logs, so it skip some of them, any thoughts? – MarioV May 13 '20 at 15:35
  • 1
    I did some benchmarks. Assuming your record items look like the one posted in the json and you have 65k items, then the total json size would be 31.85 MB. If you apply gzip compression on that file then it becomes ONLY 139 KB. If the idea is to sync your local db with the server, then I'd suggest you send the entire thing at once in a compressed format, then decompress it, parse the json and then insert the list – denvercoder9 May 13 '20 at 20:56
  • thanks for you replay, so what you are saying is; before the API replies on the server, it will gzip the json response, then on the device, it will get the response, un-zip it and save it to the DB, right? so my next questions is where do I start? Can you recommend a tutorial to do that? I am working on Android-Java with an API C#.Net on the server. – MarioV May 13 '20 at 21:34
  • You might want to check out this too https://stackoverflow.com/questions/61741592/android-inserting-large-amount-of-data-inside-room-database Go through the docs of android room and see if there's any limitation on insert or batch inserts. That could be a problem on the db side. As for the server, just google some basic gzip compression using c# and gzip decompression using kotlin/java with retrofit. There are plenty of examples online. – denvercoder9 May 15 '20 at 08:13