2

In my API I have a service that calls an external API. This external API has a cost per request. Hence I would like to queue up the requests to the external API.

This is how it looks today:

@Component
public class OrderService {

    private final WebClient webClient;

    public OrderService(WebClient webClient) {
        this.webClient = webClient;
    }

    public Mono<List<Order>> getOrders(List<String> orderNumbers)  {
        return webClient
            .get()
            .uri(builder -> builder.queryParam("orderNumbers", String.join(",", orderNumbers)).build())
            .retrieve()
            .bodyToFlux(Order.class)
            .collectList();
    }
}

What I would like is to somehow queue up 10 order numbers, and after reaching 10 order numbers in the queue, send the bulk request, and then return the response to all the original requesters.

Let's say OrderService::getOrders is called three times, first with 4 order numbers, then with 4 additional order numbers, and lastly 4 order numbers again. The first two requests would queue up the total 8 order numbers without calling the external API. Once the third request arrives, we have 12 order numbers in the queue. At this point we will send a request with the first 10 order numbers. The last two remains queued since the maximum order numbers the external API allows is 10.

My API sees a lot of traffic, so it's not an issue to queue and wait, they will not have to wait a long time until the queue has 10 items.

I have tried to read up on how to achieve this with Spring WebFlux, and my research has pointed me to Sinks and Processor. However, I have a hard time wrapping my head around how it all ties together. For instance how I can keep track of which requests requested the specific order numbers.

Could someone more knowledgeable please help me out with how to achieve this?

orderlyfashion
  • 531
  • 1
  • 4
  • 14
  • What you're trying to accomplish is not a very good idea. It is possible, though you're going to have to make several adjustments to your logic. But think about this, what happens when the first request comes in with a single order, but the subsequent 9 orders don't come in for another 10 minutes? Is the first caller expected to wait for 10 minutes to get a response? – lane.maxwell Aug 09 '23 at 20:13
  • @lane.maxwell Thanks for your concern! Like I said, my API receives plenty of requests, so it's not really an issue. Nonetheless, `Igor Artamonov` managed to provide a solution that solves my original problem and your concern in one. :) – orderlyfashion Aug 10 '23 at 07:57
  • 1
    This pattern is called request collapsing. I'm aware of an open source library which implemented this with Reactor, you might take some inspiration from there: https://github.com/ExpediaGroup/molten/blob/master/molten-core/readme.md#request-collapsing – Martin Tarjányi Aug 12 '23 at 07:52

2 Answers2

1

It can be something like the following:

First, create a queue for the incoming requests:

private Sinks.Many<Tuples2(String, Sinks.One<Order>)> queue = Sinks.many().unicast().onBackpressureBuffer()

And have the following method to accept requests:

public Mono<List<Order>> getOrders(List<String> orderNumbers)  {
    return Flux.fromIterable(orderNumbers)
        .flatMapSequential(::getOrder)
        .collectList();
}

public Mono<Order> getOrder(String number) {
    Sinks.One<Order> callback = Sinks.one(); 
    // TODO handle FAIL_NON_SERIALIZED here by doing a park-retry
    queue.tryEmitValue(Tuples.of(number, callback));
    return callback.asMono();
}

Run this for processing

public void run() {
   queue
      .bufferTimeout(10, Duration.ofMilliseconds(100))   
      .flatMap( list -> {
          List numbers = list.stream().map(Tuples::getT1).collect(Collectors.toList());
          request(numbers)
            .doOnNext(orders -> {
                for(int i = 0; i < orders.size(); i++) {
                   list.get(i).getT2().tryEmitValue(orders[i]);
                }
            })
            .then()
      })
      .subscribe()
}

public Mono<List<Order>> request(List<String> orderNumbers)  {
    return webClient
        .get()
        .uri(builder -> builder.queryParam("orderNumbers", String.join(",", orderNumbers)).build())
        .retrieve()
        .bodyToFlux(Order.class)
        .collectList();
}
Igor Artamonov
  • 35,450
  • 10
  • 82
  • 113
  • The basics work great, thanks! There's a corner case where two requests might contain the same order number though. If we send in 6 order numbers, and then the same 6 order numbers in another request, it will put 12 items in the queue and perform the API call. The first requester gets the expected result back, but the second requester gets stuck forever and is never returned anything. Is it possible to return an existing `Mono` in case the order number is already queued? I tried unsuccessfully to adjust the logic by checking if queue contains the order number already. – orderlyfashion Aug 10 '23 at 07:27
  • I tried adding a new method between `getOrders` and `getOrder` that tries to find queued items or creating a new one: `return Mono.from(queue.asFlux().filter(tuple -> tuple.getT1().equals(orderNumber)).flatMap(tuple -> tuple.getT2().asMono())).switchIfEmpty(getOrder(orderNumber));` I can see in the logs that the external API is called and values returned, but my service never returns a value to the requester. I also tried adding a `ConcurrentMap` where I store the `Mono` and check there. That works, but I guess it should be possible and better with purely reactor. – orderlyfashion Aug 10 '23 at 11:09
  • You could do that with a `ConcurrentMap` for current in-flight requests. That may be tricky but is possible. For the example you shown, it's because the `queue` never completes and never empty, so `switchIfEmpty` is never called and I guess that's why it also never enqueued. – Igor Artamonov Aug 10 '23 at 19:52
0

I would try to use countDownLatch, this allow you to trigger an action once you get to the count for all the threads that are using your service.

@Component
public class OrderService {

    private static final String QUEUE_UP_COUNT = 10;
    private final WebClient webClient;
    private CountDownLatch countDownLatch = new CountDownLatch(QUEUE_UP_COUNT );

    public OrderService(WebClient webClient) {
        this.webClient = webClient;
    }

    public Mono<List<Order>> getOrders(List<String> orderNumbers)  {
        countDownLatch.countDown();
        countDownLatch.wait();
        List<Order> orders = webClient
            .get()
            .uri(builder -> builder.queryParam("orderNumbers", String.join(",", orderNumbers)).build())
            .retrieve()
            .bodyToFlux(Order.class)
            .collectList();
       // Here you need to create a new the countDownLatch since it doesn't allow to restart
       countDownLatch = new CountDownLatch(QUEUE_UP_COUNT );
    }
}
Josema
  • 445
  • 2
  • 6
  • 22