2

I am attempting to model a system as follows:

Arrivals generate according to a predefined schedule and have known processing times supplied by a dataframe. There is a single server with a capacity equal to min_daemons at the beginning of the simulation. Simple so far, but the nxt part gets tricky: this capacity can vary on the interval [min_daemons , max_daemons] throughout the simulation according to the following algorithm:

If at any time during the simulation, the queue length reaches or exceeds the value of incr_count, and remains at or above this level for incr_delay, then an additional unit of capacity is added to the main server. This can happen at any time, provided that the capacity at no time exceeds max_daemons.

The reverse is also true. If at any time, the queue length is less than decr_count, and remains at or below this level for decr_delay, a unit of capacity is removed, potentially down to a level of min_daemons.

I created a trajectory for all arrivals that branches off when the conditions for changing server capacity above are met. The problem with this is that the changes in capacity are always tied to an arrival event. What I really want is a process independent of the arrival trajectory that monitors the queue length at all times and makes appropriate capacity changes.

I considered alternatively accomplishing this with some sort of dummy arrival process, say at every second of the simulation, but I wasn't sure if I could prevent the dummy arrivals from competing with the true arrivals for server capacity.

#instantiate simulation environment
env <- simmer("queues") %>% add_resource("daemon",1) %>% add_global("incr_start",99999) %>% add_global("decr_start",99999)

run <-  trajectory() %>% 
  branch(option = function() if (get_queue_count(env,"daemon") >=  incr_count) {1}
                             else if (get_queue_count(env,"daemon") <= decr_count) {2}
                             else {3}
         ,
         continue = c(T, T, T)
         ,
         trajectory("increment?")
         %>% branch(option = function() if (now(env) - get_global(env,"incr_start") >= incr_delay
                                            & get_capacity(env,"daemon") < max_daemons) {1}
                                        else if (get_global(env,"incr_start")==99999) {2}
                                        else {3}
                    ,
                    continue = c(T, T, T)
                    ,
                    trajectory("increment")
                    %>% log_(function () {paste("Queue size is: ",get_queue_count(env,"daemon"))})
                    %>% log_(function () 
                      {paste("Queue has exceeded count for ",now(env)-get_global(env,"incr_start")," seconds.")})
                    %>% set_capacity(resource = "daemon", value = 1, mod="+")
                    ,
                    trajectory("update incr start")
                    %>% set_global("incr_start",now(env))
                    %>% log_("Queue count is now above increment count. Starting increment timer.")
                    ,
                    trajectory("do nothing")
                    %>% log_("Did not meet increment criteria. Doing nothing.")
         )
         ,
         trajectory("decrement?")
         %>% branch(option = function() if (now(env) - get_global(env,"decr_start") >= decr_delay
                                            & get_capacity(env,"daemon") > min_daemons) {1}
                                        else if (get_global(env,"decr_start")==99999) {2}
                                        else {3}
                    ,
                    continue = c(T, T, T)
                    ,
                    trajectory("decrement")
                    %>% log_(function () {paste("Queue size is: ",get_queue_count(env,"daemon"))})
                    %>% log_(function () 
                      {paste("Queue has been less than count for ",now(env)-get_global(env,"incr_start")," seconds.")})
                    %>% set_capacity(resource = "daemon", value = -1, mod="+")
                    ,
                    trajectory("update decr start")
                    %>% set_global("decr_start",now(env))
                    %>% log_("Queue count is now below decrement count. Starting decrement timer.")
                    ,
                    trajectory("do nothing")
                    %>% log_("Did not meet decrement criteria. Doing nothing.")
         )
         ,
         trajectory("reset_timer")
         %>% log_("Did not meet criteria to increment or decrement. Resetting timers.")
         %>% set_global("decr_start", values = 99999)
         %>% set_global("decr_start", values = 99999)
  ) %>%
  seize("daemon") %>% 
  log_("Now running") %>%
  log_(function () {paste(get_queue_count(env,"daemon")," runs in the queue.")}) %>%
  timeout_from_attribute("service") %>% 
  release("daemon") %>% 
  log_("Run complete")

env %>% 
  add_dataframe("run", run, arr,time="absolute") %>% 
  run(200)

I need to do some more debugging to verify that the simulation is working as I intended, but I fully understand that this model is wrong. I can hope that the design doesn't compromise it's validity too much, but I also want to get feedback on how I could design something that's truer to real life.

