Saturday, October 29, 2011

One ring to rule my Akka actors

Hello again.

 On vacation with my hands full of books to read, I found myself committed on a serious SICP reading that takes longer than predicted (specifically while doing the exercises in Clojure in the mean time).
The promise I made was also to start learning Erlang, while starting learning a Haskell for my great good (fans will understand:)). On the edge of opening the Haskell book this weekend, I already watched Francesco Cesarini and Simon Thompson videos on Erlang which provided me with some meat for Akka.

A very short project, indeed but food for the mind, still being with no pet project, nor Scala or Clojure Master for guidance. One of the exercises proposed by Cesarini and Thompson consists in the creation of a ring of actors, one creating the following, then sending an acknowledgement message. The last created actor sends an actor to the source actor, notifying the end of the ring process. The implementation can be summarized into the following diagram:



The sbt build.scala file used for the project is contained into the following template:

import sbt._
import sbt.classpath._
import Keys._
import Process._
import System._

object BuildSettings {
  val buildSettings = Defaults.defaultSettings ++ Seq (
    fork in run         := true,
    javaOptions in run += "-server",
    javaOptions in run += "-Xms384m",
    javaOptions in run += "-Xmx512m",
    organization        := "com.promindis",
    version             := "0.1-SNAPSHOT",
    scalaVersion        := "2.9.1",
    scalacOptions       := Seq("-unchecked", "-deprecation")
  )
}


object Resolvers {
  val typesafeReleases = "Typesafe Repo" at "http://repo.typesafe.com/typesafe/releases/"
  val scalaToolsReleases = "Scala-Tools Maven2 Releases Repository" at "http://scala-tools.org/repo-releases"
  val scalaToolsSnapshots = "Scala-Tools Maven2 Snapshots Repository" at "http://scala-tools.org/repo-snapshots"
}

object TestDependencies {
  val specs2Version = "1.6.1"
  val testDependencies = "org.specs2" %% "specs2" % specs2Version % "test"
}

object AKKADependencies {
  val akkaVersion = "1.2"
  val actorDependencies = "se.scalablesolutions.akka" % "akka-actor" % akkaVersion
}

object MainBuild extends Build {
  import Resolvers._
  import TestDependencies._
  import AKKADependencies._
  import BuildSettings._

  lazy val algorithms = Project(
    "Ring",
    file("."),
    settings = buildSettings ++ Seq(resolvers += typesafeReleases) ++  
              Seq (libraryDependencies ++= Seq(testDependencies, actorDependencies))
  )

}

The BuildSettings object flags the execution of the scala main methods as forked processes so there will be no interference between the sbt process and the execution of the ring. I added memory sizing info for the execution in order not to be constrained by undersized estimates and imposed the flag server as running a 32bits old dual core laptop (yes, shame on me, but a good laptop can reach far more than 2000 euros).

The addition of the server flag revealed to be a nice idea dividing the time of execution by two.

 While achieving this small kata, I felled into a few traps. One of them, I was not expecting, was the chained creation of the Node in the ring one after the other. I could not chain them on construction unless generating an expected big stack overflow. So, on a first try, I overrode the preStart method, naively expecting for some asynchronous invocation of the method. The stack overflow took me by surprise. I then cheated and asynchronously ordered, via message sending, the creation of the next actor. The idea was to reproduce the following Erlang BIF invocation:
NPid = spawn(ring, start_proc, [Num - 1, Pid])

The Node class template is a classic:

 
  class Node(source: ActorRef, number: Int) extends Actor{

    protected def receive = {
      case 'start =>
        source ! 'ping
        if (number == 1)  {
          source ! 'ok
          self ! PoisonPill
        } else {
          Node(source, number - 1)  ! 'ok
        }
      case 'ok =>
        self ! PoisonPill
    }
  }

  object Node {

    def apply (source: ActorRef, number: Int) = {
      val actor: ActorRef = Actor.actorOf(new Node(source, number)).start()
      actor ! 'start
      actor
    }
  }


As one can see, I tried to reduce the volume of exchanged message, using only literal symbols during exchanges. The 'start and 'ok reproduces the Start, OK symbols on the ring schema.

While receiving a 'start message, a Node actor, creates a new Node actor, after notifying the source of all nodes that it has been created. It allowed to check all my actors where created.
On receiving an 'ok message, the actor poisons herself so to free the resources. The reference to the source and the number of expected actors are communicated as constructor parameters. On receiving a 'start message, a Node actor, creates her follower decreasing the number of Node to be created., the last Node, matching the number 1, sends the 'ok message to the source. 


 The Node companion object, takes in charge the creation and start of new actors. 


 The Source actor, is slightly different as collecting the ping notifications from the ring, tracing the complete execution of the ring and relaunching a ring execution. The ability to relaunch a ring process was important as the JVM warm up can take one or two ring processes before exposing a stable time of execution.


 class Source(number: Int, maximum: Int) extends Actor{
    var start: Long = _
    var total: Int = _

    var counter = maximum

    override def preStart() {
      start = currentTimeMillis()
      Node(self, number) ! 'ok
    }

    override def postStop() {
    }

    def decreaseCounter () {
      counter = counter - 1
    }

    protected def receive = {
      case 'ping => total = total + 1
      case 'ok =>
        println(currentTimeMillis() - start + " for " + total)
        decreaseCounter()
      if (counter != 0) {
        println("Retrying... " + counter + " times")
        start = currentTimeMillis()
        total = 0
        Node(self, number) ! 'ok
      } else {
        self ! PoisonPill
      }
    }
  }

  object Source {
    def apply (number: Int, maximum: Int) = Actor.actorOf(new Source(number, maximum)).start()
  }

