I am writing a service where people can paste in urls from Spotify playlists and then export the playlist in a different service. For each track url that is pasted in a request needs to be made to the Spotify api.
This code:
Rx.Observable.fromArray<ITrackIdentifier>( this._allTracks )
.pluck<string>( "id" )
.distinct()
.flatMap(
( trackId ) => this.spotifyService.lookupTrack( trackId ).
catch( ( error ) => this.handleError( error ) ))
.subscribe(
( result ) => this.handleTrackLookupResult( result ),
( error ) => this.handleError( error ),
() => this.handleComplete()
);
- Creates an observable from a list of ITrackIdentifiers
- takes the id of the track identifier to create an observable of strings (ids)
- Removes any duplicate id's in the list
- creates an observable for each http call made to spotify (and catches errors)
- merges results from all these observables into one stream with flatmap
This actually works fine except for when a large number of tracks are added. One of my sample playlists has over 500 tracks so immediately 500 calls are made and the browser needs to deal with them / return items from the cache so the browser is slow and locks up AND spotify returns loads of errors as I exceed the api call limit.
I want to be able to only have say 10 calls running at the same time. Merge with maxConcurrent set seems like the perfect solution as discussed on Stackoverflow.
This would look like this:
Rx.Observable.fromArray<ITrackIdentifier>( this._allTracks )
.pluck<string>( "id" )
.distinct()
.map(
( trackId ) => this.spotifyService.lookupTrack( trackId ).
catch( ( error ) => this.handleError( error ) ))
.merge(10)
.subscribe(
( result ) => this.handleTrackLookupResult( result ),
( error ) => this.handleError( error ),
() => this.handleComplete()
);
But it just doesn't work. In the chrome network debugger you can see all the calls made at the same time and most queueing for ages until they fail.
Why isn't this working? How else can I get round this issue?
Here is the Github checkin with the project at this stage: