Saturday, October 29, 2011

One ring to rule my Akka actors

Hello again.

 On vacation with my hands full of books to read, I found myself committed on a serious SICP reading that takes longer than predicted (specifically while doing the exercises in Clojure in the mean time).
The promise I made was also to start learning Erlang, while starting learning a Haskell for my great good (fans will understand:)). On the edge of opening the Haskell book this weekend, I already watched Francesco Cesarini and Simon Thompson videos on Erlang which provided me with some meat for Akka.

A very short project, indeed but food for the mind, still being with no pet project, nor Scala or Clojure Master for guidance. One of the exercises proposed by Cesarini and Thompson consists in the creation of a ring of actors, one creating the following, then sending an acknowledgement message. The last created actor sends an actor to the source actor, notifying the end of the ring process. The implementation can be summarized into the following diagram:



The sbt build.scala file used for the project is contained into the following template:

import sbt._
import sbt.classpath._
import Keys._
import Process._
import System._

object BuildSettings {
  val buildSettings = Defaults.defaultSettings ++ Seq (
    fork in run         := true,
    javaOptions in run += "-server",
    javaOptions in run += "-Xms384m",
    javaOptions in run += "-Xmx512m",
    organization        := "com.promindis",
    version             := "0.1-SNAPSHOT",
    scalaVersion        := "2.9.1",
    scalacOptions       := Seq("-unchecked", "-deprecation")
  )
}


object Resolvers {
  val typesafeReleases = "Typesafe Repo" at "http://repo.typesafe.com/typesafe/releases/"
  val scalaToolsReleases = "Scala-Tools Maven2 Releases Repository" at "http://scala-tools.org/repo-releases"
  val scalaToolsSnapshots = "Scala-Tools Maven2 Snapshots Repository" at "http://scala-tools.org/repo-snapshots"
}

object TestDependencies {
  val specs2Version = "1.6.1"
  val testDependencies = "org.specs2" %% "specs2" % specs2Version % "test"
}

object AKKADependencies {
  val akkaVersion = "1.2"
  val actorDependencies = "se.scalablesolutions.akka" % "akka-actor" % akkaVersion
}

object MainBuild extends Build {
  import Resolvers._
  import TestDependencies._
  import AKKADependencies._
  import BuildSettings._

  lazy val algorithms = Project(
    "Ring",
    file("."),
    settings = buildSettings ++ Seq(resolvers += typesafeReleases) ++  
              Seq (libraryDependencies ++= Seq(testDependencies, actorDependencies))
  )

}

The BuildSettings object flags the execution of the scala main methods as forked processes so there will be no interference between the sbt process and the execution of the ring. I added memory sizing info for the execution in order not to be constrained by undersized estimates and imposed the flag server as running a 32bits old dual core laptop (yes, shame on me, but a good laptop can reach far more than 2000 euros).

The addition of the server flag revealed to be a nice idea dividing the time of execution by two.

 While achieving this small kata, I felled into a few traps. One of them, I was not expecting, was the chained creation of the Node in the ring one after the other. I could not chain them on construction unless generating an expected big stack overflow. So, on a first try, I overrode the preStart method, naively expecting for some asynchronous invocation of the method. The stack overflow took me by surprise. I then cheated and asynchronously ordered, via message sending, the creation of the next actor. The idea was to reproduce the following Erlang BIF invocation:
NPid = spawn(ring, start_proc, [Num - 1, Pid])

The Node class template is a classic:

 
  class Node(source: ActorRef, number: Int) extends Actor{

    protected def receive = {
      case 'start =>
        source ! 'ping
        if (number == 1)  {
          source ! 'ok
          self ! PoisonPill
        } else {
          Node(source, number - 1)  ! 'ok
        }
      case 'ok =>
        self ! PoisonPill
    }
  }

  object Node {

    def apply (source: ActorRef, number: Int) = {
      val actor: ActorRef = Actor.actorOf(new Node(source, number)).start()
      actor ! 'start
      actor
    }
  }


As one can see, I tried to reduce the volume of exchanged message, using only literal symbols during exchanges. The 'start and 'ok reproduces the Start, OK symbols on the ring schema.