On receiving a 'ping message,the (missnamed ) total number of created actors is incremented
On receiving an 'ok message, that flags the end of the ring execution, a new ring process is initiated again until reaching the expected number of ring executions. Here is the complete code content:

import akka.actor.{PoisonPill, ActorRef, Actor}
import System._
import Integer._

object Ring {

  class Node(source: ActorRef, number: Int) extends Actor{

    protected def receive = {
      case 'start =>
        source ! 'ping
        if (number == 1)  {
          source ! 'ok
          self ! PoisonPill
        } else {
          Node(source, number - 1)  ! 'ok
        }
      case 'ok =>
        self ! PoisonPill
    }
  }

  object Node {

    def apply (source: ActorRef, number: Int) = {
      val actor: ActorRef = Actor.actorOf(new Node(source, number)).start()
      actor ! 'start
      actor
    }
  }

  class Source(number: Int, maximum: Int) extends Actor{
    var start: Long = _
    var total: Int = _

    var counter = maximum

    override def preStart() {
      start = currentTimeMillis()
      Node(self, number) ! 'ok
    }

    override def postStop() {
    }

    def decreaseCounter () {
      counter = counter - 1
    }

    protected def receive = {
      case 'ping => total = total + 1
      case 'ok =>
        println(currentTimeMillis() - start + "ms for " + total)
        decreaseCounter()
      if (counter != 0) {
        println("Retrying... " + counter + " times")
        start = currentTimeMillis()
        total = 0
        Node(self, number) ! 'ok
      } else {
        self ! PoisonPill
      }
    }
  }

  object Source {
    def apply (number: Int, maximum: Int) = Actor.actorOf(new Source(number, maximum)).start()
  }

  def  main(arguments: Array[String]) {
    Source(parseInt(arguments(0)), parseInt(arguments(1)))
 }

}

where the Ring object main method takes as input parameters respectively the number of node in a ring, and the number of ring processes to execute. With an underlying jdk7 the kind of sampling I got are typically :

> run 1000 5
[info] 201ms for 1000
[info] Retrying... 4 times
[info] 127ms for 1000
[info] Retrying... 3 times
[info] 203ms for 1000
[info] Retrying... 2 times
[info] 90ms for 1000
[info] Retrying... 1 times
[info] 74ms for 1000
[success] Total time: 3 s, completed 29 oct. 2011 15:05:16
> run 10000 5
[info] 1079ms for 10000
[info] Retrying... 4 times
[info] 511ms for 10000
[info] Retrying... 3 times
[info] 94ms for 10000
[info] Retrying... 2 times
[info] 85ms for 10000
[info] Retrying... 1 times
[info] 108ms for 10000
[success] Total time: 5 s, completed 29 oct. 2011 15:05:26
> run 100000 5
[info] 2289ms for 100000
[info] Retrying... 4 times
[info] 967ms for 100000
[info] Retrying... 3 times
[info] 754ms for 100000
[info] Retrying... 2 times
[info] 739ms for 100000
[info] Retrying... 1 times
[info] 752ms for 100000
[success] Total time: 8 s, completed 29 oct. 2011 15:05:45
> run 1000000 5
[info] 9184ms for 1000000
[info] Retrying... 4 times
[info] 7834ms for 1000000
[info] Retrying... 3 times
[info] 8163ms for 1000000
[info] Retrying... 2 times
[info] 7470ms for 1000000
[info] Retrying... 1 times
[info] 7585ms for 1000000
[success] Total time: 43 s, completed 29 oct. 2011 15:06:34

Where the performances seems weaker compared to the Erlang one:


2> timer:tc(ring, start, [1000]).
{5000,ok}
3> timer:tc(ring, start, [10000]).
{52000,ok}
4> timer:tc(ring, start, [100000]).
{246000,ok}
5> timer:tc(ring, start, [1000000]).
{1535000,ok}

The execution time unit in Erlang is microseconds,

As I do not have enough knowledge (yet !!) about the Akka internals nor the Erlang one, I don't want to bring on the scene any conclusion in favor of one of the experiments or the other.

I would rather both welcome explanations and critics about the code sample in order to increase my knowledge of the framework, and understand better why the performances should be better for one or the other.

 In addition, one should remember that

 the machine is under sized for this kind of experiments
the number of messages exchanged during the Akka ring experiment (3000000) is greater that the number of messages exchanged during the Erlang test (1000000)
after all creating 1000000 actors, exchanging around 3000000 messages in about 7s could be considered a good performance on a three year old laptop

So what are the results on your machines ? Certainly better than mine ?

Be seeing you !!! :)

Sunday, October 23, 2011

A "snaky" tribute to Stuart in Scala with Akka

Hello again.

Today a little bit of Scala. My Clojure current algorithm not being complete, I found myself with the same recurring dilemma. What  am I going to talk about? This is a commitment in trying to understand, then expose a little subject, whatever it is, so to learn how to transfer ideas.

The subject came to me while reading Stuart Halloway book, programming Clojure. Nice book, full of interesting exercises. One of the exercises aims to picture some very amusing use of STM (aka Software Transactional Memory) in the form of a simple Snake game.
A very entertaining part and potentially a nice code kata. One must steer a moving snake in a rectangular area. Some Apple(s) is(are) dispatched in the game area while the snake is rambling. The purpose is to catch apples in order to make the snake grow. One can control the snake using the arrow keys. Once the snake has grown enough you win. But beware the head of the snake must never hit the snakes body . Simple.
Although I do not play game, programming Stuart Halloway example was quite fun and the game may be in implementing the solution.  Stuart Halloway presents a nice simple  architecture separating the purely functional aspect from the mutable state and the GUI. The mutable part of the game can evolve in three ways
  • A game can be reset to its initial state.
  • Every turn, the snake updates its position. If it eats an apple, a new apple
  • is placed.
  • A snake can turn.
My (small !!) Scala brain started yelling : "God, You can do that in Scala" with actors. The game state can be managed by some actor that can reinforce the serialisation of the change. My Scala mind tortures me enough, so I do not want to upset her....but let spice the idea with some Akka.

Having not found a pet project yet in Scala and no Scala Master for guidance, this is the only opportunity I have to bring Akka into the game (may I say). Naturally, I would not recommand using Akka in order to program a little game, the Scala genuine actors being self sufficient in order to manage this kind of implementation.
In the large set of Akka modules we can also find a STM module, and I will not resist in using it. Before starting, I provide here the sbt build.scala project content, sbt being a nice tool, not always very easy to start with:
import sbt._
import sbt.classpath._
import Keys._
import Process._
import System._

object BuildSettings {
  val buildSettings = Defaults.defaultSettings ++ Seq (
    organization        := "com.promindis",
    version             := "0.1-SNAPSHOT",
    scalaVersion        := "2.9.1",
    scalacOptions       := Seq("-unchecked", "-deprecation")
  )
}


object Resolvers {
  val typesafeReleases = "Typesafe Repo" at "http://repo.typesafe.com/typesafe/releases/"
  val scalaToolsReleases = "Scala-Tools Maven2 Releases Repository" at "http://scala-tools.org/repo-releases"
  val scalaToolsSnapshots = "Scala-Tools Maven2 Snapshots Repository" at "http://scala-tools.org/repo-snapshots"
}

object TestDependencies {
  val specs2Version = "1.6.1"
  val testDependencies = "org.specs2" %% "specs2" % specs2Version % "test"
}

object AKKADependencies {
  val akkaVersion = "1.2"
  val actorDependencies = "se.scalablesolutions.akka" % "akka-actor" % akkaVersion
  val stmDependencies = "se.scalablesolutions.akka" % "akka-stm" % akkaVersion
}

object SwingDependencies {
  val swingDependencies = "org.scala-lang" % "scala-swing" % "2.9.1"
}


object MainBuild extends Build {
  import Resolvers._
  import TestDependencies._
  import AKKADependencies._
  import BuildSettings._
  import SwingDependencies._

  lazy val algorithms = Project(
    "Snake",
    file("."),
    settings = buildSettings ++ Seq(resolvers += typesafeReleases) ++  
              Seq (libraryDependencies ++= Seq(testDependencies, actorDependencies, stmDependencies, swingDependencies))
  )

}

Why not following Stuart Halloway path in reusing idioms from the language? Scala being a multi-paradigm language, we could use actors of course, and object oriented approach, colored by some functional treatment into the pattern matching implemented in the actors' bodies .

I wanted the stuff to be simple. The approach can be resumed by the following scheme:

 

The global state lays into an Entities Actor in charge of handling all events incoming from the graphic interface. I voluntarily split the Game board (a swing panel) from the entities actor using a second actor called a board driver.The board driver plays the part of a single access point to the game board, both sending and receiving events in coordination with the entities actor.
The entities actor will send notification concerning the model update to the board driver while the board driver will send direction update notifications and refresh notifications.
Basically, a timer will issue the periodic update to the board driver that will convert it into some actor message. No big stuff.

 Like Stuart Halloway we need useful tools in order to help us swapping from the model world to the screen world. First of all, the Game world must be described by abstractions like positions , dimensions etc... For the sake of simplicity, I introduced the following tool class:
import scala.util.Random._

case class WorldLocation(x: Int, y:Int) {
  def + (location: WorldLocation) : WorldLocation ={
    WorldLocation(x + location.x, y + location.y)
  }
}

object World {
  def winLength = 5

  object Direction {
    val Left = WorldLocation(-1, 0)
    val Right = WorldLocation(1, 0)
    val Down = WorldLocation(0, 1)
    val Up = WorldLocation(0, -1)
  }

  def origin: WorldLocation = WorldLocation(0,0)

  def randomLocation(): WorldLocation = WorldLocation(nextInt(width), nextInt(heigth))

  def heigth: Int = 75
  def width: Int = 100

}

where a position into our small universe can be pointed out by a WorldLocation. The randomLocation method provides us a simple mean to pop an apple in a different location on need. The + operation has been implemented in order to provide more fluent expressions during WorldLocation manipulations. winLength defines the size a snake must grow up to in order for the player to win the game  Describing a small universe serves no purpose unitl we convert our desription into a screen world language. 
Here comes the GraphConverters object, very helpful when dealing with transformations from model to screen:

 
case class ScreenLocation(x: Int, y:Int, width: Int, height: Int)

object GraphicConverters {
  val factor = 10

  def converted(length: Int): Int = length * factor

  def converted(location: WorldLocation): ScreenLocation = diplayPointFrom(location)

  def converted(segment: List[WorldLocation]): List[ScreenLocation] = segment.map(diplayPointFrom(_))


  def diplayPointFrom(location: WorldLocation): ScreenLocation = {
    ScreenLocation(location.x * factor, location.y * factor, factor, factor)
  }

}