Jack Rossi
  • 21
  • 1
  • 1
    What is the question? Where is your model going wrong? How do you know it is wrong? – Corey Levinson Mar 26 '19 at 19:13
  • Paragraphs 5 and 6 are the question. My code executes without error, but the model is not correct, and I can't figure out how to implement it correctly with the standard simmer tools. The true system is constantly checking for times when the queue exceeds a range of lengths for a certain amount of time, at which point it adjusts the capacity of the server. My model is only capable of making this adjustment when there is a new arrival. – Jack Rossi Mar 26 '19 at 19:55

1 Answers1

2

Checking the status at regular intervals defeats the entire purpose of having discrete events. We have here an asynchronous process, so the proper way to model this is by using signals.

We need to ask ourselves: when could the queue...

  1. increase? When an arrival hits the seize() activity. So before trying to seize the resource, we need to check the number of enqueued arrivals and act accordingly:

    • If it's equal to decr_count, a signal must be sent to cancel any attempt to decrease the server's capacity.
    • If it's equal to incr_count - 1, a signal must be sent to request an increase of the server's capacity.
    • Do nothing otherwise.
  2. decrease? When an arrival is served (i.e., continues to the next activity following seize(). So after seizing the resource, we also need to check the number of enqueued arrivals:

    • If it's equal to incr_count - 1, a signal must be sent to cancel any attempt to increase the server's capacity.
    • If it's equal to decr_count, a signal must be sent to request a decrease of the server's capacity.
    • Do nothing otherwise.

The procedure for checking the number of enqueued arrivals and deciding what kind of signal is needed, if any, can be bundled in a reusable function (let's call it check_queue) as follows:

library(simmer)

env <- simmer()

check_queue <- function(.trj, resource, mod, lim_queue, lim_server) {
  .trj %>% branch(
    function() {
      if (get_queue_count(env, resource) == lim_queue[1])
        return(1)
      if (get_queue_count(env, resource) == lim_queue[2] &&
          get_capacity(env, resource)    != lim_server)
        return(2)
      0 # pass
    },
    continue = c(TRUE, TRUE),
    trajectory() %>% send(paste("cancel", mod[1])),
    trajectory() %>% send(mod[2])
  )
}

main <- trajectory() %>%
  check_queue("resource", c("-", "+"), c(decr_count, incr_count-1), max_daemons) %>%
  seize("resource") %>%
  check_queue("resource", c("+", "-"), c(incr_count-1, decr_count), min_daemons) %>%
  timeout_from_attribute("service") %>%
  release("resource")

In this way, the main trajectory is quite simple. Then, we need a couple of processes to receive such signals and increase/decrease the capacity after some delay:

change_capacity <- function(resource, mod, delay, limit) {
  trajectory() %>%
    untrap(paste("cancel", mod)) %>%
    trap(mod) %>%
    wait() %>%
    # signal received
    untrap(mod) %>%
    trap(paste("cancel", mod),
         handler = trajectory() %>%
           # cancelled! start from the beginning
           rollback(Inf)) %>%
    timeout(delay) %>%
    set_capacity(resource, as.numeric(paste0(mod, 1)), mod="+") %>%
    # do we need to keep changing the capacity?
    rollback(2, check=function() get_capacity(env, resource) != limit) %>%
    # start from the beginning
    rollback(Inf)
}

incr_capacity <- change_capacity("resource", "+", incr_delay, max_daemons)
decr_capacity <- change_capacity("resource", "-", decr_delay, min_daemons)

Finally, we add the resource, our processes and the data to the simulation environment:

env %>%
  add_resource("resource", min_daemons) %>%
  add_generator("incr", incr_capacity, at(0)) %>%
  add_generator("decr", decr_capacity, at(0)) %>%
  add_dataframe("arrival", main, data)

Please note that I didn't check this code. It may require some adjustments, but the general idea is there.

Iñaki Úcar
  • 935
  • 1
  • 5
  • 11
  • Thanks so much for this! I found your explanation to be extremely helpful, and you code to be remarkably functional. The only thing I had to change was the check function in the rollback, so that it again checks that the queue is greater than increment count or less than decrement count. – Jack Rossi May 09 '19 at 15:45