Wednesday, November 14, 2012

Why the Clojure life of Brian matters...

Hi, 

here I am back after a few months of life outside of my blog. Leaving the comfort zone was not easy, and specifically not made easy. This will be told another time. 

Fortunately my number of lives is unlimited. I come back more determined than ever about my Lisp languages choices and my quest for a Lisp mentor. 

I do not bring code yet this time, but I'd like to point out the two events that lead me definitively in the Lisp world, although being a Scala coder by day. First, I met the Amsterdam Clojurians and their meetings initiatives, and then the reading of Brian Marick book: Functional Programming for the Object Oriented developer

The book uses Clojure as a support to picture various aspects of functional programming from the basics of functions as first class citizen to more experts monadic structures presentation (yes in Clojure!). 
Astute readers will recognize some tributes to the SICP book specifically when embedding an Object-Oriented Language from the very start of book, or presenting lazy sequences, as close as they are to Scheme streams. 
But in my humble opinion Mr Marick's book is not only "yet another book" on Clojure. Of course you learn about Clojure. The same ideas could be pictured in Ruby, Scheme, Haskell etc. For example, the intellectual exercise of embedding an OOP approach exposes a paradigm for what it is: a specific programmatic interface aiming to serve group of concepts for the better - or sometimes for the worse. The monadic structure examples expose clearly, concisely, and in a pragmatic way, why and how you should use some approaches of the monadic ecosystem. So far the best explanation on Monads I ever read.
Brian Maricks does not popularize functional programming for the average developer, but pulls her up to a new level of perception. As in his zipper example, he magnifies some aspects of mutable data structures and function application Whether you are new to functional programming or already an experimented functional programming developer you will certainly find meat for the brain in this book. 

Brian Marick shares understandable knowledge and it shows that sharing matters to him. Each line, each page reveals a true and deep commitment in making the functional paradigm accessible to every willing developer. 

Brian Marick works matters today more than ever. In a time where rock star developers with over sized ego seem to invade Computer Science, it is becoming harder and harder to find humble people committing into knowledge sharing. One can be a very clever hard core developer producing infinite volumes of open source code praised by a blind community, his cleverness serves no purpose if he is incapable of sharing the knowledge and pulling people at his level. The essence of progress is in the ability to pull the world with you, not to make it serve your purpose and flattering you. Unfortunately I mainly witness today exchanges between people who spend their coding lifetimes in congratulating each others for the sake of their own ego. 

 There were times when we had masters, humble, educated, sharing. It seems that some very rare people like Brian Marick keep the tradition alive. I would like to meet more of them, and it occurs to me that some remain from the Lisp community and new are coming into the Clojure communities as the new Common Lisp for the JVM. Maybe one day an old Lisper who wishes to mentor, who knows... 

 See you pals with more code next time. Be seeing you !!! :):):)

Saturday, March 31, 2012

Where we explore the basics of Iteratees - Part I

Houdy.

It's been a long time since we have exchanged on interesting stuff in Scala nor Clojure. I propose you today to keep on going working on Scala examples. I did not forget about Clojure nor Lisp. I just try to find a very gifted Lisper with  3 or 4 decades of practice and the patience to teach me from the botton the Lisp Way and functional programming. If you know someone, don't hesitate to make us get in touch. Now, I need guidance more than ever.

I changed recently for a new position, surrounded by more gifted guys than I am in Scala and I have to follow... far behind as much as I can. That's life, from the comfort zone of a JEE expert to the tough land of a beginner. By the way, there will  be no coming back.
Our functional ramblings lead us to the adoption of the Play framework. Pleasant, smart, the Play framework has been lifted up (may I say :)) as of version 2.0 and exhibits a lot of features linked to hot functional programming topics like Iteratees. 

When experts present Iteratees they generally target complete IO examples. You can find high level examples here and here. Having been exploring Scala for a few months only (barely during three massively interrupted years) I cannot go so far in one run. I wanted at least to explore the basics of the pattern. This is today purpose. 

Choosing as a starting point the canonical article from the Monad reader Issue #16 named "Teaching an Old Fold New Tricks", I propose you to join me and start a first step journey into Iteratees. In his nice presentation, John W. Lato reminds us about the classical way of handling IO using cursors and manipulating chunks of data, leading to a very imperative style of programming. 
In addition ,the use of cursors favours everything except the composition of functions. At some point while working with IO we are forced to dig deeper into the API, forcing ourselves in handling lower order methods and pulling data from our streams of data. Enumeration based IO seems to provide us with a way to embrace a more composable, higher-order approach. The idea seems simple as inspired by the behaviour of the foldleft function, as defined in every functional programming language implementation. A foldleft function typically takes three arguments, as suggested by this imaginary foldleft function derived from the GenTraversableOnce interface in Scala:

  • a starting value for an accumulator (z)
  • a list of element used for the accumulation (l)
  • a function combining the accumulated value (or accumulator) with each of the element (f)
The foldleft function pushes the sequence elements into  the accumulated value using the combining function. Still following Lato's reasoning, we can consider that we enumerate over a stream of input values using an accumulator in order to maintain a kind of computational state. Let's consider processing streams of data the same way.
The accumulator, we call it Iteratee and provide it with the role of a stream processor. As enumerating our stream content, we would pass the Iteratee an element to be consumed and then get rid of that element, passing the following element and so on. Whatever happens , having pushed the element to the Iteratee, there is no coming back whether the Iteratee effectively processes the element or not, period. Sticking to the article so to learn , and reproducing the streamG abstract data type we provide the following representation of a stream:

As we will push the elements one by one, no need to use the standard approach representing streams as flow of trunk of data. In our implementation, we implement exactly the same stream markers as in the Monad reader issue. EMPTY notifies Iteratees that a stream does not present an element but is expected to provide upcoming elements, and EOF means to an Iteratee that the stream is closed. As nicely explained by Runar in his post - obviously derived from the same article - this abstract data type implementation is arbitrary and very IO oriented as we could define different stream markers. An computation (aka an Iteratee) can represent two states (I use the word state on purpose):
  • Done and as-is will offer a result
  • or expecting an upcoming continuation
that leads to the following implementation:

Regarding the Cont case class, the apply method provides a simple way to extract the result of the continuation from the incoming stream element. As with the foldleft method we want to be able to enumerate over our values and feed the Iteratee for them to proceed to the computation:

We recognize here a classic tail recursive process , stopping when the input sequence el is empty or the input Iteratee iter is done. Otherwise, the enumeration goes on and on... But an enumeration provides an Iteratee possibly holding our expected result. We must then apply some kind of run operation in order to retrieve the result. We face two situations:
  • we are done and we then get the result
  • we are not done yet and we expect the result providing an EOF marker. At that stage we can be provided with a result or no result at all, so an Optional result
