1

So I've been trying to use a forkJoin with an array of Observable from firebase, but it seems they don't complete.

I've tried to apply .first() or .take(1) since it is supposed to complete the observable but it doesn't seem to change anything.

getChat(userTwoId: string)
{
return this.auth.getAuthenticatedUser()
  .map(auth => 
  {
    console.log(auth.uid);
    return auth.uid;
  })
  .mergeMap(uid => 
  {
    console.log(`/user-messages/${uid}/${userTwoId}/`); 
    return this.database.list(`/user-messages/${uid}/${userTwoId}/`).snapshotChanges();
  })
  .mergeMap(chats => 
  {
    const oChats = chats.map(chat=>
    {
      console.log(`/messages/${chat.key}`);
      return this.database.object(`/messages/${chat.key}`).valueChanges().first();
    });

    console.log('ObsevableMap');
    console.log(oChats);

    return Observable.forkJoin(oChats);

  });
}

My Observables (from the array) return something when I subscribe to them individually but the forkJoin doesn't.

So I've seen somewhere that Firebase Observable never complete. If that is true what would be the best alternative. And if it isn't what would complete them? isn't there a way to force them to complete?

more detail: So so here is the code that I'm trying to test:

this.auth.getAuthenticatedUser()
      .map(auth => 
      {
        return auth.uid;
      })
      .mergeMap(uid => 
      {
        console.log('path to list of keys:')
        console.log(`/user-messages/${uid}/${userTwoId}/`); 
        return this.database.list(`/user-messages/${uid}/${userTwoId}/`).snapshotChanges();
      })
      .mergeMap(chats => 
      {
        console.log('Log each path to object:');
        const oChats = chats.map(chat=>
        {
          console.log(`/messages/${chat.key}`);
          return this.database.object(`/messages/${chat.key}`).valueChanges().take(1);
        });

        //See if its log 
        //console.log('Log each Obs subscription:')
        //oChats.forEach(a=>{a.subscribe(b=>{console.log(b)})});

        return Observable.forkJoin(oChats);

      }).subscribe(vals=>
      {
        console.log('Fork Subscription:');
        console.log(vals)
      });

this.auth.getAuthenticatedUser() returns an observable of the current Firebase User then I map it to get the uid.

In my database there is a node which begins with the user uid then a second user uid and then a list of key of messages they've sent each other.

So I return these key to finnaly map each key to an observable that returns their message.

After there is a commented subscribtion to test they return something.

And then I try to fork or merge them but when I subscribe nothing happens.

I've tried with first() instead of take(1).

I've tried my best to be clear but I'm sorry not to be^^

here are the messages

here are the lists of keys

and here is my log when the subscription is uncommented

More Update

Well I tried to make a demo on stackBlitz here

https://stackblitz.com/edit/angular-template-firebase-yvbkho

I copied and pasted most of my code and made another firebase database with the same structure here:

And my demo works.

WisePlatypus
  • 118
  • 7
  • Isn't it because the observable on the messages never 'complete'? The stream stays open. I think you'd have to use [`merge`](http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#static-method-merge) instead – Raven Mar 05 '18 at 13:32
  • I'll give it a try – WisePlatypus Mar 05 '18 at 14:15
  • I tried to replace it by this but I can't get it to work`return Observable.merge(oChats);` I also tried to apply the spread operator inside `merge(...oChat);` – WisePlatypus Mar 05 '18 at 14:42
  • Have you tried both `.merge` and `.take(1)`? – Raven Mar 05 '18 at 15:05
  • Yeah but still nothing... Does `merge` takes the same parameter as `forkJoin`? I mean I really don't get what's wrong with my code everything seems to work but these `merge` and `forkJoin`. – WisePlatypus Mar 05 '18 at 18:30

2 Answers2

2

If you bind to .valueChanges() with .first() it will never emit anything because valueChanges() never completes and the .first() operator doesn't either (see Angular 2 using RxJS - take(1) vs first()).

The forkJoin requires all source Observables to emit at least one item and to complete which should be guaranteed by take(1) (but only if they emit at least one item).

The take(1) should work assuming that the valueChanges() emits at least one item because it completes immediately after reemitting the value.

martin
  • 93,354
  • 25
  • 191
  • 226
0

I finaly made it worked... I feel so stupid and yet there was no way to notice it.

Seems like I imported my Observable from:

import { Observable } from 'rxjs/observable';

instead of

import { Observable } from 'rxjs/Observable';

(if you didn't notice there's a capital missing)

there was no warning from my IDE and any other function from the observable worked...

At least it is documented for other people.

Thanks for those who tried to help me^^.

the thing is that there's no warning or error and it just does nothing.

WisePlatypus
  • 118
  • 7