Wednesday, June 29, 2011

Visiting Hanoï Towers with Scala and Akka

Well, I failed, I had to publish something, whatever it was about a stuff I learnt, even the smallest piece of information. My all and only fault, I read a lot, I hope learnt a lot. A few months ago I specifically decided to embrace functional programming, which is quite hard in France, because people there tend to think this is a domain of research only practiced by insane unaware strange geeks. By the way I don't care, because I can't trust people who never heard about Erlang nor Haskell, and I do not consider myself as a geek but a professional.
I have noticed with pleasure in last Uncle Bob's book (The Clean Coder) that he recommended that we, as professional developers, took risks outside of our domain of knowledge. Being blessed by the Master, let's roll. (It's a promise I will meet this guy some day =D)

I recently started reading the Akka 1.1.2 documentation (OMG I am late, they have upgraded up to 1.1.3 !!!), but also reading Scala In Depth from Joshua Suereth, and found my self overwhelmed, a shame for a guy trained by Jeff Sutherland. Moreover, during my everyday practice I decided to start with the basics, watching during my (smooth) early morning fitness session the Abelson and Sussman MIT lectures on video (you will find them here). LISP rocks, I mean it.
I understood I had reached my limits when I missed the point during lecture 2 or 3 about the Hanoi Towers. We have been used for so long to take for granted all the saying about the technical stuff we are using, that we are are not able from time to time to understand the most basic algorithms. That was vexing.

It was time to stop, rewrite the problem on a sheet of paper and then talk about it. That was the initial commitment. At that point the satiated readers will stop their reading. After all everybody's know about the Hanoi Towers.

For the others you will find a first required reading here. As a reminder, this mathematical game consist in moving a pile of ordered discs from one rod or peg to a second free rod, having one more third free rod and a set of rules to obey. Repeating the link content, the three rules are:
  • Only one disk may be moved at a time.
  • Each move consists of taking the upper disk from one of the rods and sliding it onto another rod
  • No disk may be placed on top of a smaller disk.
This is a so called intractable problem. The number of solutions for n discs is equal to 2^n - 1. That means that for a not-so-big number of discs, the age of the universe would not be long enough to identify all the moves (Yeah!).

There are two typical ways to solve the problem, one iterative, one recursive. All things being equal, and Scala being beautiful, I chose the recursive way I did not understand instantaneously during the lecture. Scala being a functional programming language the solution must be elegant.

In order to proceed, and choosing the same identifiers as in the referenced article, let's name the pegs A,B,C. Let then work with n discs.
I will always start with all the discs sliced onto rod A moving them to peg C. The article is crystal clear. We must start imagining we have nearly solved the problem.

Being manipulating the disc n, the last moves would be:

  1. move pile with the n - 1 discs above disc n from A to B
  2. move disc n from A to C
  3. move pile with the n - 1 discs from B to C

It is an open way to recursion. No ? Ok consider step 1, It can itself be decomposed into:

  1. move pile with the n - 2 discs above disc n - 2 from A to C
  2. move disc n - 1 from A to B
  3. move pile with the n - 2 discs from C to B

And so on... Got it ? If no, take some coffee, or go to sleep, whatever and come back. You will get it, no doubts, I trust you, we are the same.

I have my recursion, so now I must create my project. As usual I start a Maven 3.0 project into my new IntelliJ 107.xxx version. As I am thinking to work with Scala and Akka I set the following configuration into my pom.xml:


        
            scala-tools.org
            Scala-tools Maven2 Repository
            http://scala-tools.org/repo-releases
        
        
            Akka
            Akka Maven2 Repository
            http://akka.io/repository/
        
    

As a baby step I decide to grab back scalatest, in order to test it, so my dependencies are this time:


        
            
                org.scala-lang
                scala-library
                2.9.0-1
            
            
                junit
                junit
                4.8.2
                test
            
            
                org.scalatest
                scalatest_2.9.0
                1.6.1
                test
            
            
                se.scalablesolutions.akka
                akka-actor
                1.1.2
            
        
    

    
        
            org.scala-lang
            scala-library
        
        
            junit
            junit
        
        
            org.scalatest
            scalatest_2.9.0
        
        
            se.scalablesolutions.akka
            akka-actor
        
    

Of course, don't forget to activate the Scala plugin so IntelliJ will recognize the facets. This is done this way:


        src/main/scala
        src/test/scala
        
            
                org.scala-tools
                maven-scala-plugin
                
                    
                        
                            compile
                            testCompile
                        
                    
                
                
                    2.9.0-1
                
            
        
    

I started writing the tests in order to check from 0 move to 3 moves the validity of the move sequences. I also set a test in order to check a bigger pile, asserting only onto the expected number of moves.

I came with the following tests:

final class TestHanoiAlgorithm extends JUnitSuite with ShouldMatchersForJUnit {

  @Test
  def move_WithNoDisc_ShouldProduceNoMove() {
    val discs = List()
    val moves = Game move discs
    moves.should(be('empty))
  }


  @Test
  def move_WithOneDisc_ShouldProduceOneMove() {
    val discs = List(Disc(1))
    val moves = Game move discs
    moves.should(not(be('empty)))
    moves.size.should(be(1))
  }


  @Test
  def move_WithOneDisc_ShouldMoveFromAToB() {
    val discs = List(Disc(1))
    val moves = Game move discs
    val move = moves(0)
    move.disc.should(be(discs(0)))
    move.start.should(be(A()))
    move.end.should(be(C()))
  }

  @Test
  def move_WithTwoDiscs_ShouldProduceThreeMoves() {
    val discs = List(Disc(1), Disc(2))
    val moves = Game move discs
    trace(moves)
    moves.size.should(be(3))
    assertVerified(moves(0), discs(0), A(), B())
    assertVerified(moves(1), discs(1), A(), C())
    assertVerified(moves(2), discs(0), B(), C())
  }


  @Test
  def move_WithThreeDiscs_ShouldProduceSevenMoves() {
    val discs = List(Disc(1), Disc(2), Disc(3))
    val start = System.currentTimeMillis()
    val moves = Game move discs
    println((System.currentTimeMillis() - start) + " ms")
    trace(moves)
    moves.size.should(be(7))

    assertVerified(moves(0), discs(0), A(), C())
    assertVerified(moves(1), discs(1), A(), B())
    assertVerified(moves(2), discs(0), C(), B())
    assertVerified(moves(3), discs(2), A(), C())
    assertVerified(moves(4), discs(0), B(), A())
    assertVerified(moves(5), discs(1), B(), C())
    assertVerified(moves(6), discs(0), A(), C())
  }

  def assertVerified(move: Move, disc: Disc, A: Peg, B: Peg) {
    move.disc.should(be(disc))
    move.start.should(be(A))
    move.end.should(be(B))
  }


  def trace(moves: scala.List[Move]) {
    println("===============================")
    for (move <- moves) println(move)
    println("===============================\n")
  }

  @Test
  def move_WithNDiscs_ShouldProduceMoves() {
    val discs = (1 to 24).map(Disc(_)).toList

    val start = System.currentTimeMillis()
    val moves = Game move discs
    val end = System.currentTimeMillis()

    println("Took: " + (end - start) + "ms")
    moves.size.should(be(16777215))
  }
}

I hope the test names to be self explanatory enough !
As explained on the ScalaTest website, I used the wonderful shouldMatchers traits extending elegantly my test domain language. The wrapping classes in this ScalaTest trait provides a fluent DSL naturally extending the testing code vocabulary. Easy to read, easy to understand.
The trace method allows to output the methods for the developer to double check "functionally speaking" the move sequence validity. Its output will produce traces like:

===============================
Move Disc(1) from A(A) to B(B)
Move Disc(2) from A(A) to C(C)
Move Disc(1) from B(B) to C(C)
===============================

Rwaaaaaaaaaaaaght (< pale imitation of Dr Evil) 

We have tests but no code. Reading the code you easily understand  I expect my pegs (or rods) to match my initial referential vocabulary. Incapable of producing correct Scala enumerationss (please comment on this !!!) I produced the following case classes:

trait Peg {
  val name: String
}


case class A(name: String = "A") extends Peg
case class B(name: String = "B") extends Peg
case class C(name: String = "C") extends Peg

Of course I need some move abstraction in order to collect a sequence of moves and maybe reuse it into an animated display (don't dream ;), this one is going to take long):

class Move (val start: Peg, val end: Peg, val disc: Disc) {
  override def toString = "Move " + disc + " from " + start  +  " to " +  end
}

object Move{
  def apply(start: Peg, end: Peg, disc: Disc):Move = new Move(start, end, disc)
}

I used a companion object allowing me to create easy-to-read code like Move(a, c, disc), throwing away the use of the new keyword. This is a natural consequence of using an apply procedure in Scala. Ok, then I need some discs to move. Let them be defined by:

case class Disc(id: Int)

The discs will be identified by a unique id. The smaller discs, on top of the piles will have smaller numbers. Remains to code a Game as an object:

object Game {

  private def move(discs: List[Disc], a: Peg, b: Peg, c: Peg): List[Move] = {
    discs match {
      case Nil => Nil
      case head :: Nil =>
        List(Move(a, c, head))
      case head :: tail =>
        move(tail, a, c, b) ::: Move(a, c, head) :: move(tail, b, a, c)
    }
  }

  def move(discs: List[Disc]): List[Move] = {
    move(discs reverse, A(), B(), C())
  }
}

I based my algorithm on pattern matching. The two first case matchings solve the edge problems of no disc and one disc:

private def move(discs: List[Disc], a: Peg, b: Peg, c: Peg): List[Move] = {
    discs match {
      case Nil => Nil
      case head :: Nil =>
        List(Move(a, c, head))
      ...
}
  }

so no disc, no move, and one disc, one move. For more than one disc I just recursively wrote the algorithm I presented a few blocks above

case head :: tail =>
   move(tail, a, c, b) ::: Move(a, c, head) :: move(tail, b, a, c) 

I resolve a list with more then one element with the now well known pattern head :: tail.
The head, is the current disc n, and what I simply return is the composition of the previous moves , completed with my current move and of course the upcoming moves.
The a,b,c pegs references are switched in the next recursion in  order to match the n - 1 level problem signature.
You can blame me, the use of the ::: operator being quite expensive, but the problem is intractable by the way. I will welcome every optimized solution.

Notice I have reversed the input list so my head match is always the largest disc to move from the source rod to the target rod.

Having started practicing Akka at a very modest level, the insane idea of trying to use all my CPUs if possible instead of one CPU occurred to me. This is insane because of the intractable nature of the problem. What is a fistful of CPUs compared to the age of the universe ?
But I was in need for a more "concrete"example of the use of Akka actors. I did not knew all about Akka, but enough to maybe reuse the Hotswap facility too.

Handling this problem with actors is a typical fork join problem. A perfectly balanced one indeed. The number of moves to recursively process, before and after a current move are equals. On each level of recursion we are going to create two new actors in order to handle the next level, then wait for their results.
Receiving the results of the sub-processes, we join them and send back the information to the invoking actor or the main thread. As of every fork-join problem you first need to define a threshold that means you must find when to stop forking (so creating new actors) in order to balance the benefit from parallelism with the cost of resources creation and management.
By the way too many actors would not be a benefit as you might have to rotate you CPUs' execution time onto handling sub intractable problems, the worst of all problems.
Finding the balance is a question of test and retry, measures etc... I found a rule half intuitive, half experimental which is to define a depth beyond that it would be dumb to sub-divide the problems in more parts than there are CPU's. With two CPUs you fork once. With eight, thrice etc... Given my number of CPU's, a suitable recursive algorithm is:

private def depthFor(numberOfCPUs:Int): Int ={
    if (numberOfCPUs <=1) 0
    else {1 + depthFor(numberOfCPUs >> 1)}
  }

So all I need now are Mover actors exchanging identified messages. There will be no finesse in the code I am going to present you, because I am still so dumb when it comes to manipulating type classes in Scala, and I will really welcome your proposals. It is a first shot and I was really in need to present it to you.

My actors need to exchange messages concerning the search for PreviousMoves and PostMoves respectively before and after the move of the larger disc in a pile. Naturally the orders must be sent using respectively ProcessPreviousMoves and ProcessPostMoves. So here we come:

case class ProcessPreviousMoves(discs: List[Disc], a: Peg, b: Peg, c: Peg)
case class ProcessPostMoves(discs: List[Disc], a: Peg, b: Peg, c: Peg)
case class PreviousMoves(moves: List[Move])
case class PostMoves(moves: List[Move])

I created an abstract Mover class inheriting from the Akka Actor class. This class is extended by two sub-classes handling the previous and post moves processing. The abstract class holds all the knowledge and state required to:


  • store previous, post and current moves 
  • recursively process the remaining moves beyond the threshold
  •  gather incoming previous and post moves 
  • send back invocation to the caller 

So I produced that class template: 

abstract class Mover(val threshold: Int) extends Actor {
  var move: Move = _
  var previousList: List[Move] = _
  var postList: List[Move] = _
  var source: Channel[Any] = _


  def move(discs: List[Disc], a: Peg, b: Peg, c: Peg): List[Move] = {
    discs match {
      case Nil => Nil
      case head :: Nil =>
        List(Move(a, c, head))
      case head :: tail =>
        move(tail, a, c, b) ::: Move(a, c, head) :: move(tail, b, a, c)
    }
  }

  def propagate(head: Disc, tail: List[Disc], a: Peg, b: Peg, c: Peg) {
    source = self.channel
    move = Move(a, c, head)
    PreviousMover(threshold) ! ProcessPreviousMoves(tail, a, c, b)
    PostMover(threshold) ! ProcessPostMoves(tail, b, a, c)
  }

  def complete()

  def previous(list: List[Move]) {
    previousList = list
  }

  def following(list: List[Move]) {
    postList = list
  }

   def expectingSomeMoves: Receive = {
    case PreviousMoves(list) =>
      previous(list)
      become(expectingPostMoves)
    case PostMoves(list) =>
      following(list)
      become(expectingPreviousMoves)
  }

  def expectingPreviousMoves: Receive = {
    case PreviousMoves(list) =>
      previous(list)
      complete()
  }

  def expectingPostMoves: Receive = {
    case PostMoves(list) =>
      following(list)
      complete()
  }
}

The state is stored using the following self explanatory variables:

var move: Move = _
  var previousList: List[Move] = _
  var postList: List[Move] = _
  var source: Channel[Any] = _

The fields move, previousList, postList will be merged and send back to the source. Once the threshold limit is exceeded, I have to process the remaining moves in the same actor so I brought back my recursive method:

def move(discs: List[Disc], a: Peg, b: Peg, c: Peg): List[Move] = {
    discs match {
      case Nil => Nil
      case head :: Nil =>
        List(Move(a, c, head))
      case head :: tail =>
        move(tail, a, c, b) ::: Move(a, c, head) :: move(tail, b, a, c)
    }
  }

Of course an actor must know how to fork two new actors:

def propagate(head: Disc, tail: List[Disc], a: Peg, b: Peg, c: Peg) {
    source = self.channel
    move = Move(a, c, head)
    PreviousMover(threshold) ! ProcessPreviousMoves(tail, a, c, b)
    PostMover(threshold) ! ProcessPostMoves(tail, b, a, c)
  }

You have identified the upcoming PreviousMover and PostMover classes that will extend the abstract Mover class.

Each of these class respectively handle the processing of previous and post moves. You will also have recognized the bang "!" symbol, identical to the Scala symbol dedicated to the sending of asynchronous messages to actors (fire and forget invocations). This is where I store the necessary state information like the current move and the emitting source.
Then, I have used the Akka implementation of method loop hot swapping, in order to wait for messages holding the result of the sub processing forked actors.
Why? because on each incoming message, the state of my actor becomes more and more predictable and I will have to chose between less and less cases. This way my reactions will simply be shorter so faster. I defined the following implementations:

def expectingSomeMoves: Receive = {
    case PreviousMoves(list) =>
      previous(list)
      become(expectingPostMoves)
    case PostMoves(list) =>
      following(list)
      become(expectingPreviousMoves)
  }

  def expectingPreviousMoves: Receive = {
    case PreviousMoves(list) =>
      previous(list)
      complete()
  }

  def expectingPostMoves: Receive = {
    case PostMoves(list) =>
      following(list)
      complete()
  }

Note the invocation of the become method from within the Actor, invocation allowing me to change at runtime the method loop. After forking two new actors, I will have to expect some moves whether they are PreviousMoves or PostMoves. Receiving PreviousMoves, I will update my previousList reference and then expect for the PostMoves:

def expectingSomeMoves: Receive = {
    case PreviousMoves(list) =>
      previous(list)
      become(expectingPostMoves)
......
  }

  def expectingPostMoves: Receive = {
    case PostMoves(list) =>
      following(list)
      complete()
  }

Once the PostMoves received I will update my followingList reference and complete the process. I have only one other alternate scenario, that is receiving PostMoves first, then swapping my receive code in order to  complete my operations on the next receive invocation:

def expectingSomeMoves: Receive = {
 ...................
    case PostMoves(list) =>
      following(list)
      become(expectingPreviousMoves)
  }

  def expectingPreviousMoves: Receive = {
    case PreviousMoves(list) =>
      previous(list)
      complete()
  }

Completion is basic and depends whether I am working as a PreviousMover or a PostMover. Both implementations are:

class PostMover(threshold: Int) extends Mover(threshold) {

  def complete() {
    source ! PostMoves(previousList ::: move :: postList)
  }

  protected def receive = {
    case ProcessPostMoves(discs, a, b, c) =>
      discs match {
        case Nil =>
          self.channel ! PostMoves(Nil)
        case head :: Nil =>
          self.channel ! PostMoves(List(Move(a, c, head)))
        case head :: tail =>
          if (discs.size <= threshold) {
            self.channel ! PostMoves(move(discs, a, b, c))
          } else {
            propagate(head, tail, a, b, c)
            become(expectingSomeMoves)
          }
      }
  }

}

object PostMover {
  def apply(threshold: Int): ActorRef = {
    val mover = Actor.actorOf(new PostMover(threshold))
    mover.start()
    mover
  }
}

class PreviousMover(threshold: Int) extends Mover(threshold) with Actor {

  protected def receive = {
    case ProcessPreviousMoves(discs, a, b, c) =>
      discs match {
        case Nil =>
          self.channel ! PreviousMoves(Nil)
        case head :: Nil =>
          self.channel ! PreviousMoves(List(Move(a, c, head)))
        case head :: tail =>
          if (discs.size <= threshold) {
            self.channel ! PreviousMoves(move(discs, a, b, c))
          } else {
            propagate(head, tail, a, b, c)
             become(expectingSomeMoves)
          }
      }
  }

  def complete() {
    source ! PreviousMoves(previousList ::: move :: postList)
  }
}

object PreviousMover {
  def apply(threshold: Int): ActorRef = {
    val mover = Actor.actorOf(new PreviousMover(threshold))
    mover.start()
    mover
  }
}

The bodies of the receive methods are quite symmetric. Considering the PostMover implementation:

protected def receive = {
    case ProcessPostMoves(discs, a, b, c) =>
      discs match {
        case Nil =>
          self.channel ! PostMoves(Nil)
        case head :: Nil =>
          self.channel ! PostMoves(List(Move(a, c, head)))
        case head :: tail =>
          if (discs.size <= threshold) {
            self.channel ! PostMoves(move(discs, a, b, c))
          } else {
            propagate(head, tail, a, b, c)
            become(expectingSomeMoves)
          }
      }
  }

The genuine pattern matching then recursive invocation is easily recognizable. Only the threshold check has been introduced in order to stop creating actors. I chose to create a new method in my Game instance in order to execute the parallel scenario:

private def depthFor(numberOfCPUs:Int): Int ={
    if (numberOfCPUs <=1) 0
    else {1 + depthFor(numberOfCPUs >> 1)}
  }

  def movePar(discs: List[Disc]): List[Move] = {
    val threshold: Int = depthFor(getRuntime.availableProcessors())
    println("threshold: " + threshold);
    val result = PostMover(discs.size - threshold) !! (ProcessPostMoves(discs reverse,A(), B(), C()), 3600000)
    result.getOrElse(PostMoves(Nil)).asInstanceOf[PostMoves].moves
  }

I start firing a PostMover actor using a "!!" symbol allowing me to synchronously invoke the actor so wait for the result.

Not knowing how to proceed I set a timeout of an hour for the duration of the timeout in the synchronous invocation.

The program has been tested on both a dual core machine and an 8 core machine. All things being equal, all the effort was worthwhile on the 8 core machine,  a gain of 40/45 % being measured during micro-benches.
Don't mind the exercise was fun by the way and we learnt how to fork join and hot swap method loops with Akka actors.

This one was really tooooooooooooooooooo long!!!

Must go to bed. Be seeing you !! =D

Monday, June 13, 2011

Scala reverse indexing of resources using naive MapReduce

Cross my heart, I will end the story of the deployment of the walking skeleton of my little Kanban application. But as I have promised myself to talk on a regular base of things I have learned or done, here it is in Scala this week.
On the edge of leaving the company I am working for (looking for a position in J2EE or Scala in Benelux, Switerland or UK ;)), we had to face two or three interesting problems on the last weeks. The first targeted J2EE6, and we are on our way to explore it. The second is a story about graphs (I will maybe make a port in Scala to reproduce the problem and optimize it for personal exploration, watch for it ). The third one was more a proof of concept than a real problem.

The company I am working for, polls wide ranges of properties, on massive sets of physical devices, in order to produce high value primary key indicators. A few months ago we have opened a web service API in order to be able to both import an export filtered historical data information matching identified set of devices. Let call them resources. Although this was not the purpose of the genuine coders, let us say that the resources are identified by a kind of adaptative object modeling approach. New descriptive attributes can be dynamically bound to resources. These attributes are called properties.

The API, among other facilities, allow to retrieve resources descriptions using selectors including the attribute values. Part of the domain model is stored in a cache. But the volume of properties information is too big to be stored in a memory cache.
We adopted a text search engine solution that basically creates reverse indexes. All the property values are perceived as string instances and used as reference to resources ids and descriptions. This idealistic approach aimed to save memory and provide access faster than a database approach. But the hardware storing device behavior matters when it comes to implementing the solution, and some clients complain. Big clients.
We started thinking into our own solution of reverse indexing, with multiple level caches mixing both reverse indexed memory cache and physical cache, everything taking benefit from multi core structures during the indexation process.
As the management is already reluctant in the adoption of rational development process, this idea cannot be adopted although we provide a start of a solution, some proof of concept, something stating we know what we are talking about.

For once I was lucky. I ended up this week-end the reading of Philipp Haller's book: Actors in Scala. This is quite a nice book if you want to go into the mechanics of Scala actors, and if you want to take real benefit from it, work the examples with a copy of Programming in Scala, in order to have a reference . After this reading, you will know more things on Scala and Scala actors. This is a step I wanted to take before discovering akka. First things first !!!
At the end of the book, and in order to draw a broad picture of parallel computing, Mr Haller proposes a quite expressive example of a reverse indexing approach close to the Google patented MapReduce implementation, aiming to create a reverse index for a file systems. His example is quite complete and I don't want to duplicate it here. But it inspired me and I created a small proof of concept for the tech lead of the team, hoping he will be able to provide it. Blame me if I did not correctly my job and betrayed Mr Haller's approach.

So what's all the fuss about ? The idea is that we must be able in some way to separate the process of indexation by itself from the parallel mechanics (the old separation of concerns stuff).
This way we can distribute parallelism on computers, multi core structures etc... The process of reverse indexing can be describe by as a combination of the standard functional programming facilities known as map and reduce (check Lisp, Scheme,...). Paraphrasing Jeffrey Dean and Sanjay Ghemawat, we are going to
  1. map the input as a logical set of (property value, resource) pair
  2. reduce the result for all the resources sharing the same key

The result will be some instance of Map[property value, List[resource]]
The two steps are:

map List[K, List(V)] to List[(V, K)]
reduce List[(V, K)] to Map[V, List(K)]

where K and V respectively target a resource type and a property type (Ok, that's grossly over simplified)

In order to start, we need some basic abstraction for a resource. This resource must provide properties. Some very rough test is :

import org.junit.Test
import org.junit.Assert._

final class TestResource {

  @Test
  def properties_WithTestInstance_ShouldBeBound() {
    val properties = List("Server", "Switch")
    assertEquals(properties, Resource(properties).properties)
  }

}

Nothing hard here: the resource is identified as a server and also a switch, as marked by the properties. So be it:

final class Resource (val properties: List[String])

object Resource{
  def apply(properties: List[String] = Nil): Resource = {
    new Resource(properties)
  }
}

Still quite simple. I provided a companion class to the resource so I would be able to clarify the code avoiding using the new keyword. This is a typical factory method pattern. Note that I used the pattern of code specifying a default value for the list parameter. The default value is a Nil empty list.

The two variable operations I will inject in my mechanic are a mapping action and a reducing action.
I used a Scala trait to set contract for these two operations:

trait ReverseIndexMapper[K,V] {

  def mapping(key: K, list: List[V]): List[(V,K)]

  def selecting(list: List[K]): List[K]

}

The default implementation I will provide is quite simple. In the first case I have planned to convert a List[K, List(V)] to a List[(V, K)] to extract a list of normalized (property value, resource) pairs.
In the second step I will have merged all the List[(V, K)] for all the resources into a Map[V, List(K)], but I will have to suppress double entries into List(K) of entries.

This lead me to a test describing these cases:

final class TestDefaultMapper {
  @Test
  def mapping_WithResourcePropertiesList_ShouldCreateListOfPairPropertyResource() {
    val resource = Resource(List[String]("Server", "Bridge" , "LDAP"))
    val values  = resource properties
    val result = DefaultIndexerMapper[Resource, String]().mapping(resource, values)
    assertNotNull(result)
    assertEquals(values.size, result.size)

    for ((property, inResource) <- result){
      assertTrue(values contains property)
      assertEquals(resource, inResource)
    }
  }


  @Test
  def mapping_WithDuplicateResources_ShouldCreateListWithNoDuplicates() {
    val instanceOne = Resource()
    val instanceTwo = Resource()
    val resources = List[Resource](instanceOne, instanceTwo, instanceTwo)
    val result = DefaultIndexerMapper[Resource, String]().selecting(resources)
    assertNotNull(result)
    assertEquals(2, result.size)
    assertTrue(result contains instanceOne)
    assertTrue(result contains instanceTwo)

  }
}

I chose to name my target class DefaultIndexerMapper and to provide it with the mapping and selecting procedures (yes functional programming vocabulary ;) ). The first test submit a pair (Resource, list of property values) to the mapping action and check that all the entries are normalized as (property, value) record-like pairs The second test checks that we have effectively removed double entries in the list As a result I came to the following implementation:

class DefaultIndexerMapper[K, V] private()
  extends ReverseIndexMapper[K,V]{

  override def selecting(list: List[K]): List[K] = {
    list.distinct
  }

  override def mapping(key: K, list: List[V]): List[(V, K)] = {
    for (value <- list) yield (value, key)
  }

}

object DefaultIndexerMapper{
  def apply[K,V]()={
    new DefaultIndexerMapper[K,V]
  }
}

As usual(I like that), we find a companion object. The implementations are self explanatory. We used the native distinct method of the Scala List class so to remove duplicates. Done (OMG one line !)
For the mapping in order to produce the list we use the Scala's yield keyword to convert through a for loop iteration the input list into a sequence of pairs.Done (OMG one line again!)

 Nothing more to say or to do. Then comes the test to check the map reduce action. This is how I described the expected result:

import org.junit.Test
import org.junit.Assert._

final class TestMapReduce {
  val resourceOne = Resource(List("Server", "Switch"))
  val resourceTwo = Resource(List("Server", "HTTP", "FTP"))
  val resourceThree = Resource(List("Server", "LDAP"))


  def inputFor(resourceOne: Resource, resourceTwo: Resource, resourceThree: Resource): List[(Resource, List[String])] = {
    List(
      (resourceOne, resourceOne.properties),
      (resourceTwo, resourceTwo.properties),
      (resourceThree, resourceThree.properties)
    )
  }

  @Test
  def index_WithSentences_ShouldMatch() {
    val mapper = DefaultIndexerMapper[Resource, String]()
    val result = ReverseIndex[Resource, String](mapper).proceedWith(
      inputFor(resourceOne, resourceTwo, resourceThree)
    )

    assertEquals(5, result size)

    assertEquals(3, result("Server") size)
    assertTrue(result("Server").contains(resourceOne))
    assertTrue(result("Server").contains(resourceTwo))
    assertTrue(result("Server").contains(resourceThree))

    assertEquals(1, result("Switch") size)
    assertTrue(result("Switch").contains(resourceOne))

    assertEquals(1, result("HTTP") size)
    assertTrue(result("HTTP").contains(resourceTwo))

    assertEquals(1, result("FTP") size)
    assertTrue(result("FTP").contains(resourceTwo))

    assertEquals(1, result("LDAP") size)
    assertTrue(result("LDAP").contains(resourceThree))
  }

}

You can shoot now. This is more a functional level test than a unitary test. The body of the second test is too large and provides too many information. Basically I create three resources :

import org.junit.Test
import org.junit.Assert._

final class TestMapReduce {
  val resourceOne = Resource(List("Server", "Switch"))
  val resourceTwo = Resource(List("Server", "HTTP", "FTP"))
  val resourceThree = Resource(List("Server", "LDAP"))
//...
}

As I expect my reverse indexer to take instance of List[(Resource, List[String])] as input I create this structure into the method:

def inputFor(resourceOne: Resource, resourceTwo: Resource, resourceThree: Resource): List[(Resource, List[String])] = {
    List(
      (resourceOne, resourceOne.properties),
      (resourceTwo, resourceTwo.properties),
      (resourceThree, resourceThree.properties)
    )
  }

I then challenge my indexer, checking the output result and specifically that each entry is mapped to the owning resource:

@Test
  def index_WithSentences_ShouldMatch() {
    val mapper = DefaultIndexerMapper[Resource, String]()
    val result = ReverseIndex[Resource, String](mapper).proceedWith(
      inputFor(resourceOne, resourceTwo, resourceThree)
    )

    assertEquals(5, result size)

    assertEquals(3, result("Server") size)
    assertTrue(result("Server").contains(resourceOne))
    assertTrue(result("Server").contains(resourceTwo))
    assertTrue(result("Server").contains(resourceThree))
//...
  }

No excuse, I was in a rush and as Uncle Bob says, you have no excuse to do dirty job. Shame on me. The solution I provided to pass the test comes as :

import actors.Actor._
case class Intermediate[K, V](list: List[(V,K)])
case class Optimized[K, V](value: V, forKeys: List[K])

final class ReverseIndex[K,V] private(delegate: ReverseIndexMapper[K,V]){

  def selecting(list: List[K]): List[K] = {
    delegate.selecting(list)
  }

  def mapping(key: K, list: List[V]): List[(V,K)] = {
    delegate.mapping(key, list)
  }

  def mapped(input: List[(K, List[V])]) ={
    val master = self
    for ((key, values) <- input) {
      actor {
        master ! Intermediate(mapping(key, values))
        exit()
      }
    }
    input
  }

  def join(taskSize: Int) = {
    var mappedLists = List[(V, K)]()
    for (_ <- 1 to taskSize) {
      receive {
        case Intermediate(list: List[(V, K)]) =>
          mappedLists :::= list
        case _ => println("Trap")
      }
    }
    mappedLists
  }

  def all(list: List[(K, List[V])]): Int ={
    list.size
  }

  def mapFrom(mappedLists: List[(V, K)]): Map[V, List[K]] = {
    var toReduce = Map[V, List[K]]().withDefault(k => List[K]())
    for ((key, value) <- mappedLists)
      toReduce += (key -> (value :: toReduce(key)))
    toReduce
  }

  def reduced(reducedMap: Map[V, List[K]]) ={
    val master = self
    for ((key, values) <- reducedMap)
      actor {
        master ! Optimized(key, selecting(values))
        exit()
      }
    reducedMap
  }

  def collect(reducedMap: Map[V, List[K]]): Map[V, List[K]] = {
    var result = Map[V, List[K]]()
    for (_ <- 1 to reducedMap.size)
      receive {
        case Optimized(key: V, values: List[K]) => result += (key -> values)
      }
    result
  }

  def proceedWith(input: List[(K, List[V])]): Map[V, List[K]] = {
    val reversedLists = join(all(mapped(input)))
    collect(reduced(mapFrom(reversedLists)))
  }
}

object ReverseIndex {
  def apply[K,V](delegate : ReverseIndexMapper[K,V] ):ReverseIndex[K,V] ={
    new ReverseIndex[K,V](delegate)
  }
}

Don't panic. This is the final version and I may have exaggerated the use of methods calls in the main method. Let's start with the simple elements. Fisrt we have a companion object:

object ReverseIndex {
  def apply[K,V](delegate : ReverseIndexMapper[K,V] ):ReverseIndex[K,V] ={
    new ReverseIndex[K,V](delegate)
  }
}

The companion object build the reverse indexer, taking as a parameter a DefaultIndexerMapper implementation. This is why the class declaration comes to be:

final class ReverseIndex[K,V] private(delegate: ReverseIndexMapper[K,V])

When needed we will use our delegate implementations:

def selecting(list: List[K]): List[K] = {
    delegate.selecting(list)
  }

  def mapping(key: K, list: List[V]): List[(V,K)] = {
    delegate.mapping(key, list)
  }

Hurrah, we have handled a third of the implementation. The important entry method is the proceedWith method:

def proceedWith(input: List[(K, List[V])]): Map[V, List[K]] = {
    val reversedLists = join(all(mapped(input)))
    collect(reduced(mapFrom(reversedLists)))
  }


The first step expresses in naive literate programming the normalization of all our (K, List[V]) entry pairs into a list of all the (V,K):

val reversedLists = join(all(mapped(input)))

the mapped method will delegate the job of mapping to a set of actors:

def mapped(input: List[(K, List[V])]) ={
    val master = self
    for ((key, values) <- input) {
      actor {
        master ! Intermediate(mapping(key, values))
        exit()
      }
    }
    input
  }

In order to proceed, for each entry pair (K, List[V]) so (resource, ListOf(property values)), I fires an actor using the scala.Actor actor tool method. Each actor sends back to the master process an answer as an Intermediate result instance:

master ! Intermediate(mapping(key, values))

Intermediate is a standard case which purpose is to both store the message content returned to the sender (the master) and to ease pattern matching from the point of the master while analyzing the message content.
 
case class Intermediate[K, V](list: List[(V,K)])

Of course I naturally stored the master process reference before firing the actors to provide them the reference as an immutable master reference. As I don't need the actor anymore I quit immediately invoking an exit() method. The join method will collect the job sent back by the actors. The all method is syntax sugar collecting the number expected list of (V,K) pairs to be joined (so the number of resources).

def join(taskSize: Int) = {
    var mappedLists = List[(V, K)]()
    for (_ <- 1 to taskSize) {
      receive {
        case Intermediate(list: List[(V, K)]) =>
          mappedLists :::= list
        case _ => println("Trap")
      }
    }
    mappedLists
  }

  def all(list: List[(K, List[V])]): Int ={
    list.size
  }

First I create the list of normalized entries to be filled up by messages incoming from all the fired actors. There is no risk of race condition. As we are acting as an actor, invoking the receive method, we will sequentially read the incoming messages.

var mappedLists = List[(V, K)]()

So for each fired actors (as many as (K, List[V]) pairs) I receive the Intermediate class instance and resolve its content using case classes pattern matching facility. I added a trap case in order to log flaw in the class execution (This is a prototype) The list is completed using the standard Scala ::: concatenation operator. First step done. What about the second step ?

collect(reduced(mapFrom(reversedLists)))

The same story, by the way. The mapFrom method creates a map from the list of (property value, resource) pairs:

def mapFrom(mappedLists: List[(V, K)]): Map[V, List[K]] = {
    var toReduce = Map[V, List[K]]().withDefault(k => List[K]())
    for ((key, value) <- mappedLists)
      toReduce += (key -> (value :: toReduce(key)))
    toReduce
  }

Really no big deal. Clearly generic notation in Scala does not entangle the code as much as in Java. The reduced operation replays the same part we have been playing previously, firing actors in charge of removing duplicates from the lists of resources that are now indexed by the property values:

def reduced(reducedMap: Map[V, List[K]]) ={
    val master = self
    for ((key, values) <- reducedMap)
      actor {
        master ! Optimized(key, selecting(values))
        exit()
      }
    reducedMap
  }

this time the result of the execution of each actor is set back  through the use of a new case class :

case class Optimized[K, V](value: V, forKeys: List[K])

The input map is returned in order to chain the last method:

def collect(reducedMap: Map[V, List[K]]): Map[V, List[K]] = {
    var result = Map[V, List[K]]()
    for (_ <- 1 to reducedMap.size)
      receive {
        case Optimized(key: V, values: List[K]) => result += (key -> values)
      }
    result
  }

Again, we collect all the Optimized actor messages and store them into the result map.

Test green !

Nice, but tough. Philipp Haller's offers a more complete example explaining how to trap exceptions and build a more fault-tolerant system using self.trapExit instance variable and link method to chain master/slave processes. I you are intersted in , I can complete the example.

I have to finish reading "Why functional programming matters" by John Hughes and wake up at 4am, so I will leave you for tonight.

Be seeing you !!!

Tuesday, June 7, 2011

Mixing J2EE 6 and Scala, a first shot

I must confess I do admire a lot people recognized as craftsmen, all from different domains like Martin Fowler, James Coplien, Uncle Bob,Martijn Verburg, Martin Odersky, Jonas Boner. I like to follow Corey Haines adventures, listen to Venkat Subramaniam presentations... Apologies to the many others I forget.
In the desert land of french IT service it is hard to get satisfaction in our everyday projects, and we have very few opportunities to cross the path of such masters . Personally my only satisfactions come from my own learning from the masters... alas from book, videos, articles.Few chats with one of them in a twitter exchange is worth all the gold on earth.
Stop self-pitying, hopefully, all the time out of the daily job can be dedicated to exploration, discovery...and blog buddies =D

Recently we tried o to promote J2EE 6, hoping that the current client would accept to finally switch to a universally adopted standard, we once more had to start dealing with a mess, some big ball of muddy mixed services and strange God objects. Tough play. Fortunately J2EE 6 is lighter (pruning), easier to use (optional XML, more AOP), richer in terms of API, really more business oriented than the 5.0 level was,  and it really gazes to a brighter cloudy future.
Naturally, I came to acquire a copy of a J2EE 6  book in order to upgrade my knowledge from the 5.0 version up to the 6.0 one, in a more friendly approach. I chose Antonio Goncalves' Beginning JavaEE6 with GlassFish 3, 2nd Edition. Don't be foolished by the title. Although every naive java developer can start with the content, every astute J2EE developer can find in almost every page valuable information. Reading this gold mine will help you in grabbing good habits and fully adopting the 6.0 standard peacefully. So, this is a good way to escape from the everyday routine.

As I promised myself to try to deliver something on a regular base(shame on me I am late, too few hours during the day), I did not want to re-do a carbon copy of Mr Goncalves book content. There are still so many presentations exposing J2EE6 and its benefits.
Like a lot of developers I have some difficulties in self organization so I planned to work on a kind of small personal planning board (very, very light kanban one). A J2EE application would be a good approach. In the mean time it came to me how difficult it was to study so many things in parallel (too many books). I have started practicing Scala, upgrading to J2EE6, recently started playing with Clojure, and still gazing to Lisp and Erlang

Why not gathering the J2EE6 adventure and the Scala one, for a while ? It may be ambitious. Maybe not. So before even trying to attempt  a Reenskaugh/Coplien DCI approach, selecting my Use Cases, identifying the Roles, planning some form of architecture, I have to set up an environment, a working skeleton as defined by Nat Pryce and Steve Freeman in Growing object Oriented software guided by tests. This approach would allow me to define a broad picture of a working environment mixing Scala and J2EE 6.

How can I start ? The most important thing to do is to start. After all incredible people like David Pollack must have been starting somewhere before becoming experts in application servers design like Lift.

I want to work in a J2EE6 environment using the Scala language. So let set a Maven 3.0 environment to work with my IntelliJ 10.5. I am going through this experience the hard way: I will set the dependencies as the sum of all the dependencies needed both in a J2EE project, and inScala project. For a very first baby step I will focus on creating an entity, paired with some managing EJB service. What I want is a working shell. An embedded environment for TDD. Second step (I hope in some upcoming article) will be to make the stuff deployable.

Let the entity be an elementary task, then I will create a task service.

I want to work with J2EE, so I may need the following dependencies:


        
            
                org.eclipse.persistence
                javax.persistence
                2.0.3
            
            
                org.eclipse.persistence
                eclipselink
                2.2.0
            
            
                org.glassfish
                javax.ejb
                3.1
            
            
                org.glassfish.extras
                glassfish-embedded-all
                3.1
                test
            
            
                org.apache.derby
                derby
                10.8.1.2
            
            
                junit
                junit
                4.8.2
                test
            
            
                org.scala-lang
                scala-library
                2.8.1
            
        
    

The javax.persistence and javax.ejb dependencies reference the J2EE standards definitions. The eclipse link dependency is the JPA 2.0 implementation I chose, following Antonio's recommendations.

For test scope only I set a reference to the Glassfish embedded container. The embedded container appeared in J2EE 6 in order to simplify the development of unitary tests on EJB components but in a standard J2SE context providing the same managed environment - so facilities (transaction, CDI, AOP, security...) -as the one found into application servers.

The Scala library is the 2.8.1 as I have to work with the Scala Maven plugin. Discussing about plugins configuration I defined the following build configuration:


        src/main/scala
        src/test/scala
         
      
        ${project.basedir}/src/main/resources
      
    
    
      
        ${project.basedir}/src/test/resources
      
    
        
            
                org.scala-tools
                maven-scala-plugin
                
                    
                        
                            compile
                            testCompile
                        
                    
                
                
                    2.8.1
                
            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                2.3.2
                true
                
                    1.6
                    1.6
                
            
            
                org.apache.maven.plugins
                maven-jar-plugin
                2.3.1
            
        
    

Well, this is really a brute force approach. I defined all I think I needed and chose to work into scala directories.

In order to avoid download problems, I selected the set of following repositories:


        
            UK-plugin
            http://uk.maven.org/maven2/
        
        
            scala-tools.org
            Scala-tools Maven2 Repository
            http://scala-tools.org/repo-releases
        
    

    
        
            glassfish-maven-repository.dev.java.net
            GlassFish Maven Repository
            http://download.java.net/maven/glassfish
        
        
            UK
            http://uk.maven.org/maven2/
        
        
            EclipseLink Repo
            http://www.eclipse.org/downloads/download.php?r=1&nf=1&file=/rt/eclipselink/maven.repo
        
        
            scala-tools.org
            Scala-tools Maven2 Repository
            http://scala-tools.org/repo-releases
        
    

Fine we are ready to start ! IntelliJ starts up, recognizes my Scala facets, So far, so good, happy camper I am.

I need a task. I want to persist it and reload it. For starting purpose, let say my task will have a title and of course an id. The test cases are defined as

import org.junit.Test
import org.junit.Assert._

class TestTask {

  @Test
  def id_WithTestValue_ShouldBeBound() {
    val task = new Task
    task.id = 17
    assertEquals(17, task id)
  }
  
  @Test
  def title_WithTestValue_ShouldBeBound() {
    val task = new Task
    task.title = "myTitle"
    assertEquals("myTitle", task title)
  } 
  
}

This leads me to the following Task definition template:

import javax.persistence._

@Entity
@NamedQuery(name = "findAllTasks", query = "SELECT t FROM Task t")
class Task {

  @Id@GeneratedValue var id: Long = 0
  @Column(nullable = false) var title: String = null
}

For clarity purpose, as a first shot, I chose to annotate my Scala fields. As Scala definitions are short, I found more elegant to leave the annotations on the same lines as the field declarations.
Naturally I flagged the definition template with the @Entity annotation. Then I cheated, anticipating a named query dedicated to the selection of all persisted tasks.
How fine and simple is this class template definition, all the beauty of Scala in the uniform access principle application: "It seems that perfection is attained not when there is nothing more to add, but when there is nothing more to remove" (A. de Saint Exupery)

Hurrah, I've got an entity... written in Scala.

Now I want a service to manage this basic skeleton task. How will I query the service in an embedded mode ? I need this very nice stuff introduced in J2EE 6 and destined to ease the integration tests: an embedded EJB container !!! As a reference implementation, Glassfish does provide one (the purpose of the reference in the testing scope).
All I have to to then, is to :
  1. start the embedded container
  2. find my service through JNDI lookup
  3. persist my task
  4. assert my task id has been set
  5. reload my list of persisted tasks
  6. assert my list is not empty

So be it:

import org.junit.Test
import javax.ejb.embeddable.EJBContainer
import com.promindis.planning.entities.Task

import org.junit.Assert._
import java.util.List

final class TestTaskService {
  @Test
  def createTask_WithTestTask() {
    val ec = EJBContainer.createEJBContainer
    val ctx = ec.getContext

    val task = new Task
    task.title =   "testing task"

    val service: TaskEJB = ctx.lookup("TaskEJB").asInstanceOf[TaskEJB]
    service.createTask(task)

    assertNotNull(task id)

    val foundTasks: List[Task] = service.findAllTasks()

    assertNotNull(foundTasks)
    assertEquals(1, foundTasks size)


    ctx.close()

  }
}

Compilation error ! Of course, I must create the service. The solution comes easilly, thanks to the J2EE 6 AOP approach:

import javax.ejb.{LocalBean, Stateless}
import javax.persistence.{EntityManager, PersistenceContext}
import com.promindis.planning.entities._
import java.util.List


@Stateless
@LocalBean
class TaskEJB{
  @PersistenceContext(unitName = "planningPU") private var em: EntityManager = null

  def findAllTasks(): List[Task] ={
      val query = em.createNamedQuery("findAllTasks", classOf[Task])
      query.getResultList
  }

  def createTask(task: Task) {
    em.persist(task)
  }
}

The service is flagged as a stateless local bean, hosting an injected entity manager (Dependency injection is cool when sticking to a standard).
Because I don't know how to proceed for now, I decided not to wrapp the java List in a Scala one (maybe you can help me on that point)

On behalf of the service, the container will manage the transactions during the entity manager invocations.
In upcoming steps I will want to query my service remotely. In a standard Java approach, this would require the definition of an interface flagged with a @Remote annotation. But we know that a 100% abstract Scala trait compiles as an interface.

Let's make a try:

import javax.ejb.Remote
import com.promindis.planning.entities.Task
import java.util.List

@Remote
trait TaskEJBRemote {
  def findAllTasks() : List[Task]
  def createTask(task: Task)
}

Easy, so our service becomes:

import javax.ejb.{LocalBean, Stateless}
import javax.persistence.{EntityManager, PersistenceContext}
import com.promindis.planning.entities._
import java.util.List


@Stateless
@LocalBean
class TaskEJB extends TaskEJBRemote {
  @PersistenceContext(unitName = "planningPU") private var em: EntityManager = null

  def findAllTasks(): List[Task] ={
      val query = em.createNamedQuery("findAllTasks", classOf[Task])
      query.getResultList
  }

  def createTask(task: Task) {
    em.persist(task)
  }
}

One minute, I have named a persistence unit to bind to the entity manager. So I need a persistence.xml file.
Easy said, (not-so)easy done:


    
        org.eclipse.persistence.jpa.PersistenceProvider
        com.promindis.planning.entities.Task
        
            
            
            
            
            
            
            
        
    


Although it is embedded, we are still working with a container, so the transaction type appears to be JTA. The logging level is fixed to FINE as I like to understand what happens.
Note that the database connection properties makes part of the j2ee 6 standard:

  • javax.persistence.jdbc.driver
  • javax.persistence.jdbc.url
  • javax.persistence.jdbc.user
  • javax.persistence.jdbc.password

I chose a drop-and-create-tables ddl-generation schema so to get rid of my database table after test.

Everything's ready. Engage. I open a command console and hit

mvn clean test

Test is automatically run. I have a test failure. Ah right. Some problem with the name service binding. Since J2EE 6, apparently, an agreement has been found onto a global uniform naming convention into the JNDI namespace:

java:global[/<app-name>]/<module-name>/<bean-name>[!<fully-qualified-interface-name>]

Riiiight . Logs in my console give me a hint I would not have guessed alone:

Portable JNDI names for EJB TaskEJB : [java:global/classes/TaskEJB!com.promindis.planning.services.TaskEJBRemote, java:global/classes/TaskEJB!com.promindis.planning.services.TaskEJB]

I replace it with the second defintion
val service: TaskEJB = ctx.lookup("java:global/classes/TaskEJB!com.promindis.planning.services.TaskEJB").asInstanceOf[TaskEJB]

relaunch ...and it works !!!

For the moment I have to trust my environment when it tells me he ran the test, persisted and re-loaded my task. But a real complete environment requires to be able to produce a fully deployable application set >_<.
This will be the next step and I will then be able to run an external client and create records in more permanent database.

One moment, before going to bed, I want to check my remote interface (my Scala trait) is really compiled as an interface. After running:

javap -p TaskEJBRemote.class

I get this:

public interface com.promindis.planning.services.TaskEJBRemote {
 public abstract java.util.List<com.promindis.planning.entities.Task>findAllTasks();
 public abstract void createTask(com.promindis.planning.entities.Task);
}

Good!!!

So we have seen that the promise of a more universal JVM is not only a promise. It does still require some work to do but we are on our way.

I am tired and need four, five hours sleep.

Be seeing you !!!!