I'm creating an RXJS observable to perform a task given an input of an array of URLs:
let scrapeObservable: Observable<IProduct> = Observable.from(urlList)
.mergeMap(url => Observable.forkJoin(
Observable.of(url),
getPageDom(url), // Returns Observable that completes with PageDom
getExtractionRule(parseURL(url)["Host"]) // returns observable with data needed to perform work on the page dom
).delay(timeDelay += intervalTime)) // scrapes one url every set interval - using 0 in this case
.mergeMap(result => getProductInfoFromDOMObservable(result[0], result[1], result[2][0], result[2][1], requiredFields)) // performs work on page dom using data above.
.merge(maxConcurrentRequests) // prevents more than an certain number of processes from taking place simultaneously.
.catch(handleError);
When I subscribe to this observable it appears as though for some inputs (rather large lists of URLs >500) it never calls complete or error. It's like the observable just hangs. I can't figure out why this is happening but it is causing problems with my code. For small inputs it works correctly, and it doesn't always fail for large inputs, just some of the time.
Any ideas on what might be causing this behavior?