Implemented in scala:

For this first approach we will stick to simple stuff , no fluff. Let us try to play with basic Iteratees. The following extract of code provides canonical examples:

respectively, getting the head of stream, dropping a given amount of elements and calculating the length of a stream. Although we kept simple the implementation of these first functions, one should notice a very interesting facet in this approach. Running the sample:

provides the following result

[info] Running com.promindis.user.UseIteratees
Some(3)
Some(Some(1))
Done((),EMPTY)

Having scrutinized the case matching in the function body one might notice that
  • An iteratee can return Done result only when receiving an EOF marker (length function)
  • An iteratee can return a Done result by itself (drop method)
So control on processing can both  come from the input stream of data or from the Iteratee ... or both. Consequence is one can pair the usage of a "never-ending" Iteratee with multiple sources or combine multiple Iteratees processing the same source. When done an Iteratee can let a "successor"  Iteratee takes the processing in charge. Here comes the magic word of combination which sounds like the word Monad. From the project derived from our previous posts we can derive the monadic behaviour of the Iteratees from the Applicative trait, getting for free the Functor , Monad and Applicative Functor extensions. As a reminder both the Functor and Monad definitions where given by:

Basically we must implement the map and flatten functions and provide a kestrel combinator to automatically pair an Iteratee with a matching applicative functor instance:

The function iterateesToApplicative acts as a factory function providing an Applicative Functor for Iteratees handling StreamG of type E elements. The typed lambda projection is achieved as to allow for the extraction of the result of an Iteratee computation.
The iterateeOnSteroid function represents the kestrel combinator implementation implicitly converting an Iteratee instance to a monadic structure.
The map function implementation remains trivial as usual. We apply the mapping function to the result of the Iteratee. Done, may I say :). Notice we propagate the map effect, in the case of continuation as an another continuation.
 Regarding the implementation of the flatten method we literally translated our previous expectations. When the flatten function is provided with a Done(Done(_, _), s) data structure, the computation is also in state Done for the resulting Iteratee and we pass the state as-is. A Done(Cont(f), s) data structures expresses that our resulting Iteratee is of course also in a Done state, but we can apply the result of the previous continuation to the stream element. Obviously dealing with continuation, leads us to recreate a new continuation to be flatten again until a Done Iteratee is met.

I agree this part is not easy and I have to come back to it on a regular base. At this step the last typical example of combination to provide to the reader is the famous alternate example. Imagine we were provided with a list of numbers and would want to pick elements every two elements. Basically we would compose the head and drop Iteratees functions presented earlier: 

and replicate the behaviour a certain amount of time. Bang we are doomed.
We never though so far to implement a replicate function. We face a situation where applying the implicit conversions will save the day once more. We define first the concept of a Replicable object for both simple object instances and object instances embedded in context:

with the help of Monoid definitions for both simple object and single parameter constructor type :

We can then provide an implicit conversion to replicates for both simple object and objects wrapped in contexts:

and some function allowing us to convert a traversable structure of replicated monadic values into an monadic traversable structure of values (aka apply a sequence as we did last time):

There we are, ready to provide the famous alternate example:

providing :

[info] Running com.promindis.user.UseIteratees
...
List(2, 4, 6, 8, 10)

Hurray,  we reached the first part of the seminal article. The next leap into the world of Iteratees should be longer. So far we learned what Iteratees are. We discovered they brought us a new vision on the way of dealing with streams of data. We brought to them some element of composition. This implementation allowed us to play with our previous experimentations on traversable structures and helped us in leveraging our skills in the reuse of monadic patterns.
In essence, all together we progressed.

 Before getting to IO application I will diverge soon to parser combinators. Some common aspects between Iteratees, parser combinators and the state Monad should help in understanding and applying all these concepts. Until next time, learn, hack, progress and never be satisfied of what you know.
Don't let anybody prevent you from leveraging and growing your knowledge. Improve, get better !!!
The project can be found at the same location as usual.

Be seeing you !!! :)

Sunday, March 4, 2012

Where we traverse, accumulate and collect in Scala

Houdy,

In a previous post targeting Applicative ramblings, we studied the ability to transform a list of monadic values into a Monadic list of values. In situation when one wishes to convert a list of futures into a future of list, or convert a list of optional values into an optional list of values during a validation process, this ability can become very handy and self explanatory. 
Recently, as watching the wonderful presentation of Rúnar Bjarnason here on functional programming in Scala with Scalaz, I noticed a more generic definition of the sequence function we have early defined. Starting form the usual code project here, from:

we could abstract the concept into the following trait:

where the type constructor C plays the part of a "traversable" structure. This more complete definition provides also a sequence method, but this time, taking benefit from a more generic traverse method. Quoting the McBride and Patterson seminal article on applicative programming with side effects, let say the Traverse trait captures data structures built with the C type constructor through which it can apply applicative computation. 
In essence the traverse method allows us to traverse a C[T] structure and produces a M[C[U]] structure with the help of a f: (T) ⇒ M[U] function. Viewing Applicative Functors as a way to lift functions into the world of Monadic values one can sense that using the traverse function we are going to distribute Monadic applications all over the data structure. 
Doing so, we will be able to convert a structure of possible values to a possible structure of values, same with a structure of Futures, Lists etc. and so on, whatever is the structure I can define as traversable using an ad-hoc definition.
I provided two definitions of traversable. One to reproduce the list example, and a second to play with a tree structure. Concerning the list definition:

As lists belong to a more generic category of traversable structures per se, I freely defined a ListLike trait gathering the basic functions required during the construction of a list. Then I explicitly self referenced this first trait in a second trait TraverseListLike reusing all the basic methods of the first trait. We witness here the real power of the applicative style allowing us to lift all the tool methods into the world of Monadic values keeping the construction of the new monadic structure specifically clear. 
The expression

  cons[U] _ :@:f(value):*:traverse(rest(source))(f) 

nicely reproduces the pattern of construction of a list like structure:  

(cons value (rest))  

in Lisp-ish form. The same procedure can apply to a structure I wanted to play with, the trees:

Both the definition of a Tree (algebraic data type) and of the Traverse object speak for themselves. Here, we still can perceive through the applicative style form  

cons[U] _ :@: f(v) :*: traverse(left)(f) :*: traverse(right)(f)  

the pattern of construction of nodes in a tree 

(cons value left-node right-node)  

