Wednesday, January 25, 2012

A Start Trek firing Disruptors from Scala

Hello again.

 Today I want to share some of my ramblings while exploring a little bit the Disruptor framework from a Scala eye. It is never too late. This little inspection was encouraged by David Greco, and Martin Krasser kindly suggested me to also have a look to a Scala port implemented by Jamie Allen. 

Before dissecting this last reference I decided to read the classic stuff on the subject, from the seminal technical article up to Trisha Gee's blog but also read again the very interesting articles written by Martin Thompson here. In order to get a better understanding I also suggest this reading and that one. Naturally on the pile of all that documentation do not forget Martin Fowler's must read production over here

Everything has been said about the Disruptor and I really advocate you to read about the subject before reading the incoming article if interested. My incoming summary in a few line is going to be too harsh to be considered as a serious presentation, and I hopefully expect that some of the experts will read one day the article so to correct my misconceptions. 

Roughly cloning Martin Fowler's article introduction let say that,as all financial trading platforms, the LMAX platform relies on business logic core components that must interact with the outside world as fast as possible, supporting a throughput of dozen of millions of messages per second. 

The chosen approach of the implementation find its name in the disruptive way of thinking that one must adopt so to understand the internals of the Disruptor.  Working with disruptors involves first thinking  "asynchronous", then accepting that simple and elegant solutions to exchange messages can be better than messaging based on queues. The pre-cited articles explain in a detailed manner the why and of the how at the origin of the astounding high level of performance of Disruptors compared to standard queue systems. 

The core business logic is surrounded by Disruptors. At the heart of the Disruptor, we find a ring buffer as the corner stone of the architecture, fed with one or more producers, and feeding one or more readers. And guess what, the one producer/ many readers configuration is the optimum combination.
Readers and producers run in their own thread of execution. We achieve the communication between readers/producers and the ring Buffer through barriers created by the ring buffer, although in the former case the producer(s) barrier is merged with the ring buffer. 
The fixed size ring buffer creates all its indexed slots during instantiation, the memory being allocated once and for all avoiding annoying GC's. peaks.
The producer claims for the available slots index, get the matching slots, updates the slot's content, and commit them. '
A reader waits through its sequence barrier for the available slots. 
On producer side, events are created by an event factory. 
On the consumer side, event handlers process the dispatched events.

 The framework produces a lot of support in order to ease the creation of typical consumers'configurations like pipe line configurations:


or diamond configuration for example:


Notice that the performance tests implement these configurations in the Disruptor source code. 

Before diving into the Scala port, I wanted to reproduce some configurations in Scala, the code exploration allowing me to get a better understanding of how thing goes. This is me: I need a client API point of you before understanding it all.

I decided not to adopt the Disruptor wizard approach and to dive down to the layer just under the wizard, creating by myself the barriers, handlers and so on. BatchEventProcessor remains the only generic class definition I kept in my explorations. Roughly said, a BatchEventProcessor instance while executed in its own thread of execution, takes in charge the access to the sequence barrier, getting the event, and passing them to an EventHandler instance. So a BatchEventProcessor basically impersonates a consumer.

The (little code) code project I produced is located here (yes a Github project finally :)). What I want to run are little scenarii reproducing some of the configurations. Compared to tests I over simplified the design. The (very basic) event management is hosted in the body othe the EventModule object declaration:


package com.promindis.disruptor.adapters
import com.lmax.disruptor._
import java.util.concurrent.CountDownLatch
import util.PaddedLong

object EventModule {
  class ValueEvent() {
    val value = new PaddedLong()

    def setValue(newValue: Long) {
      value.set(newValue)
    }

    def getValue = value.get()
  }

  object ValueEvent {
    def apply() = new ValueEvent()
  }

  object ValueEventFactory extends EventFactory[ValueEvent]{
    def newInstance(): ValueEvent = ValueEvent()
  }

  case class Handler(name: String, expectedShoot: Long = 0, latch: Option[CountDownLatch] = None) extends EventHandler[ValueEvent] {
    var counter = 0L

    def onEvent(event: ValueEvent, sequence: Long, endOfBatch: Boolean) {
      counter += 1L
      for (l <- latch if (counter == expectedShoot) ) {
        l.countDown()
      }
    }

    override def toString = "[" + name   + ": counter => " + counter  + "]"
  }

