Sunday, July 17, 2011

Finding a short path using Scala

Aouch, two weeks without blogging about things I am learning and I have this addiction problem with the blog :). Having committed to write something, when time's up (nearly two weeks due), I feel to start not at ease specifically when thinking about all the people encouraging me.

I changed of position too soon while missing two nice opportunities, one in England and another in Ireland, two tremendous stuffs. Well a contract is a contract, and as professional (please read Uncle Bob last writings about that), I have signed, so I have to go, although J2EE seems less attractive to me as it used to be. Meeting the Actor model or functional programming seems to please me more. It's a curse, stuck in France for the next five months, but next time I will wait for the nice opportunity, six months if necessary, practicing what I like.

I took the time to read and start practicing stuff like Akka, kept going on reading Scala In Depth from Joshua Suereth. This one's rocks specifically when it comes to implicits and typing formalism. But the domain is hard, and it is a must-read-many-times book, although Joshua Suereth provides clear explanations and examples. I also began practicing Clojure thanks to this amazing book: the joy of clojure. I felt the same as I felt while I started looking for the Abelson and Sussman lectures. Some example are quite hard for the Java eye and I bumped into a wall last week.

I spent a tremendous time at the end of chapter 7, on a shortest path search algorithm aiming to help you to get out of a small maze. Interesting. I noticed once more how some of us (me included) are so bound to our client will to integrate open sources - in order not to reinvent the wheel - that we may loose our capability to reason about mandatory stuff to know like how to find a short path into a graph. By the way, I hate the expression "do not reinvent the wheel", it is a proof of laziness.

The proposed algorithm, is the so called A* search algorithm extending the Dijkstra's algorithm. I was not sure I have understood all that clearly. So I decided to lose(?) one day more and try to implement the same problem in Scala using Dijkstra's approach to search of shortest paths. Grossly explained, the A* search adds an estimate function for the sorting of remaining paths to be explored.

I took the time to re-read my introduction to algorithms about the Dijkstra's algorithm. Indeed, the maze problem is typically a shortest path problem. It is all about a question of vocabulary. What are we talking about ?

Typically we are evolving into a world - the maze - where we have to progress from one point - a spot - to the other. The two points can respectively be the entrance and the exit of the maze. Each step we are going to make is going to cost us some price. Walls, high obstacles, whatever difficulty, is going to cost us a maximum of "weight" while the cost of a step on a flat floor cost us the minimum, let us say 1 on an integer scale. The topology and the shape of the maze can then be easily represented by an array of arrays structure (matrix) like the following

val  world = World(Array(
      Array(1 ,  1,  1,  1, 1),
      Array(99, 99, 99, 99, 1),
      Array(1 ,  1,  1,  1, 1),
      Array(1 , 99, 99, 99, 99),
      Array(1 ,  1,  1,  1, 1)
    ))

What we have presented here is the structure of a Z path. The walls -of high cost - are represented by the number 99. So a natural path to leave the maze from the upper left corner to bottom right corner would be

List(
  Spot(0,0), Spot(1,0), Spot(2,0), Spot(3,0), Spot(4,0), 
  Spot(4,1), 
  Spot(4,2), Spot(3,2), Spot(2,2), Spot(1,2), Spot(0,2),
  Spot(0,3), 
  Spot(0,4), Spot(1,4), Spot(2,4), Spot(3,4), Spot(4,4))

The astute reader will have noticed that we have kept the standard x, y notations used in computer science. The x axis values increase from the left to the right, and the y values increase from the top to the bottom.

The upper left corner is represented by the Spot(0,0) while the bottom right is represented by Spot(4,4).

This working context can be mapped to a graph problem where the Spot abstractions are the nodes and the matrix (array of arrays) contains the weight of the edges linking the nodes between them. The weight (or cost) at certain position in the matrix expresses how much it would cost you to get to the position wherever you come from. The so called weight function regularly invoked in the
graph search algorithms reduces itself to a look-up of value into the matrix.

So what is a spot ? Some abstraction that matches position coordinates. As we will rambling through the maze we will have to find out for neighbors of spot using incremental deltas of position (the steps). The test describing the expected behavior is:

import org.junit.Test
import org.scalatest.junit.{ShouldMatchersForJUnit, JUnitSuite}