Exercising our new toys provides the following:

At that step we have implemented the interface kindly exposed by Rúnar. The traverse function reveals itself to be very powerful. One example of application provided in the McBride and Patterson article proposes to accumulate information while we are traversing the structure. The cumulated value could evolve depending on each of the structure item. An elegant way to do it consists in cumulating values calculable using the add method of a Monoïd given by the type of the values:

Cumulating a value means changing a State while we are traversing the tree, the changing state type being the type of the cumulative value. De facto we think about using the State Monad which we extended to the applicative definition:

I propose you the following implementation for an accumulate function:

The function distributed all over the structure,  

(x: A) ⇒ toAccumulator(x, f, monoid) 

impersonates the f: (T) ⇒ M[U] function as one of the traverse function parameters. 
The toAccumulator function provides a State instance depending both on an input value being the current item in the tree and the Monoïd used for the cumulative effect. The Monoïd cumulate the result of applying f to the current item with any provided external state. 
Traversing the structure means chaining all the cumulated state changes each change depending on a local item value. 

The function f: (T) ⇒ M[U] expects a single parameter type constructor M[_]. So, instead of defining once again a type lambda I defined a local Projection[X] type synonym allowing to dictate to the traverse function the [Projection, A, A] parameters where Projection matches a single parameter type constructor. In exchange the Scala compiler not fooled by my trick demands an explicit specification of the returned type : State[T[A], U]. I'd really love Scala type Gurus to provide me with an explanation concerning this behaviour and another about the fact too that the compiler needs a local definition of the stateApplicative[U]() as the stateApplicative function is already defined implicitly. 

Suppose I want to check some property on any of the element in the tree in a Tree.The input function f: (T) ⇒ U in the accumulate functions becomes a predicate producing a boolean value we cumulate using an OR operation.Wishing to verify if there are some integers greater than 8 in a tree of integers, we provide:

Providing the Any definition as a an logical OR centric Monoid. Both the traversableTree applicative functor and Any Monoid are explicitly provided as the Monoid Any definition is not implicit. We could could cumulate all the values greater than 8:

Here the toListMonoid definition is implicit, so omitted. Building on top of the accumulate function we can reduce (aka crush) all the values in a tree using a Monoïd given by the type of the elements in the tree:

In one of his wondeful posts Debasish Gosh presents Scala variations on the The Essence of the Iterator Pattern article by Jeremy Gibbons and Bruno C. d. S. Oliveira. 
Among all variations, one function called collect, specifically modifies the elements of a data structure purely and independently of the accumulation process. The example provided converts a list of integers into a new list while counting the number elements independently. Here is the implementation of his example:

The implementation, very similar to the accumulate function definition splits in the state definition (f(x), g(s))) both the effect of transforming the tree applying f(x) to a current item, while cumulating a state g(s).

Today we started taking benefit from the Applicative style levering our ability to traverse data structures, cumulating effects, rebuilding our tree, or doing both in just one shot (code project here). 
 I have to go and work on CQRS now. Night's going to be short 

Be seeing you !!! :)

Thursday, February 23, 2012

Can I Read with my Monad ?

Hello again.

 Last time we talked about the state Monad and we how solved this problem using type lambda. 
I am sorry to say I did it again, following a strange path. During one of my morning fitness session I have been watching Rúnar Bjarnason presentation about Scalaz functional programming in Scala. As I am progressively understanding new concepts in Scala, I can now start getting interested in a small selection of libraries. 
In my humble opinion, this is the way things should work: before using any dependency, one should get sure he understands (more than) the basic bricks of theory and potential applications at the origin of the creation of the library. 
Getting my hands into Functors, Monads, Applicative Functors and their application can now open the door to a (small) range of selected libraries (not frameworks of course). Scalaz comes as a natural selection to be used... soon. 

Naturally, before I have to understand a little bit more. Rambling in search of a concept implementation, derived from the talk, I realized I never took the time to write a Monad "concept" - in the concept pattern parlance - targeting functions. 
Approaching functions as Functors, Monads and Applicative Functors demands a lot of effort to a lot of people (including me). But I would not say that creating the Monad implementation for function gets as harder as working with State Monads, I have to come back to on a regular base. And guess what, my understanding progresses each time. 
The starting question we should ask - from my point of view - is : if we assume a Monad to manage a context for embedded values, what kind of context a function can be ? Looking at the type of a function, one can consider the returned result as the embedded value extracted from a context being the input value. Then, what would a map function on a such a context look like:



In school we learnt very early a way to manage this kind of mapping between functions, thanks to function composition:



In the final version I just took benefit from the contravariance in the type definition of the applied transforming function f. An alternate version could be:



So far, so good. 
From the previous posts, and while playing with our project located here, we agreed on the following definitions for both Monads and Functors:



With a simple map implementation for Functors, and having provided a generic flatMap definition, the only implementations we should worry about are both the apply and flatten method:



The easiest one, the apply function, embeds an input value in a function context. This fresh context should be able to restore the value any time. A sort of constant function. 

Regarding the flatten method, typically what we expect is to flatten a function producing a function ((A) ⇒ (A) ⇒ T seen as (A) ⇒ ((A) ⇒ T)) to a function producing a simple value. That is exactly the purpose of the new function we created, applying successively the possible input x value to each of the function argument. 

Armed with our implementations we now can derive a full Applicative Functor implementation. As for the State Monad pattern, the Applicative Functor implementation accepts a single type parameter  Applicative[M[_]].
The type of functions we are working with are double parameter type constructor . In our working context, the type of returned values is the the embedded type in our Monad definitions. The function input parameter type defines a context from within the embedded type is extracted, therefore we must freeze the input type somehow. This is when type lambda comes into the place:



We freeze the function input type A while the type lambda projection (through #) allows us to set a single parameter container to our Applicative Functor definition. But what's the use ? The Learn You a Haskell canonical example remains very simple:



We basically defines two operations accepting an input parameter each, and return the result of the combined functions. The more interesting part being that each function reads from the same input source. I guess that's why we call the pattern the "Reader" Monad.

How many times did we encounter this situation when we had to apply different operations onto an input context from which we aimed to extract, then process values ? 
The most recent example that came to my mind was setting up an environment from accessible configuration information. In his book, Joshua Suereth explains how to produce then combine optional Config instances while building an application. Starting from there and using our new pattern just see how we can simulate the whole scenario:




In our scenario we build some environment using connection, datastore, and application configuration. The configuration (aka Config case class) is a container manageable using an implicit Applicative Functor.

the configBuilder method implements a complete environment setup process. We use our Reader Monad implementation in order to extract from a unique source
  • a connection configuration
  • a datastore configuration from the connection configuration
  • application configuration
Then we build a complete environment using from single source of properties (a Map here). The environment is set using the sugar syntax we defined around Applicative functors in a previous post.


Now, we know how to separate the concern of what we create from a single source from how to process it. Although the example remains simplistic, we have in our every day life as programmers, opportunities to clutter the code while applying different processes to  a same immutable input (input analysis, input validation, information display etc.). The Reader Monad can save the day gathering in the same comprehension the whole process.

Code can be found here.

That's all folks. Not a big one this time. I need to do some Clojure too.

Be seeing you !!! :)