A scale factor of value 10 will provide a visible graphical panel surface delimited by a 750 * 1000 rectangle. The diplayPointFrom method takes in charge the basic conversion processe from the game World to the screen World, so it will be reused from the converted methods in charge of handling simple coordinates or list of coordinates. 

 Nice. The game started. We need a entities actor managing our entities. The entities are classified in two sub classes definitions: a snake and an apple. So two object abstractions:

 
 sealed trait Entity

  case class Snake(body: List[WorldLocation], direction: WorldLocation) extends Entity {
    def go(toDirection: WorldLocation): Snake = Snake(body, toDirection)

    def moved: Snake = Snake(ahead::body.take(body.size - 1), direction)

    def grown: Snake = Snake(ahead::body, direction)

    def ahead: WorldLocation = head + direction

    def head: WorldLocation = body.head
  }

case class Apple(location: WorldLocation) extends Entity

I found case classes to be natural implementations of board entities Some of you, dear pals, are going to yell at me. Yes, the Snake entity is a real object, taking in charge its own growth, like a real adult snake, being capable of changing its direction on demand etc...Or does it. As a matter of fact snake instances are immutable, so real "value objects". Every object invocation consists in a pure read access, like head position or leads to the creation of a new snake like moved, grown or go. Can it be more functional ? The location of the methods into the snake definition serves a modular purpose only. Precisely the methods purpose:

 
methodpurpose
gochange direction
movedchange snake position
growngrow snake
aheadprovides next location of head
headlocation of head


We have our two objects.

Good, we need an actor behavior to manage the transformations induced by the incoming events. An akka actor receives its event notifications via the ...receive method:
protected def receive =  {
    case Refresh() => updatePositions(snake,  apple)
    case UpdateDirection(to) => updateDirectionOf(snake, to)
  }

where we recognize the Refresh and UpdateDirection messages set earlier in the model graphic . Starting with  the easiest of all, the snake direction update:
 var snake: Snake = _

 def updateDirectionOf(withSnake: Snake, to : WorldLocation) {
    snake = withSnake.go(to)
  }


The internal actor mutable reference to the snake is basically updated . The updatePosition invocation involves a little bit more of trickery:

 
var snake: Snake = _
  var apple: Apple = _

 def reset() {
    snake = Snake(List(origin), Direction.Right)
    apple = Apple(randomLocation())
  }


 def updatePositions(fromSnake: Snake, fromApple: Apple) {
    fromSnake.body match {
      case head::tail if head == fromApple.location =>
        apple = Apple(randomLocation())
        snake = fromSnake.grown
      case head::tail if tail.contains(head) =>
        Game.displayMessage("You lose")
        reset()
      case head::tail if tail.size == World.winLength  =>
        Game.displayMessage("You win")
        reset()
      case _ => snake = fromSnake.moved
    }

    display ! Updated(snake.body, apple.location)
  }


