3

I have a firebase subscription in my angular app which fires multiple times. How can ich achieve that the tasks are processed as a queue so that I can run each task synchronously once?

this.tasks.subscribe(async tasks => {
   for (const x of tasks) 
      await dolongtask(x); // has to be sync
      await removetask(x);
   });

The problem is that the subribe event fires when the longtask is still processing.

daniel
  • 34,281
  • 39
  • 104
  • 158

3 Answers3

4

IMHO, I would try and leverage the power of rxjs since we're using it here already anyway and avoid implementing a custom queuing concept as suggested by another answer (though you certainly can do that).

If we simplify the given case a bit, we just have some observable and want to perform a long-running procedure for each emission – in sequence. rxjs allows doing this by means of the concatMap operator essentially out of the box:

$data.pipe(concatMap(item => processItem(item))).subscribe();

This only assumes that processItem returns an observable. Since you used await, I assume your function(s) currently return Promises. These can be trivially converted into observables using from.

The only detail left to look at from the OP is that the observable actually emits an array of items and we want to perform the operation on each item of each emission. To do that, we just flatten the observable using mergeMap.


Let's put it all together. Note that if you take away preparing some stub data and logging, the actual implementation of this is only two lines of code (using mergeMap + concatMap).

const { from, interval } = rxjs;
const { mergeMap, concatMap, take, bufferCount, tap } = rxjs.operators;

// Stub for the long-running operation
function processTask(task) {
  console.log("Processing task: ", task);
  return new Promise(resolve => {
    setTimeout(() => {
      console.log("Finished task: ", task);
      resolve(task);
    }, 500 * Math.random() + 300);
  });
}

// Turn processTask into a function returning an observable
const processTask$ = item => from(processTask(item));

// Some stubbed data stream
const tasks$ = interval(250).pipe(
  take(9),
  bufferCount(3),
);