Wednesday, February 15, 2012

Variation on the State Monad in Scala

Houdy,

a brief one this evening, but I need to communicate. A friend of mine who recently read the post on the state Monad pattern asked me why I did not extend the generic Monad trait I was working on. 
Astute question. It took me some time to get an answer and provide him with a compiled version reproducing the canonical stack sample. I come today with a version, that, I submit to your judgement in need of feedbacks. As a reminder my previous proposal for both a Monad and a Functor trait lead me to:


No big deal. When I came to define the state Monad I bumped into a wall because of the expected behavior of the defintion of state Monad (from what I learnt for Learn You a Haskell). Roughly quoting the book, a stateful computation may be viewed as function taking as input some state and returning a value paired with some new state:


No matter how I took the problem I was facing two impediments
  • neither Function1[S, (A, S)] nor (S) => (A,S) are type constructors
  • Whatever would be the solution, I would have to work with a single argument type constructor  M[_]  while implementing Monad[M]
The first problem , I solved it mimicking Haskell creating a State class :


having at my hand a type constructor...alas with two type arguments. 

 The deal then was to freeze somehow one of the parameters in order to propose to the Monad trait to implement a single type parameter: from State[_, _] to M[_]
The method defintions in the generic Functor and Monad traits already fix the type parameter constraint as the type of the Monad returned value (in Haskell parlance). So obviously my Monad trait would have to be parameterized (<- neologism :)) with some kind of State[_, S] thing. 

Wait a minute. The idea would be to define my single argument type constructor under the scope of my state Monad definition, supposedly having "already" bound the State type (there is blur notion of "already" for me). It seems that the solution lays onto the use of a type lambda. As in the referenced article I have to "curry" somehow my double argument State[_,_] type constructor: 


The expression ({type λ[α] = State[α,S]}) seems to me like an anonymous structural type, the type I want being State[α,S]. The type S is "already" (still blur) bound to the State Monad implementation. We use then a type projection (aka #) in order to specify explicitly the λ type we expect as the single argument constructor. Roughly said, a type projection allows to refer to the  inner type in a type. All this is new for me and any pointer in addition to Joshua Suereth book will be welcome. 
All we need is a Kestrel combinator (look here and there for detailed explanation):


in order to use our State Monad in a for comprehension:


The complete code project can be found here.

The  implementation is naive and this field of Scala ramblings is new. Do not hesitate to feedback or to give pointers. 

Be seeing you !!! :)

Tuesday, February 7, 2012

Applicative ramblings in Scala

Hello again.

I was looking (a am still looking) for a good way to present the result of my ramblings on applicative functors without hurting the feelings of people who would like to start a Scala journey. 
As a "young" Scala explorer I would like to tell them, that I really started understanding the full potential of Scala while starting to deal with category theory concepts and typing concepts. 

Reading Learn you a haskell (LYAH) is a good start and trying to reproduce some of the example from Haskell to Scala is a nice move. I only started reading my first book on conceptual mathematics after reading LYAH and it is helping. David Greco told me recently that patterns like Monads, Functors, Applicative Functors etc. could be viewed as a kind of mathematically proven patterns. I stick to his point of view, claiming in addition that these are mathematically proven programming patterns (in opposition to architectural patterns). 
As so, learning these mathematical concepts represents a big part of our programming job. Nevertheless, I also claim that it is possible, if not recommended, to adopt both the mathematical understanding approach and the practical approach. 
We foresaw in previous posts that monads can be seen as contexts, allowing to safely manipulate their content with higher level functions with the help of functors:


trait Functor[M[_]] {
  def map[T, U](source: M[T])(f: T => U): M[U];
}

trait Monad[M[_]] {

  def apply[T](data: T): M[T]

  def flatten[T](m: M[M[T]]): M[T]

  def flatMap [T, U](source: M[T])(t: T => M[U])(implicit f: Functor[M]): M[U] = {
    flatten(f.map(source)(t))
  }
}

where M[_] is the type constructor of the container assumed as a Monadic container. Paired with the following implicit definitions located in a package.scala file,

 
  implicit def ToFunctor[F[_] : Functor, A](ma: F[A]) = new {
    val functor = implicitly[Functor[F]]
    final def map[B](f: A => B): F[B] = functor.map(ma)(f)
  }

  implicit def ToMonad[M[_]: Functor : Monad, T](source: M[T]) = new {
    val monad = implicitly[Monad[M]]
    def flatMap[U](f: T => M[U]): M[U] = monad.flatMap(source)(f)
  }

our Monadic definitions can make us taking benefit from the for comprehensions in scala. Having created a Writer to log traces during operations, logging a simple operation became:

def logNumber(x: Int): StringWriter[Int] = StringWriter[Int](x, "Got number " + x + " ")
val value = for {
      a <- logNumber(3)
      b <- logNumber(5)
    } yield a * b

      logNumber(3).flatMap(x =>
        logNumber(5).map(y =>
          x * y))

where the log operation - as a side effect - has been cleanly separated from the operation per se. I have slightly modified my definitions after reading again Joshua Suereth book and some enlightning discussion with David. I adopted a definition of Monads parameterized with the container type only.
(Another rule: when like me you are learning an try to pull your level up, try to find skilled people. There is no shame in not understanding or knowing, even at 40. There is shame only in living in the comfort zone). 
The project source is located here

To Joshua: if I have the chance you read the post some day, thanks for the book (please don't beat me for the part of codes I have stolen in my small project :)) 

Until now, we made no progress in understanding Applicative Functors ! I know. Considering our monad definition, a base frustration comes from the fact that we can only map a single argument function:

final def map[B](f: A => B): F[B] = functor.map(ma)(f)

In essence, what we would like to be able to, is to manipulate a bunch of values embedded in structures like our Monads. It would be really interesting to work at a higher order level allowing us to manipulate our Monadic values with our simple functions in a very expressive (aka self explanatory) way, like in Haskell:

ghci>(+) <$> (Just 2) <*> (Just 3)
Just 5

Here a simple (+) operation is applied to two possible values producing a possible values. With an impossible value argument, this would become:

ghci>(+) <$> (Just 2) <*> Nothing
Nothing

so an impossible value. 

In one line, thanks to this "Applicative style", we apply an operation on two possible operands, pulling off the values from their context and pushing the new value in an identical context.

Why the Applicative name: these functions a rpovided by the scala typeclass definition:

class (Functor f) => Applicative f where  
    pure :: a -> f a  
    (<*>) :: f (a -> b) -> f a -> f b  

(<$>) :: (Functor f) => (a -> b) -> f a -> f b  
f <$> x = fmap f x  

In addition we have the benefit of managing the possible non existence of a one of the operands. 

Can we do that in Scala ? We can. We are going to produce something like:

def add = (a: Int, b: Int) => a + b

add :@: List(1, 2) :*: List(3, 4)

where :@: stands for <$> and :*: for <*>. 

Almost...because both :@: and :*: express the same intent but mostly as "sugar" combining more elementary functions 

The idea is simply to "lift" the applied function into the world of our container and to use function currying

Like explained in the link and in J. Suereth book, we can view currying as splitting a function taking multiple input arguments into a function that takes a single argument and return a function that takes a single arguments and so on...:

scala> val f = (a: Int, b: Int, c: Int) => b * b - 4 * a * c
f: (Int, Int, Int) => Int = <function3>

scala> val c = f.curried
c: Int => Int => Int => Int = >function1>

scala> c(1)
res3: Int => Int => Int = >function1>

In the previous Haskell example, the <$> function lift the (+) function into the world of Maybe Monads and partially applies it to (Just 2), then, apply the embedded partially applied function to (Just 3) via the <*> function (do not confuse with partial functions in Scala) 

The following trait intends to lift up functions and apply lifted functions to "Applicative", assimilated to Monad and Functors:


trait Applicative[F[_]] {
  self: Functor[F] with Monad[F] =>

  def pure[T](data: T) = apply[T](data: T)

  def applyA[T, U](fs: F[T => U])(source: F[T]): F[U] = {
    implicit val f: Functor[F] = self

    flatMap(source) {
      x: T => map(fs)(f => f(x))
    }
  }

  def mapA[T, U](t: T => U, source: F[T]): F[U] = {
    applyA(pure(t))(source)
  }
}

  • Lifting is realized through the invocation of the pure function.
  • applyA extracts the function(s) embedded in the F[_] fs context, the values x embedded in the F[_] source context and produces a M[_] result containing the output of f(x).
  • mapA lifts up an input function

What about the currying operation ? As Scala exposes 23 traits abstracting functions taking up to 22 parameters we should take into account all the possibilities. Let us consider two of them:


  def liftA2[T, P, Q, A[_]](f: (T, P) => Q, a1: A[T], a2: A[P])(implicit applicative: Applicative[A]): A[Q] = {
    import applicative._
    applyA(mapA(f.curried, a1))(a2)
  }

  def liftA3[T, P, Q, R, A[_]](f: (T, P, Q) => R, a1: A[T], a2: A[P], a3:A[Q])(implicit applicative: Applicative[A]): A[R] = {
    import applicative._
    applyA(applyA(mapA(f.curried, a1))(a2))(a3)
  }

Basically the function liftA2 takes a method with two parameters, two Monadic instances, then producing a Monadic result after applying the function to the embedded values in the Monadic instances. 
Scala allows for the currying of the f function using the application of the curried method on the function instance. 
This is where the method mapA gracefully lift up the curried function instance and applies it once to the a1 argument producing a partially applied function
We can then apply applyA on the resulting partially applied function and a2, producing the final result.

Same for liftA3 and so on for hypothetical liftA4,5,6 etc. 

Wait a minute: applyA(mapA(f.curried, a1))(a2) does not offer any "Applicative style"  sugar. I struggled a lot with the sugar syntax and reading again chapter 11 of Joshua book provided me with the path to the solution:

case class BuilderToApplicative[T1, A[_]](m1: A[T1])(implicit applicative: Applicative[A]) {
def :@:[T2](f: T1 => T2): A[T2] = applicative.mapA(f, m1)

    def :*:[T2](m2: A[T2]) = BuilderToApplicative2[T2](m2)

    case class BuilderToApplicative2[T2](m2: A[T2]) {
      def :@:[T3](f: (T1, T2) => T3) = Applicative.liftA2(f, m1, m2)

      def :*:[T3](m3: A[T3]) = BuilderToApplicative3[T3](m3)

      case class BuilderToApplicative3[T3](m3: A[T3]) {
        def :@:[T4](f: (T1, T2, T3) => T4): A[T4] = Applicative.liftA3(f, m1, m2, m3)
      }
    }
  }

  implicit def toApplicative[T, A[_]](m: A[T])(implicit applicative: Applicative[A]) =
    new BuilderToApplicative[T, A](m)

This solution is almost identical to Joshua version of its config builder. 
My first version was incomplete as I was expecting a simple BuilderToApplicative class to handle the :@: and :*: function application with a liftA2 method (Sometime you hate yourself ;)). 
A better understanding of Joshua's book proved me that his builder was the right answer, creating a building environment with same number of levels as the possible maximum number of arguments in a Scala functions. We provide here the two first level. No doubt I will provide the others when needed

In each BuilderToApplicativeX class we apply a liftAX method (Joshua if we ever meet some day I owe your a capuccino, a tea, etc. whatever you want). 

 Why not applying that to lists. Defining an ad-hoc binding between lists and Applicative trait:

implicit object ApplicativeList extends Applicative[List] with Monad[List] with Functor[List]{
override def map[T, U](source: List[T])(f: (T) => U) = source.map(f)

  override def apply[T](from: T) = List(from)

  override def flatten[T](m: List[List[T]]) = m.flatten
}

we try:

def add = (a: Int, b: Int) => a + b
def addd = (a: Int, b: Int, c: Int) => a + b + c

def basics() {
  println(liftA2(add, List(1, 2), List(3, 4)))
  println(liftA3(addd, List(1, 2), List(3, 4), List(5, 6)))
  println(add :@: List(1, 2) :*: List(3, 4))
  println(addd :@: List(1, 2) :*: List(3, 4) :*: List(5, 6))
}

providing :

scala> import com.promindis.user._
import com.promindis.user._

scala> import UseApplicativeFunctor._
import UseApplicativeFunctor._

