Sunday, August 28, 2011

firing and forgetting... Akka continuations...

Strange title. I found myself with my hands full for a lot of time,and dumbly had nothing to talk about. Reading Scala In Action, Clojure In Action, and reading the Haskell books on the table will take a lot of time.Don't forget I am getting crazier about Lisp. So what ? I found myself worried because I dropped what I had started reading about Akka.
Like many people, if I do not practice regularly I have to rollback again to the very beginning of the study and try again. Unfortunate. This time I chose to come back to the basic and bought Gul Agha book about the Actor Model paradigm. This will help me in parallel to acquire some inherent knowledge about the paradigm while reaching the last chapter of Scala in action, about fault tolerant systems and all the rest. This is my purpose with Akka actors. Today's purpose is a "back-to-the-basics" reminder.

If by any chance Jonas Boner reads this article , may he forgive me for publishing a so basic level example, Akka deserves the best. Starting again on Akka, I downloaded the 1.2RC3 version and decided in order to learn my way again to implement one of the examples of the chapter 3. The famous recursive factorial example. Although this is not exactly the subject of my article, as a reminder, this is how a recursive factorial can be processed from an actor point of view:



The actor 1 receives a message containing the value of the number n to be processed.
On each message reception:
  • it does create (dashed arrow) some new actor that will accumulate a number k
  • decrements the received number and resend to itself a message if the new number is not 1
  • otherwise sends a process message to the last created accumulated actor which is storing 1

Accumulating actors will expect for a processing message containing a number to be multiplied by the number they are actually storing. They will propagate the result to the accumulating actor containing
the next number or send back the result. So In the case you would want to process factorialOf(3), then the accumulating actor 2 would store 3, the actor 3 would store 2 and the fourth would store 1.

When actor 1 fires a message to actor 4 in order to complete the operation then the whole chain is processed and the result is sent to actor 6.

I willingly implemented a fork solution splitting the task.

I loved the idea although the performances are very poor. I loved it because this model of chaining actors reflects the genuine Alan Kay idea of object oriented message passing. Thanks to Akka you really create instances of objects, each owning a single responsibility. That is Object Oriented programming a la Smalltalk and not class oriented programming a la "whatever you want" (please don't misunderstand me, I love Java too).

I did not like the performances to be poor (5 times a tail recursive version) so I decided to reduce the overload of message passing reducing the number of accumulating actors. The bigger your number to process, the bigger your number of accumulating actors, so your process time grows exponentially with the input number value. I created a fork of the tasks to process:


The input actor 1 branches the looping actors 2, 4, 6. The looping actors creates their storing actors and at the end of the process all the work is sent to the continuing actor 8. Its purpose being to synchronize all the jobs, as being also a mean of control afterwards, this actor for me is nothing more than a continuation. This observation, lead me to consider a 1977 article of Carl Hewitt, where this taxonomy is used. Still reading this fascinating article.

In order to challenge the system I created a first set of tests, and for comparing purpose, implemented both a tail recursion solution and the actor solution. Here come the tests:

import org.specs2._
import MathBox._
import akka.actor.Actors
import org.perf4j.log4j.Log4JStopWatch

final class FactorialSpecification extends Specification {
  def is =
    "Factorial Specification" ^
      p ^
      "Tail recursive factorial should" ^
      "provide the expected result !!! " ! e1 ^
      p ^
      "Acting Factorial should" ^
      "be processed equal to tail recursive one " ! AssertOnActors(2048).e1


  def e1 = factorialOf(5).should(beEqualTo(120))

  case class AssertOnActors(number: Int) extends specification.After {
    def e1 = {
      val stopWatch = new Log4JStopWatch();
      Range(0, 1000).foreach {
        value =>
          actingFactorialOf(number)
          stopWatch.lap("parallel");
          factorialOf(number)
          stopWatch.lap("serial");
      }
      actingFactorialOf(number).should(beEqualTo(factorialOf(number)))
    }

    def after {
      Actors.registry().shutdownAll()
    }
  }

}


And yes although I do not like frameworks, because it makes you dumb, I used perf4j to measure performances (so I am a little dumb). The assertions validate the processing of the tail recursive factorial and, then validate the actors solution thanks to the tail recursive one.

The entry points of the processing are located into the MathBox object:

import annotation.tailrec

object MathBox {
  def actingFactorialOf(number: Int): BigInt = {
    val factorial = FactorialActor()
    (factorial ? FactorialOf(number)).get.asInstanceOf[FactorialResult] match {
      case FactorialResult(result) =>   result
      case _ => -1
    }
  }


  def factorialOf(number: BigInt): BigInt = {
    @tailrec
    def recFactorialOf(accumulated: BigInt, counter: BigInt): BigInt =  {
      if (counter <= 1) accumulated
      else recFactorialOf(accumulated * counter, counter - 1)
    }

    recFactorialOf(1, number)
  }
}

Using the Scala embedded @tailrec annotation verifies that the method will be compiled with tail call optimization, and if not will throw some exception. As I did not want to clutter the tests with timing I used the new symbol provided in the 1.2-RC3 to get a future after sending a message: the '?'. What I do send is a FactorialOf(number) message while expecting a FactorialResult(result). Nothing terrific.
I told you this article was easy, but I had to write something blah blah blah... :)
 So comes the solution:

import akka.actor.Actor._
import akka.actor.{PoisonPill, ActorRef, UntypedChannel, Actor}
import scala.Option

case class FactorialOf(number: Int)

case class FactorialResult(result: BigInt)

case class StoreResult(number: Int, channel: Option[UntypedChannel])

case class Cumulate(number: Int)

case class WorkOn(min: Int, max:Int, next: UntypedChannel)


class AccumulatingActor(val target: UntypedChannel) extends Actor {
  var stored: BigInt = 1

  protected def receive = {
    case Cumulate (operand)=> stored = stored * operand
    case FactorialResult(operand) =>
      target ! FactorialResult(stored * operand)
      self.getChannel ! PoisonPill
    case _ =>
  }
}
object AccumulatingActor {
  def apply(target: UntypedChannel): ActorRef = {
    actorOf(new AccumulatingActor(target)).start()
  }
}

class LoopingActor extends Actor {
  var min: Int = _
  var intermediate: UntypedChannel = _

  protected def receive = {
    case WorkOn(lower, max, target) =>
      min = lower
      intermediate = AccumulatingActor(target)
      become(decrementer)
      self ! FactorialOf(max)
  }

  protected def decrementer: Receive = {
    case FactorialOf(number) =>  {
      intermediate ! Cumulate(number)
      if (number > min) {
        self ! FactorialOf(number - 1)
      } else {
        intermediate ! FactorialResult(1)
     }
    }
  }
}

object LoopingActor {
  def apply() = actorOf[LoopingActor].start()
}

class ContinuingActor(val expectedMessages: Int,val  target: UntypedChannel) extends Actor {
  var counter = expectedMessages
  var result: BigInt = 1

  protected def receive = {
    case FactorialResult(value) =>
      counter = counter - 1
      result = result * value
      if (counter == 0) {
        target ! FactorialResult(result)
        self ! PoisonPill
      }
  }
}

object ContinuingActor {
  def apply(expectedMessages: Int,target: UntypedChannel) = {
    actorOf( new ContinuingActor(expectedMessages, target)).start()
  }
}

class FactorialActor extends Actor {
  def coreNumber = Runtime.getRuntime.availableProcessors() * 8;

  def dispatch(number: Int, bunch: Int, channel: UntypedChannel) {
    bunch match  {
      case 0 =>
        LoopingActor() ! WorkOn(1, number, Some(ContinuingActor(1, channel)))
      case _ =>
        val continuation = ContinuingActor(coreNumber, channel)
        val lastBoundary = Range(0, coreNumber - 1).foldLeft(0){ (acc, step) =>
          LoopingActor() ! WorkOn(acc + 1, acc + bunch , continuation)
          acc + bunch
        } + 1

        LoopingActor() ! WorkOn(lastBoundary, number, continuation)
    }
  }

  def asBunch(forMax: Int): Int = forMax / coreNumber

  protected def receive = {
    case FactorialOf(number) => dispatch(number, asBunch(number), self.getChannel)
  }
}

object FactorialActor {
  def apply() = {
    actorOf[FactorialActor].start()
  }
}

Waoooooooh !! Too long. Don't panic. There is no challenge in it. Let us start with the FactorialActor actor !

class FactorialActor extends Actor {
  def coreNumber = Runtime.getRuntime.availableProcessors() * 8;

  def dispatch(number: Int, bunch: Int, channel: UntypedChannel) {
    bunch match  {
      case 0 =>
        LoopingActor() ! WorkOn(1, number, Some(ContinuingActor(1, channel)))
      case _ =>
        val continuation = ContinuingActor(coreNumber, channel)
        val lastBoundary = Range(0, coreNumber - 1).foldLeft(0){ (acc, step) =>
          LoopingActor() ! WorkOn(acc + 1, acc + bunch , continuation)
          acc + bunch
        } + 1

        LoopingActor() ! WorkOn(lastBoundary, number, continuation)
    }
  }

  def asBunch(forMax: Int): Int = forMax / coreNumber

  protected def receive = {
    case FactorialOf(number) => dispatch(number, asBunch(number), self.getChannel)
  }
}

object FactorialActor {
  def apply() = {
    actorOf[FactorialActor].start()
  }
}

There, on reception of a FactorialOf(number) message:

protected def receive = {
    case FactorialOf(number) => dispatch(number, asBunch(number), self.getChannel)
  }

The FactorialActor
  • creates the continuation
  • fork the looping actors 
The FactorialActor grossly cut the number into bunches or range of number to be processed using the raw heuristic:

def coreNumber = Runtime.getRuntime.availableProcessors() * 8;

As all of the actors in this example, the FactorialActor instance is created using a companion object. The created looping actors are started using a WorkOn message:

LoopingActor() ! WorkOn(start, end , continuation)

