1

I have trouble testing a my RxJava code as it seems to always run through the test without awaiting the execution that is ought to happen when calling subscribe().

This is the Repository to test:

public class Repository<T extends Entity> {

    private final DataSource<T> remoteDataSource;
    private final DataSource<T> localDataSource;

    public Repository(DataSource<T> remoteDataSource, DataSource<T> localDataSource) {
        this.remoteDataSource = remoteDataSource;
        this.localDataSource = localDataSource;
    }

    public Observable<Boolean> save(T entity) {

        return localDataSource
                .save(entity)
                .flatMap(success -> {
                    if (success) {
                        return remoteDataSource.save(entity);
                    }
                    return Observable.just(Boolean.FALSE);
                })
                .doOnNext(success -> {
                    if (success) {
                        if (cache == null) {
                            cache = new LinkedHashMap<>();
                        }
                        cache.put(entity.getId(), entity);
                    }
                });
    }
}

This the DataSource interface:

public interface DataSource<T extends Entity> {
    <T> Observable<T> get();

    Observable<Boolean> save(T entity);

    Observable<Boolean> clear();

    Observable<Boolean> remove(T entity);
}

And this is the Unit-Test including a MockDataSource that is ought to simulate data access with different execution timings:

public class RepositoryTest {

    @Test
    public void testRepository() {

        MockSource remoteSource = new MockSource("RemoteSource", 1000L);
        MockSource localSource = new MockSource("LocalSource", 200L);
        Repository<Poi> poiRepository = new Repository<>(remoteSource, localSource);

        Poi poi1 = newMockPoi();

        Observable<Boolean> obs = poiRepository.save(poi1);
        TestSubscriber<Boolean> testSubscriber = new TestSubscriber<>();
        obs.subscribe(testSubscriber);

        testSubscriber.assertNoErrors();
        testSubscriber.assertReceivedOnNext(Arrays.asList(true));


    }

    private Poi newMockPoi() {
        Poi poi = new Poi();
        poi.name = RandomStringUtils.randomAlphabetic(12);
        poi.description = RandomStringUtils.randomAlphabetic(255);
        poi.latitude = new Random().nextDouble();
        poi.longitude = new Random().nextDouble();
        return poi;
    }


    private class Poi extends Entity {
        String name;
        String description;
        Double latitude;
        Double longitude;
    }

    private class MockSource implements DataSource<Poi> {

        private String name;
        private final long delayInMilliseconds;

        private Map<Long, Poi> pois = new LinkedHashMap<>();

        private MockSource(String name, long delayInMilliseconds) {
            this.delayInMilliseconds = delayInMilliseconds;
            this.name = name;
        }

        @Override
        public Observable<List<Poi>> get() {
            return Observable
                    .zip(
                            Observable
                                    .just(pois)
                                    .map(Map::entrySet)
                                    .flatMapIterable(entries -> entries)
                                    .map(Map.Entry::getValue)
                                    .toList(),
                            Observable
                                    .interval(delayInMilliseconds, TimeUnit.MILLISECONDS), (obs, timer) -> obs)
                    .doOnNext(pois -> System.out.println("Soure " + name + " emitted entity"));
        }

        @Override
        public Observable<Boolean> save(Poi entity) {
            return Observable
                    .zip(
                            Observable.just(true).asObservable(),
                            Observable.interval(delayInMilliseconds, TimeUnit.MILLISECONDS), (obs, timer) -> obs)
                    .doOnNext(value -> pois.put(entity.getId(), entity))
                    .doOnNext(pois -> System.out.println("Soure " + name + " saved entity"));
        }

        @Override
        public Observable<Boolean> clear() {
            return Observable
                    .zip(
                            Observable.just(true).asObservable(),
                            Observable.interval(delayInMilliseconds, TimeUnit.MILLISECONDS), (obs, timer) -> obs)
                    .doOnNext(value -> pois.clear())
                    .doOnNext(pois -> System.out.println("Soure " + name + " cleared all entities"));
        }

        @Override
        public Observable<Boolean> remove(Poi entity) {
            return Observable
                    .zip(
                            Observable.just(true).asObservable(),
                            Observable.interval(delayInMilliseconds, TimeUnit.MILLISECONDS), (obs, timer) -> obs)
                    .doOnNext(value -> pois.remove(entity))
                    .doOnNext(pois -> System.out.println("Soure " + name + " removed entity"));
        }
    }
}

This is the output:

java.lang.AssertionError: Number of items does not match. Provided: 1  
Actual: 0.
Provided values: [true]
Actual values: []
 (0 completions)

at rx.observers.TestSubscriber.assertionError(TestSubscriber.java:667)
at rx.observers.TestSubscriber.assertReceivedOnNext(TestSubscriber.java:320)
at nl.itc.geofara.app.data.source.RepositoryTest.testRepository(RepositoryTest.java:37)

Also, setting breakpoints in the repository's save method and running in debug mode show that the code inside e.g. .flatMap(success -> ...) is never ever invoked.

John Smith
  • 752
  • 9
  • 35
  • See https://stackoverflow.com/a/39828581/4191629, "Troubleshooting", "Race condition" – maciekjanusz Aug 04 '17 at 09:18
  • I have seen the Schedulers.immediate() notion before, the given source however never invokes subscribeOn() anywhere. I thought that all tasks are now being executed on the same thread as the unit test itself. Am I wrong in that assumption? – John Smith Aug 04 '17 at 09:22
  • @maciekjanusz Add solution #1 as an answer and I mark it as the correct. An explanation why this works would be awesome! – John Smith Aug 04 '17 at 09:28
  • Answer added. Happy to help! – maciekjanusz Aug 04 '17 at 09:32

1 Answers1

2

You are experiencing a race condition between the observable execution and the JUnit thread, because Observable.interval that you use in your mock source introduces implicit subscription on computation scheduler:

Scheduler: interval operates by default on the computation Scheduler.

Also, see more explanation in this answer, "Troubleshooting", "Race condition".

maciekjanusz
  • 4,702
  • 25
  • 36