I am trying to do the following:
- Take in an observable
text$
(in my app this is an input stream) then emit that value from the input up to a parent component, the parent component will then make a call to an API and retrieve some data. - WAIT until the parent sends back down an observable
(<string[]>)
as an@input
, the parent component is sending back to the component below the data it retrieved from the API. - After we know we have the new data retrieved from the parent component I want to execute another pipe on the same observable (
text$
) which is my input stream.
So far this is what I have come up with:
import { of, Observable, ConnectableObservable } from 'rxjs';
import { take, map, publish, tap, toArray, distinctUntilChanged, debounceTime, switchMap, repeat } from 'rxjs/operators';
const tt = of('a','b','c','d')
let text$ = of('nct')
const data = tt.pipe(
map((result: any) => {
return result
})
) as Observable<string[]>
let dataSnap;
data.pipe(toArray()).subscribe(x => dataSnap = x)
let obs1 = text$.pipe(
distinctUntilChanged(),
debounceTime(250),
// tap((term) => this.target = text$),
tap((term) => console.log("handleTypeahead", term)),
// tap((term) => this.onTypeahead.emit(term)),
) as ConnectableObservable<string>
let obs2 = obs1.pipe(
publish(),
switchMap(term => term === '' ? [] : data)
) as ConnectableObservable<string>
data
.pipe(
tap(x => console.log(dataSnap)),
toArray(),
).subscribe(x => {
if (x == dataSnap){
repeat(2)
}
else {
obs2.connect()
}
})
Or on stackblitz here: https://stackblitz.com/edit/typescript-p3yoqa
Currently my issue seems to be with the connect
where I get this error:
Property 'connect' does not exist on type 'Observable<string>'
. I have tried the solutions suggested but seem to have implemented them wrong. In addition not sure that my logic is sound to begin with as I am new to RXJS.