1

I have a function that does an http request based on a parameter. And I want to add some kind of "debounce" functionality. So if the function gets called multiple times in a set time window, I want to combine the parameters into one request instead of making multiple requests.

I want to achieve this with Observables and in Angular. This does not sound that complicated, however I'm not able to get it running, maybe I'm missing something.

For now let's just skip the combining in a single request as this can be done with an aggregated debounce or a Oberservable.buffer. I have trouble combining the single Observables.

Here's what I've tried so far.

I tried using a Subject, as this seemed to be the proper object for this case (https://stackblitz.com/edit/angular-hcn41v?file=src%2Fapp%2Fapp.component.ts).

constructor(private http: HttpClient) {
  this.makeRequest('1').subscribe(x => console.log(x))
  this.makeRequest('2').subscribe(console.log)
  setTimeout(() => {
    this.makeRequest('3').subscribe(console.log)
  }, 1000)
}

private makeRequest(id: string) {
  this.observable = this.observable.pipe(
    merge(Observable.of(id).pipe(delay(1)))
  )
  return this.aggregateDebounce(this.observable)
}

private getUrl(value) {
  console.log('getUrl Call', value);
  return 'https://jsonplaceholder.typicode.com/posts/1';
}

private aggregateDebounce(ob$) {
  const shared$ = ob$.publishReplay(1).refCount()
  return shared$.buffer(shared$.debounceTime(75))
}

I expect to have one 'getUrl Call' log for each function call and one result log. However I only get results if I add more than 1 calls to this.makeRequest() and the result is also weird. All previous values are always returned as well. I think I don't fully understand how Subject works in this case.

Another approach (taken from here RXJS: Aggregated debounce) was to create some sort of aggregate debounce (https://stackblitz.com/edit/angular-mx232d?file=src/app/app.component.ts)

constructor(private http: HttpClient) {
  this.makeRequest('1').subscribe(x => console.log(x))
  this.makeRequest('2').subscribe(console.log)
  setTimeout(() => {
    this.makeRequest('3').subscribe(console.log)
  }, 1000)
}

private makeRequest(id: string) {
  this.observable = this.observable.pipe(
    merge(Observable.of(id).pipe(delay(1)))
  )
  return this.aggregateDebounce(this.observable)
}

private getUrl(value) {
  console.log('getUrl Call', value);
  return 'https://jsonplaceholder.typicode.com/posts/1';
}

private aggregateDebounce(ob$) {
  const shared$ = ob$.publishReplay(1).refCount()
  return shared$.buffer(shared$.debounceTime(75))
}

In this scenario I have the problem I'm also getting all previous values as well.

In theory (at least to me) both variants sounded plausible, however it seems like I'm missing something. Any wink in the right direction is highly appreciated.

Edit:

As requested I added the final real-world goal.

Imagine a service that requests information from an API. Within 50-75ms you call the service with a certain id. I want to group those ids together to a single request instead of doing 3. And if 100ms later another call to the service is made, a new request will be done

Nicolas Gehlert
  • 2,626
  • 18
  • 39

2 Answers2

1
this.makeRequest(1).subscribe();

private makeRequest(number: number) {
  this.values.next(number);
  return this.idObservable.pipe(

You emit the value before you subscribe -> The value gets lost.

private values: Subject = new Subject();
private idObservable = this.values.pipe(

private makeRequest(number: number) {
  this.values.next(number);
  return this.idObservable.pipe(    

Every call creates a new observable based on the subject. Whenever you emit a value, all subscribers receive the value.

A possible solution could look something like this (I'm using the new rxjs syntax here):

subject: Subject<String> = null;
observable = null;
window = 100;

constructor() {
  this.subject = null;
  this.window = 100;

  this.makeRequest('1').subscribe(console.log)
  this.makeRequest('2').subscribe(console.log)
  setTimeout(() => {
    this.makeRequest('3').subscribe(console.log)
  }, 1000)
}

private makeRequest(id: string) {
  if (!this.subject) {
    this.subject = new ReplaySubject()
    this.observable = this.subject.pipe(
      takeUntil(timer(this.window).pipe(take(1))),
      reduce((url, id, index) => this.combine(url, id), baseUrl),
      flatMap(url => this.request(url)),
      tap(() => this.subject = null),
      share()
    )
  }      

  this.subject.next(id);
  return this.observable;
}  

Where combine creates the url and request makes the actual request.

a better oliver
  • 26,330
  • 2
  • 58
  • 66
  • thanks for your answer. why does every call create a new subject? the subject itself ist just created once in the beginning? but if i add one stream per id how do I actually combine them in the end? I need some sort of "single stream" to be able to do this or am I wrong? – Nicolas Gehlert Jun 07 '18 at 08:54
  • @NicolasGehlert wrote _"a new observable based on the subject"_, not _" a new subject"_. I assumed you wanted to group requests with the same id. If that's not the case then you want one observable per time slot, but that's still one observable per "unit" as opposed to one subject only. – a better oliver Jun 07 '18 at 09:09
  • ah ok. yeah that makes more sense. yeah not the same id. but different ids into the same request. and how do I create "an observable per time slot" ? (without creating a timer) – Nicolas Gehlert Jun 07 '18 at 09:10
  • @NicolasGehlert Good question. It's generally possible with the `windowTime` operator, but I'm not sure it's a good fit here. If you don't want a timer you can store the last time a new observable has been created and create a new one if necessary. But a timer is actually a good idea as it's the only way to reliably complete a stream. – a better oliver Jun 07 '18 at 09:24
  • ok yeah a timer probably makes sense. but still the question is how to I achieve this. First function call, start timer, second call check if timer is running, combine observables, start new timer... and I don't want to have an interval that is running the entire time in the background and pools every x seconds. This is a waste of memory I guess – Nicolas Gehlert Jun 07 '18 at 11:56
  • @NicolasGehlert An interval is much more efficient than anything else. I'll try to update the answer with a possible solution later this day. – a better oliver Jun 07 '18 at 12:02
  • highly appreciate your answer. but isn't it weird to have a interval running the entire time, even if you just really need it once every 10 minutes? if you add a couple of those they'll block unneeded memory. – Nicolas Gehlert Jun 07 '18 at 13:56
  • @NicolasGehlert Rest assured that it consumes less memory than a `Subject` ^^ – a better oliver Jun 07 '18 at 16:46
1

Rxjs is quite good at handling this kind of case. You'll need two different Subjects:

  1. One will be used to collect and combine all requests
  2. The second will be used for subscribing to results

When a request is made, the value will be pushed onto the first subject but the second will be returned, abstracting away the internal logic of combining requests.

private values: Subject = new Subject();
private results: Subject = new Subject();

private makeRequest(number: number) {
  this.values.next(number);
  return this.results;
}

The pipeline for merging the requests could be a buffer and debounceTime as indicated in the question or other logic, as required. When a response is recieved, it just needs to be pushed onto the results Subject:

constructor(private http: HttpClient) {
  this.values
    .pipe(
      buffer(this.values.pipe(debounceTime(1000))),
      switchMap(values => this.getUrl(values)),
      map(response => this.results.next(response)))
    .subscribe();
}

I've used a switchMap to simulate an asynchronous request before pushing the response onto the results.

Full example here: https://angular-8yyvku.stackblitz.io

DNJohnson
  • 776
  • 5
  • 11