tasks$.pipe(
  tap(task => console.log("Received task: ", task)),
  // Flatten the tasks array since we want to work in sequence anyway
  mergeMap(tasks => tasks),
  // Process each task, but do so consecutively
  concatMap(task => processTask$(task)),
).subscribe(null, null, () => console.log("Done"));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.2/rxjs.umd.js"></script>
Ingo Bürk
  • 19,263
  • 6
  • 66
  • 100
  • I don't understand what this does beyond `tasks$.subscribe(tasks => tasks.forEach(task => processTask(task))`. –  Sep 10 '18 at 12:36
  • 1
    This looks like an interesting pattern if you want to add additional steps in between the `mergeMap()` and the `concatMap()`, for example `filter(task => isNewTask(task))`. – Richard Matsen Sep 10 '18 at 13:24
  • @Hiram K. Hackenbacker For starters, it answers the question, meaning it processes things sequentially. Just calling the function like you did won't guarantee that. – Ingo Bürk Sep 10 '18 at 13:52
  • The only problem with the queue (effectively) internalized in the `concatMap` operator is the risk of backpressure you mention above. Perhaps it can be handled with a `map(task => delayTaskIfTooMany(task))` but I can't yet see how to add the delay into the pipeline. – Richard Matsen Sep 10 '18 at 21:19
  • 1
    BTW, I think it works without `const processTask$ = item => from(processTask(item))` since processTask returns a promise (see [Example 2](https://www.learnrxjs.io/operators/transformation/concatmap.html)). – Richard Matsen Sep 10 '18 at 21:27
  • Backpressure is a problem no matter how you do it, it's a conceptual problem. We have no information from OP how they want to deal with it, so I ignored the problem. – Ingo Bürk Sep 11 '18 at 05:05
  • How can you ignore backpressure, if it is the reason to create the queue in first place? – jayarjo Sep 27 '19 at 04:32
2

I am making a couple of assumptions from the code you gave,

  • other applications add tasks to the firebase db (asynchronously), and this code is implementing the task processor.

  • your firebase query returns all unprocessed tasks (in a collection) and it emits the full list every time a new task is added.

  • the query will drop a task only after removeTask() has been run

If this is so, you need a deduping mechanism before the processor.

For the purpose of illustration, I've simulated the firebase query with a subject (renamed it to tasksQuery$) and a sequence of firebase events are simulated at the bottom of the script. I hope it's not too confusing!

console.clear()
const { mergeMap, filter } = rxjs.operators;

// Simulate tasks query  
const tasksQuery$ = new rxjs.Subject();

// Simulate dolongtask and removetask (assume both return promises that can be awaited)
const dolongtask = (task) => {
  console.log( `Processing: ${task.id}`);
  return new Promise(resolve => {
    setTimeout(() => {
      console.log( `Processed: ${task.id}`);
      resolve('done')
    }, 1000);
  });
}
const removeTask = (task) => {
  console.log( `Removing: ${task.id}`);
  return new Promise(resolve => {
    setTimeout(() => {
      console.log( `Removed: ${task.id}`);
      resolve('done')
    }, 200);
  });
}

// Set up queue (this block could be a class in Typescript)
let tasks = [];
const queue$ = new rxjs.Subject();
const addToQueue = (task) => {
  tasks = [...tasks, task];
  queue$.next(task);
}
const removeFromQueue = () => tasks = tasks.slice(1);
const queueContains = (task) => tasks.map(t => t.id).includes(task.id)

// Dedupe and enqueue
tasksQuery$.pipe(
  mergeMap(tasks => tasks), // flatten the incoming task array 
  filter(task => task && !queueContains(task)) // check not in queue
).subscribe(task => addToQueue(task) );

//Process the queue
queue$.subscribe(async task => {
  await dolongtask(task);
  await removeTask(task); // Assume this sends 'delete' to firebase
  removeFromQueue();
});

// Run simulation
tasksQuery$.next([{id:1},{id:2}]);
// Add after delay to show repeated items in firebase
setTimeout(() => {
  tasksQuery$.next([{id:1},{id:2},{id:3}]); 
}, 500);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.2/rxjs.umd.js"></script>
Richard Matsen
  • 20,671
  • 3
  • 43
  • 77
0

Leaving aside your title 'Rxjs subscription queue', you can actually fix your async/await code.

The problem is that async/await does not play nicely with for loops, see this question Using async/await with a forEach loop.

For example, you can replace the for loop as per @Bergi's answer,

with Promise.all()

console.clear();
const { interval } = rxjs;
const { take, bufferCount } = rxjs.operators;

function processTask(task) {
  console.log(`Processing task ${task}`);
  return new Promise(resolve => {
    setTimeout(() => {
      resolve(task);
    }, 500 * Math.random() + 300);
  });
}
function removeTask(task) {
  console.log(`Removing task ${task}`);
  return new Promise(resolve => {
    setTimeout(() => {
      resolve(task);
    }, 50);
  });
}

const tasks$ = interval(250).pipe(
  take(10),
  bufferCount(3),
);

tasks$.subscribe(async tasks => {
  await Promise.all(
    tasks.map(async task => {
      await processTask(task); // has to be sync
      await removeTask(task);
      console.log(`Finished task ${task}`);
    })
  );
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.2/rxjs.umd.js"></script>

Better yet, you can shape the query to avoid using a for loop,

with mergeMap()

console.clear();
const { interval } = rxjs;
const { mergeMap, take, bufferCount } = rxjs.operators;

function processTask(task) {
  console.log(`Processing task ${task}`);
  return new Promise(resolve => {
    setTimeout(() => {
      resolve(task);
    }, 500 * Math.random() + 300);
  });
}
function removeTask(task) {
  console.log(`Removing task ${task}`);
  return new Promise(resolve => {
    setTimeout(() => {
      resolve(task);
    }, 50);
  });
}

const tasks$ = interval(250).pipe(
  take(10),
  bufferCount(3),
);

tasks$
.pipe(mergeMap(tasks => tasks))
.subscribe(
  async task => {
    await processTask(task); // has to be sync
    await removeTask(task);
    console.log(`Finished task ${task}`);
  }
);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.2/rxjs.umd.js"></script>