final class TestSpot 
    extends JUnitSuite with ShouldMatchersForJUnit{

  @Test
  def spot_WithCoordinates_ShouldBindXCoordinate() {
    Spot(0, 0).x.should(be(0))
    Spot(7, 0).x.should(be(7))
  }

  @Test
  def spot_WithCoordinates_ShouldBindYCoordinate() {
    Spot(0, 7).y.should(be(7))
    Spot(7, 11).y.should(be(11))
  }

  @Test
  def spot_WithDelta_ShouldBindCoordinates() {
    (Spot(0, 7) + Delta(1,1)).x.should(be(1))
    (Spot(0, 7) + Delta(1,1)).y.should(be(8))

  }

 
  @Test
  def spot_InSmallWorld_ShouldNotBeInRange() {
    Spot(7,7).inBoundaries(5, 5).should(be(false))
  }

  @Test
  def spot_InBiggerWorld_ShouldBeInRange() {
    Spot(7,7).inBoundaries(10, 10).should(be(true))
  }
}

Nothing terrible up this point. We have added tests to check whether spots were into the boundaries of a world(or maze). This set of tests lead to the following implementation:

case class Delta(x: Int, y: Int)

case  class Spot(x: Int, y: Int) {

  def inBoundaries(width: Int, height: Int): Boolean = {
    (0 <= x && x < width) && (0 <= y && y < height)
  }

  def + (delta: Delta): Spot = {
    Spot(x + delta.x, y + delta.y)
  }
}
I chose case classes, in order to ease spots comparisons and look-up into tables, sets etc... In the spirit of functional programming, a spot incremented by a delta creates a new Spot. Before digging in to a (very) small explanation of the algorithm, a little more code. I already known I need a World abstraction where to define my costs, and maybe allowing me to find the neighbors of spot during my exploration. In order to create it I defined the following tests:  

import org.scalatest.junit.{ShouldMatchersForJUnit, JUnitSuite}
import org.junit.Test
import scala.Array._

final class TestWorld extends JUnitSuite with ShouldMatchersForJUnit{

  @Test
  def world_WithSingleCell_ShouldFindNoNeighbours() {
    World(Array.ofDim[Int](1, 1)).neighborsOf(Spot(0,0)).should(be(List()))
  }

  @Test
  def origin_WithTwoCellWorld_ShouldHaveTwoNeighbours() {
    World(ofDim[Int](2, 2)).neighborsOf(Spot(0,0)).size.should(be(2))
    World(ofDim[Int](2, 2)).neighborsOf(Spot(0,0)).contains(Spot(0,1)).should(be(true))
    World(ofDim[Int](2, 2)).neighborsOf(Spot(0,0)).contains(Spot(1,0)).should(be(true))
  }

  @Test
  def leftBottomCornerSpot_WithFourCellWorld_ShouldHaveTwoNeighbours() {
    World(ofDim[Int](2, 2)).neighborsOf(Spot(0,1)).size.should(be(2))
    World(ofDim[Int](2, 2)).neighborsOf(Spot(0,1)).contains(Spot(0,0)).should(be(true))
    World(ofDim[Int](2, 2)).neighborsOf(Spot(0,1)).contains(Spot(1,1)).should(be(true))
  }

  @Test
  def rightBottomCornerSpot_WithFourCellWorld_ShouldHaveTwoNeighbours() {
    World(ofDim[Int](2, 2)).neighborsOf(Spot(1,1)).size.should(be(2))
    World(ofDim[Int](2, 2)).neighborsOf(Spot(1,1)).contains(Spot(1,0)).should(be(true))
    World(ofDim[Int](2, 2)).neighborsOf(Spot(1,1)).contains(Spot(0,1)).should(be(true))
  }

  @Test
  def rightTopCornerSpot_WithFourCellWorld_ShouldHaveTwoNeighbours() {
    World(ofDim[Int](2, 2)).neighborsOf(Spot(1,0)).size.should(be(2))
    World(ofDim[Int](2, 2)).neighborsOf(Spot(1,0)).contains(Spot(0,0)).should(be(true))
    World(ofDim[Int](2, 2)).neighborsOf(Spot(1,0)).contains(Spot(1,1)).should(be(true))
  }