scala> basics()
List(4, 5, 5, 6)
List(9, 10, 10, 11, 10, 11, 11, 12)
List(4, 5, 5, 6)
List(9, 10, 10, 11, 10, 11, 11, 12)

Notice that these one liners easily replace for comprehensions expressions, leading to clearer syntax. 

Why all that stuff? My initial intent was to reproduce a kind of sequence function like in Haskell. The sequence function provides from a list of Monadic values a Monadic list a values and I found (still find) that priceless:

def sequence[T, A[_]](input: List[A[T]])(implicit applicative: Applicative[A]): A[List[T]] = {
import applicative._
  input match {
    case (x :: xs) =>
      def cons(head: T, list: List[T]): List[T] = head :: list
      liftA2(cons, x, sequence(xs))
    case _ => pure(List.empty[T])
  }
}
So List[A[T]] produces A[List[T]]. Taking Joshua's example of producing connection data from identifiers:
  def getConnection(input: List[Option[String]]) = {
      def doSomething(fromParameters: List[String]) =  fromParameters.toString()

      for {
        withParams <- input.sequenceA
      } yield doSomething(withParams)
  }

we get

scala> getConnection(List(Some("url"), Some("user"), Some("passwd")))
res5: Option[String] = Some(List(url, user, passwd))

scala> getConnection(List(Some("url"), None, Some("passwd")))
res6: Option[String] = None

only by applying the sequence method to the list using the implicit definition:

  implicit def toSequenceable[T, A[_]](list: List[A[T]])(implicit applicative: Applicative[A]) = new {
    def sequenceA = Applicative.sequence(list)
  }

We provide ad-hoc meaning to our list of Option's, returning a valuable answer only when all the elements are provided. No NullPointerException, no catch, no if. Just fluent language. A more useful example? Taking a class a workers, executable in pool, and provided a naive Applicative definition for Future traits:

case class W(value: Int) extends Callable[Int] {
def call() = {
    Thread.sleep(3000)
    value
  }
}

object Futures {
  implicit object ApplicativeFuture extends Applicative[Future] with Monad[Future] with Functor[Future] {
    def apply[T](data: T) = new Future[T] {
      override def get() = data

      override def get(timeout: Long, unit: TimeUnit) = get()

      override def cancel(mayInterruptIfRunning: Boolean) = false

      override def isCancelled = false

      override def isDone = true
    }

    def flatten[T](m: Future[Future[T]]) = m.get()

    def map[T, U](source: Future[T])(f: (T) => U) = new Future[U] {
      override def get() = f(source.get())

      override def get(timeout: Long, unit: TimeUnit) = f(source.get(timeout, unit))

      override def cancel(mayInterruptIfRunning: Boolean) = false

      override def isCancelled = false

      override def isDone = true
    }
  }

I can now transform a list of Futures into a Future of a list of the expected values:

 def exec(workers: W*)(implicit pool: ExecutorService): List[Int] = {
    workers.toList.map {pool.submit(_)}.sequenceA.get()
  }


  def futures() {
    implicit val pool = Executors.newCachedThreadPool()

    val start = System.currentTimeMillis

    val result = add :@: exec(W(1), W(2), W(3)) :*: exec(W(4), W(5), W(6))

    val ellapsed = System.currentTimeMillis - start
    println("Got " + result + " in " + ellapsed + " ms")

    pool.shutdown()
  }

For a more real-world example, check the Akka application
 The code presented here is vailable there. That's all folks. See you (very) soon with my last ramblings in the Disruptor world in Scala.

 Be seeing you !!!:)

Wednesday, January 25, 2012

A Start Trek firing Disruptors from Scala

Hello again.

 Today I want to share some of my ramblings while exploring a little bit the Disruptor framework from a Scala eye. It is never too late. This little inspection was encouraged by David Greco, and Martin Krasser kindly suggested me to also have a look to a Scala port implemented by Jamie Allen. 

Before dissecting this last reference I decided to read the classic stuff on the subject, from the seminal technical article up to Trisha Gee's blog but also read again the very interesting articles written by Martin Thompson here. In order to get a better understanding I also suggest this reading and that one. Naturally on the pile of all that documentation do not forget Martin Fowler's must read production over here

Everything has been said about the Disruptor and I really advocate you to read about the subject before reading the incoming article if interested. My incoming summary in a few line is going to be too harsh to be considered as a serious presentation, and I hopefully expect that some of the experts will read one day the article so to correct my misconceptions. 

Roughly cloning Martin Fowler's article introduction let say that,as all financial trading platforms, the LMAX platform relies on business logic core components that must interact with the outside world as fast as possible, supporting a throughput of dozen of millions of messages per second. 

The chosen approach of the implementation find its name in the disruptive way of thinking that one must adopt so to understand the internals of the Disruptor.  Working with disruptors involves first thinking  "asynchronous", then accepting that simple and elegant solutions to exchange messages can be better than messaging based on queues. The pre-cited articles explain in a detailed manner the why and of the how at the origin of the astounding high level of performance of Disruptors compared to standard queue systems. 

The core business logic is surrounded by Disruptors. At the heart of the Disruptor, we find a ring buffer as the corner stone of the architecture, fed with one or more producers, and feeding one or more readers. And guess what, the one producer/ many readers configuration is the optimum combination.
Readers and producers run in their own thread of execution. We achieve the communication between readers/producers and the ring Buffer through barriers created by the ring buffer, although in the former case the producer(s) barrier is merged with the ring buffer. 
The fixed size ring buffer creates all its indexed slots during instantiation, the memory being allocated once and for all avoiding annoying GC's. peaks.
The producer claims for the available slots index, get the matching slots, updates the slot's content, and commit them. '
A reader waits through its sequence barrier for the available slots. 
On producer side, events are created by an event factory. 
On the consumer side, event handlers process the dispatched events.

 The framework produces a lot of support in order to ease the creation of typical consumers'configurations like pipe line configurations:


or diamond configuration for example:


Notice that the performance tests implement these configurations in the Disruptor source code. 

Before diving into the Scala port, I wanted to reproduce some configurations in Scala, the code exploration allowing me to get a better understanding of how thing goes. This is me: I need a client API point of you before understanding it all.

I decided not to adopt the Disruptor wizard approach and to dive down to the layer just under the wizard, creating by myself the barriers, handlers and so on. BatchEventProcessor remains the only generic class definition I kept in my explorations. Roughly said, a BatchEventProcessor instance while executed in its own thread of execution, takes in charge the access to the sequence barrier, getting the event, and passing them to an EventHandler instance. So a BatchEventProcessor basically impersonates a consumer.

