4

I have some experience with LMAX Disruptor and I'd really like to implement a custom actor mailbox using disruptor.

Are there any guidelines? Is it even possible? What are the limitations of Akka's actor mailboxes?

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
herburos
  • 118
  • 11

1 Answers1

3

As it said here you just have to implement a few methods - of course you should write/read messages directly using your pointer to the ring buffer. You should also keep in mind:

  • disruptor usually preallocates big amount of memory, so using one disruptor per actor is bad idea, you may use one router actor (with disruptor inside) in combination with BalancingPool.

  • if you want to have different message type consumption, separate consumers for journaling, repair etc. - you should pass different RingBufferPointer (smthng-like) instance as parameter to your mailbox (with same start-value for journaling, different start-value for different message-types), but still use one Disruptor. So different mailboxes will refer to one disruptor.

  • you will loose low-level control on message creating, extracting, etc, so no batched allocation by default.

  • you could also use the history from ring to restore failed actor's state (in preRestart or in supervisor).

What LMAX says:

It works in a different way to more conventional approaches, so you use it a little differently than you might be used to. For example, applying the pattern to your system is not as simple as replacing all your queues with the magic ring buffer. We've got code samples to guide you, a growing number of blogs and articles giving an overview of how it works, the technical paper goes into some detail as you'd expect, and the performance tests give examples of how to use the Disruptor http://mechanitis.blogspot.com/2011/06/dissecting-disruptor-whats-so-special.html

And here is a short Queues/Disruptors/Actors comparison

In pseudo-scala-code it will be something like:

object MyUnboundedMailbox {
  val buffer = new RingBuffer()

  class MyMessageQueue(val startPointer: Pointer, readerPointer: Pointer, writerPointer: Pointer) extends MessageQueue {

    // these should be implemented; queue used as example
    def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
      writerPointer.allocate(() => handle) //allocate one element and set, if you want different message types - you should allocate big amount of data before and block when it ends (to not interfere with another messages), so it has to be bounded queue then  

    }
    def dequeue(): Envelope = readerPointer.poll()
    def numberOfMessages: Int = writerPointer - readerPointer //should be synchronized
    def hasMessages: Boolean = readerPointer == writerPointer //should be synchronized
    def cleanUp(owner: ActorRef, deadLetters: MessageQueue) { }
  }

  trait MyUnboundedMessageQueueSemantics 

}

class MyUnboundedMailbox(settings: ActorSystem.Settings, config: Config) extends MailboxType
  with ProducesMessageQueue[MyUnboundedMailbox.MyMessageQueue] {

  import MyUnboundedMailbox._
  final override def create(owner: Option[ActorRef],
                            system: Option[ActorSystem]): MessageQueue = {

    val pointer = ring.newPointer
    val read = pointer.copy
    val write = pointer.copy
    new MyMessageQueue(pointer, read, write) 
  }
    // you may use another strategy here based on owner (you can access name and path here), 
    // so for example may allocate same pointers for same prefixes in the name or path 
}

You can use unchanged MyMessageQueue.startPointer to access message log during failure recovery (you may also look at akka's Event Sourcing for analogy).

Using UnboundedQueue approach doesn't guarantee message delivery here as very old undelivered message may be overwritten with new version if the ring "ends", so you may want BoundedQueue, like here.

Community
  • 1
  • 1
dk14
  • 22,206
  • 4
  • 51
  • 88