Monday, June 13, 2011

Scala reverse indexing of resources using naive MapReduce

Cross my heart, I will end the story of the deployment of the walking skeleton of my little Kanban application. But as I have promised myself to talk on a regular base of things I have learned or done, here it is in Scala this week.
On the edge of leaving the company I am working for (looking for a position in J2EE or Scala in Benelux, Switerland or UK ;)), we had to face two or three interesting problems on the last weeks. The first targeted J2EE6, and we are on our way to explore it. The second is a story about graphs (I will maybe make a port in Scala to reproduce the problem and optimize it for personal exploration, watch for it ). The third one was more a proof of concept than a real problem.

The company I am working for, polls wide ranges of properties, on massive sets of physical devices, in order to produce high value primary key indicators. A few months ago we have opened a web service API in order to be able to both import an export filtered historical data information matching identified set of devices. Let call them resources. Although this was not the purpose of the genuine coders, let us say that the resources are identified by a kind of adaptative object modeling approach. New descriptive attributes can be dynamically bound to resources. These attributes are called properties.

The API, among other facilities, allow to retrieve resources descriptions using selectors including the attribute values. Part of the domain model is stored in a cache. But the volume of properties information is too big to be stored in a memory cache.
We adopted a text search engine solution that basically creates reverse indexes. All the property values are perceived as string instances and used as reference to resources ids and descriptions. This idealistic approach aimed to save memory and provide access faster than a database approach. But the hardware storing device behavior matters when it comes to implementing the solution, and some clients complain. Big clients.
We started thinking into our own solution of reverse indexing, with multiple level caches mixing both reverse indexed memory cache and physical cache, everything taking benefit from multi core structures during the indexation process.
As the management is already reluctant in the adoption of rational development process, this idea cannot be adopted although we provide a start of a solution, some proof of concept, something stating we know what we are talking about.

For once I was lucky. I ended up this week-end the reading of Philipp Haller's book: Actors in Scala. This is quite a nice book if you want to go into the mechanics of Scala actors, and if you want to take real benefit from it, work the examples with a copy of Programming in Scala, in order to have a reference . After this reading, you will know more things on Scala and Scala actors. This is a step I wanted to take before discovering akka. First things first !!!
At the end of the book, and in order to draw a broad picture of parallel computing, Mr Haller proposes a quite expressive example of a reverse indexing approach close to the Google patented MapReduce implementation, aiming to create a reverse index for a file systems. His example is quite complete and I don't want to duplicate it here. But it inspired me and I created a small proof of concept for the tech lead of the team, hoping he will be able to provide it. Blame me if I did not correctly my job and betrayed Mr Haller's approach.

So what's all the fuss about ? The idea is that we must be able in some way to separate the process of indexation by itself from the parallel mechanics (the old separation of concerns stuff).
This way we can distribute parallelism on computers, multi core structures etc... The process of reverse indexing can be describe by as a combination of the standard functional programming facilities known as map and reduce (check Lisp, Scheme,...). Paraphrasing Jeffrey Dean and Sanjay Ghemawat, we are going to
  1. map the input as a logical set of (property value, resource) pair
  2. reduce the result for all the resources sharing the same key

The result will be some instance of Map[property value, List[resource]]
The two steps are:

map List[K, List(V)] to List[(V, K)]
reduce List[(V, K)] to Map[V, List(K)]