  def fillEvent(event: ValueEvent): ValueEvent = {
    event.setValue(1234);
    event
  }

}

Our events wrap a long value using the LMAX PaddedLong class definition. Handlers will count the numbers of received instances. I used the counter for personal manual testing and checking all the events where passed to handlers. I introduce some optional countdown latch, very useful to track the time when the last consumer in my configurations has achieved its duty. What we basically need, is to measure the times of execution. So we need something like that:

package com.promindis.disruptor.adapters

import System._

object TimeMeasurement {

  implicit def toFrequency(value: Long) = new {
    def throughput(during: Long): Long = {
      (value * 1000L) / during
    }
  }

  def sampling(block: => Unit) = {
    val start = currentTimeMillis
    block
    val end = currentTimeMillis
    new {
      def provided[T](f: Long => T) = {
        f(end - start)
      }
    }
  }
}

The usage being:

 measured {
   doYourStuff()
 } getting { elapsedTime =>
  doSomethingWith(elapsedTime)}

The BatchEventProcessor instances must be run into their own thread of execution. The implicit throughput method will allow for the application of a small mean calculus to any Long in the scope of the definition.

Therefore, It would be nice if we could run our tests without having to take into account all that blurry management of
  • creating a thread executor service
  • Running the batch processors
  • halting the processors
  • halting the executor
I propose something like:

package com.promindis.disruptor.adapters

import com.lmax.disruptor.BatchEventProcessor
import java.util.concurrent.Executors._

object ProcessorLifeCycle {

  def executing[T](processors: BatchEventProcessor[T]*)(block: => Unit) {
    val executor = newFixedThreadPool(processors.length)
    processors.foreach {executor.execute(_)}
    try {
      block
    } finally {
      processors.foreach{_.halt()}
      executor.shutdown()
    }
  }

}

a typical usage being:

executing(processors) {
   scenario
}

So far so good. I have all the tools I need to run a Scenario. The * character at the end of the typing declaration of the first parameter expresses a variable number of argument processors. Why not creating a trait to gather all the basics:

package com.promindis.disruptor.configurations
import com.lmax.disruptor.BatchEventProcessor
import com.promindis.disruptor.adapters.TimeMeasurement._
import com.promindis.disruptor.adapters.ProcessorLifeCycle._


final case class Configuration(
  ringBufferSize: Int = 1024 * 1024,
  iterations: Long = 1000L * 1000L * 25L,
  runs: Int  = 5
)

trait Scenario {

  final def playWith[T](processors: Seq[BatchEventProcessor[T]])(bench: => Unit)(implicit config: Configuration) = {
    sampling {
      executing(processors:_*) {
        bench
      }
    } provided {
      config.iterations.throughput(_)
    }
  }

  def challenge(implicit configuration: Configuration): Long

  def run(implicit config: Configuration): Seq[Long] =  {
    val config = Configuration()
    for (_ <- 1 to config.runs) yield challenge(config)
  }

  def main(args: Array[String]) {
    run(Configuration())
      .foreach{value => println("Nb Op/s: " + value)}
  }
}

An Scenario allows to play with a set of processors a (micro!) bench execution. A Configuration case class instance groups all what's needed to play multiple times a fixed number of iterations. Notice that we use our implicit throughput method application on the result of time measurement so to return after a play the measured number of messages per second.
The run method is the entry point of the little bench execution, returning a list of throughput values, their number also being set by an input configuration instance.

No better start than implementing the most simple possible example: a producer fills a ring buffer while a consumer handles the events. This code sample will provide us with an overview of the main class involved in the process:

package com.promindis.disruptor.configurations.unicast

import com.promindis.disruptor.adapters.RingBufferFactory._
import com.lmax.disruptor.BatchEventProcessor
import com.promindis.disruptor.adapters.EventModule.{Handler, ValueEvent, ValueEventFactory}
import java.util.concurrent.CountDownLatch
import com.promindis.disruptor.adapters.{EventModule, Shooter}
import com.promindis.disruptor.configurations.{Configuration, Scenario}


