0

I am trying to do the following:

  1. Take in an observable text$ (in my app this is an input stream) then emit that value from the input up to a parent component, the parent component will then make a call to an API and retrieve some data.
  2. WAIT until the parent sends back down an observable (<string[]>) as an @input, the parent component is sending back to the component below the data it retrieved from the API.
  3. After we know we have the new data retrieved from the parent component I want to execute another pipe on the same observable (text$) which is my input stream.

So far this is what I have come up with:

import { of, Observable, ConnectableObservable } from 'rxjs';
import { take, map, publish, tap, toArray, distinctUntilChanged, debounceTime, switchMap, repeat } from 'rxjs/operators';


const tt = of('a','b','c','d')
let text$ = of('nct')
const data = tt.pipe(
      map((result: any) => {
        return result
      })
    ) as Observable<string[]>


let dataSnap;
    data.pipe(toArray()).subscribe(x => dataSnap = x)
    let obs1 = text$.pipe(
      distinctUntilChanged(),
      debounceTime(250),
      // tap((term) => this.target = text$),
      tap((term) => console.log("handleTypeahead", term)),
      // tap((term) => this.onTypeahead.emit(term)),
    ) as ConnectableObservable<string>
    let obs2 = obs1.pipe(
      publish(),
      switchMap(term => term === '' ? [] : data)
    ) as ConnectableObservable<string>
      data
        .pipe(
          tap(x => console.log(dataSnap)),
          toArray(),
          ).subscribe(x => {
        if (x == dataSnap){
          repeat(2)
        }
        else {
         obs2.connect()
        }
          })

Or on stackblitz here: https://stackblitz.com/edit/typescript-p3yoqa Currently my issue seems to be with the connect where I get this error: Property 'connect' does not exist on type 'Observable<string>'. I have tried the solutions suggested but seem to have implemented them wrong. In addition not sure that my logic is sound to begin with as I am new to RXJS.

Community
  • 1
  • 1
Michael
  • 4,538
  • 5
  • 31
  • 58
  • 1
    Can you please clarify your question a bit more? By seeing the code it is not discoverable what you want to achieve. When do you say parent then what is a parent? Can you articulate your requirement from the end user perspective? You are just setting up various observables but not assigning it to any other observable which could be combined to other observables. BTW - There is no `connect` method on observable. Please detail your requirement, then we will be able to provide a correct solution. Thanks! – user2216584 Jun 11 '19 at 14:49
  • @user2216584 tried to clarify, hopefully it is now clearer. – Michael Jun 11 '19 at 14:58
  • 1
    Using the `pipe()` operator returns a new observable, it doesn't modify the original, you have to assign it to something. – Avin Kavish Jun 11 '19 at 15:15
  • Can you take a look here I have assigned variables but the code will not run: https://stackblitz.com/edit/typescript-p3yoqa – Michael Jun 11 '19 at 15:22
  • @Michael What confuses you? Observable is not ConnectableObservable that you are forcibly casting to. End of story. – Antoniossss Jun 11 '19 at 17:38
  • @Antoniosss this is not the case see here:https://stackoverflow.com/questions/54265143/property-connect-does-not-exist-on-type-observableany-rxjs-multicast – Michael Jun 11 '19 at 17:54
  • Ofc it is the case. If actual implementation would be ConnectableObservable, then even casting it to Observable would yeld proper results despite compiler error saying there is no such property/merthod. It is just for compile time type safety assurence not runtime type check. – Antoniossss Jun 11 '19 at 18:29
  • operators order does matter https://stackblitz.com/edit/typescript-p3yoqa – Antoniossss Jun 11 '19 at 20:35

1 Answers1

0

Observable is not ConnectableObservable that is one thing. Another thing is, if whatever pipe is returning would be ConnectableObservable, code would just work like that without actual type casting. Obviously compiler would complain, but it is easy to turn it off (like you did anyway). Casting has no influence on runtime and definetly NOT on type of object.

Conclusion:

  1. Casting is irrelevant in runtime
  2. Whetever pipe is returning, it is NOT ConnectableObservalbe

I have took a look into rxjs sources on github and found that for example

https://github.com/ReactiveX/rxjs/blob/master/src/internal/util/pipe.ts

EDIT

pipe operator returns UnaryFunction which extends Observable but is not ConnectableObservable .

That is not true that UnaryFunction extends Observable. I have no idea how I come up with that (browsing on github....)

Antoniossss
  • 31,590
  • 6
  • 57
  • 99
  • If i understand correctly `publish\connect` cannot be used in a `pipe` ? Is this example (like the only one i could find) incorrect? https://www.learnrxjs.io/operators/multicasting/publish.html furthermore it would be helpful if you could suggest how to write this correctly. Thanks – Michael Jun 11 '19 at 19:27
  • Notice that order of operators does matter. In given example, publish is the last operator (or only one) while in your case it is not. – Antoniossss Jun 11 '19 at 20:10
  • @Michael what he is trying to say, is , if you have an operator that returns an object that is `Observable` instead of `ConnectableObservable(CO)`, typecasting them, will not help if that object doesn't have that method. If you want CO object, then put the operator that returns it, as the last operator in pipe. – KiraAG Jun 12 '19 at 03:34
  • Also Unary Function doesn't extend observable. Its of type ``. If your operator returns `Observable` or `ConnnectedObs`, then that would your `R` – KiraAG Jun 12 '19 at 03:39
  • Thats not type, thats generic parameters. But you are right, that extension is false. – Antoniossss Jun 12 '19 at 07:07