3

I have the following scenario: I have an object called Foo that my application is interested in. This Foo object comes from a remote service, and I would like to cache this instance locally and keep using it until a certain amount of time has passed.

So far, I've tried creating a FooState class that contains the instance of Foo along with a timestamp indicating the time the Foo was fetched in milliseconds:

public class FooState {

    private Foo foo;
    private long timestamp;

    /* Constructor and getters */

}

Now, so far I've come up with this code that uses concat:

public Observable<Foo> foo() {
    return Observable.concat(local(), remote())
               .takeFirst(fooState -> fooState.getTimestamp() >= System.currentTimeMillis())
               .map(fooState -> fooState.getFoo())
               .defaultIfEmpty(new Foo());

}

private Observable<FooState> local() {
    return Observable.just(cache.hasFooState() ? cache.getFooState() : new FooState(null, 0));
}

private Observable<FooState> remote() {
    return api.getFoo()
              .map(foo -> new FooState(foo, System.currentTimeMillis() + ONE_DAY_MILLIS)
              .doOnNext(fooState -> {
                   cache.save(fooState);
              });
}

Basically, if there's a cached value, I want to use it as long as the timestamp isn't expired. If the timestamp is expired or there is no cached value, I want to fetch from the remote service and cache the result.

Is there a cleaner way to implement this use case? I'm kind of new to RxJava and I was wondering if any Rx-gurus knew of a better way to handle this scenario.

2 Answers2

0

You can also use built-in .timestamp() operator and Timestamped<T> class.

    BehaviorSubject<Timestamped<Foo>> subject = BehaviorSubject.create();

    Observable<Foo> serviceCall() {
        return subject.filter(new Func1<Timestamped<Foo>, Boolean>() {
            @Override
            public Boolean call(Timestamped<Foo> tsFoo) {
                return tsFoo.getTimestampMillis() < expiry;
            }
        }).switchIfEmpty(serviceCall().timestamp().doOnNext(new Action1<Timestamped<Foo>>() {
            @Override
            public void call(Timestamped<Foo> tsFoo) {
                subject.onNext(tsFoo);
            }
        })
        .map(new Func1<Timestamped<Object>, Foo>() {
            @Override
            public Foo call(Timestamped<Object> tsFoo) {
                return tsFoo.getValue();
            }
        });
    }
Maxim Volgin
  • 3,957
  • 1
  • 23
  • 38
0

I came up with something similar to your approach with concat as it seems to be the most appropriate for this use case, but build my solution based on Maybe (RxJava 2) which will not emit anything from local if there is no cache or that cache entry is expired.

public class RxJavaUnitTestJava {
    public class Foo extends Object {
        String source = "n/a";
    }

    public class FooState {
        private Foo foo;
        private long timestamp;

        public FooState(Foo foo, long timestamp) {
            this.foo = foo;
            this.timestamp = timestamp;
        }
    }

    private static long EXPIRATION = 0l;
    private FooState cachedFooState = null;

    private Maybe<Foo> local() {
        return Maybe.fromCallable(() -> {
            System.out.println("checking local");

            if (cachedFooState != null && cachedFooState.timestamp + EXPIRATION > System.currentTimeMillis()) {
                System.out.println("taking unexpired local");
                cachedFooState.foo.source = "local"; // mark source of this foo

                return cachedFooState.foo;
            } else {
                System.out.println("not taking local");
                return null;
            }
        });
    } 

    private Maybe<Foo> remote() {
        return Maybe.fromCallable(() -> {
            System.out.println("checking remote");

            Foo foo = new Foo();
            foo.source = "remote"; // mark source of this foo
            FooState fooState = new FooState(foo, System.currentTimeMillis());
            cachedFooState = fooState;

            return fooState.foo;
        });
    }

    private Observable<Foo> foo() {
        return Maybe.concat(local(), remote())
                .take(1)
                .toObservable();
    }

    @Test
    public void testNoCachedLocal() {
        cachedFooState = null;

        foo()
                .doOnNext(foo -> System.out.println("doOnNext: foo.source: " + foo.source))
                .doOnComplete(() -> System.out.println("onComplete"))
                .test()
                .assertValueCount(1)
                .assertComplete();
    }

    @Test
    public void testExpiredLocal() {
        cachedFooState = new FooState(new Foo(), System.currentTimeMillis());
        EXPIRATION = 0l;

        foo()
                .doOnNext(foo -> System.out.println("doOnNext: foo.source: " + foo.source))
                .doOnComplete(() -> System.out.println("onComplete"))
                .test()
                .assertValueCount(1)
                .assertComplete();
    }

    @Test
    public void testUnExpiredLocal() {
        cachedFooState = new FooState(new Foo(), System.currentTimeMillis());
        EXPIRATION = TimeUnit.SECONDS.toMillis(30);

        foo()
                .doOnNext(foo -> System.out.println("doOnNext: foo.source: " + foo.source))
                .doOnComplete(() -> System.out.println("onComplete"))
                .test()
                .assertValueCount(1)
                .assertComplete();
    }
}
dkarmazi
  • 3,199
  • 1
  • 13
  • 25