1

I have a Single that makes a REST call. This Single can get called up to 10 times at the same time, which ends up in 10 requests of the same thing. I don't see any way to get a hot Single and I tried using cache() when returning the Single to no avail.

How can I achieve this?

Here is the logic I follow to make the call:

fun getUser(userID: UUID): Single<User> {
        if (userCache.containsUser(userID)) {
            // Just return the value already saved in the cache
            return Single.create {
                it.onSuccess(getUserFromCache(userID))
            }
        } else {
            // Make rest call, add user in cache, and then return that user
            return Single.fromCallable {
                val user = getUserFromRest(userID).blockingGet()

                userCache.addUser(user)

                return@fromCallable user
            }.cache()
        }
    }

I have updated the previous code to implement caching:

     private var ongoingRequests: HashMap<UUID, Single<User>> = HashMap()
     fun getUser(userID: UUID): Single<User> {
        if (userCache.containsUser(userID)) {
            return Single.create {
                it.onSuccess(getUserFromCache(userID))
            }
        } else if (ongoingRequests.containsKey(userID)) {
            return ongoingRequests[userID]!!
        } else {
            val request =  Single.create<User> {
                getUserFromRest(userID).subscribe(
                    { user ->
                        userCache.addUser(user)
                        ongoingRequests.remove(userID)
                        it.onSuccess(user)
                    },
                    {
                    }
                )
            }.cache()

            ongoingRequests[userID] = request

            return request
        }
    }
ThomasFromUganda
  • 380
  • 3
  • 17
  • 2
    I have answred the same question here https://stackoverflow.com/a/59491423/8325761 – bubbles Mar 03 '20 at 07:54
  • @bubbles Thanks for responding! However, it does not seem to work. I checked the server logs and multiple calls have been made to the same resource. I have updated my question with pseudo and real code to show my logic. – ThomasFromUganda Mar 03 '20 at 14:22
  • 1
    if getUser() is called 10 times you have of course 10 requests to the server. You have to cache the result of getUser in a variable like `var cached = getUser(xx).cache()`, after that use `cached` – bubbles Mar 03 '20 at 14:36
  • @bubbles I have added more code to my question which has a list of all ongoing requests. Would that work? Also, if it would work, why is the `cache()` operator necessary? Wouldn't omitting it with the new code snippet I provided achieve the same result? If the code I provided would work, isn't there a more native solution? I feel like what I am doing is a common use case and would involve changing all my rest calls to hold a map of ongoing requests. I would have thought RxJava provided a native way to do this. – ThomasFromUganda Mar 03 '20 at 14:41

2 Answers2

1

There is two problem in your code. First, checking user existence runs when you will call getUser method, and not when you will subscribe.

In similar situations, you can return a deferred instance, with Single.deferred method.

In this case, I guess you do not needed. If you are using Single.cache, you can drop the caching logic too, because Rx does it instead.

private val shared: MutableMap<UUID, Single<User>> = mutableMapOf()

fun getUser(userID: UUID): Single<User> {

    return shared.getOrPut(userID, {
        return Single.fromCallable {
            val user = getUserFromRest(userID).blockingGet()

            userCache.addUser(user)

            return@fromCallable user
        }.cache()
    })
}
bvarga
  • 716
  • 6
  • 12
  • Oh, so you do need an external way to keep track of ongoing requests? What's the point of calling `cache()` when return the single? Wouldn't just omitting it return the same result? – ThomasFromUganda Mar 03 '20 at 15:10
  • 1
    if you return the Single without cache, every future subscriber will trigger a network call. If you call Single.cache() the returned Single instance will emitt previously emitted item for the future subscribers. – bvarga Mar 03 '20 at 15:30
1

I write the code in java to illustrate the idea (i'm not a kotlin developper, sorry)

Cache the result of the long procecessing call

Single<User> getUser(String userId) {
        return Single.<String>create(emitter -> {
            Thread.sleep(1000); // call the rest
            emitter.onSuccess(new User());
        }).cache();
    }

And then of course you can use a map to reuse the result:

Map<String, Single<User>> ongoingRequests = new HashMap<>();
Single<User> call = ongoingRequests.computeIfAbsent(userId, this::getUser);
call.subscribe()

call is executed once and then cached now.

bubbles
  • 2,597
  • 1
  • 15
  • 40
  • Alright, I definitely understand your solution and I guess it's what I am going to end up doing. I am disappointing that RxJava or Retrofit doesn't have a simple way to avoid making the same call at the same time and instead just return the result if there is an ongoing request. – ThomasFromUganda Mar 03 '20 at 15:12
  • 1
    Yes, that's the way we do it in RxJava, caching, is up to the client. If I designed it, I wouldn't put the cache as a default behavior, it is better to let the user decide to cache the result or not – bubbles Mar 03 '20 at 15:19