I'm using Monix Task
for async control.
scenario
- tasks are executed in parallel
- if failure occurs over X times
- stop all tasks that are not yet in complete status (as quick as better)
my solution
I come up the ideas that race between 1. result and 2. error counter, and cancel the loser.
Via Task.race
if the error-counter get to threshold first, then the tasks would be canceled by Task.race
.
experiment
on Ammonite REPL
{
import $ivy.`io.monix::monix:3.1.0`
import monix.eval.Task
import monix.execution.atomic.Atomic
import scala.concurrent.duration._
import monix.execution.Scheduler
//import monix.execution.Scheduler.Implicits.global
implicit val s = Scheduler.fixedPool("race", 2) // pool size
val taskSize = 100
val errCounter = Atomic(0)
val threshold = 3
val tasks = (1 to taskSize).map(_ => Task.sleep(100.millis).map(_ => errCounter.increment()))
val guard = Task(f"stop because too many error: ${errCounter.get()}")
.restartUntil(_ => errCounter.get() >= threshold)
val race = Task
.race(guard, Task.gather(tasks))
.runToFuture
.onComplete { case x => println(x); println(f"completed task: ${errCounter.get()}") }
}
issue
The outcome is depends on thread pool size !?
For pool size 1
the outcome is almost always a task success i.e. no stop.
Success(Right(.........))
completed task: 100 // all task success !
For pool size 2
it is very un-deterministic between success and failure and the cancelling is not accurate.
for example:
Success(Left(stop because too many error: 1))
completed task: 98
the canceling is as late as 98 tasks has completed.
the error count is weird small to threshold.
The default global scheduler get this same outcome behavior.
For pool size 200
it is more deterministic and the stopping is earlier thus more accurate in sense that less task was completed.
Success(Left(stop because too many error: 2))
completed task: 8
the larger of the pool size the better.
If I change Task.gather
to Task.sequence
execution, all issues disappeared!
What is the cause for this dependency on pool size ? How to improve it or is there better alternative for stopping tasks once too many error occurs ?