holding the inclusive boundaries of the range to work on, and the continuation to be passed the results to. So what about the continuation ?

class ContinuingActor(val expectedMessages: Int,val  target: UntypedChannel) extends Actor {
  var counter = expectedMessages
  var result: BigInt = 1

  protected def receive = {
    case FactorialResult(value) =>
      counter = counter - 1
      result = result * value
      if (counter == 0) {
        target ! FactorialResult(result)
        self ! PoisonPill
      }
  }
}

object ContinuingActor {
  def apply(expectedMessages: Int,target: UntypedChannel) = {
    actorOf( new ContinuingActor(expectedMessages, target)).start()
  }
}

As a good kamikaze actor, the continuing actor will store the incoming partial results, finalizing the multiplication in order to send it to its target, before committing suicide with a poison pill.
Therefore the continuing actor also plays the part of a forwarding actor sending back the result to the client, hiding its reference form the other actors view.
We never have to send back the result to previous actor like we do in standard fork/join frameworks because we have the control as a continuation.
So, we avoid cluttering the code of the pattern matching in the emitting previous actors preventing them from handling a return result.

 Remain two actors more. First the looping actor in charge of looping into the range boundaries:

class LoopingActor extends Actor {
  var min: Int = _
  var intermediate: UntypedChannel = _

  protected def receive = {
    case WorkOn(lower, max, target) =>
      min = lower
      intermediate = AccumulatingActor(target)
      become(decrementer)
      self ! FactorialOf(max)
  }

  protected def decrementer: Receive = {
    case FactorialOf(number) =>  {
      intermediate ! Cumulate(number)
      if (number > min) {
        self ! FactorialOf(number - 1)
      } else {
        intermediate ! FactorialResult(1)
     }
    }
  }
}

object LoopingActor {
  def apply() = actorOf[LoopingActor].start()
}

I chose to apply a behavior hot swap, splitting the behavior of being initialized from the behavior of self looping on decremented FactorialOf(number). The hot swap can be managed using the nice become method switching to the decrementer behavior.
During initialization, a freshly created AccumulatingActor instance is passed the continuation and the swap is achieved The decrementer behavior self loop until the lower boundary is reached , then the looping actor fires a message of end of calculus using a FactorialResult(1) message.
A FactorialResult(1) message flags the end of a processing for a range. One should notice that in his 1986 book, Gul Agha already exposes the change of behavior presenting some prototype language which syntax is very close to Erlang or Akka syntaxes. On the Accumulating actor side:

class AccumulatingActor(val target: UntypedChannel) extends Actor {
  var stored: BigInt = 1

  protected def receive = {
    case Cumulate (operand)=> stored = stored * operand
    case FactorialResult(operand) =>
      target ! FactorialResult(stored * operand)
      self.getChannel ! PoisonPill
    case _ =>
  }
}
object AccumulatingActor {
  def apply(target: UntypedChannel): ActorRef = {
    actorOf(new AccumulatingActor(target)).start()
  }
}

the accumulating actor simply processes a multiplication and forward the result to the continuation actor. 
We are done. Execution, tests green (joking, it took me several shots ;)) A sample of average statistics

Performance Statistics
Tag          Avg(ms)         Min         Max     Std Dev       Count
parallel     3,6             3            19         1,0         485
serial       12,1            11           73         2,9         485

Performance Statistics  
Tag          Avg(ms)         Min         Max     Std Dev       Count
parallel     10,1           3           1082        77,8         191
serial       12,2           12            18         0,7         191

Performance 
Tag          Avg(ms)         Min         Max     Std Dev       Count
parallel     3,7              3           6          0,8         588
serial       12,2            11          66          2,3         587


exhibits warm up phases, than in nominal conditions a nice ratio from 1/4 concerning the increase of performances. Note that the tail recursion performances (serial tag) are very close to the loop performances I benched using Java.

I realised experimentations with the ForkJoin Framework, as I was using JDK7 in order to run Scala (2.9.1RC4). Performances are really close in favor to the ForkJoin framework, but one should notice that I did not try to bench greater values and did not respect a very serious protocol.

The nice thing, as far as I am concerned, is the ability to adopt a perfectly understandable object oriented approach with the actor paradigm. We used CPS as in functional programming languages, we have hidden the client channel, we have separated concerns etc...

Don't know what the end of the book will reveal but can't wait to read it. Hoping the next time we will talk about actors will be about fault tolerance, have a nice day.

Be seeing you :)!!!!




2 comments:

Chris Kelly said...

Great article. Very informative. I'm also reading Scala in Action as well as another MEAP book, Scala in Depth. It pays to have a different perspective sometimes :).

Globulon said...

Thanks for your comment :). These two books are really nice. I enjoyed all the already written chapters in Scala In Depth. I found the one on the actors tremendous. MEAP is a very good idea indeed, so one can read the book while it is written. Pragmatic bookshelf does that too and I had the opportunity to buy and read Programming Concurrency on the JVM by Venkat Subramaniam there. Recommended too :)

Post a Comment