The (little code) code project I produced is located here (yes a Github project finally :)). What I want to run are little scenarii reproducing some of the configurations. Compared to tests I over simplified the design. The (very basic) event management is hosted in the body othe the EventModule object declaration:


package com.promindis.disruptor.adapters
import com.lmax.disruptor._
import java.util.concurrent.CountDownLatch
import util.PaddedLong

object EventModule {
  class ValueEvent() {
    val value = new PaddedLong()

    def setValue(newValue: Long) {
      value.set(newValue)
    }

    def getValue = value.get()
  }

  object ValueEvent {
    def apply() = new ValueEvent()
  }

  object ValueEventFactory extends EventFactory[ValueEvent]{
    def newInstance(): ValueEvent = ValueEvent()
  }

  case class Handler(name: String, expectedShoot: Long = 0, latch: Option[CountDownLatch] = None) extends EventHandler[ValueEvent] {
    var counter = 0L

    def onEvent(event: ValueEvent, sequence: Long, endOfBatch: Boolean) {
      counter += 1L
      for (l <- latch if (counter == expectedShoot) ) {
        l.countDown()
      }
    }

    override def toString = "[" + name   + ": counter => " + counter  + "]"
  }

  def fillEvent(event: ValueEvent): ValueEvent = {
    event.setValue(1234);
    event
  }

}

Our events wrap a long value using the LMAX PaddedLong class definition. Handlers will count the numbers of received instances. I used the counter for personal manual testing and checking all the events where passed to handlers. I introduce some optional countdown latch, very useful to track the time when the last consumer in my configurations has achieved its duty. What we basically need, is to measure the times of execution. So we need something like that:

package com.promindis.disruptor.adapters

import System._

object TimeMeasurement {

  implicit def toFrequency(value: Long) = new {
    def throughput(during: Long): Long = {
      (value * 1000L) / during
    }
  }

  def sampling(block: => Unit) = {
    val start = currentTimeMillis
    block
    val end = currentTimeMillis
    new {
      def provided[T](f: Long => T) = {
        f(end - start)
      }
    }
  }
}

The usage being:

 measured {
   doYourStuff()
 } getting { elapsedTime =>
  doSomethingWith(elapsedTime)}

The BatchEventProcessor instances must be run into their own thread of execution. The implicit throughput method will allow for the application of a small mean calculus to any Long in the scope of the definition.

Therefore, It would be nice if we could run our tests without having to take into account all that blurry management of
  • creating a thread executor service
  • Running the batch processors
  • halting the processors
  • halting the executor
I propose something like:

package com.promindis.disruptor.adapters

import com.lmax.disruptor.BatchEventProcessor
import java.util.concurrent.Executors._

object ProcessorLifeCycle {

  def executing[T](processors: BatchEventProcessor[T]*)(block: => Unit) {
    val executor = newFixedThreadPool(processors.length)
    processors.foreach {executor.execute(_)}
    try {
      block
    } finally {
      processors.foreach{_.halt()}
      executor.shutdown()
    }
  }

}

a typical usage being:

executing(processors) {
   scenario
}

So far so good. I have all the tools I need to run a Scenario. The * character at the end of the typing declaration of the first parameter expresses a variable number of argument processors. Why not creating a trait to gather all the basics:

package com.promindis.disruptor.configurations
import com.lmax.disruptor.BatchEventProcessor
import com.promindis.disruptor.adapters.TimeMeasurement._
import com.promindis.disruptor.adapters.ProcessorLifeCycle._


final case class Configuration(
  ringBufferSize: Int = 1024 * 1024,
  iterations: Long = 1000L * 1000L * 25L,
  runs: Int  = 5
)

trait Scenario {

  final def playWith[T](processors: Seq[BatchEventProcessor[T]])(bench: => Unit)(implicit config: Configuration) = {
    sampling {
      executing(processors:_*) {
        bench
      }
    } provided {
      config.iterations.throughput(_)
    }
  }

  def challenge(implicit configuration: Configuration): Long

  def run(implicit config: Configuration): Seq[Long] =  {
    val config = Configuration()
    for (_ <- 1 to config.runs) yield challenge(config)
  }

  def main(args: Array[String]) {
    run(Configuration())
      .foreach{value => println("Nb Op/s: " + value)}
  }
}

An Scenario allows to play with a set of processors a (micro!) bench execution. A Configuration case class instance groups all what's needed to play multiple times a fixed number of iterations. Notice that we use our implicit throughput method application on the result of time measurement so to return after a play the measured number of messages per second.
The run method is the entry point of the little bench execution, returning a list of throughput values, their number also being set by an input configuration instance.

No better start than implementing the most simple possible example: a producer fills a ring buffer while a consumer handles the events. This code sample will provide us with an overview of the main class involved in the process:

package com.promindis.disruptor.configurations.unicast

import com.promindis.disruptor.adapters.RingBufferFactory._
import com.lmax.disruptor.BatchEventProcessor
import com.promindis.disruptor.adapters.EventModule.{Handler, ValueEvent, ValueEventFactory}
import java.util.concurrent.CountDownLatch
import com.promindis.disruptor.adapters.{EventModule, Shooter}
import com.promindis.disruptor.configurations.{Configuration, Scenario}


object UnicastWithShooter extends Scenario{

  override def challenge(implicit config: Configuration) = {
    val rb = ringBuffer(ValueEventFactory,size = config.ringBufferSize);

    val barrier =  rb.newBarrier()
    val countDownLatch = new CountDownLatch(1)
    val handler = Handler("P1", latch = Some(countDownLatch), expectedShoot = config.iterations)
    val processor = new BatchEventProcessor[ValueEvent](rb, barrier, handler);
    rb.setGatingSequences(processor.getSequence)

    val shooter = Shooter(config.iterations, rb, EventModule.fillEvent)

    playWith(List(processor)){
      shooter ! 'fire
      countDownLatch.await()
    }
  }
}

From the creation of the barrier to the settings of the ring buffer gating sequence, the trained eye can recognize a typical Disruptor code pattern. But what is a Shooter ? I defined a class able to feed the ring buffer, from its own thread of execution and taking a minimum number of parameters, aka, the number of iterations, a ring buffer and a way to set up a created event (fillEvent method). 

There comes the Scala Actor "native" implementation:

package com.promindis.disruptor.adapters

import com.lmax.disruptor.RingBuffer
import actors.Actor

class Shooter[T](numberOfShoot: Long, val ringBuffer: RingBuffer[T], val eventStrategy: T => T) extends Actor {
  self =>