where K and V respectively target a resource type and a property type (Ok, that's grossly over simplified)

In order to start, we need some basic abstraction for a resource. This resource must provide properties. Some very rough test is :

import org.junit.Test
import org.junit.Assert._

final class TestResource {

  @Test
  def properties_WithTestInstance_ShouldBeBound() {
    val properties = List("Server", "Switch")
    assertEquals(properties, Resource(properties).properties)
  }

}

Nothing hard here: the resource is identified as a server and also a switch, as marked by the properties. So be it:

final class Resource (val properties: List[String])

object Resource{
  def apply(properties: List[String] = Nil): Resource = {
    new Resource(properties)
  }
}

Still quite simple. I provided a companion class to the resource so I would be able to clarify the code avoiding using the new keyword. This is a typical factory method pattern. Note that I used the pattern of code specifying a default value for the list parameter. The default value is a Nil empty list.

The two variable operations I will inject in my mechanic are a mapping action and a reducing action.
I used a Scala trait to set contract for these two operations:

trait ReverseIndexMapper[K,V] {

  def mapping(key: K, list: List[V]): List[(V,K)]

  def selecting(list: List[K]): List[K]

}

The default implementation I will provide is quite simple. In the first case I have planned to convert a List[K, List(V)] to a List[(V, K)] to extract a list of normalized (property value, resource) pairs.
In the second step I will have merged all the List[(V, K)] for all the resources into a Map[V, List(K)], but I will have to suppress double entries into List(K) of entries.

This lead me to a test describing these cases:

final class TestDefaultMapper {
  @Test
  def mapping_WithResourcePropertiesList_ShouldCreateListOfPairPropertyResource() {
    val resource = Resource(List[String]("Server", "Bridge" , "LDAP"))
    val values  = resource properties
    val result = DefaultIndexerMapper[Resource, String]().mapping(resource, values)
    assertNotNull(result)
    assertEquals(values.size, result.size)

    for ((property, inResource) <- result){
      assertTrue(values contains property)
      assertEquals(resource, inResource)
    }
  }


  @Test
  def mapping_WithDuplicateResources_ShouldCreateListWithNoDuplicates() {
    val instanceOne = Resource()
    val instanceTwo = Resource()
    val resources = List[Resource](instanceOne, instanceTwo, instanceTwo)
    val result = DefaultIndexerMapper[Resource, String]().selecting(resources)
    assertNotNull(result)
    assertEquals(2, result.size)
    assertTrue(result contains instanceOne)
    assertTrue(result contains instanceTwo)

  }
}

I chose to name my target class DefaultIndexerMapper and to provide it with the mapping and selecting procedures (yes functional programming vocabulary ;) ). The first test submit a pair (Resource, list of property values) to the mapping action and check that all the entries are normalized as (property, value) record-like pairs The second test checks that we have effectively removed double entries in the list As a result I came to the following implementation:

class DefaultIndexerMapper[K, V] private()
  extends ReverseIndexMapper[K,V]{

  override def selecting(list: List[K]): List[K] = {
    list.distinct
  }

  override def mapping(key: K, list: List[V]): List[(V, K)] = {
    for (value <- list) yield (value, key)
  }

}

object DefaultIndexerMapper{
  def apply[K,V]()={
    new DefaultIndexerMapper[K,V]
  }
}

As usual(I like that), we find a companion object. The implementations are self explanatory. We used the native distinct method of the Scala List class so to remove duplicates. Done (OMG one line !)
For the mapping in order to produce the list we use the Scala's yield keyword to convert through a for loop iteration the input list into a sequence of pairs.Done (OMG one line again!)

 Nothing more to say or to do. Then comes the test to check the map reduce action. This is how I described the expected result:

import org.junit.Test
import org.junit.Assert._

final class TestMapReduce {
  val resourceOne = Resource(List("Server", "Switch"))
  val resourceTwo = Resource(List("Server", "HTTP", "FTP"))
  val resourceThree = Resource(List("Server", "LDAP"))


  def inputFor(resourceOne: Resource, resourceTwo: Resource, resourceThree: Resource): List[(Resource, List[String])] = {
    List(
      (resourceOne, resourceOne.properties),
      (resourceTwo, resourceTwo.properties),
      (resourceThree, resourceThree.properties)
    )
  }

  @Test
  def index_WithSentences_ShouldMatch() {
    val mapper = DefaultIndexerMapper[Resource, String]()
    val result = ReverseIndex[Resource, String](mapper).proceedWith(
      inputFor(resourceOne, resourceTwo, resourceThree)
    )

    assertEquals(5, result size)

    assertEquals(3, result("Server") size)
    assertTrue(result("Server").contains(resourceOne))
    assertTrue(result("Server").contains(resourceTwo))
    assertTrue(result("Server").contains(resourceThree))

    assertEquals(1, result("Switch") size)
    assertTrue(result("Switch").contains(resourceOne))

    assertEquals(1, result("HTTP") size)
    assertTrue(result("HTTP").contains(resourceTwo))

    assertEquals(1, result("FTP") size)
    assertTrue(result("FTP").contains(resourceTwo))

    assertEquals(1, result("LDAP") size)
    assertTrue(result("LDAP").contains(resourceThree))
  }

}

You can shoot now. This is more a functional level test than a unitary test. The body of the second test is too large and provides too many information. Basically I create three resources :

import org.junit.Test
import org.junit.Assert._

