0

I'm trying to implement gRPC with NestJS and use RxJS Observerable for handling server streaming.

On the code below I try to put value from observable to results array. The function findAllRepos return empty list while the console.log(value) in subscribe print all the result correctly.

  findAllRepos(): Repository[] {
    const results: Repository[] = [];

    const observable = this.mainService.findAllRepos({});
    observable.subscribe({
      next: (value) => {
        console.log(value);
        results.push(value);
      },
      error: (err) => console.log(err),
      complete: () => console.log(results)
    });

    return results;
  }

I think the problem is that the function returns value before subscribe finish. Is there any solution to solve this? Thanks!

fahmiduldul
  • 924
  • 7
  • 18
  • Have a look at this: https://stackoverflow.com/questions/38291783/how-to-return-value-from-function-which-has-observable-subscription-inside – Fatih Ersoy Sep 23 '21 at 08:31
  • You need to return the observable and subscribe where it's response is required. You cannot return value synchronously from the object like you're attempting. See here for more info on async data: https://stackoverflow.com/q/14220321/6513921 – ruth Sep 23 '21 at 12:34
  • Does this answer your question? [How to return the response from an asynchronous call](https://stackoverflow.com/questions/14220321/how-to-return-the-response-from-an-asynchronous-call) – ruth Sep 23 '21 at 12:34

2 Answers2

2

The Problem

I think the problem is that the function returns value before subscribe finish.

The functions (next, error, & complete) are all called by the observable (not by you) some time in the future. When you dig into the gut of RxJS you can gain some control over how these functions are called, but to keep things simple it's best to imagine that this is handled opaquely by the RxJS library.

As such, there's no way to do what you want.

The problem is even worse than that though. The problem you encountered exists with any interaction between synchronous and asynchronous code. You cannot run asynchronous code synchronously. In a single-threaded environment (such as JavaScript) trying to get a synchronous piece of code to wait will deadlock the entire program immediately.

Consider the following code: You may expect this to output "a equals 0" for 1000ms, and the start outputting "a equals 1" forever thereafter. What actually happens, however, is that the code inside the setTimeout will never get a chance to run since the thread will be stuck in an infinite loop printing "a equals 0"

// Start with a = 0
let a = 0;
// After 1 second, set a = 1
setTimeout(() => a = 1, 1000);

// loop forever and print something based on value of a
while(true){ 
  if(a == 0) console.log("a equals 0");
  else console.log("a equals 1");
}

The Solution

The two most popular ways to manage asynchronous code is through promises or observables. If you want a function to return something asynchronously, then have it return a promise or an observable.

In your case:

findAllRepos(): Observable<Repository[]> {
  
  const observable = this.mainService.findAllRepos({});
  return observable.pipe(
    tap(value => console.log("Repository value being added to array: ", value)),
    toArray(),
    tap({
      next: value => console.log("Result Array (Repository[]) : ", value),
      error: console.log,
      complete: () => console.log("findAllRepos observable complete")
    })
  );

}

Then to get the actual value elsewhere:

findAllRepos().subscribe(repositories => {
  /* Do something with your array of repositories */
});
Mrk Sef
  • 7,557
  • 1
  • 9
  • 21
  • One big important advantage to your solution: You can handle the subscription. Currently every time findAllRepos is called a new Subscription is done. If the user doesn't unsubscribe or the observable completes, a memory leak is created. – Jonathan Stellwag Sep 23 '21 at 20:49
0

It's not very good practice to have methods with subscriptions, You'll make reactive code non reactive at all. Why not use sth like

findAllRepos(): Observable<Repository[]> {
    return this.mainService.findAllRepos({}).pipe(catchError((e) => {
        console.log('e ', e);
    }), finalize(() => {
        console.log('completed');
    }))
}

and then findAllRepos().subscribe in client ? You'll have more flexibility in this method, You can call this method from multiple places and map the result however you want per use case.

Dariusz
  • 314
  • 1
  • 6