5

I'm new in rxjava or rxandroid and I want to change my AsyncTask and callback hell to observable and subscriber. But I have a problem with that. I need to do two request to my database.

My response from the first request will be a result for second request. I try to resolve this with flatMap. First request return value and everything is ok, but next request give me NetworkOnMainThreadException.

I know that request execute on the main thread but why? I try to add subscribeOn(Schedulers.io()) before flatMap but result is the same. Could you help me with that and explain what am I doing wrong? Thanks in advance. my code......

private void getFavouriteList(){
    Observable.create((Observable.OnSubscribe<PaginatedScanList<UserDO>>) subscriber -> {
        final Map<String, AttributeValue> filterExpressionAttributeValues = new HashMap<>();
        filterExpressionAttributeValues
                .put(":val1", new AttributeValue().withS(sharedPreferences.getString("socialId", "")));
        final DynamoDBScanExpression scanExpression = new DynamoDBScanExpression()
                .withFilterExpression("socialId = :val1")
                .withExpressionAttributeValues(filterExpressionAttributeValues);
        PaginatedScanList<UserDO> result = dynamoDBMapper.scan(UserDO.class, scanExpression);
        Log.d(TAG, "first result size " + result.size());
        subscriber.onNext(result);
        subscriber.onCompleted();
    })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .filter(result -> {
                if(result.isEmpty()) {
                    Toast.makeText(context, "Can not find user", Toast.LENGTH_SHORT).show();
                    return false;
                }
                return true;
            })
            .flatMap(user -> Observable.from(user.get(0).getFavourites()))
            .subscribeOn(Schedulers.io())
            .flatMap(result -> {
                final Map<String, AttributeValue> filterExpressionAttributeValues = new HashMap<>();
                filterExpressionAttributeValues
                        .put(":val1", new AttributeValue().withS(result));
                filterExpressionAttributeValues
                        .put(":val2", new AttributeValue().withN("1"));
                final DynamoDBScanExpression scanExpression = new DynamoDBScanExpression()
                        .withFilterExpression("productId = :val1 and selling = :val2")
                        .withExpressionAttributeValues(filterExpressionAttributeValues);
                PaginatedScanList<ProductDO> res = dynamoDBMapper.scan(ProductDO.class, scanExpression);
                Log.d(TAG, "second result size " + res.size());
                return Observable.from(res);
            })
            .subscribe(new Subscriber<ProductDO>() {
                @Override
                public void onCompleted() {
                    favouriteProgressBar.setVisibility(View.INVISIBLE);
                }

                @Override
                public void onError(Throwable e) {
                    e.printStackTrace();
                    favouriteProgressBar.setVisibility(View.INVISIBLE);
                }

                @Override
                public void onNext(ProductDO productDO) {
                    Log.d(TAG, "productId " + productDO.getProductId());
                    product.add(productDO);
                    adapter.notifyDataSetChanged();
                }
            });
}
rafsanahmad007
  • 23,683
  • 6
  • 47
  • 62
Alex Khotiun
  • 431
  • 4
  • 16

1 Answers1

4

Move your .observeOn(AndroidSchedulers.mainThread()) to just before your subscribe.

PS: You should not be using Observable.create() unless there's no other alternative.

Edited to fix the Toast in the filter issue.

private void getFavouriteList(){
    Observable.create((Observable.OnSubscribe<PaginatedScanList<UserDO>>) subscriber -> {
        final Map<String, AttributeValue> filterExpressionAttributeValues = new HashMap<>();
        filterExpressionAttributeValues
                .put(":val1", new AttributeValue().withS(sharedPreferences.getString("socialId", "")));
        final DynamoDBScanExpression scanExpression = new DynamoDBScanExpression()
                .withFilterExpression("socialId = :val1")
                .withExpressionAttributeValues(filterExpressionAttributeValues);
        PaginatedScanList<UserDO> result = dynamoDBMapper.scan(UserDO.class, scanExpression);
        Log.d(TAG, "first result size " + result.size());
        subscriber.onNext(result);
        subscriber.onCompleted();
    })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .filter(result -> {
                if(result.isEmpty()) {
                    Toast.makeText(context, "Can not find user", Toast.LENGTH_SHORT).show();
                    return false;
                }
                return true;
            })
            .observeOn(Schedulers.io())
            .flatMap(user -> Observable.from(user.get(0).getFavourites()))
            .flatMap(result -> {
                final Map<String, AttributeValue> filterExpressionAttributeValues = new HashMap<>();
                filterExpressionAttributeValues
                        .put(":val1", new AttributeValue().withS(result));
                filterExpressionAttributeValues
                        .put(":val2", new AttributeValue().withN("1"));
                final DynamoDBScanExpression scanExpression = new DynamoDBScanExpression()
                        .withFilterExpression("productId = :val1 and selling = :val2")
                        .withExpressionAttributeValues(filterExpressionAttributeValues);
                PaginatedScanList<ProductDO> res = dynamoDBMapper.scan(ProductDO.class, scanExpression);
                Log.d(TAG, "second result size " + res.size());
                return Observable.from(res);
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<ProductDO>() {
                @Override
                public void onCompleted() {
                    favouriteProgressBar.setVisibility(View.INVISIBLE);
                }

                @Override
                public void onError(Throwable e) {
                    e.printStackTrace();
                    favouriteProgressBar.setVisibility(View.INVISIBLE);
                }

                @Override
                public void onNext(ProductDO productDO) {
                    Log.d(TAG, "productId " + productDO.getProductId());
                    product.add(productDO);
                    adapter.notifyDataSetChanged();
                }
            });
}
JohnWowUs
  • 3,053
  • 1
  • 13
  • 20
  • Thaks, it's really help. What is the problem with `Observable.create()`? – Alex Khotiun Jan 13 '17 at 11:27
  • It's really easy to make a mistake using `Observable.create()`. See [here](http://stackoverflow.com/documentation/rx-java/1418/observable/4633/create-an-observable#t=201701131129135582063) for documentation on alternatives. – JohnWowUs Jan 13 '17 at 11:33
  • There is another problem when I place `.observeOn(AndroidSchedulers.mainThread())` before subscribe, the `Toast` in the filter doesn't work. It's give me `android.view.ViewRootImpl$CalledFromWrongThreadException` – Alex Khotiun Jan 13 '17 at 12:17
  • The second `.subscribeOn(Schedulers.io())` is unnecessary and has no practical effects other than wasting a thread. – akarnokd Jan 13 '17 at 14:08