If the snake eats the apple, in essence , if the the snake head meets the apple location, then a new apple is re-created and the snake grown.
If coincidentally the snake head location meets the snake body, the game is lost, so restarted after notifying the main game board (Game.displayMessage
If hopefully the size of the snake is enough the game is won, so restarted after notifying the main game board (Game.displayMessage
...else we move the snake in its current direction. 

 So suitable is pattern matching dealing with our update !! 

 The Updated message, wraps around locations only and not entities that do belong to the model world. And that's all. Our model is safe, thanks to a containing actor:

package com.promindis.games.snake

import com.promindis.games.snake.World._
import akka.actor.{ActorRef, Actor}

sealed trait StateMessage
case class Refresh() extends StateMessage
case class UpdateDirection(to: WorldLocation) extends StateMessage
case class Updated(snake: List[WorldLocation], apple: WorldLocation) extends StateMessage


class Entities(display: ActorRef) extends Actor {

  sealed trait Entity

  case class Snake(body: List[WorldLocation], direction: WorldLocation) extends Entity {
    def go(toDirection: WorldLocation): Snake = Snake(body, toDirection)

    def moved: Snake = Snake(ahead::body.take(body.size - 1), direction)

    def grown: Snake = Snake(ahead::body, direction)

    def ahead: WorldLocation = head + direction

    def head: WorldLocation = body.head
  }

case class Apple(location: WorldLocation) extends Entity


  var snake: Snake = _
  var apple: Apple = _
  reset()

  def reset() {
    snake = Snake(List(origin), Direction.Right)
    apple = Apple(randomLocation())
  }

  def updatePositions(fromSnake: Snake, fromApple: Apple) {
    fromSnake.body match {
      case head::tail if head == fromApple.location =>
        apple = Apple(randomLocation())
        snake = fromSnake.grown
      case head::tail if tail.contains(head) =>
        Game.displayMessage("You lose")
        reset()
      case head::tail if tail.size == World.winLength  =>
        Game.displayMessage("You win")
        reset()
      case _ => snake = fromSnake.moved
    }

    display ! Updated(snake.body, apple.location)
  }

  def updateDirectionOf(withSnake: Snake, to : WorldLocation) {
    snake = withSnake.go(to)
  }

  protected def receive =  {
    case Refresh() => updatePositions(snake,  apple)
    case UpdateDirection(to) => updateDirectionOf(snake, to)
  }
}

object Entities {
 def apply(display: ActorRef): ActorRef = Actor.actorOf(new Entities(display)).start()
}


We provided a companion object to the actor in order to simplify both creation and start of the actor. A reset action is fired while instantiating the actor. The second part concerns the board and its driver. 
There, we fall into Scala Swing. The Game extends the standard SimpleSwingApplication while gathering the three elements:

val driver = BoardDriver()
val board = new Board(handleFor(driver))
val state = Entities(driver)

where the handle notifying the board of key press events will look like:

def handleFor(boardDriver: ActorRef) : (Value) => Unit = {
    (key: Value) => boardDriver ! ReceivedPressed(key)
  }

Call me a nit-picker if you want, but I did not want the display to know about the driver, although they do lay into the same Game class (making my own bakery of doom ?). So notification of key pressure is blindly fired through a function closing onto the driver. 
 The board by itself (not my favorite part), embeds some DSL's from the the Scala Swing layers:

 
class Board(handle: => (Value) => Unit ) extends Panel {
    var doPaint: ((Graphics2D) => Unit) = (onGraphics) => {}
    preferredSize = new Dimension(GraphicConverters.converted(World.width), GraphicConverters.converted(World.heigth))
    focusable = true

    override def paintComponent(onGraphic: Graphics2D) {
      super.paintComponent(onGraphic)
      doPaint(onGraphic)
    }

    listenTo(keys)

    reactions += {
      case KeyPressed(source, key, modifiers, location) =>
        handle(key)
    }

    def apply(snake: List[ScreenLocation], apple: ScreenLocation) {
      def paintPoint(screenLocation: ScreenLocation, color: Color, onGraphics: Graphics2D) {
        onGraphics.setColor(color)
        onGraphics.fillRect(screenLocation.x, screenLocation.y, screenLocation.width, screenLocation.height)
      }

      doPaint = (onGraphics: Graphics2D) => {
        paintPoint(apple, new Color(210, 50, 90), onGraphics)
        snake.foreach {
          paintPoint(_, new Color(15, 160, 70), onGraphics)
        }
      }
      repaint()
    }
  }

As of Scala Swing the listenTo(keys) and reactions += {...} expressions elegantly set the environment as key event listener. We just added the constructor handle method to the list of reactions to be fired on key press event reception. 
The apply method provides us with an idiomatic way to drive the panel repaint from the board driver. The apply methods receives ScreenLocation infornation immediately converted into a closure that will be invoked during the next paint action, issued from the repaint invocation. As a matter of fact I don't like the idea of storing a reference to the closure on a variable, although the driver board is an actor so serialises its incoming events treatment. 

The board driver implementation remains very simple as a single entry point. Its purpose being only the translation of timer/user inputs into actor messages and actor messages into actions onto the main board:  

class BoardDriver() extends Actor {
    import GraphicConverters._
    import World._

    val directions = Map[Value, WorldLocation](
      Left -> Direction.Left,
      Right -> Direction.Right,
      Up -> Direction.Up,
      Down -> Direction.Down
    )

    protected def receive = {
      case Updated(snake, apple) =>
        board(converted(snake), converted(apple))
      case ReceivedPressed(key) =>
        state ! UpdateDirection(directions(key))
       case ShowMessage(text) => showMessage(parent = board, message = text)

    }
  }

As in all actors, the main purpose is implemented into its receive method. An incoming Updated message implies the invocation of the board after conversion of WorldLocation abstractions into ScreenLocation abstraction. The reception of a KeyPressed message will fire an update order to the entities model and finally a show message will display the dialog box. 
The periodic timer is started into the Game definition. Here it is at the bottom of the whole Game class definition:

 
package com.promindis.games.snake
import java.awt.{Dimension, Graphics2D}
import swing.event.KeyPressed
import swing._
import java.awt.event.{ActionEvent, ActionListener}
import event.Key._
import akka.actor.{ActorRef, Actor}
import swing.Dialog._

object Game extends SimpleSwingApplication {
  val driver = BoardDriver()
  val board = new Board(handleFor(driver))
  val state = Entities(driver)


  def handleFor(boardDriver: ActorRef) : (Value) => Unit = {
    (key: Value) => boardDriver ! ReceivedPressed(key)
  }

  class Board(handle: => (Value) => Unit ) extends Panel {
    var doPaint: ((Graphics2D) => Unit) = (onGraphics) => {}
    preferredSize = new Dimension(GraphicConverters.converted(World.width), GraphicConverters.converted(World.heigth))
    focusable = true

    override def paintComponent(onGraphic: Graphics2D) {
      super.paintComponent(onGraphic)
      doPaint(onGraphic)
    }

    listenTo(keys)

    reactions += {
      case KeyPressed(source, key, modifiers, location) =>
        handle(key)
    }

    def apply(snake: List[ScreenLocation], apple: ScreenLocation) {
      def paintPoint(screenLocation: ScreenLocation, color: Color, onGraphics: Graphics2D) {
        onGraphics.setColor(color)
        onGraphics.fillRect(screenLocation.x, screenLocation.y, screenLocation.width, screenLocation.height)
      }

      doPaint = (onGraphics: Graphics2D) => {
        paintPoint(apple, new Color(210, 50, 90), onGraphics)
        snake.foreach {
          paintPoint(_, new Color(15, 160, 70), onGraphics)
        }
      }
      repaint()
    }
  }


  case class ShowMessage(text: String)
  case class ReceivedPressed(keyCode: Value)

  class BoardDriver() extends Actor {
    import GraphicConverters._
    import World._

    val directions = Map[Value, WorldLocation](
      Left -> Direction.Left,
      Right -> Direction.Right,
      Up -> Direction.Up,
      Down -> Direction.Down
    )

    protected def receive = {
      case Updated(snake, apple) =>
        board(converted(snake), converted(apple))
      case ReceivedPressed(key) =>
        state ! UpdateDirection(directions(key))
       case ShowMessage(text) => showMessage(parent = board, message = text)

    }
  }

  object BoardDriver {
    def apply() = Actor.actorOf(new BoardDriver()).start()
  }

  def displayMessage(text: String) {
    driver ! ShowMessage(text)
  }

  def top = new MainFrame {
    title = "Snake"
    contents = new FlowPanel() {
      val timer = new javax.swing.Timer(100, new ActionListener() {
        def actionPerformed(e: ActionEvent) {
          state ! Refresh()
        }
      }).start();
      contents += board
    }
    pack()
  }
}

One minute pals. Didn't I promise for a STM implementation ? Well, the Game, its board and board drivers remaining almost unchanged, the stuff that do really changes is the Entities class. No more actor involved in this template.

package com.promindis.game.snake.stm
import com.promindis.game.snake.stm.World._
import akka.stm._


object Entities {
  sealed trait Entity

  case class Snake(body: List[WorldLocation], direction: WorldLocation) extends Entity {
    def go(toDirection: WorldLocation): Snake = Snake(body, toDirection)

    def moved: Snake = Snake(ahead::body.take(body.size - 1), direction)

    def grown: Snake = Snake(ahead::body, direction)

    def ahead: WorldLocation = head + direction

    def head: WorldLocation = body.head
  }

  case class Apple(location: WorldLocation) extends Entity

  val snake: Ref[Snake] = Ref(Snake(List(origin), Direction.Right))
  var apple: Ref[Apple] = Ref(Apple(randomLocation()))

  private def reset() {
    snake.set(Snake(List(origin), Direction.Right))
    apple.set(Apple(randomLocation()))
  }

  def updatePositions() {
     atomic{
       val fromSnake: Snake = snake.get()
       val fromApple: Apple = apple.get()
       fromSnake.body match {
          case head::tail if head == fromApple.location =>
              apple.set(Apple(randomLocation()))
              snake.set(fromSnake.grown)
          case head::tail if tail.contains(head) =>
            Game.displayMessage("You lose")
            reset()
          case head::tail if tail.size == World.winLength  =>
            Game.displayMessage("You Win")
            reset()
          case _ => snake.set(fromSnake.moved)
        }
       Game.update(snake.get().body, apple.get().location)
     }
  }

  def updateSnakeDirection(to : WorldLocation) {
    atomic {
      snake.alter(fromPrevious => fromPrevious.go(to))
    }
  }
}

The global state of game, consisting in a snake and an apple will be updated atomically in one transaction. The code will look familiar to Clojure developers. And it is, quoting Jonas Boner, "Refs (transactional references) are mutable references to values and through the STM allow the safe sharing of mutable data. Refs separate identity from value." 
A transaction is delimited using the atomic keyword. The snake and apple definitions remaining the same, the snake and apple object "values" will be identified by two immutable reference fields:

val snake: Ref[Snake] = Ref(Snake(List(origin), Direction.Right))
val apple: Ref[Apple] = Ref(Apple(randomLocation()))

The updatePositions method implementation makes use of the atomic keyword, using then the get/set methods of references in order to modify the references value. Invoked from the Game, the updateSnakeDirection method uses the alternate alter method which accepts a function that takes the old value while creating a new value of the same type, still in the scope of a transaction. 
We used a closure for the purpose of our implementation. The reset method is made private as being exclusively invoked from an atomic scope. Coming back to the Game class, we provide a version very close to the previous one:

 
package com.promindis.game.snake.stm
import java.awt.{Dimension, Graphics2D}
import swing.event.KeyPressed
import swing._
import java.awt.event.{ActionEvent, ActionListener}
import event.Key._
import akka.actor.{ActorRef, Actor}
import swing.Dialog._

object Game extends SimpleSwingApplication {
  val driver = BoardDriver()
  val board = new Board(handleFor(driver))


  class Board(handle: => (Value) => Unit ) extends Panel {
    var doPaint: ((Graphics2D) => Unit) = (onGraphics) => {}
    preferredSize = new Dimension(GraphicConverters.converted(World.width), GraphicConverters.converted(World.heigth))
    focusable = true

    override def paintComponent(onGraphic: Graphics2D) {
      super.paintComponent(onGraphic)
      doPaint(onGraphic)
    }

    listenTo(keys)

    reactions += {
      case KeyPressed(source, key, modifiers, location) =>
        handle(key)
    }

    def apply(snake: List[ScreenLocation], apple: ScreenLocation) {
      def paintPoint(screenLocation: ScreenLocation, color: Color, onGraphics: Graphics2D) {
        onGraphics.setColor(color)
        onGraphics.fillRect(screenLocation.x, screenLocation.y, screenLocation.width, screenLocation.height)
      }

      doPaint = (onGraphics: Graphics2D) => {
        paintPoint(apple, new Color(210, 50, 90), onGraphics)
        snake.foreach {
          paintPoint(_, new Color(15, 160, 70), onGraphics)
        }
      }
      repaint()
    }
  }

  def displayMessage(text: String) {
    driver ! ShowMessage(text)
  }


  def handleFor(boardDriver: ActorRef) : (Value) => Unit = {
                          (key: Value) => boardDriver ! ReceivedPressed(key)
  }

  case class ShowMessage(text: String)

  case class ReceivedPressed(keyCode: Value)

  case class Updated(snake: List[WorldLocation], apple: WorldLocation)
  class BoardDriver() extends Actor {
    import GraphicConverters._
    import World._

    val directions = Map[Value, WorldLocation](
      Left -> Direction.Left,
      Right -> Direction.Right,
      Up -> Direction.Up,
      Down -> Direction.Down
    )

    protected def receive = {
      case Updated(snake, apple) =>
        board(converted(snake), converted(apple))
      case ReceivedPressed(key) =>
        Entities.updateSnakeDirection(directions(key))
      case ShowMessage(text) => showMessage(parent = board, message = text)
    }
  }

  object BoardDriver {
    def apply() = Actor.actorOf(new BoardDriver()).start()
  }

  def update(list: List[WorldLocation], location: WorldLocation) {
    driver ! Updated(list, location)
  }

  def top = new MainFrame {
    title = "Snake"
    contents = new FlowPanel() {
      val timer = new javax.swing.Timer(100, new ActionListener() {
        def actionPerformed(e: ActionEvent) {
          Entities.updatePositions()
        }
      }).start();
      contents += board
    }
    pack()
  }
}


Hurray, we have did it !!!!!!! 

Okay the game's very simple, the boundary positions are not controlled, but the kata is worth while. Thank you Stuart Halloway. Must go to fix my next blog code...in Clojure this time. Very strange bug indeed. 

 Be seeing you !!! :)

Sunday, October 9, 2011

One pmap for the road before partying with an agent

Hi again. And thank you to the positive feedbacks of people having read the previous writings.

During a recent discussion with a bunch of interesting guys, one of them asked me why I was always expecting so much time between two consecutive writings.While explaining to him how hard it was to find both subject that might interest people and time to write about it, he then suggested that I should from time to time choose into one of my small Clojure/Scala/Java experiments and just reproduce this one it is.
It might not be perfect but should bring some interesting feedbacks. In addition to his comment, I noted, as from a personal point of view, that I liked too reading from other people short program stories because these small programs always present new keywords, other visions of problem analysis and so on...

As my current discovery of Akka fault tolerance takes longer than expected, I randomly picked up one of my last projecteuler implementation. I found the problem interesting as it provided some opportunity to experiment, on a (very) modest scale, on elements of parallelism in Clojure.

The problem was to find the greatest product of four adjacent numbers in any direction (up, down, left, right, or diagonally) in a 20x20 grid. I immediatly started writing something based on the feeling I had, how suitable the problem was for a functional programming kata.
The matrix can be divided in sub matrices in charge each to identify their maximum product, then maximum maximorum of all matrices is the looked up result. A maximum search is idempotent, the product processing in matrices can be easilly implemented using higher order functions etc..
Having loaded the most recent version of Clojure (1.3) , the Leiningen project was configured with the following content:

(defproject sorting "1.0.0-SNAPSHOT"
  :description "Algorithms"
  :dependencies [[org.clojure/clojure "1.3.0"]
  				[org.clojure/clojure-contrib "1.2.0"]])

where the contrib dependency has been introduced in order to parse the file hosting the matrix definition, making use of the io API. This light use is encompassed into the following file header:

(ns algorithms.matrix
	(:use [clojure.contrib.io :only [read-lines]])
	(:use [clojure.string :only [split trim-newline trim]]))

I selectively chose to read the lines and trim them if necessary. Data copied and paste from the net can sometime be strange. My first kata step was to load a file from the filesystem, a task - who believes it - I never achieved in Clojure before !!.
The test written to challenge that huge effort remained very simple as I tested that my loaded file was a real square matrix:

(def from-file "/dev/projects/clojure/leinengen/algorithms/matrix")

(deftest loaded-matrix-from-file-should-have-right-size
  (is (= 20 (count  (matrix-definition from-file))))
  (is (= 20 (count  (first (matrix-definition from-file))))))

My test looks like an acceptance test as I did not get into detailed info. I took the opportunity to apply all the necessary conversions from character to numbers inClojure. This drove me to the solution:

(defn string-matrix [lines]
    (map 
        (fn[line] (split (trim (trim-newline line)) #" ")) 
	       lines))

(defn to-int [line]
    (vec (map #(Integer/parseInt %) line)))

(defn converted [s-matrix]
    (vec (map #(to-int %) s-matrix)))

(defn matrix-definition [in-file]
  (converted (string-matrix (read-lines in-file))))

The matrix-definition function is the entry point, accepting a file path definition as a string expression. The read-lines function from the Clojure contrib library delivers a lazy sequence of strings that the string-matrix function converts into a sequence of character sequences. The converted function then to-int function helps tranforming the matrix of Strings into a matrix of integers.
No big deal.
 Nice, I have this big matrix now. In order to proceed, a next interesting step would be to able to grab square four matrices:

(def matrix-origin [  [ 8  2 22 97]
                      [49 49 99 40]
                      [81 49 31 73]
                      [52 70 95 23] ])

(def matrix-bottom-right [  [40 62 76 36]
                            [74 04 36 16]
                            [23 57 05 54]
                            [89 19 67 48]])

(deftest four-matrix-at-origin-should-match-file-content
  (is 
     (= matrix-origin 
        (square-at 0 0 4 (matrix-definition from-file )))))

(deftest four-matrix-at-bottom-left-should-match-file-content
  (is 
     (= matrix-bottom-right 
       (square-at 16 16 4 (matrix-definition from-file )))))

The purpose  of the function square-at will be to bring back this small sub matrix from our (not so) big definition. Here the for comprehensions come to the rescue

(defn square-at [x y size matrix]
    (vec (for [posy (range y (+ y size))]
      (vec (for [posx (range x (+ x size))]
        (get-in matrix [posy posx]))))))

Basically, quoting Stuart Halloway, a list comprehension creates a list based on an existing list. Working, with matrix, I naturally transform the result in a vector of vectors, for being indexed list, mandatory in our case. In possession of a matrix, we can work on the extraction of maximum products among rows, columns, and diagonals:

(deftest max-product-in-matrix-with-origin-should-match
  (is (= 24468444 
     (max-product-in (square-at 0 0 4 (matrix-definition from-file ))))))

Operating using wishful thinking, I expect to work from a main method:

(defn max-product-in [matrix]
  (apply max 
      (concat 
        (line-products-in matrix)
          (col-products-in matrix)
            (diag-products-in matrix))))

while extracting the maxima obtained from helping functions.
So far so good, I comment my current macroscopic test and define finer grained test so to challenge the helping functions input/output function:

(deftest line-products-in-origin-matrix-should-match
  (is (= [34144 9507960 8981847 7953400] 
     (line-products-in (square-at 0 0 4 (matrix-definition from-file ))))))

(deftest col-products-in-origin-matrix-should-match
  (is (= [1651104 336140 6414210 6514520] 
     (col-products-in (square-at 0 0 4 (matrix-definition from-file ))))))

(deftest diag-products-in-origin-matrix-should-match
  (is (= [279496 24468444] 
     (diag-products-in (square-at 0 0 4 (matrix-definition from-file ))))))

The implementing helping functions also host comprehensions combined with reduce operations, a whole assembly of higher order functions, as predicted:

(defn line-products-in [matrix]
  (vec (map #(reduce * %) matrix)))

(defn col-products-in [matrix]
    (vec (for [x (range 0 (count matrix))]
      (reduce * (map #(get % x) matrix)))))

(defn diag-products-in [matrix]
  (let [size (count matrix)]
    [(reduce * (for [x (range 0 size)]
      (get-in matrix  [x x])))
     (reduce * (for [x (range 0 size)]
      (get-in matrix  [ x (- (dec size) x)])))]))

Good, let's uncomment the previous test: max-product-in. Green(...figure of speech: in the REPL, you execute the (run-tests) methods and read the report). Good. My attempts to get the maiximum maximorum can be resumed in :

(deftest find-maximum-from-file
  (is 
     (= 70600674 
       (find-maximum (matrix-definition from-file) 20))))

And my first experience leads me to a simple solution:

(defn find-maximum [matrix in-range]
  (let [size (- in-range 3 ) 
        coordinates (for [x (range 0 size) y (range 0 size)] [x y])]
    (apply max 
      (map 
        #(max-product-in (square-at (first %) (second %) 4 matrix)) 
          coordinates))))

where in-range defines the width of my square matrix , as I want to be able to extend its size. Note I could have changed too the number of operands in the multiplication.
Works fine!

A first try to make a little bit parallel my algorithm would be the use of pmap. The magic in there, lays onto the change from map to pmap in the algorithm:

(defn par-find-maximum [matrix in-range]
  (let [
        size (- in-range 3 ) 
        coordinates (for [x (range 0 size) y (range 0 size)] [x y])]
    (apply max 
      (pmap 
        #(max-product-in (square-at (first %) (second %) 4 matrix)) 
          coordinates))))

Improvements on performance are not obvious, and more visible when the size of the matrix is dramatically extended by unfolding it for example, duplicating itself. Working on bigger multiplications, so bigger matrices would also make a difference.
As literally stated in pmap comment, pmap appears to be useful for computationally intensive functions where the time of the applied functions dominates the coordination overhead. At the scale of my small computer (Dual core) processing small operations, the improvements are not to be obvious.

Just for fun, because it does not provide any improvement in this situation, let's try to with agents. Ideally we could save the main thread the work of processing the maximum, deferring the operation to an abstraction dealing with the operation asynchronously.
Clojure agents can take in charge this kind of operation. An agent binds a value like a reference in Clojure, so it can be dereferenced at will. As with actors in Scala(and naturally Akka), an agent works on a queue of messages, each message corresponding to new incoming action aiming to change the bound value.
Here stops the similarities between actors and agents. The agent only changes its internal bound value versus asynchronous queued incoming actions. So it remains value centric, on the contrary of actors who can swap behaviors, transfer continuations etc...
Michael Fogus and Chris Houser take time to explain the differences in the Joy of Clojure in far more technical descritpion than mine :) As far as our little problem is concerned, I'd like to apply a little producer/consumer mechanic where my main thread of execution would push the values to an agent switching its bound value to a new value when the incoming value is greater then the one already bound. Here is the solution:

(defn other-par-find-maximum [matrix in-range]
  (let [
        size (- in-range 3 ) 
        sorter (agent 1 :validator number?)]
    (dorun
      (for [x (range 0 size) y (range 0 size)]
        (send sorter max (max-product-in (square-at x y 4 matrix)))))
    (await sorter)
    @sorter))

The agent is impersonated as a sorter, initialized with the value 1 and providing a validator method, in charge of checking that the input is effectively a number:

sorter (agent 1 :validator number?)

For each processed maximum in the matrix we send its result value to the agent:

(send sorter max (max-product-in (square-at x y 4 matrix)))

in respect to the send method syntax. The send method takes as input the agent reference in addition to a new value and a function to apply to both the bound and new value. The resulting bound value will be the output of the applied function.
 Good, we are done, tests green !!

 So we have seen how to apply a low cost change to our processing, migrating from map to pmap, and so preserving our algorithm. We have also used a Clojure agent in a producer/consumer pattern.

A short one, awaiting for incoming deeper subjects. Being on holiday soon, I will try to push more of these little experiments and take time to develop the biggest subjects.
Before I forget, the book of the week, by G.J.Michaelson, An Introduction to Functional Programming Through Lambda Calculus, to (re)discover...the postscript edition being available on line here.

 Got to switch to Scala, be seeing you !!! :):):)