object UnicastWithShooter extends Scenario{

  override def challenge(implicit config: Configuration) = {
    val rb = ringBuffer(ValueEventFactory,size = config.ringBufferSize);

    val barrier =  rb.newBarrier()
    val countDownLatch = new CountDownLatch(1)
    val handler = Handler("P1", latch = Some(countDownLatch), expectedShoot = config.iterations)
    val processor = new BatchEventProcessor[ValueEvent](rb, barrier, handler);
    rb.setGatingSequences(processor.getSequence)

    val shooter = Shooter(config.iterations, rb, EventModule.fillEvent)

    playWith(List(processor)){
      shooter ! 'fire
      countDownLatch.await()
    }
  }
}

From the creation of the barrier to the settings of the ring buffer gating sequence, the trained eye can recognize a typical Disruptor code pattern. But what is a Shooter ? I defined a class able to feed the ring buffer, from its own thread of execution and taking a minimum number of parameters, aka, the number of iterations, a ring buffer and a way to set up a created event (fillEvent method). 

There comes the Scala Actor "native" implementation:

package com.promindis.disruptor.adapters

import com.lmax.disruptor.RingBuffer
import actors.Actor

class Shooter[T](numberOfShoot: Long, val ringBuffer: RingBuffer[T], val eventStrategy: T => T) extends Actor {
  self =>

  implicit def adapted[T](ringBuffer: RingBuffer[T]) = new {

    def shoot(update: T => T) {
      val (sequence, event) = nextEventStructure(ringBuffer)
      update(event)
      ringBuffer.publish(sequence);
    }

    def nextEventStructure[T](rb: RingBuffer[T]): (Long, T) = {
      val sequence = rb.next();
      (sequence, rb.get(sequence));
    }
  }

  def act() {
    react {
      case 'fire =>
        for (i <- 1L to numberOfShoot) {
          ringBuffer.shoot(eventStrategy)
        }
        self.exit()
    }
  }
}

object Shooter {
  def apply[T](numberOfShoot: Long, ringBuffer: RingBuffer[T], fillEvent: T => T) =
    new Shooter(numberOfShoot, ringBuffer, fillEvent).start()
}

If you did not read Philip Haller and Franck Sommers' book on actors and actors in Scala...you should :). In order to shorten the size of the react method implementation I used an implicit method declaration allowing for an "ad-hoc" extension of the ring buffer API, allowing it to shoot some event update strategy by itself. 
After all who better than a ring buffer knows how to get a sequence and publish it ?
Although not comfortable with implicits, I do not find disturbing using them very close to their context of application, specifically when they help in solving a kind of expression problem

When it comes to reuse the same patterns of code I can become very lazy (as a lot of people). Having a little time at home I decided I could simplify a little bit the creation of other configurations. What if I could create a pipe line configuration like that :

val chain = for {
  _ <- pipe[ValueEvent](Handler("C1"), rb)
  _ <- pipe[ValueEvent](Handler("C2"), rb)
  _ <- pipe[ValueEvent](Handler("C3", latch = Some(countDownLatch), expectedShoot = config.iterations), rb)
}

in order to pipe three consumers, just getting back the BatchEventProcessors paired with the handlers for purpose testing. 
This is when I switched to panic mode. Of course we are building a state configuration for the ring buffer so we have to meet again with the state monad we studied last time. Implementing it was hard the first time, not being sure I had understood everything about it. I have difficulties with the sate monad, so here comes an opportunity to learn how to use the pattern...again 

Everything I need can be gathered in the pipe method, located in the Builder object:

package com.promindis.disruptor.adapters
import com.promindis.disruptor.support.StateMonad
import com.lmax.disruptor.{SequenceBarrier, BatchEventProcessor, RingBuffer, EventHandler}


object Builder {
  type Context[X] = (BatchEventProcessor[X], EventHandler[X])