While receiving a 'start message, a Node actor, creates a new Node actor, after notifying the source of all nodes that it has been created. It allowed to check all my actors where created.
On receiving an 'ok message, the actor poisons herself so to free the resources. The reference to the source and the number of expected actors are communicated as constructor parameters. On receiving a 'start message, a Node actor, creates her follower decreasing the number of Node to be created., the last Node, matching the number 1, sends the 'ok message to the source. 


 The Node companion object, takes in charge the creation and start of new actors. 


 The Source actor, is slightly different as collecting the ping notifications from the ring, tracing the complete execution of the ring and relaunching a ring execution. The ability to relaunch a ring process was important as the JVM warm up can take one or two ring processes before exposing a stable time of execution.


 class Source(number: Int, maximum: Int) extends Actor{
    var start: Long = _
    var total: Int = _

    var counter = maximum

    override def preStart() {
      start = currentTimeMillis()
      Node(self, number) ! 'ok
    }

    override def postStop() {
    }

    def decreaseCounter () {
      counter = counter - 1
    }

    protected def receive = {
      case 'ping => total = total + 1
      case 'ok =>
        println(currentTimeMillis() - start + " for " + total)
        decreaseCounter()
      if (counter != 0) {
        println("Retrying... " + counter + " times")
        start = currentTimeMillis()
        total = 0
        Node(self, number) ! 'ok
      } else {
        self ! PoisonPill
      }
    }
  }

  object Source {
    def apply (number: Int, maximum: Int) = Actor.actorOf(new Source(number, maximum)).start()
  }

On receiving a 'ping message,the (missnamed ) total number of created actors is incremented
On receiving an 'ok message, that flags the end of the ring execution, a new ring process is initiated again until reaching the expected number of ring executions. Here is the complete code content:

import akka.actor.{PoisonPill, ActorRef, Actor}
import System._
import Integer._

object Ring {

  class Node(source: ActorRef, number: Int) extends Actor{

    protected def receive = {
      case 'start =>
        source ! 'ping
        if (number == 1)  {
          source ! 'ok
          self ! PoisonPill
        } else {
          Node(source, number - 1)  ! 'ok
        }
      case 'ok =>
        self ! PoisonPill
    }
  }

  object Node {

    def apply (source: ActorRef, number: Int) = {
      val actor: ActorRef = Actor.actorOf(new Node(source, number)).start()
      actor ! 'start
      actor
    }
  }

  class Source(number: Int, maximum: Int) extends Actor{
    var start: Long = _
    var total: Int = _

    var counter = maximum

    override def preStart() {
      start = currentTimeMillis()
      Node(self, number) ! 'ok
    }

    override def postStop() {
    }

    def decreaseCounter () {
      counter = counter - 1
    }

    protected def receive = {
      case 'ping => total = total + 1
      case 'ok =>
        println(currentTimeMillis() - start + "ms for " + total)
        decreaseCounter()
      if (counter != 0) {
        println("Retrying... " + counter + " times")
        start = currentTimeMillis()
        total = 0
        Node(self, number) ! 'ok
      } else {
        self ! PoisonPill
      }
    }
  }

  object Source {
    def apply (number: Int, maximum: Int) = Actor.actorOf(new Source(number, maximum)).start()
  }

  def  main(arguments: Array[String]) {
    Source(parseInt(arguments(0)), parseInt(arguments(1)))
 }

}

where the Ring object main method takes as input parameters respectively the number of node in a ring, and the number of ring processes to execute. With an underlying jdk7 the kind of sampling I got are typically :

> run 1000 5
[info] 201ms for 1000
[info] Retrying... 4 times
[info] 127ms for 1000
[info] Retrying... 3 times
[info] 203ms for 1000
[info] Retrying... 2 times
[info] 90ms for 1000
[info] Retrying... 1 times
[info] 74ms for 1000
[success] Total time: 3 s, completed 29 oct. 2011 15:05:16
> run 10000 5
[info] 1079ms for 10000
[info] Retrying... 4 times
[info] 511ms for 10000
[info] Retrying... 3 times
[info] 94ms for 10000
[info] Retrying... 2 times
[info] 85ms for 10000
[info] Retrying... 1 times
[info] 108ms for 10000
[success] Total time: 5 s, completed 29 oct. 2011 15:05:26
> run 100000 5
[info] 2289ms for 100000
[info] Retrying... 4 times
[info] 967ms for 100000
[info] Retrying... 3 times
[info] 754ms for 100000
[info] Retrying... 2 times
[info] 739ms for 100000
[info] Retrying... 1 times
[info] 752ms for 100000
[success] Total time: 8 s, completed 29 oct. 2011 15:05:45
> run 1000000 5
[info] 9184ms for 1000000
[info] Retrying... 4 times
[info] 7834ms for 1000000
[info] Retrying... 3 times
[info] 8163ms for 1000000
[info] Retrying... 2 times
[info] 7470ms for 1000000
[info] Retrying... 1 times
[info] 7585ms for 1000000
[success] Total time: 43 s, completed 29 oct. 2011 15:06:34

Where the performances seems weaker compared to the Erlang one:


2> timer:tc(ring, start, [1000]).
{5000,ok}
3> timer:tc(ring, start, [10000]).
{52000,ok}
4> timer:tc(ring, start, [100000]).
{246000,ok}
5> timer:tc(ring, start, [1000000]).
{1535000,ok}

