5

I'm guessing this should be somewhat easy to achieve but I've having trouble (conceptually, I guess) figuring out how to tackle it.

What I have is an API that returns an array of JSON objects. I need to step through these objects, and, for each object, make another AJAX call. The issue is the system that handles each AJAX call can only handle two active calls at a time (as it's quite a CPU-intensive task that hooks out into a desktop application).

I was wondering how I could achieve this using RxJS (either using version 5 or 4)?

EDIT: In addition, is it possible to have a chain of steps running concurrently. i.e.

Downloading File: 1 Processing File: 1 Converting File: 1 Uploading File: 1 Downloading File: 2 Processing File: 2 Converting File: 2 Uploading File: 2 Downloading File: 3 Processing File: 3 Converting File: 3 Uploading File: 3

I've tried doing something like:

Rx.Observable.fromPromise(start())
    .concatMap(arr => Rx.Observable.from(arr))
    .concatMap(x => downloadFile(x))
    .concatMap((entry) => processFile(entry))
    .concatMap((entry) => convertFile(entry))
    .concatMap((entry) => UploadFile(entry))
    .subscribe(
        data => console.log('data', new Date().getTime(), data),
        error => logger.warn('err', error),
        complete => logger.info('complete')
    );

However that doesn't seem to work. The downloadFile, for example doesn't wait for processFile, convertFile and uploadFile to all complete, rather, the next one will run again as soon as the previous one completes.

user3743222
  • 18,345
  • 5
  • 69
  • 75
NRaf
  • 7,407
  • 13
  • 52
  • 91
  • http://xgrommx.github.io/rx-book/content/observable/observable_instance_methods/subscribe.html – cport1 Aug 20 '16 at 01:27
  • @cport1 - the linked example runs one after the other without waiting for any form of async operation. That's different to what I'm asking. – NRaf Aug 20 '16 at 01:31

4 Answers4

4

Here are 2 approaches, if you want the sequence of requests exactly like this

Downloading File: 1
Processing File: 1
Converting File: 1
Uploading File: 1
Downloading File: 2
Processing File: 2
...

You need to resolve all promises inside single concatMap method, like this

Rx.Observable.fromPromise(getJSONOfAjaxRequests())
  .flatMap(function(x) { return x;})
  .concatMap(function(item) {
    return downloadFile(item)
      .then(processFile)
      .then(convertFile);
  })
  .subscribe(function(data) {
    console.log(data);
  });

see the working plunkr here: https://plnkr.co/edit/iugdlC2PpW3NeNF2yLzS?p=preview This way, the new ajax call will be sent only when the previous is finished.

Another approach is that allow the files to send requests in parallel but the operations 'downloading,processing,converting,uploading' will be in sequence. For this you can get it working by

Rx.Observable.fromPromise(getJSONOfAjaxRequests())
  .flatMap(function(x) { return x;})
  .merge(2)  // in case maximum concurrency required is 2
  .concatMap(function(item) {
    return downloadFile(item);
  })
  .concatMap(function(item) {
    return processFile(item);
  })
  .concatMap(function(item) {
    return convertFile(item)
  })
  .subscribe(function(data) {
    //console.log(data);
  });

see plunkr here: https://plnkr.co/edit/mkDj6Q7lt72jZKQk8r0p?p=preview

FarazShuja
  • 2,287
  • 2
  • 23
  • 34
  • Thanks, the first one is what I've gone with, although the second would be ideal, if I could limit the amount of parallel operations (i.e. max 2 at a time). – NRaf Aug 21 '16 at 22:04
  • By the way, this first solution seems only work with rxjs5. The behavior using version 4 runs them all in parallel. – NRaf Aug 21 '16 at 22:30
  • strange! theoretically the behavior should remain the same on rxjs4 and rxjs5. – FarazShuja Aug 22 '16 at 07:56
3

You could use merge operator with the maxConcurrency overload (Rxjs v4), so something like :

Rx.Observable.fromArray(aJSONs)
  .map (function (JSONObject) {
    return ajaxRequest(JSONObject) // that would be an observable (or a promise)
  })
  .merge(2)

You can have a look to see other examples of use at :

Official documentation :

Community
  • 1
  • 1
user3743222
  • 18,345
  • 5
  • 69
  • 75
1

How about something like this? You could use from to break the array into bite sized chunks and process them one by one using concatMap.

function getArr() {
    return Rx.Observable.of([1, 2, 3, 4, 5, 6, 7, 8]);
}


function processElement(element) {
    return Rx.Observable.of(element)
        .delay(500);
}


getArr()
    .concatMap(arr => {
        return Rx.Observable.from(arr);
    })
    .concatMap(element => {
        return processElement(element);
    })
    .subscribe(res => {
        console.log(res);
    });
qfwfq
  • 2,416
  • 1
  • 17
  • 30
  • This looks promising, but would it be possible to extend it? i.e. if I wanted to add another step as well? – NRaf Aug 20 '16 at 02:59
  • I believe you could extend it, yes. You could continue to add concatMaps or whatever other operator you needed. There is a lot of good information here http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html – qfwfq Aug 20 '16 at 03:07
0

Old post but I believe this could work, for console log we could use tap. Note editor would through intellisense error since from expects an array, but the code should work.

 from(start()).pipe(
    switchMap(files => from(files).pipe(
       switchMap(file => from(downloadFile(file)).pipe(
         map(_ => ({file: file, downloaded: true}))
       )),
       switchMap(attr => from(processFile(attr.file)).pipe(
         map(_ => ({...attr, downloaded: true}))
       )),
       switchMap(attr => from(convertFile(attr.file)).pipe(
         map(_ => ({...attr, converted: true}))
       )),
       switchMap(attr => from(uploadFile(attr.file)).pipe(
         map(_ => ({...attr, uploaded: true}))
       ))
    ))
 ).subscribe(_ => {})