Wednesday, June 29, 2011

Visiting Hanoï Towers with Scala and Akka

Well, I failed, I had to publish something, whatever it was about a stuff I learnt, even the smallest piece of information. My all and only fault, I read a lot, I hope learnt a lot. A few months ago I specifically decided to embrace functional programming, which is quite hard in France, because people there tend to think this is a domain of research only practiced by insane unaware strange geeks. By the way I don't care, because I can't trust people who never heard about Erlang nor Haskell, and I do not consider myself as a geek but a professional.
I have noticed with pleasure in last Uncle Bob's book (The Clean Coder) that he recommended that we, as professional developers, took risks outside of our domain of knowledge. Being blessed by the Master, let's roll. (It's a promise I will meet this guy some day =D)

I recently started reading the Akka 1.1.2 documentation (OMG I am late, they have upgraded up to 1.1.3 !!!), but also reading Scala In Depth from Joshua Suereth, and found my self overwhelmed, a shame for a guy trained by Jeff Sutherland. Moreover, during my everyday practice I decided to start with the basics, watching during my (smooth) early morning fitness session the Abelson and Sussman MIT lectures on video (you will find them here). LISP rocks, I mean it.
I understood I had reached my limits when I missed the point during lecture 2 or 3 about the Hanoi Towers. We have been used for so long to take for granted all the saying about the technical stuff we are using, that we are are not able from time to time to understand the most basic algorithms. That was vexing.

It was time to stop, rewrite the problem on a sheet of paper and then talk about it. That was the initial commitment. At that point the satiated readers will stop their reading. After all everybody's know about the Hanoi Towers.

For the others you will find a first required reading here. As a reminder, this mathematical game consist in moving a pile of ordered discs from one rod or peg to a second free rod, having one more third free rod and a set of rules to obey. Repeating the link content, the three rules are:
  • Only one disk may be moved at a time.
  • Each move consists of taking the upper disk from one of the rods and sliding it onto another rod
  • No disk may be placed on top of a smaller disk.
This is a so called intractable problem. The number of solutions for n discs is equal to 2^n - 1. That means that for a not-so-big number of discs, the age of the universe would not be long enough to identify all the moves (Yeah!).

There are two typical ways to solve the problem, one iterative, one recursive. All things being equal, and Scala being beautiful, I chose the recursive way I did not understand instantaneously during the lecture. Scala being a functional programming language the solution must be elegant.

In order to proceed, and choosing the same identifiers as in the referenced article, let's name the pegs A,B,C. Let then work with n discs.
I will always start with all the discs sliced onto rod A moving them to peg C. The article is crystal clear. We must start imagining we have nearly solved the problem.

Being manipulating the disc n, the last moves would be:

  1. move pile with the n - 1 discs above disc n from A to B
  2. move disc n from A to C
  3. move pile with the n - 1 discs from B to C

It is an open way to recursion. No ? Ok consider step 1, It can itself be decomposed into:

  1. move pile with the n - 2 discs above disc n - 2 from A to C
  2. move disc n - 1 from A to B
  3. move pile with the n - 2 discs from C to B

And so on... Got it ? If no, take some coffee, or go to sleep, whatever and come back. You will get it, no doubts, I trust you, we are the same.

I have my recursion, so now I must create my project. As usual I start a Maven 3.0 project into my new IntelliJ 107.xxx version. As I am thinking to work with Scala and Akka I set the following configuration into my pom.xml:


        
            scala-tools.org
            Scala-tools Maven2 Repository
            http://scala-tools.org/repo-releases
        
        
            Akka
            Akka Maven2 Repository
            http://akka.io/repository/
        
    

As a baby step I decide to grab back scalatest, in order to test it, so my dependencies are this time:


        
            
                org.scala-lang
                scala-library
                2.9.0-1
            
            
                junit
                junit
                4.8.2
                test
            
            
                org.scalatest
                scalatest_2.9.0
                1.6.1
                test
            
            
                se.scalablesolutions.akka
                akka-actor
                1.1.2
            
        
    

    
        
            org.scala-lang
            scala-library
        
        
            junit
            junit
        
        
            org.scalatest
            scalatest_2.9.0
        
        
            se.scalablesolutions.akka
            akka-actor
        
    

Of course, don't forget to activate the Scala plugin so IntelliJ will recognize the facets. This is done this way:


        src/main/scala
        src/test/scala
        
            
                org.scala-tools
                maven-scala-plugin
                
                    
                        
                            compile
                            testCompile
                        
                    
                
                
                    2.9.0-1
                
            
        
    

I started writing the tests in order to check from 0 move to 3 moves the validity of the move sequences. I also set a test in order to check a bigger pile, asserting only onto the expected number of moves.

I came with the following tests:

final class TestHanoiAlgorithm extends JUnitSuite with ShouldMatchersForJUnit {

  @Test
  def move_WithNoDisc_ShouldProduceNoMove() {
    val discs = List()
    val moves = Game move discs
    moves.should(be('empty))
  }


  @Test
  def move_WithOneDisc_ShouldProduceOneMove() {
    val discs = List(Disc(1))
    val moves = Game move discs
    moves.should(not(be('empty)))
    moves.size.should(be(1))
  }


  @Test
  def move_WithOneDisc_ShouldMoveFromAToB() {
    val discs = List(Disc(1))
    val moves = Game move discs
    val move = moves(0)
    move.disc.should(be(discs(0)))
    move.start.should(be(A()))
    move.end.should(be(C()))
  }

  @Test
  def move_WithTwoDiscs_ShouldProduceThreeMoves() {
    val discs = List(Disc(1), Disc(2))
    val moves = Game move discs
    trace(moves)
    moves.size.should(be(3))
    assertVerified(moves(0), discs(0), A(), B())
    assertVerified(moves(1), discs(1), A(), C())
    assertVerified(moves(2), discs(0), B(), C())
  }


  @Test
  def move_WithThreeDiscs_ShouldProduceSevenMoves() {
    val discs = List(Disc(1), Disc(2), Disc(3))
    val start = System.currentTimeMillis()
    val moves = Game move discs
    println((System.currentTimeMillis() - start) + " ms")
    trace(moves)
    moves.size.should(be(7))

    assertVerified(moves(0), discs(0), A(), C())
    assertVerified(moves(1), discs(1), A(), B())
    assertVerified(moves(2), discs(0), C(), B())
    assertVerified(moves(3), discs(2), A(), C())
    assertVerified(moves(4), discs(0), B(), A())
    assertVerified(moves(5), discs(1), B(), C())
    assertVerified(moves(6), discs(0), A(), C())
  }

  def assertVerified(move: Move, disc: Disc, A: Peg, B: Peg) {
    move.disc.should(be(disc))
    move.start.should(be(A))
    move.end.should(be(B))
  }


  def trace(moves: scala.List[Move]) {
    println("===============================")
    for (move <- moves) println(move)
    println("===============================\n")
  }

  @Test
  def move_WithNDiscs_ShouldProduceMoves() {
    val discs = (1 to 24).map(Disc(_)).toList

    val start = System.currentTimeMillis()
    val moves = Game move discs
    val end = System.currentTimeMillis()

    println("Took: " + (end - start) + "ms")
    moves.size.should(be(16777215))
  }
}

I hope the test names to be self explanatory enough !
As explained on the ScalaTest website, I used the wonderful shouldMatchers traits extending elegantly my test domain language. The wrapping classes in this ScalaTest trait provides a fluent DSL naturally extending the testing code vocabulary. Easy to read, easy to understand.
The trace method allows to output the methods for the developer to double check "functionally speaking" the move sequence validity. Its output will produce traces like:

===============================
Move Disc(1) from A(A) to B(B)
Move Disc(2) from A(A) to C(C)
Move Disc(1) from B(B) to C(C)
===============================

Rwaaaaaaaaaaaaght (< pale imitation of Dr Evil) 

We have tests but no code. Reading the code you easily understand  I expect my pegs (or rods) to match my initial referential vocabulary. Incapable of producing correct Scala enumerationss (please comment on this !!!) I produced the following case classes:

trait Peg {
  val name: String
}


case class A(name: String = "A") extends Peg
case class B(name: String = "B") extends Peg
case class C(name: String = "C") extends Peg

Of course I need some move abstraction in order to collect a sequence of moves and maybe reuse it into an animated display (don't dream ;), this one is going to take long):

class Move (val start: Peg, val end: Peg, val disc: Disc) {
  override def toString = "Move " + disc + " from " + start  +  " to " +  end
}

object Move{
  def apply(start: Peg, end: Peg, disc: Disc):Move = new Move(start, end, disc)
}

I used a companion object allowing me to create easy-to-read code like Move(a, c, disc), throwing away the use of the new keyword. This is a natural consequence of using an apply procedure in Scala. Ok, then I need some discs to move. Let them be defined by:

case class Disc(id: Int)

The discs will be identified by a unique id. The smaller discs, on top of the piles will have smaller numbers. Remains to code a Game as an object:

object Game {

  private def move(discs: List[Disc], a: Peg, b: Peg, c: Peg): List[Move] = {
    discs match {
      case Nil => Nil
      case head :: Nil =>
        List(Move(a, c, head))
      case head :: tail =>
        move(tail, a, c, b) ::: Move(a, c, head) :: move(tail, b, a, c)
    }
  }

  def move(discs: List[Disc]): List[Move] = {
    move(discs reverse, A(), B(), C())
  }
}

I based my algorithm on pattern matching. The two first case matchings solve the edge problems of no disc and one disc:

private def move(discs: List[Disc], a: Peg, b: Peg, c: Peg): List[Move] = {
    discs match {
      case Nil => Nil
      case head :: Nil =>
        List(Move(a, c, head))
      ...
}
  }

so no disc, no move, and one disc, one move. For more than one disc I just recursively wrote the algorithm I presented a few blocks above

case head :: tail =>
   move(tail, a, c, b) ::: Move(a, c, head) :: move(tail, b, a, c) 

I resolve a list with more then one element with the now well known pattern head :: tail.
The head, is the current disc n, and what I simply return is the composition of the previous moves , completed with my current move and of course the upcoming moves.
The a,b,c pegs references are switched in the next recursion in  order to match the n - 1 level problem signature.
You can blame me, the use of the ::: operator being quite expensive, but the problem is intractable by the way. I will welcome every optimized solution.

Notice I have reversed the input list so my head match is always the largest disc to move from the source rod to the target rod.

Having started practicing Akka at a very modest level, the insane idea of trying to use all my CPUs if possible instead of one CPU occurred to me. This is insane because of the intractable nature of the problem. What is a fistful of CPUs compared to the age of the universe ?
But I was in need for a more "concrete"example of the use of Akka actors. I did not knew all about Akka, but enough to maybe reuse the Hotswap facility too.

Handling this problem with actors is a typical fork join problem. A perfectly balanced one indeed. The number of moves to recursively process, before and after a current move are equals. On each level of recursion we are going to create two new actors in order to handle the next level, then wait for their results.
Receiving the results of the sub-processes, we join them and send back the information to the invoking actor or the main thread. As of every fork-join problem you first need to define a threshold that means you must find when to stop forking (so creating new actors) in order to balance the benefit from parallelism with the cost of resources creation and management.
By the way too many actors would not be a benefit as you might have to rotate you CPUs' execution time onto handling sub intractable problems, the worst of all problems.
Finding the balance is a question of test and retry, measures etc... I found a rule half intuitive, half experimental which is to define a depth beyond that it would be dumb to sub-divide the problems in more parts than there are CPU's. With two CPUs you fork once. With eight, thrice etc... Given my number of CPU's, a suitable recursive algorithm is:

private def depthFor(numberOfCPUs:Int): Int ={
    if (numberOfCPUs <=1) 0
    else {1 + depthFor(numberOfCPUs >> 1)}
  }

So all I need now are Mover actors exchanging identified messages. There will be no finesse in the code I am going to present you, because I am still so dumb when it comes to manipulating type classes in Scala, and I will really welcome your proposals. It is a first shot and I was really in need to present it to you.

My actors need to exchange messages concerning the search for PreviousMoves and PostMoves respectively before and after the move of the larger disc in a pile. Naturally the orders must be sent using respectively ProcessPreviousMoves and ProcessPostMoves. So here we come:

case class ProcessPreviousMoves(discs: List[Disc], a: Peg, b: Peg, c: Peg)
case class ProcessPostMoves(discs: List[Disc], a: Peg, b: Peg, c: Peg)
case class PreviousMoves(moves: List[Move])
case class PostMoves(moves: List[Move])

I created an abstract Mover class inheriting from the Akka Actor class. This class is extended by two sub-classes handling the previous and post moves processing. The abstract class holds all the knowledge and state required to:


  • store previous, post and current moves 
  • recursively process the remaining moves beyond the threshold
  •  gather incoming previous and post moves 
  • send back invocation to the caller 

So I produced that class template: 

abstract class Mover(val threshold: Int) extends Actor {
  var move: Move = _
  var previousList: List[Move] = _
  var postList: List[Move] = _
  var source: Channel[Any] = _


  def move(discs: List[Disc], a: Peg, b: Peg, c: Peg): List[Move] = {
    discs match {
      case Nil => Nil
      case head :: Nil =>
        List(Move(a, c, head))
      case head :: tail =>
        move(tail, a, c, b) ::: Move(a, c, head) :: move(tail, b, a, c)
    }
  }

  def propagate(head: Disc, tail: List[Disc], a: Peg, b: Peg, c: Peg) {
    source = self.channel
    move = Move(a, c, head)
    PreviousMover(threshold) ! ProcessPreviousMoves(tail, a, c, b)
    PostMover(threshold) ! ProcessPostMoves(tail, b, a, c)
  }

  def complete()

  def previous(list: List[Move]) {
    previousList = list
  }

  def following(list: List[Move]) {
    postList = list
  }

   def expectingSomeMoves: Receive = {
    case PreviousMoves(list) =>
      previous(list)
      become(expectingPostMoves)
    case PostMoves(list) =>
      following(list)
      become(expectingPreviousMoves)
  }

  def expectingPreviousMoves: Receive = {
    case PreviousMoves(list) =>
      previous(list)
      complete()
  }

  def expectingPostMoves: Receive = {
    case PostMoves(list) =>
      following(list)
      complete()
  }
}

The state is stored using the following self explanatory variables:

var move: Move = _
  var previousList: List[Move] = _
  var postList: List[Move] = _
  var source: Channel[Any] = _

The fields move, previousList, postList will be merged and send back to the source. Once the threshold limit is exceeded, I have to process the remaining moves in the same actor so I brought back my recursive method:

def move(discs: List[Disc], a: Peg, b: Peg, c: Peg): List[Move] = {
    discs match {
      case Nil => Nil
      case head :: Nil =>
        List(Move(a, c, head))
      case head :: tail =>
        move(tail, a, c, b) ::: Move(a, c, head) :: move(tail, b, a, c)
    }
  }

Of course an actor must know how to fork two new actors:

def propagate(head: Disc, tail: List[Disc], a: Peg, b: Peg, c: Peg) {
    source = self.channel
    move = Move(a, c, head)
    PreviousMover(threshold) ! ProcessPreviousMoves(tail, a, c, b)
    PostMover(threshold) ! ProcessPostMoves(tail, b, a, c)
  }

You have identified the upcoming PreviousMover and PostMover classes that will extend the abstract Mover class.

Each of these class respectively handle the processing of previous and post moves. You will also have recognized the bang "!" symbol, identical to the Scala symbol dedicated to the sending of asynchronous messages to actors (fire and forget invocations). This is where I store the necessary state information like the current move and the emitting source.
Then, I have used the Akka implementation of method loop hot swapping, in order to wait for messages holding the result of the sub processing forked actors.
Why? because on each incoming message, the state of my actor becomes more and more predictable and I will have to chose between less and less cases. This way my reactions will simply be shorter so faster. I defined the following implementations:

def expectingSomeMoves: Receive = {
    case PreviousMoves(list) =>
      previous(list)
      become(expectingPostMoves)
    case PostMoves(list) =>
      following(list)
      become(expectingPreviousMoves)
  }

  def expectingPreviousMoves: Receive = {
    case PreviousMoves(list) =>
      previous(list)
      complete()
  }

  def expectingPostMoves: Receive = {
    case PostMoves(list) =>
      following(list)
      complete()
  }

Note the invocation of the become method from within the Actor, invocation allowing me to change at runtime the method loop. After forking two new actors, I will have to expect some moves whether they are PreviousMoves or PostMoves. Receiving PreviousMoves, I will update my previousList reference and then expect for the PostMoves:

def expectingSomeMoves: Receive = {
    case PreviousMoves(list) =>
      previous(list)
      become(expectingPostMoves)
......
  }

  def expectingPostMoves: Receive = {
    case PostMoves(list) =>
      following(list)
      complete()
  }

Once the PostMoves received I will update my followingList reference and complete the process. I have only one other alternate scenario, that is receiving PostMoves first, then swapping my receive code in order to  complete my operations on the next receive invocation:

def expectingSomeMoves: Receive = {
 ...................
    case PostMoves(list) =>
      following(list)
      become(expectingPreviousMoves)
  }

  def expectingPreviousMoves: Receive = {
    case PreviousMoves(list) =>
      previous(list)
      complete()
  }

Completion is basic and depends whether I am working as a PreviousMover or a PostMover. Both implementations are:

class PostMover(threshold: Int) extends Mover(threshold) {

  def complete() {
    source ! PostMoves(previousList ::: move :: postList)
  }

  protected def receive = {
    case ProcessPostMoves(discs, a, b, c) =>
      discs match {
        case Nil =>
          self.channel ! PostMoves(Nil)
        case head :: Nil =>
          self.channel ! PostMoves(List(Move(a, c, head)))
        case head :: tail =>
          if (discs.size <= threshold) {
            self.channel ! PostMoves(move(discs, a, b, c))
          } else {
            propagate(head, tail, a, b, c)
            become(expectingSomeMoves)
          }
      }
  }

}

object PostMover {
  def apply(threshold: Int): ActorRef = {
    val mover = Actor.actorOf(new PostMover(threshold))
    mover.start()
    mover
  }
}

class PreviousMover(threshold: Int) extends Mover(threshold) with Actor {

  protected def receive = {
    case ProcessPreviousMoves(discs, a, b, c) =>
      discs match {
        case Nil =>
          self.channel ! PreviousMoves(Nil)
        case head :: Nil =>
          self.channel ! PreviousMoves(List(Move(a, c, head)))
        case head :: tail =>
          if (discs.size <= threshold) {
            self.channel ! PreviousMoves(move(discs, a, b, c))
          } else {
            propagate(head, tail, a, b, c)
             become(expectingSomeMoves)
          }
      }
  }

  def complete() {
    source ! PreviousMoves(previousList ::: move :: postList)
  }
}

object PreviousMover {
  def apply(threshold: Int): ActorRef = {
    val mover = Actor.actorOf(new PreviousMover(threshold))
    mover.start()
    mover
  }
}

The bodies of the receive methods are quite symmetric. Considering the PostMover implementation:

protected def receive = {
    case ProcessPostMoves(discs, a, b, c) =>
      discs match {
        case Nil =>
          self.channel ! PostMoves(Nil)
        case head :: Nil =>
          self.channel ! PostMoves(List(Move(a, c, head)))
        case head :: tail =>
          if (discs.size <= threshold) {
            self.channel ! PostMoves(move(discs, a, b, c))
          } else {
            propagate(head, tail, a, b, c)
            become(expectingSomeMoves)
          }
      }
  }

The genuine pattern matching then recursive invocation is easily recognizable. Only the threshold check has been introduced in order to stop creating actors. I chose to create a new method in my Game instance in order to execute the parallel scenario:

private def depthFor(numberOfCPUs:Int): Int ={
    if (numberOfCPUs <=1) 0
    else {1 + depthFor(numberOfCPUs >> 1)}
  }

  def movePar(discs: List[Disc]): List[Move] = {
    val threshold: Int = depthFor(getRuntime.availableProcessors())
    println("threshold: " + threshold);
    val result = PostMover(discs.size - threshold) !! (ProcessPostMoves(discs reverse,A(), B(), C()), 3600000)
    result.getOrElse(PostMoves(Nil)).asInstanceOf[PostMoves].moves
  }

I start firing a PostMover actor using a "!!" symbol allowing me to synchronously invoke the actor so wait for the result.

Not knowing how to proceed I set a timeout of an hour for the duration of the timeout in the synchronous invocation.

The program has been tested on both a dual core machine and an 8 core machine. All things being equal, all the effort was worthwhile on the 8 core machine,  a gain of 40/45 % being measured during micro-benches.
Don't mind the exercise was fun by the way and we learnt how to fork join and hot swap method loops with Akka actors.

This one was really tooooooooooooooooooo long!!!

Must go to bed. Be seeing you !! =D

0 comments:

Post a Comment