final class TestMapReduce {
  val resourceOne = Resource(List("Server", "Switch"))
  val resourceTwo = Resource(List("Server", "HTTP", "FTP"))
  val resourceThree = Resource(List("Server", "LDAP"))
//...
}

As I expect my reverse indexer to take instance of List[(Resource, List[String])] as input I create this structure into the method:

def inputFor(resourceOne: Resource, resourceTwo: Resource, resourceThree: Resource): List[(Resource, List[String])] = {
    List(
      (resourceOne, resourceOne.properties),
      (resourceTwo, resourceTwo.properties),
      (resourceThree, resourceThree.properties)
    )
  }

I then challenge my indexer, checking the output result and specifically that each entry is mapped to the owning resource:

@Test
  def index_WithSentences_ShouldMatch() {
    val mapper = DefaultIndexerMapper[Resource, String]()
    val result = ReverseIndex[Resource, String](mapper).proceedWith(
      inputFor(resourceOne, resourceTwo, resourceThree)
    )

    assertEquals(5, result size)

    assertEquals(3, result("Server") size)
    assertTrue(result("Server").contains(resourceOne))
    assertTrue(result("Server").contains(resourceTwo))
    assertTrue(result("Server").contains(resourceThree))
//...
  }

No excuse, I was in a rush and as Uncle Bob says, you have no excuse to do dirty job. Shame on me. The solution I provided to pass the test comes as :

import actors.Actor._
case class Intermediate[K, V](list: List[(V,K)])
case class Optimized[K, V](value: V, forKeys: List[K])

final class ReverseIndex[K,V] private(delegate: ReverseIndexMapper[K,V]){

  def selecting(list: List[K]): List[K] = {
    delegate.selecting(list)
  }

  def mapping(key: K, list: List[V]): List[(V,K)] = {
    delegate.mapping(key, list)
  }

  def mapped(input: List[(K, List[V])]) ={
    val master = self
    for ((key, values) <- input) {
      actor {
        master ! Intermediate(mapping(key, values))
        exit()
      }
    }
    input
  }

  def join(taskSize: Int) = {
    var mappedLists = List[(V, K)]()
    for (_ <- 1 to taskSize) {
      receive {
        case Intermediate(list: List[(V, K)]) =>
          mappedLists :::= list
        case _ => println("Trap")
      }
    }
    mappedLists
  }

  def all(list: List[(K, List[V])]): Int ={
    list.size
  }

  def mapFrom(mappedLists: List[(V, K)]): Map[V, List[K]] = {
    var toReduce = Map[V, List[K]]().withDefault(k => List[K]())
    for ((key, value) <- mappedLists)
      toReduce += (key -> (value :: toReduce(key)))
    toReduce
  }

  def reduced(reducedMap: Map[V, List[K]]) ={
    val master = self
    for ((key, values) <- reducedMap)
      actor {
        master ! Optimized(key, selecting(values))
        exit()
      }
    reducedMap
  }

  def collect(reducedMap: Map[V, List[K]]): Map[V, List[K]] = {
    var result = Map[V, List[K]]()
    for (_ <- 1 to reducedMap.size)
      receive {
        case Optimized(key: V, values: List[K]) => result += (key -> values)
      }
    result
  }

  def proceedWith(input: List[(K, List[V])]): Map[V, List[K]] = {
    val reversedLists = join(all(mapped(input)))
    collect(reduced(mapFrom(reversedLists)))
  }
}

object ReverseIndex {
  def apply[K,V](delegate : ReverseIndexMapper[K,V] ):ReverseIndex[K,V] ={
    new ReverseIndex[K,V](delegate)
  }
}

Don't panic. This is the final version and I may have exaggerated the use of methods calls in the main method. Let's start with the simple elements. Fisrt we have a companion object:

object ReverseIndex {
  def apply[K,V](delegate : ReverseIndexMapper[K,V] ):ReverseIndex[K,V] ={
    new ReverseIndex[K,V](delegate)
  }
}

The companion object build the reverse indexer, taking as a parameter a DefaultIndexerMapper implementation. This is why the class declaration comes to be:

final class ReverseIndex[K,V] private(delegate: ReverseIndexMapper[K,V])

When needed we will use our delegate implementations:

def selecting(list: List[K]): List[K] = {
    delegate.selecting(list)
  }

  def mapping(key: K, list: List[V]): List[(V,K)] = {
    delegate.mapping(key, list)
  }