  def pipe[T](handler: EventHandler[T], rb: RingBuffer[T]) = new StateMonad[SequenceBarrier, List[Context[T]] ] {
    def apply(list: List[Context[T]]) = {
      list match {
        case (p::ps)=>
          val newBarrier = rb.newBarrier(p._1.getSequence)
          val newProcessor = new BatchEventProcessor[T](rb, newBarrier, handler)
          (newBarrier, (newProcessor, handler)::p::ps)
        case _ =>
          val newBarrier = rb.newBarrier()
          val newProcessor = new BatchEventProcessor[T](rb, newBarrier, handler)
          (newBarrier, (newProcessor, handler) :: Nil)
      }
    }
  }
}

Aliasing a pair (BatchEventProcessor/EventHandler) as a Context for a given Event type, I define this type alias as my mutable state.
We are going to pile each step, because we need all the created processors at the end of the configuration execution, and maybe we need the handlers too so to check their internal counters. The value returned by the application of state Monad instance will be a sequence barrier. Returning a sequence barrier can be useful when attaching multiple consumers to the same barrier.

Ideally we would need only the processors to submit them in some executor service. Piping the consumers consists in creating the barrier of one consumer using the sequence of the previous one, so the former can handle an event after the latter did. I advocate you once more to study a little bit the articles I provided the links to, before understanding the implementation of the tool methods used for configuration in the Builder object. As a result the implementation of a pipeline becomes:

package com.promindis.disruptor.configurations.pipeline

import com.promindis.disruptor.adapters.RingBufferFactory._
import java.util.concurrent.CountDownLatch

import com.promindis.disruptor.adapters.EventModule.{Handler, ValueEvent, ValueEventFactory}
import com.promindis.disruptor.configurations.{Configuration, Scenario}
import com.promindis.disruptor.adapters.{Builder, Shooter, EventModule}
import Builder._


object Pipe_Line extends Scenario{

  def challenge(implicit config: Configuration) = {
    val rb = ringBuffer(ValueEventFactory,size = config.ringBufferSize);
    val countDownLatch = new CountDownLatch(1)

    val chain = for {
      _ <- pipe[ValueEvent](Handler("C1"), rb)
      _ <- pipe[ValueEvent](Handler("C2"), rb)
      _ <- pipe[ValueEvent](Handler("C3", latch = Some(countDownLatch), expectedShoot = config.iterations), rb)
    } yield ()

    val consumers = chain(List())._2
    val processors = consumers.unzip._1
    rb.setGatingSequences(processors.head.getSequence)
    val shooter = Shooter(config.iterations, rb, EventModule.fillEvent)

    playWith(processors){
      shooter ! 'fire
      countDownLatch.await()
    }

  }
}

which is a far shorter than the first implementation, believe me :). 

Returning a list of tuples (BatchEventProcessor/EventHandler), requires a little of trickery with the unzip method, but that cluttering can be easilly removed, because except for testing we do not need to get back the handlers. 
The Github repository project content provides two more methods used to build diamonds and I am certain, other possible variations on configurations. 

The declarative part of the configuration of a diamond looks like:

val diamond = for {
  barrier <- fork(Handler("C1"), rb, rb.newBarrier())
  _ <- fork(Handler("C2"), rb, barrier)
  _ <- join(Handler("C2", latch = Some(countDownLatch), expectedShoot = config.iterations), rb)
}

also far shorter in length than the very first implementation.

Thank you for following me at this point. 

Running my tests provides the same results as running the tests located in the LMAX source code in terms of throughput values. 
Having reduced the number of iterations to 25 millions and extended generously the ring buffer size, depending on the tested configuration, I get from 3.5 millions to 5 millions messages per second. 
My old machine not being sized for this kind of stress (only two cores) , the results values are smaller than the benchmarks' result values presented by the LMAX team. 
But my old laptop still runs five to ten time  faster the Disruptor tests than the message queue tests. I find the result quite interesting, being sure that I do not host any parallel desktop application on this laptop that can handle 5 million message per second or even one million message per second. 

Somehow adopting a disruptive mind could be a smart move not only in large scaled applications. 
That's all folks. 

 Be seeing you ! :):):)

1 comments:

vgosh media said...

such a nice post…I liked it.. We are Event management companies in chennai and Event organizing companies in chennai. All the best for your future post..

Post a Comment