  @Test
  def cost_WithWeightWorld_ShouldBeProperlyFound() {
    World(Array(Array(1,2), Array(3, 4))).costAt(Spot(0,0)).should(be(1))
    World(Array(Array(1,2), Array(3, 4))).costAt(Spot(1,0)).should(be(2))
    World(Array(Array(1,2), Array(3, 4))).costAt(Spot(0,1)).should(be(3))
    World(Array(Array(1,2), Array(3, 4))).costAt(Spot(1,1)).should(be(4))
  }
}

Basically I set small worlds, asserting on the cost of positions, finding the neighbors of very specific corner points. Standard unitary test for a matrix exploration.
This is where I discovered the useful Array.ofDim() Scala factory method. A wisely chosen import helps writing a self explanatory tests like the one into rightTopCornerSpot_WithFourCellWorld_ShouldHaveTwoNeighbours.
The matching implementation of a world is :

final class World(val definition: Array[Array[Int]]) {
  val selector  = List(Delta(0, -1), Delta(0, 1), Delta(1, 0), Delta(-1, 0))

  private val width = definition(0).length
  private val height = definition.length

  def neighborsOf(spot: Spot): List[Spot] = {
    selector.map(spot + _).filter(_.inBoundaries(width, height));
  }

  def costAt(spot: Spot): Int = {
    definition(spot.y)(spot.x)
  }
}

object World {
  def apply(definition: Array[Array[Int]]) = {
    new World(definition)
  }
}

A World abstraction accepts a topology definition and exposes two query methods neighborsOf and costAt respectively providing the possible Spot neighbors for some input spot and allowing some client code to get the cost (or weight) at a certain position indexed by a Spot. We assume being able to step uniquely on horizontal and vertical positions so the selector used to identify the neighbors is composed of only four deltas:

val selector  = List(Delta(0, -1), Delta(0, 1), Delta(1, 0), Delta(-1, 0))

Assumption has been made that all the arrays into the main array definition have the same size. The class is immutable. So far so good...

 Now we have to find our way in the world abstractions. Graph search lays onto a sets of lemmas found by smart people like Dijkstra and these lemmas are nice guides when it comes to the search for short paths. These lemmas can be taken for granted as one can intuitively "feel" their correctness. I invite you to check some algorithm book to become familiar with them.
I got one of these "Ah-Ah" moments when I understood the following (over simplified here):

Given a weighted directed graph G, with a weight function, and let p be a shortest path, than any sub-path extracted from p will be itself a shortest path

 So if at some points in our research we have found a shortest path then we are sure all the sub-path will be shortest paths too.

Looking for a shortest path will be a progressive move starting in the vicinity of the starting node. Then expanding from neighbor to neighbor, we will reevaluate when necessary an estimates of shortest path measures.

The technique used is called relaxation. Let start saying that all the shortest path to all the Spots except the starting have an infinite cost. If during our progression we can improve these shortest path estimates we will relax these infinite values. Let say that d[v] is the shortest path to the spot v from the spot s (the start) and that at some time we can estimate d[u] the shortest path from s to u a neighbor of v. Given w(u,v) the weight (or cost) of the edge (or step) between u and v than it is possible to reduce the value of d[v] this way:

if (d[v] > d[u] + w(u,v))
    then d[v] = d[u] + w(u,v)
         u becomes a predecessor of v

We have a beginning of solution. Dijkstra's helps us. He proposed a greedy algorithm, always providing a solution The idea is to enrich a set of already identified set of found shortest paths (S) while progressively emptying a priority queue (Q) of paths to be explored. The paths to be explored are sorted versus their (corrected or not) weight (or cost) values. The paths to be explored immediately in Q are the paths with the lowest weight. The correction of the weight is progressively achieved by relaxation. Dijkstra's algorithm is grossly described by the following pseudo code

Set all paths weight to infinite
Path weight at Starting Spot is 0
S is empty
Let Q be all Spots 
while Q is not empty
    do let u = peek-first-in(Q)
       let S = S U {u}
       for all neighbors v of u
            relax v with u path estimation and w(u,v) 