Hurrah, we have handled a third of the implementation. The important entry method is the proceedWith method:

def proceedWith(input: List[(K, List[V])]): Map[V, List[K]] = {
    val reversedLists = join(all(mapped(input)))
    collect(reduced(mapFrom(reversedLists)))
  }


The first step expresses in naive literate programming the normalization of all our (K, List[V]) entry pairs into a list of all the (V,K):

val reversedLists = join(all(mapped(input)))

the mapped method will delegate the job of mapping to a set of actors:

def mapped(input: List[(K, List[V])]) ={
    val master = self
    for ((key, values) <- input) {
      actor {
        master ! Intermediate(mapping(key, values))
        exit()
      }
    }
    input
  }

In order to proceed, for each entry pair (K, List[V]) so (resource, ListOf(property values)), I fires an actor using the scala.Actor actor tool method. Each actor sends back to the master process an answer as an Intermediate result instance:

master ! Intermediate(mapping(key, values))

Intermediate is a standard case which purpose is to both store the message content returned to the sender (the master) and to ease pattern matching from the point of the master while analyzing the message content.
 
case class Intermediate[K, V](list: List[(V,K)])

Of course I naturally stored the master process reference before firing the actors to provide them the reference as an immutable master reference. As I don't need the actor anymore I quit immediately invoking an exit() method. The join method will collect the job sent back by the actors. The all method is syntax sugar collecting the number expected list of (V,K) pairs to be joined (so the number of resources).

def join(taskSize: Int) = {
    var mappedLists = List[(V, K)]()
    for (_ <- 1 to taskSize) {
      receive {
        case Intermediate(list: List[(V, K)]) =>
          mappedLists :::= list
        case _ => println("Trap")
      }
    }
    mappedLists
  }

  def all(list: List[(K, List[V])]): Int ={
    list.size
  }

First I create the list of normalized entries to be filled up by messages incoming from all the fired actors. There is no risk of race condition. As we are acting as an actor, invoking the receive method, we will sequentially read the incoming messages.

var mappedLists = List[(V, K)]()

So for each fired actors (as many as (K, List[V]) pairs) I receive the Intermediate class instance and resolve its content using case classes pattern matching facility. I added a trap case in order to log flaw in the class execution (This is a prototype) The list is completed using the standard Scala ::: concatenation operator. First step done. What about the second step ?

collect(reduced(mapFrom(reversedLists)))

The same story, by the way. The mapFrom method creates a map from the list of (property value, resource) pairs:

def mapFrom(mappedLists: List[(V, K)]): Map[V, List[K]] = {
    var toReduce = Map[V, List[K]]().withDefault(k => List[K]())
    for ((key, value) <- mappedLists)
      toReduce += (key -> (value :: toReduce(key)))
    toReduce
  }

Really no big deal. Clearly generic notation in Scala does not entangle the code as much as in Java. The reduced operation replays the same part we have been playing previously, firing actors in charge of removing duplicates from the lists of resources that are now indexed by the property values:

def reduced(reducedMap: Map[V, List[K]]) ={
    val master = self
    for ((key, values) <- reducedMap)
      actor {
        master ! Optimized(key, selecting(values))
        exit()
      }
    reducedMap
  }

this time the result of the execution of each actor is set back  through the use of a new case class :

case class Optimized[K, V](value: V, forKeys: List[K])

The input map is returned in order to chain the last method:

def collect(reducedMap: Map[V, List[K]]): Map[V, List[K]] = {
    var result = Map[V, List[K]]()
    for (_ <- 1 to reducedMap.size)
      receive {
        case Optimized(key: V, values: List[K]) => result += (key -> values)
      }
    result
  }

Again, we collect all the Optimized actor messages and store them into the result map.

Test green !

Nice, but tough. Philipp Haller's offers a more complete example explaining how to trap exceptions and build a more fault-tolerant system using self.trapExit instance variable and link method to chain master/slave processes. I you are intersted in , I can complete the example.

I have to finish reading "Why functional programming matters" by John Hughes and wake up at 4am, so I will leave you for tonight.

Be seeing you !!!

2 comments:

gridgaintech said...

A lot, and I mean A LOT, easier to do this w/GridGain. Give it a shot.

Regards,
Nikita Ivanov.

Globulon said...

You bet I will =D

Post a Comment