  implicit def adapted[T](ringBuffer: RingBuffer[T]) = new {

    def shoot(update: T => T) {
      val (sequence, event) = nextEventStructure(ringBuffer)
      update(event)
      ringBuffer.publish(sequence);
    }

    def nextEventStructure[T](rb: RingBuffer[T]): (Long, T) = {
      val sequence = rb.next();
      (sequence, rb.get(sequence));
    }
  }

  def act() {
    react {
      case 'fire =>
        for (i <- 1L to numberOfShoot) {
          ringBuffer.shoot(eventStrategy)
        }
        self.exit()
    }
  }
}

object Shooter {
  def apply[T](numberOfShoot: Long, ringBuffer: RingBuffer[T], fillEvent: T => T) =
    new Shooter(numberOfShoot, ringBuffer, fillEvent).start()
}

If you did not read Philip Haller and Franck Sommers' book on actors and actors in Scala...you should :). In order to shorten the size of the react method implementation I used an implicit method declaration allowing for an "ad-hoc" extension of the ring buffer API, allowing it to shoot some event update strategy by itself. 
After all who better than a ring buffer knows how to get a sequence and publish it ?
Although not comfortable with implicits, I do not find disturbing using them very close to their context of application, specifically when they help in solving a kind of expression problem

When it comes to reuse the same patterns of code I can become very lazy (as a lot of people). Having a little time at home I decided I could simplify a little bit the creation of other configurations. What if I could create a pipe line configuration like that :

val chain = for {
  _ <- pipe[ValueEvent](Handler("C1"), rb)
  _ <- pipe[ValueEvent](Handler("C2"), rb)
  _ <- pipe[ValueEvent](Handler("C3", latch = Some(countDownLatch), expectedShoot = config.iterations), rb)
}

in order to pipe three consumers, just getting back the BatchEventProcessors paired with the handlers for purpose testing. 
This is when I switched to panic mode. Of course we are building a state configuration for the ring buffer so we have to meet again with the state monad we studied last time. Implementing it was hard the first time, not being sure I had understood everything about it. I have difficulties with the sate monad, so here comes an opportunity to learn how to use the pattern...again 

Everything I need can be gathered in the pipe method, located in the Builder object:

package com.promindis.disruptor.adapters
import com.promindis.disruptor.support.StateMonad
import com.lmax.disruptor.{SequenceBarrier, BatchEventProcessor, RingBuffer, EventHandler}


object Builder {
  type Context[X] = (BatchEventProcessor[X], EventHandler[X])

  def pipe[T](handler: EventHandler[T], rb: RingBuffer[T]) = new StateMonad[SequenceBarrier, List[Context[T]] ] {
    def apply(list: List[Context[T]]) = {
      list match {
        case (p::ps)=>
          val newBarrier = rb.newBarrier(p._1.getSequence)
          val newProcessor = new BatchEventProcessor[T](rb, newBarrier, handler)
          (newBarrier, (newProcessor, handler)::p::ps)
        case _ =>
          val newBarrier = rb.newBarrier()
          val newProcessor = new BatchEventProcessor[T](rb, newBarrier, handler)
          (newBarrier, (newProcessor, handler) :: Nil)
      }
    }
  }
}

Aliasing a pair (BatchEventProcessor/EventHandler) as a Context for a given Event type, I define this type alias as my mutable state.
We are going to pile each step, because we need all the created processors at the end of the configuration execution, and maybe we need the handlers too so to check their internal counters. The value returned by the application of state Monad instance will be a sequence barrier. Returning a sequence barrier can be useful when attaching multiple consumers to the same barrier.

Ideally we would need only the processors to submit them in some executor service. Piping the consumers consists in creating the barrier of one consumer using the sequence of the previous one, so the former can handle an event after the latter did. I advocate you once more to study a little bit the articles I provided the links to, before understanding the implementation of the tool methods used for configuration in the Builder object. As a result the implementation of a pipeline becomes:

package com.promindis.disruptor.configurations.pipeline

import com.promindis.disruptor.adapters.RingBufferFactory._
import java.util.concurrent.CountDownLatch

import com.promindis.disruptor.adapters.EventModule.{Handler, ValueEvent, ValueEventFactory}
import com.promindis.disruptor.configurations.{Configuration, Scenario}
import com.promindis.disruptor.adapters.{Builder, Shooter, EventModule}
import Builder._


object Pipe_Line extends Scenario{

  def challenge(implicit config: Configuration) = {
    val rb = ringBuffer(ValueEventFactory,size = config.ringBufferSize);
    val countDownLatch = new CountDownLatch(1)

    val chain = for {
      _ <- pipe[ValueEvent](Handler("C1"), rb)
      _ <- pipe[ValueEvent](Handler("C2"), rb)
      _ <- pipe[ValueEvent](Handler("C3", latch = Some(countDownLatch), expectedShoot = config.iterations), rb)
    } yield ()

    val consumers = chain(List())._2
    val processors = consumers.unzip._1
    rb.setGatingSequences(processors.head.getSequence)
    val shooter = Shooter(config.iterations, rb, EventModule.fillEvent)

    playWith(processors){
      shooter ! 'fire
      countDownLatch.await()
    }

  }
}

which is a far shorter than the first implementation, believe me :). 

Returning a list of tuples (BatchEventProcessor/EventHandler), requires a little of trickery with the unzip method, but that cluttering can be easilly removed, because except for testing we do not need to get back the handlers. 
The Github repository project content provides two more methods used to build diamonds and I am certain, other possible variations on configurations. 

The declarative part of the configuration of a diamond looks like:

val diamond = for {
  barrier <- fork(Handler("C1"), rb, rb.newBarrier())
  _ <- fork(Handler("C2"), rb, barrier)
  _ <- join(Handler("C2", latch = Some(countDownLatch), expectedShoot = config.iterations), rb)
}

also far shorter in length than the very first implementation.

Thank you for following me at this point. 

Running my tests provides the same results as running the tests located in the LMAX source code in terms of throughput values. 
Having reduced the number of iterations to 25 millions and extended generously the ring buffer size, depending on the tested configuration, I get from 3.5 millions to 5 millions messages per second. 
My old machine not being sized for this kind of stress (only two cores) , the results values are smaller than the benchmarks' result values presented by the LMAX team. 
But my old laptop still runs five to ten time  faster the Disruptor tests than the message queue tests. I find the result quite interesting, being sure that I do not host any parallel desktop application on this laptop that can handle 5 million message per second or even one million message per second. 

Somehow adopting a disruptive mind could be a smart move not only in large scaled applications. 
That's all folks. 

 Be seeing you ! :):):)