At the very beginning all the hypothetical paths in Q are supposed to be infinite except the starting point. Each neighbor path cost estimate will be progressively corrected.
I have adapted the starting conditions. The Q of to-do paths will start containing the starting path only (no point in introducing all the path at all points with infinite costs). Will be corrected the paths to-do in Q and added new corrected paths if they are not in the queue. So at each step we need weighted Path abstractions, matching a path to a spot Spot, holding a weight estimate and a list of predecessors to the Spot. These are the tests qualifying the Path abstraction:

import org.scalatest.junit.{ShouldMatchersForJUnit, JUnitSuite}
import org.junit.Test
final class TestPath extends JUnitSuite with ShouldMatchersForJUnit{

  @Test
  def weight_InPath_ShouldBeBound() {
    Path(3, Spot(4,5), List()).weight.should(be(3))
  }


  @Test
  def spot_InPath_ShouldBeBound() {
    Path(3, Spot(4,5), List()).spot.should(be(Spot(4,5)))
  }

  @Test
  def walk_InPath_ShouldBeBound() {
    Path(3, Spot(4,5), List(Spot(4,5),Spot(4,4), Spot(4,3)))
      .walk.should(be(List(Spot(4,5),Spot(4,4), Spot(4,3))))
  }

  @Test
  def relax_WithWorsePredecessor_ShouldNotRelax() {
    val pathToRelax = Path(3, Spot(3,4), List(Spot(3,4), Spot(3,5)))
    pathToRelax.relax(Path(2, Spot(2,4), List(Spot(2,4), Spot(1,5))), 7)
      .should(be(pathToRelax))
  }

  @Test
  def relax_WithBestPredecessor_ShouldNotRelax() {
    val pathToRelax = Path(5, Spot(3,4), List(Spot(3,5)))
    pathToRelax.relax(Path(2, Spot(2,4), List(Spot(2,4), Spot(2,3))), 1)
      .should(be(Path(3, Spot(3,4), List(Spot(3,4), Spot(2,4), Spot(2,3)))))
  }
}

One should note that the hold walk is reversed and always contains the current spot to which the path is attached. A natural implementation becomes:

case class Path(weight: Int, spot: Spot, walk: List[Spot]) {

  def chainedTo(previous: Path, withWeight: Int): Path = {
    Path(withWeight + previous.weight, spot, spot :: previous.walk)
  }

  def relax(previous: Path, withWeight: Int): Path = {
    if (weight > previous.weight + withWeight) {
      chainedTo(previous, withWeight)
    } else {
      this
    }
  }
}

I have the bricks. What do I want to test ? Well paths into some mazes. So go ahead for the tests:

import org.scalatest.junit.{ShouldMatchersForJUnit, JUnitSuite}
import com.promindis.graphs.PathFinder._
import org.junit.Test

final class TestPathFinder extends JUnitSuite with ShouldMatchersForJUnit {

  def pathIn(world: World, from: Spot, to: Spot): Path = {
    PathFinder(world).shortestPath(from, to)
  }

  def found(path: Path) = {
    path.walk.reverse
  }

  @Test
  def path_InFourthCellWorld_ShouldBeTwoSteps() {

    val world = World(Array(
      Array(1, 1),
      Array(99, 1)
    ))

    found(pathIn(world, from(0, 0), to(1, 1)))
      .should(be(List(Spot(0, 0), Spot(1, 0), Spot(1, 1))))
  }

  @Test
  def path_InDirectedWorld_ShouldMatchUniquePath() {

    val world = World(Array(
      Array(1, 1, 1, 1, 1),
      Array(99, 99, 99, 99, 1),
      Array(1, 1, 1, 1, 1),
      Array(1, 99, 99, 99, 99),
      Array(1, 1, 1, 1, 1)
    ))

    found(pathIn(world, from(0, 0), to(4, 4))).should(be(
      List(Spot(0, 0), Spot(1, 0), Spot(2, 0), Spot(3, 0), Spot(4, 0),
        Spot(4, 1),
        Spot(4, 2), Spot(3, 2), Spot(2, 2), Spot(1, 2), Spot(0, 2),
        Spot(0, 3),
        Spot(0, 4), Spot(1, 4), Spot(2, 4), Spot(3, 4), Spot(4, 4))
    ))
  }


