3

I am writing a service where people can paste in urls from Spotify playlists and then export the playlist in a different service. For each track url that is pasted in a request needs to be made to the Spotify api.

This code:

        Rx.Observable.fromArray<ITrackIdentifier>( this._allTracks )
            .pluck<string>( "id" )
            .distinct()
            .flatMap(
                ( trackId ) => this.spotifyService.lookupTrack( trackId ).
                    catch( ( error ) => this.handleError( error ) ))
            .subscribe(
                ( result ) => this.handleTrackLookupResult( result ),
                ( error ) => this.handleError( error ),
                () => this.handleComplete()
            );
  1. Creates an observable from a list of ITrackIdentifiers
  2. takes the id of the track identifier to create an observable of strings (ids)
  3. Removes any duplicate id's in the list
  4. creates an observable for each http call made to spotify (and catches errors)
  5. merges results from all these observables into one stream with flatmap

This actually works fine except for when a large number of tracks are added. One of my sample playlists has over 500 tracks so immediately 500 calls are made and the browser needs to deal with them / return items from the cache so the browser is slow and locks up AND spotify returns loads of errors as I exceed the api call limit.

I want to be able to only have say 10 calls running at the same time. Merge with maxConcurrent set seems like the perfect solution as discussed on Stackoverflow.

This would look like this:

        Rx.Observable.fromArray<ITrackIdentifier>( this._allTracks )
            .pluck<string>( "id" )
            .distinct()
            .map(
                ( trackId ) => this.spotifyService.lookupTrack( trackId ).
                    catch( ( error ) => this.handleError( error ) ))
            .merge(10)
            .subscribe(
                ( result ) => this.handleTrackLookupResult( result ),
                ( error ) => this.handleError( error ),
                () => this.handleComplete()
            );

But it just doesn't work. In the chrome network debugger you can see all the calls made at the same time and most queueing for ages until they fail.

Why isn't this working? How else can I get round this issue?

Here is the Github checkin with the project at this stage:

Community
  • 1
  • 1
Roaders
  • 4,373
  • 8
  • 50
  • 71
  • Merge works as it should. It limits the number of subscriptions. But before the 'merge', you have a 'map', which actually makes all requests, and only then 'merge' kicks in. – psx Jan 05 '18 at 10:42

2 Answers2

3

The problem with your code using merge is that the spotifyService.lookupTrack doesn't return an Observable but a Promise. Some of Observables functions like flatMap will handle Promises as well, but the difference between an Observable and a Promise is that the Observable is lazy, while the Promise is not. You can make a lazy observable from a promise factory function using Observable.defer, as user3743222 suggests. This little example is in JavaScript instead of TypeScript so it can be run here.

console.log = x => {var d = document,b=d.body,p=d.createElement('pre'); p.style.margin = "0"; p.appendChild(d.createTextNode(''+x)); b.appendChild(p);  window.scrollTo(0, b.scrollHeight); };

function log_delay(timeout, value) {
  return new Promise(resolve => {
    console.log('Start: ' + value);
    setTimeout(() => {
      console.log('End: ' + value);
      resolve(value);
    }, timeout);
  });
}

Rx.Observable.range(0, 6)
.map(x => Rx.Observable.defer(
  () => log_delay(1000, x)
  .catch(e => console.log('Inner catch'))
))
.merge(2)
.subscribe(
  s => console.log('Result: ' + s),
  s => console.log('Error: ' + s),
  s => console.log('Complete')
);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.7/rx.all.js"></script>
Tamas Hegedus
  • 28,755
  • 12
  • 63
  • 97
  • Many thanks. Works perfectly now. Git hub with this fix in: https://github.com/Roaders/SpotifyExportTool/tree/be9e91b208e1378c5f4ed677d73e05f06a6c16f9 – Roaders Jan 23 '16 at 11:40
  • Note merge() is now mergeAll() in RXJS 5. – kayjtea Feb 04 '17 at 00:25
0

I managed to get it working somewhat how I wanted but I am still curious as to why merge didn't work. Here the list of unique ids is built and then we use concatMap to create an Observable for each id and then wait for a delay before moving onto the next item:

        Rx.Observable.fromArray<ITrackIdentifier>( this._allTracks )
            .pluck<string>( "id" )
            .distinct()
            .concatMap( ( id, index ) => Rx.Observable.interval( 50 ).take( 1 ).map( () => { return id } ) )
            .flatMap(
                ( trackId ) => this.spotifyService.lookupTrack( trackId ).
                catch( ( error ) => this.handleError( error ) ))
            .subscribe(
                ( result ) => this.handleTrackLookupResult( result ),
                ( error ) => this.handleError( error ),
                () => this.handleComplete()
            );

In this example I wait 50ms between each call. This reduces the errors a lot.

Here is the Github checkin with the project at this stage.

Roaders
  • 4,373
  • 8
  • 50
  • 71