The execution time unit in Erlang is microseconds,

As I do not have enough knowledge (yet !!) about the Akka internals nor the Erlang one, I don't want to bring on the scene any conclusion in favor of one of the experiments or the other.

I would rather both welcome explanations and critics about the code sample in order to increase my knowledge of the framework, and understand better why the performances should be better for one or the other.

 In addition, one should remember that

 the machine is under sized for this kind of experiments
the number of messages exchanged during the Akka ring experiment (3000000) is greater that the number of messages exchanged during the Erlang test (1000000)
after all creating 1000000 actors, exchanging around 3000000 messages in about 7s could be considered a good performance on a three year old laptop

So what are the results on your machines ? Certainly better than mine ?

Be seeing you !!! :)

5 comments:

Andriy Plokhotnyuk said...

Output on my desktop (2 cores, 3GHz) with jdk 7:

201ms for 1000
Retrying... 9 times
126ms for 1000
Retrying... 8 times
183ms for 1000
Retrying... 7 times
250ms for 1000
Retrying... 6 times
76ms for 1000
Retrying... 5 times
99ms for 1000
Retrying... 4 times
[ERROR] [14/11/11 18:55] [akka:event-driven:dispatcher:global-11] [LocalActorRef] Actor has not been started, you need to invoke 'actor.start()' before using it
akka.actor.ActorInitializationException: Actor has not been started, you need to invoke 'actor.start()' before using it
[PC2_6eaa71ca-0ee1-11e1-a597-001a4d5595d4]
at akka.actor.ScalaActorRef$class.$bang(ActorRef.scala:1399)
at akka.actor.LocalActorRef.$bang(ActorRef.scala:605)
at Ring$Node$$anonfun$receive$1.apply(Ring.scala:16)
at Ring$Node$$anonfun$receive$1.apply(Ring.scala:9)
at akka.actor.Actor$class.apply(Actor.scala:545)
at Ring$Node.apply(Ring.scala:7)
at akka.actor.LocalActorRef.invoke(ActorRef.scala:905)
at akka.dispatch.MessageInvocation.invoke(MessageHandling.scala:25)
at akka.dispatch.ExecutableMailbox$class.processMailbox(ExecutorBasedEventDrivenDispatcher.scala:216)
at akka.dispatch.ExecutorBasedEventDrivenDispatcher$$anon$4.processMailbox(ExecutorBasedEventDrivenDispatcher.scala:122)
at akka.dispatch.ExecutableMailbox$class.run(ExecutorBasedEventDrivenDispatcher.scala:188)
at akka.dispatch.ExecutorBasedEventDrivenDispatcher$$anon$4.run(ExecutorBasedEventDrivenDispatcher.scala:122)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
at akka.dispatch.MonitorableThread.run(ThreadPoolBuilder.scala:184)

112ms for 1000
Retrying... 3 times
47ms for 1000
Retrying... 2 times
67ms for 1000
Retrying... 1 times
205ms for 1000

Globulon said...

Whaoooooooo, nice snapshot. Give a few days for I reproduce it on my "Ford T" computer and investigate. It will change from my boring day job :)

Globulon said...

I am going to post the problem on the akka-user list today while starting the analysis of the problem

Globulon said...

I am trying to reproduce your problem with a build 1.7.0_02-ea-b02 from the JDK7.I did not suceeded for the moment and being upgrading up to u1 version.
A first answer from the Akka user group suggested changing the "self ! PoisonPill" simply by a self.stop.
Nice advice, because as you might imagine the performance got better (20% to 30% on my machine). I am going to correct the code.
Keep me informed if you reproduce it with this change.
Would yo be as so kind as to tell me more about your jdk7 version ?

Andriy Plokhotnyuk said...

It was build 1.7.0_01-b08 for win32, but for now I cannot reproduce that exception.
Let us think that it was some fantom bug.

Also I tried to play ping-pong with Akka actors, and found that they perform close to Erlang actors, when they are pre-created:

18,155,070,440 ns
1,815 ns/op
550,810 ops/s

{code}
import akka.actor.Actor._
import akka.actor.{Actor, ActorRef}

val n = 10000000
val t = System.nanoTime

def printResult() {
val d = System.nanoTime - t
printf("%,d ns\n", d)
printf("%,d ns/op\n", d / n)
printf("%,d ops/s\n", (n * 1000000000L) / d)
}

abstract class Player extends Actor {

def adversary: ActorRef

def receive = {
case 0 => printResult(); self.stop()
case 1 => adversary ! 0; self.stop()
case i: Int => adversary ! i - 1
}
}

object Pong extends Player {
def adversary = Ping.self
}

object Ping extends Player {
def adversary = Pong.self
}

actorOf(Pong).start()
actorOf(Ping).start() ! n
{code}

Post a Comment