  @Test
  def path_InUndirectedCellWorld_ShouldHaveCorrectSize() {

    val world = World(Array(
      Array(1, 1, 1, 2, 1),
      Array(1, 1, 1, 99, 1),
      Array(1, 1, 1, 99, 1),
      Array(1, 1, 1, 99, 1),
      Array(1, 1, 1, 1, 1)
    ))

    found(pathIn(world, from(0, 0), to(4, 4))).size.should(be(9))
  }

  @Test
  def path_InOtherDirectedCellWorld_ShouldMatch() {

    val world = World(Array(
      Array(1, 1, 1, 2, 1),
      Array(1, 1, 1, 99, 1),
      Array(1, 1, 1, 99, 1),
      Array(1, 1, 1, 99, 1),
      Array(1, 1, 1, 99, 1)
    ))

    found(pathIn(world, from(0, 0), to(4, 4))).should(be(
      List(
        Spot(0, 0), Spot(1, 0), Spot(2, 0), Spot(3, 0),
        Spot(4, 0),
        Spot(4, 1), Spot(4, 2), Spot(4, 3), Spot(4, 4))
    ))
  }
}

We start by a small world made of four Spots where the shortest path is unique. All the tests except path_InUndirectedCellWorld_ShouldHaveCorrectSize have clearly identified or forced paths. The latter one must find a path of the correct size even if there are multiple solutions. Here is the matching implementation:

final class PathFinder (val world: World) {

  def excludeFrom(neighbors: List[Spot], spotsInSet: List[Spot]): List[Spot] = {
    neighbors.filter(neighbor => !spotsInSet.contains(neighbor))
  }

  def not(done: Map[Spot, Path]) : Spot => Boolean= {
    neighbor: Spot => !done.contains(neighbor)
  }

  def accessibleNeighborsOf(current: Path, criteria: Spot => Boolean): List[Spot] = {
    world.neighborsOf(current.spot).filter(criteria)
  }

  def relax(pathStillToDo: List[Path], currentPath: Path): List[Path] = {
    pathStillToDo.map(path => path.relax(currentPath, world.costAt(path.spot)))
  }

  def spotsIn(pathStillToDo: List[Path]): List[Spot] = {
    pathStillToDo.map(path => path.spot)
  }

  def newPathFrom (current: Path): Spot => Path = {
    neighbor: Spot =>
        val cost = world.costAt(neighbor)
        Path(cost + current.weight, neighbor, neighbor::current.walk)
  }

  def relaxed(current: Path, awaiting: List[Path], alreadyDone: Map[Spot, Path]) = {
    val neighbors = accessibleNeighborsOf(current, not(alreadyDone))
    val awaitingPaths = awaiting.filter(path => neighbors.contains(path.spot))
    val refreshedPaths = relax(awaitingPaths, current)

    val newPathTodo: List[Path] =
      excludeFrom(neighbors, spotsIn(awaitingPaths)).map{newPathFrom(current)}

    (newPathTodo ::: (refreshedPaths ::: (awaiting -- awaitingPaths)))
      .sortWith(_.weight < _.weight)
  }

  def find(todo: List[Path], alreadyDone: Map[Spot, Path], to: Spot): Map[Spot, Path] = {
    todo match {
      case List() => alreadyDone
      case currentPath::remaining if currentPath == to =>
        alreadyDone + (currentPath.spot -> currentPath)
      case currentPath::remaining =>
        val done = alreadyDone + (currentPath.spot -> currentPath)
        find(relaxed(currentPath, remaining, done), done, to)
    }
  }

  def shortestPath(from: Spot, to: Spot): Path= {
    find(List(Path(world.costAt(from), from, List(from))), Map(), to)(to)
  }
}

object PathFinder{
  def to(x: Int, y: Int) = Spot(x,y)

  def from(x: Int, y: Int): Spot= Spot(x,y)

  def  apply(world: World) = {
    new PathFinder(world)
  }
}

The most important methods are relax and the recursive find, the others being tool methods. I used the companion object

object PathFinder

to define fluent factory methods like from and to. The entry point is the shortestPath method

def shortestPath(from: Spot, to: Spot): Path= {
    find(List(Path(world.costAt(from), from, List(from))), Map(), to)(to)
  }

There I start my exploration creating my list of path to be improved or settled with the from point. Naturally at this very moment I pass an empty Map of already done paths. The destination to will be used as a break point to terminate the process. In the find method, lays the logic targeting my exploration choices: did I find the destination ? Is my list empty ? etc..

def find(todo: List[Path], alreadyDone: Map[Spot, Path], to: Spot): Map[Spot, Path] = {
    todo match {
      case List() => alreadyDone
      case currentPath::remaining if currentPath == to =>
        alreadyDone + (currentPath.spot -> currentPath)
      case currentPath::remaining =>
        val done = alreadyDone + (currentPath.spot -> currentPath)
        find(relaxed(currentPath, remaining, done), done, to)
    }
  }

This decision scenario is perfectly supported by a Scala case strategy application, where the breaking condition is handled by the second match. I kept recursion at the end in order to take benefit from tail recursion optimization. The recursion process lays onto the relax method implementation:

def relaxed(current: Path, awaiting: List[Path], alreadyDone: Map[Spot, Path]) = {
    val neighbors = accessibleNeighborsOf(current, not(alreadyDone))
    val awaitingPaths = awaiting.filter(path => neighbors.contains(path.spot))
    val refreshedPaths = relax(awaitingPaths, current)

    val newPathTodo: List[Path] =
      excludeFrom(neighbors, spotsIn(awaitingPaths)).map{newPathFrom(current)}

    (newPathTodo ::: (refreshedPaths ::: (awaiting -- awaitingPaths)))
      .sortWith(_.weight < _.weight)
  }

This is where the algorithm is implemented.
At that moment I am working with a specified Path located at some Spot and already weighted. I simply look up for the valid spot neighbors. A valid neighbor is a neighbor Spot not already stored in the alreadyDone Map.
In order to proceed using a fluent language I defined the following tool methods

def not(done: Map[Spot, Path]) : Spot => Boolean= {
    neighbor: Spot => !done.contains(neighbor)
  }

  def accessibleNeighborsOf(current: Path, criteria: Spot => Boolean): List[Spot] = {
    world.neighborsOf(current.spot).filter(criteria)
  }

The not methods being a first class Function closing onto the alreadyDone Map (very Lisp-ish, I like that :)) Then I process the paths awaiting to be analyzed:

def relaxed(current: Path, awaiting: List[Path], alreadyDone: Map[Spot, Path]) = {
//..........
    val awaitingPaths = awaiting.filter(path => neighbors.contains(path.spot))
    val refreshedPaths = relax(awaitingPaths, current)
//................
  }

 def relax(pathStillToDo: List[Path], currentPath: Path): List[Path] = {
    pathStillToDo.map(path => path.relax(currentPath, world.costAt(path.spot)))
  }

If some of the neighbor Paths are not already awaiting, then I prepare a list of new paths to be processed:

def relaxed(current: Path, awaiting: List[Path], alreadyDone: Map[Spot, Path]) = {
//..........
  val newPathTodo: List[Path] =
      excludeFrom(neighbors, spotsIn(awaitingPaths)).map{newPathFrom(current)}

//................
  }

 def excludeFrom(neighbors: List[Spot], spotsInSet: List[Spot]): List[Spot] = {
    neighbors.filter(neighbor => !spotsInSet.contains(neighbor))
  }

  def spotsIn(pathStillToDo: List[Path]): List[Spot] = {
    pathStillToDo.map(path => path.spot)
  }

  def newPathFrom (current: Path): Spot => Path = {
    neighbor: Spot =>
        val cost = world.costAt(neighbor)
        Path(cost + current.weight, neighbor, neighbor::current.walk)
  }

All the purpose of the tool methods is to apply comprehensions or to create reusable first class functions so the algorithm in the relax method is more fluent. Finally one built up the lists of refreshed path to be handled and new paths to added, the whole resulting list is built up and sorted again:

(newPathTodo ::: (refreshedPaths ::: (awaiting -- awaitingPaths)))
      .sortWith(_.weight < _.weight)


Of course there are flaws in the implementations, mainly due to the use of list and a the late sorting after each relax operation. The program aims to be improved and advices will be welcomed .

As I have less time for Scala, a new client (remember? J2EE blah blah blah), I leave you.

Be Seeing you !

2 comments:

Post a Comment