Sunday, May 29, 2011

A touch of Scala

Since a few month now the shadow of functional programming has started haunting me. A few months ago I started again gazing at Scala, Ruby and would really like to go into Clojure. As all these languages can be run onto a JVM, practicing them brings an interesting merge between Object Oriented paradigm and functional programming paradigm (OMG I'd like to try LISP and Erlang some day). As it is difficult to find masters of these crafts in France (hum... some may know what I mean...), I started contacting foreign companies in border countries, but these things take time. So in order to improve the knowledge, I bought the Martin Odersky's Scala reference (Programming in Scala) , the pragmatic book approach of Venkat Subramaniam, and a recent book from Artima about Actors in Scala from Philipp Haller and Frank Sommers (because I'd like so much to code some Erlang !!!).

I have adopted the approach suggested by Jonas Boner in an infoQ presentation, starting from a bird eyes view and then going deeper in to the code. This learning involved starting with Venkat book and then going to the actors book, using the big programming book as an incredible powerful reference (reading it teaches you a lot about Scala, Java... and yourself).

Humility implies going back and going back again to basics and one pleasant way to do it, is trying to grab presentations on channels. As I was recently looking at some Martin's Odersky's presentation, he provided quite an interesting short topic about how to implement a try-with-resource jdk7 (TWR) stuff but as a Scala implementation. This is a small interesting part for a newbie like me because this small example gathers some typical features of Scala.
The presentation was made in 2009, when the jdk7 was not even a beta version.

We talked about the TWR feature in jdk earlier. Basically, this feature reproduces the C#.NET language enhancement allowing to declare a closeable resource in a try expression that would ensure on your behalf that the resource is closed (works for I/O streams, bundles, JDBC connections etc...):

try (final AsynchronousFileChannel sourceChannel =...
     final AsynchronousFileChannel targetChannel =...
     ) {
         final ByteBuffer buffer = allocateDirect(65536);
         final CountDownLatch countDownLatch = new CountDownLatch(1);
         final SourceCompletionHandler handler =
                 new SourceCompletionHandler(
                         sourceChannel, targetChannel, buffer, countDownLatch
                 );
         sourceChannel.read(buffer, 0, null, handler);
         countDownLatch.await();
     }

and channels are closed by the underlying JVM plumbing.

How would I want to implement it in Scala ?
Naturally I started with some testing code in order to foresee how I would like to write this invocation.
Basically I would like to create a closeable resource, invoke what I have to invoke and bang, done!
It is a start, so I wrote

import org.junit.Test
import org.junit.Assert._
import  com.promindis.tools.CloseableUtils._


final class TestCloseUtils {

  @Test
  def  using_StubCloseable_ShouldFireCloseAction()  {
    val closeable = SpyingCloseable()
    
    using(closeable) {
      closeable => closeable.invoke();
    }

    assertTrue(closeable wasInvoked)
    assertTrue(closeable wasClosed)

  }
}

In order to trace trace that, both the closeable would be invoked and, that the method close would be applied I wrote a small Spying class:

class SpyingCloseable {
  var closed = false
  var invoked = false

  def close() { closed = true}
  def invoke() {invoked = true}

  def wasClosed() = closed
  def wasInvoked() = closed

}

object SpyingCloseable {
  def apply() = {
    new SpyingCloseable
  }
}

Indeed I wrote a class definition and the implementation of the "companion" object. As there are no statics methods nor field objects  in Scala, you can create an object instance very close to Martin Fowler's knowledge level implementation of your class. This object provides you with class level utilities and particularly you have the opportunity to define a default behavior implementing an apply() method. Creating an apply method allows you to create instance without the new keyword.

Basically the spy caches whether you've called the expected methods.

The natural elegance of the Scala syntax makes the class content easily readable even for a novice. The purist will not be very happy with the variable instance which by definition are mutable and in functional programming, mutability should have not place to live in. But it is a test so I beg your pardon.

So how does look like the implementing class ? Very close to Martin Odersky's demo:

object CloseableUtils {

  def using[T <: {def close()}]
                    (resource: T)
                       (block: T => Unit){

    try {
      block(resource)
    } finally {
      if (resource != null) resource.close()
    }
  }

}

Firts of all I wanted my class to accept all possible closeable resources. So my method - as in Java Generics - must accept all kinds of derived definitions of closeable. In one word I must apply type inference rules on my method, using a generic typing parameter: T. As all the handled resources derived from some closeable resource definition, my type is limited by an upper bound type definition. The upper bound definition is provided by the following notation: <: .

What is a closeable ? Indeed everything that exposes a close() method. This is where we can use duck type. Remember :
"When I see a bird that walks like a duck and swims like a duck and quacks like a duck, I call that bird a duck."

Indulge me and think about this version:

"When I see a bird that walks like a duck and swims like a duck and quacks like a duck, I assume that this bird behaves like duck."

Thinking like that opens the path to role driven design. But this is another discussion...

The duck typing implementation of my type is quite simple:

{def close()}

One of my purpose designing this class was providing this feature with a more "native" like look. You know with the open/closed braces that gives this impression of fluency and that you extended the language definition.
Scala allows the developer to apply braces instead of parenthesis on single parameter methods. That would provide this look alike the while control for example. But my utility class has two parameters.
That's where currying and partial functions come into the party. Function (specially idempotent ones) taking multiple input parameters can be seen as a composition of multiple functions. You can than create a partial function instance setting one of the parameters.
For example

def sum(x: Int, y: Int) = x + y

could also be written

def composedSum(x: Int)(y: Int) = x + y

You can then set

val fivePlus = composedSum(5)_

that will create fivePlus as a partial function instance, ready to be invoked with the lasting parameter:

fivePlus(6)

I can do exactly the same with my utility function, splitting the parameters declarations.
So I will be able to invoke

using(closeable) {
      closeable => closeable.invoke();
    }

creating my partial curried function invoking using(closeable). This partial function only accepts one parameter that I can invoke using braces:

def using[...](resource: T)(block: T => Unit)

Tests are green. So now as a little example I can open my pom.xml project file and list its content, sure that my source file abstraction has been closed:

using(fromFile("pom.xml")) { data =>
  println(data.mkString)
}

Thanks to the ones who did not fall asleep =D
Be seeing you !

Sunday, May 22, 2011

Glimpse at the JDK7 MethodHandle

Like last Sunday, as a result of a tough week fighting with dumb legacy code, I found myself short of time. But as I committed into learning things and talk about them I decided to do small kata to understand better the new method handles into the JDK7.

The big surprise with incoming versions will be the introduction of closures allowing for example more data oriented programming (very useful in massive parallel algorithms), while leading to less verbose code style. The use of closures could also help us avoiding declaration of interfaces for single method classes, or (at last) avoiding using the use of the (infamous) inner classes, that were half way to be closures. Well designed on the language level, closures can be very self explanatory, have a look to Ruby ,. Groovy or Scala samples. Bluffing and so clean.

But it was decided to split the set of incoming features in two, and guess what, closures are planned for version 8.0 .But... new stuff has been introduced so to prepare their arrival. Deep stuff indeed with the extension of the bytcode with the definition of new opcode invokedynamic. This new opcode is provided as a support for the dynamically typed languages (JSR 292) by allowing late binding in method calls. As a matter of fact, dynamically typed languages don't provide type information until runtime. To make it fast - quoting Ed Ort - invokedynamic, as a new linkage mechanism, solves the problem catching invocations to unexisting  methods or not already bound methods and redirect them to bootstrap methods. All that happens on the bytcode level insuring efficiency and performance.

The detailed explanation of Benjamin J. Evans and Martijn Verburg in their incoming book is quite a jewel and details better  better than I can do this aspect. I must confess that thanks to their introduction I can go deeper into my understanding and the JVM. I am so ashamed I did not that before =D.

One important effect of the introduction of invokedynamic is the apparition of  a set of classes involved in the feature management and located into the java.lang.invoke package. One class attracted me specifically because it can be a real helping class while laying a test harness around unbearable legacy code. Yep that does happen to me regularly and I hate that.
The class is MethodHandle. A method handle, from my point of view can be seen as a pointer to a method not  bound yet to a receiver (very Smalltalk-like indeed, but where does Java come from ?).
When one writes:

System.out.println()

out is the receiver of the printlln() method.

Using a method handle in order to query dynamically an object seems very close to operating on object methods through the reflection API but gaining the performance of the underlying bytcode enhancement.
Having practiced Scala and Ruby, it seems quite natural to define a method abstraction and to bind it later, specifically when it does come to the point of exploratory testing.
Why? because with low level code you can gain access to private methods and when your are testing big spaghetti code functions, so it rocks.

Let us take an example inspired by a previous experience in a bank environment.

I reproduced here, the two object definitions involved in my problem. We had this transaction object that strangely succeeded in transferring money with negative amount from one account to another.
For the sake of simplicity I reproduced a basic Account implementation. First tested:

public final class AccountTest {

    private BigDecimal amountOf(final double money) {
        return BigDecimal.valueOf(money);
    }

    @Test
    public void openAccount_forAmountOfMoney_ShouldNotBeNull() {
        assertThat(openWithDeposit(amountOf(100.0d)), is(not(nullValue())));
    }

    @Test
    public void openAccount_forAmountOfMoney_ShouldHaveMatchingBalance() {
        assertThat(
                openWithDeposit(amountOf(100.0d)).balance(), 
                is(equalTo(amountOf(100.0d)))
        );
    }

    @Test
    public void deposit_forAmountOfMoney_ShouldHaveIncreasedBalance() {
        assertThat(
                openWithDeposit(amountOf(100.0d))
                                .withDeposited(amountOf(100.0)).balance(), 
                is(equalTo(amountOf(200.0)))
        );
    }

    @Test
    public void withdraw_forAmountOfMoney_ShouldHaveIncreasedBalance() {
        assertThat(
                openWithDeposit(amountOf(100.0d))
                            .withdrawed(amountOf(100.0)).balance(), 
                is(equalTo(amountOf(0.0)))
        );
    }
}


Then implemented:

public final class Account {
    private BigDecimal balance_;

    private Account(final BigDecimal money) {
        super();
        balance_ = money;
    }

    static Account openWithDeposit(final BigDecimal money) {
        assert money != null;
        return new Account(money);

    }

    BigDecimal balance() {
        return balance_;
    }

    Account withdrawed(final BigDecimal amount) {
        balance_ = balance_.subtract(amount);
        return this;
    }

    Account withDeposited(final BigDecimal amount) {
        balance_ = balance_.add(amount);
        return this;
    }
}

Remember...basic (should have final balance, with application of the value object pattern). Please remember to use BigDecimal in banking systems (I am a client too =D)

Than comes the transaction object. We need a test fixture like the following:

public final class TransactionTest {
    private BigDecimal money;
    private Account sourceAccount;
    private Account targetAccount;

    @Before
    public void setup() {
        money = amountOf(100);
        sourceAccount = openWithDeposit(amountOf(200));
        targetAccount = openWithDeposit(amountOf(100));
    }
    
    private Account toTarget() {
        return targetAccount;
    }

    private Account fromSource() {
        return sourceAccount;
    }

    private BigDecimal money() {
        return money;
    }

    private BigDecimal amountOf(final double money) {
        return BigDecimal.valueOf(money);
    }    
//....Up to come
}

The transfer must be tested, so I wrote:

    @Test
    public void transaction_forAmountOfTestMoney_ShouldNotBeNull() {
        assertThat(Transaction.forAmountOf(money()), is(not(nullValue())));
    }

    @Test
    public void transfer_ShouldOperateOnAccounts() {
        final Transaction transaction = Transaction.forAmountOf(money());
        transaction.transfer(fromSource(), toTarget());
        assertThat(fromSource().balance(), is(equalTo(amountOf(100))));
        assertThat(toTarget().balance(), is(equalTo(amountOf(200))));
    }

The matching implementation of the Transaction object came to be:

public final class Transaction {

    private BigDecimal amount;

    private Transaction(final BigDecimal amount) {
        super();
        this.amount = amount;
    }

    public boolean transfer(
            final Account fromSource, final Account toTarget
    ) {
        boolean status = false;
        if (canWithdraw(amount(), fromSource)) {
            doTransfer(fromSource, toTarget);
            status = true;
        }
        return status;
    }

    private boolean canWithdraw(
            final BigDecimal amount, final Account fromSource
    ) {
        return true;
    }

    private void doTransfer(
            final Account fromSource, final Account toTarget
    ) {
        fromSource.withdrawed(amount());
        toTarget.withDeposited(amount());
    }

    private BigDecimal amount() {
        assert amount != null;
        return amount;
    }

    public static Transaction forAmountOf(final BigDecimal money) {
        return new Transaction(money);
    }
}

The astute reader will notice that we will have to face a problem while checking if the withdrawal is possible, specifically when the amount of money is negative. The problem is that the canWithdraw method is private. I would like to challenge it without lowering its access level to package scope or protected.
This really happened a few years ago, but the method was long, big, applying remote access, all the kind of bad stuff that happens in really messy code.

The Reflection API offers a solution that works fine:

    @Test
    public void canWithdraw_ThroughReflection_WithAccessRight_ShouldReturnFalse()
    throws Exception {
        final Transaction transaction = Transaction.forAmountOf(money());
        Method method = transaction.getClass().getDeclaredMethod(
                "canWithdraw", BigDecimal.class, Account.class
        );
        method.setAccessible(true);
        final Object result = 
                  method.invoke(transaction, amountOf(-100), fromSource());
        assertThat((Boolean) result,is(equalTo(FALSE)));
    }


I create a Method abstraction, lower dynamically the access right and invoke it on my object. Done !

Using the invoke package utilities I can do the same relying on my JVM performance. All I have to do is to set a hook into my tested class, a hook that will provide me with all the access I want to all the methods in the class, whatever is their access level.

This is the hook I introduced:

    public MethodHandles.Lookup hook() {
        return MethodHandles.lookup();
    }

Of course I will use it temporary as testing method. Quoting the Javadoc a lookup "has the capability to access any method handle that the caller has access to,including direct method handles to private fields and methods". So hello private methods !

I need it, that's clear

Back to my test I then follow the standard procedure:

@Test
public void canWithdraw_ThroughLookupHandle_WithAccessRight_ShouldReturnFalse()
throws Throwable {
   final Transaction transaction = Transaction.forAmountOf(money());
   final MethodType methodType = withDrawMethodType();
   final MethodHandle handle =
           transaction.hook()
               .findVirtual(Transaction.class, "canWithdraw", methodType);
   assertThat(
       (boolean)handle.invokeExact(transaction, amountOf(-100), fromSource()),
        is(equalTo(FALSE))
    );
}

private MethodType withDrawMethodType() {
   return methodType(boolean.class, BigDecimal.class, Account.class);
}



I first define a MethodType, a kind of meta data for an unnamed, unbound methods. What uniquely defines a method if not the return type and the arguments? That's what a method type is.

I can then ask the transaction hook to find the method handle (the pointer if you prefer) to the abstraction of the method I am looking for. The helping hunting method is findVirtual, that brings me for the Transaction class, the handle for the private canWithdraw method.
Then I simply invoke the method...

Done !!The test is red but I can modify or explore my tested code on more finer level.

Very useful, and forcing us a little bit to understand the JVM. But that's pure pleasure =D (We are craftsmen after all). It will help understanding the internal mechanics because the dynamics involved for, provide inference behavior that demands to understand some of the bytecode syntax. I first achieved the test with:

assertThat(
    (Boolean)handle.invokeExact(transaction, amountOf(-100), fromSource()),
    is(equalTo(FALSE))
);

and got:

java.lang.invoke.WrongMethodTypeException: (Ljava/math/BigDecimal;Lcom/promindis/jdk7/handles/Account;)Z cannot be called as (Lcom/promindis/jdk7/handles/Transaction;Ljava/math/BigDecimal;Lcom/promindis/jdk7/handles/Account;)Ljava/lang/Boolean;

Of course the method returns a boolean primitive (Z symbol) !!!

Be seeing you !! =D

Saturday, May 21, 2011

Sunday, May 15, 2011

Overview of the JDK7 watch service

Time' s running and I found myself with too few things to talk about this week, but to become a craftsman requires discipline and a real will to share things with others even small things. And of course, I still expect others to share things with me :). I made the promise a few weeks ago to talk about a real time saver and take this opportunity today, even if the subject is quite small.

Me and one of this blog's subscribers (hi ze1mlp), have been working for a long time together on the implementation of a military tactical editor (ok they must shoot me now =D). The application had to interface to external systems in order to exchange legacy data. One of the processes required the lowest protocol of communication that can be imagined: the exchange of files deposited under a shared directory. The legacy external system could only push files, non communication or service connection was possible.

We decided to create a watch dog. Easy said, easy done... but with all the trivial impediments raised by file I/O manipulation specifically when your JVM is running under windows (or Linux by the way). And we found ourselves with double checking for file deletions because of the file descriptor pains , tuning for periods of polling losing a lot of CPU, creating all sorts of small classes in addition to the set of classes to deal with the watch dog implementation.

The JDK7 elegantly solves the watch dog problem with a new service facility offered by the FileSystem god object (no anti patterns here :) ).

As usual I repoened my beloved IntelliJ IDE and started coding a new learning test for this facility.

Helped with the following helper methods:

@Before
    public void setup() {
        fileSystem = FileSystems.getDefault();
    }

    private FileSystem fileSystem() {
        return fileSystem;
    }

    private FileSystemProvider provider() {
        return fileSystem().provider();
    }


    private Path directory() {
        return fileSystem().getPath("C:", "temp");
    }



I came to the following:

@Test
    public void test() throws IOException, InterruptedException {
//step1
        final WatchService service = 
                     monitorCreationIn(directory(), withWatchService());
//step2
        final String location = newFileIn(directory());
//step3
        final WatchKey modification = service.take();
        assertEquals(directory(), modification.watchable());
        for (final WatchEvent event : modification.pollEvents()) {
            if (OVERFLOW.equals(event.kind())) continue;
            assertThat(event.count(), is(equalTo(1)));
            assertEquals(ENTRY_CREATE, event.kind());

            assertThat(
                  ((Path) event.context()).endsWith(location), 
                   is(true)
            );
        }
        modification.reset();
    }

In step 1, I first claim my intent to watch the content of the temporary directory (obviously the directory() method) using a watch service.

The watch service is created as is:

private WatchService withWatchService() throws IOException {
        return fileSystem().newWatchService();
    }


Magic ! :) Everything is set or nearly set. I also have to register the directory path as a Watchable object of the new service for this one to check it.

private WatchService monitorCreationIn(
          final Path path, final WatchService withService
    ) throws IOException {
        path.register(withService, ENTRY_CREATE);
        return withService;
    }

I only want to observe creation so be it. I registered the ENTRY_CREATE StandardWatchEventKind implementation of WatchEvent.Kind... as the kind of event to be observed.

Then in step 2 I do create my file as a path abstraction into the temporay directory location. This was a nice kata to create a file with the new I/O. I proceeded this way:

private String newFileIn(final Path directory) throws IOException {
        String location = format("newFile{0}.dat", currentTimeMillis());
        final Path newPath = directory.resolve(location);
        assertThat(exists(newPath), is(not(true)));
        create(newPath);
        return location;
    }

    private void create(final Path newPath) throws IOException {
        provider().newByteChannel(newPath, withCreationOptions()).close();
    }

This new API rocks !!! (notice I did close the byte channel)

On the time I registered my watchable path instance, a watch key has been registered, This key is driven by a finite state machine counting the following states:

  • ready (to receive file system events)
  • signaled (file system events received)
  • invalid (self explanatory) 
Once created, the key is in ready state and it will switch into signaled states once events will have been attached to it.
There is one key per watchable (very close to the selector key when working with non blocking events in NIO.1).

So From step 3, in practice I would start a time loop, polling key after key to check my watchables life cycle. I created one single file so I catch the next one and assert that the watch key watchable matches my observed directory:

final WatchKey modification = service.take();
      assertEquals(directory(), modification.watchable());

So far, so good, I can now poll for the events bound to my watchable, through the key:

for (final WatchEvent event : modification.pollEvents()) {

but I will take care not to handle lost or discarded events :

if (OVERFLOW.equals(event.kind())) continue;

Then, I check that this is not a repeated event, that the expected kind is creation and that I find again the name of the file I have created:

assertThat(event.count(), is(equalTo(1)));
     assertEquals(ENTRY_CREATE, event.kind());
     assertThat(((Path) event.context()).endsWith(location), is(true));

Of course, if you want your watchable key to be ready again to be polled for upcoming events, you must reset it so it will switch again to the ready state.

Done. Test green.

A few line of code reproducing the problem we solved four years ago with a bit more of code, of pain and of tests. Moreover there is this similarity with the familiar readiness check mechanism in NIO.1 implementation for non blocking sockets. Of course, the whole stuff is event driven and seems to lay onto lower level OS mechanisms allowing for a kind of reactor pattern implementation (no mystery).

Thanks for following these explorations smaller than usual. Be seeing you !! =D

Sunday, May 8, 2011

Playing with concurrency in JDK7

As an addition to the existing concurrent package, the JDK7 comes with a Fork/Join framework. An originating article from Doug Lea about this implementation can be found here. Some algorithms solve problems using divide-and-conquer or recursive-data  patterns implementations, dividing the original problem into sub problems that can be handled independently. On each algorithm step, new independent tasks are forked to handle a sub problem and then joined once their own execution achieved. This kind of algorithm can be supported by a parallel framework implementation.
The fork/join process is repeated recursively till one task find the sub-problem small enough to  process it sequentially. The pattern of pseudo code found in each tasks, in charge of deciding to process or to fork the problem looks like:

def compute() {
        if (problem_is_small_enough()) {
            processSequentially();
        } else {
            divideAndConquer();
        }
    }

The divide and conquer pseudo code  then can be implemented as:

def divideAndConquer() {
           split_the_problem()
           fork(tasks)
           join
           merge(tasks)
     }


Doug Lea's and al. implementation of the framework uses an indirect task/thread mapping implementation so each thread takes in charge a double-ended queue of tasks but can also steals work from other threads queues when out of job. The article quoted at the beginning presents all the advantages of this approach.Specifically one should note that this indirect mapping prevent from clogging the system by heavy creation/destruction thread implementations as a limited number of threads can aggressively  work on set of tasks

The two important objects in the framework are the ForkJoinPool and the ForkJoinTask instances. The ForkJoinPool is an ExecutorService implementation of the concurrent API. It will provide the standard mechanisms of thread management and all the implementation of the work-stealing tactics. The ForkJoinTask is the light object impersonating the task that will be pushed onto the queue. This is where all the decisions concerning the fork/join strategy will be taken.

In a tribute to Doug Lea's genuine article I decided to compute some values from the Fibonacci function (Yep I like to compute the evolution of pairs of rabbits  :) )
In a idealistic world (check the previous link) you will find that the evolution of pair of rabbits can be computed versus the following formula:

f(n) = f(n - 1) + f(n -2)

In order to validate the algorithm versus tables of data I implemented the two followin tests:

@Test
    public void knownValues_ShouldBeFound() {
        assertThat(Fibonacci.processor().compute(0), is(equalTo(0L)));
        assertThat(Fibonacci.processor().compute(1), is(equalTo(1L)));
        assertThat(Fibonacci.processor().compute(2), is(equalTo(1L)));
        assertThat(Fibonacci.processor().compute(3), is(equalTo(2L)));
        assertThat(Fibonacci.processor().compute(8), is(equalTo(21L)));
    }

    @Test
    public void recursive_WithRabbitCouple_ShouldMatchExpectedResult() {
        final long fromStart = nanoTime();
        final long result = Fibonacci.processor().compute(47L);
        trace("rec: {0}", nanoTime(), fromStart);
        assertThat(result, is(equalTo(2971215073L)));
    }

The  first test looks like more  canary test allowing me to validate the algorithm  and the second test allow me to trace some time of execution for a big number.
The first class implemented, that allows to check make canary test green is

final class Fibonacci {
    private Fibonacci() {
        super();
    }

    static Fibonacci processor() {
        return new Fibonacci();
    }

    public long compute(final long upTo) {
        return sequence(upTo);
    }

    private long sequence(final long upTo) {
        if (upTo <= 1) {
            return upTo;
        } else {
        return sequence(upTo - 1) + sequence(upTo - 2);
        }
    }
}

So far so good, we are happy campers !
Then the big stuff, is writing the second test to challenge parallel computing. After some code exploration, here we go:

@Test
    public void forkJoin_WithRabbitCouple_ShouldMatchExpectedResult() {
        final ForkJoinPool pool = new ForkJoinPool();
        final FibonacciTask task = FibonacciTask.upTo(47L);
        final long fromStart = nanoTime();
        pool.invoke(task);
        trace("fj: {0}", nanoTime(), fromStart);
        assertThat(task.result(), is(equalTo(2971215073L)));
        pool.shutdown();
    }

We basically created a ForkJoinPool instance,  and invoked a new FibonacciTask. The invoke method returns upon completion. You might use the execute method to invoke asynchronously the task.

The real magic happens into the task her self. This task extends the abstract  ForkJoinTask template provided by the framework:

public class FibonacciTask extends ForkJoinTask<long>
This constrains us to implements the three following convenience method:

@Override
    public Long getRawResult() {
       return result;
    }

    @Override
    protected void setRawResult(final Long value) {
    }

    @Override
    protected boolean exec() {
        if (number < TRESHOLD) {
            result = sequence(number);
        } else {
            divideAndConquer();
        }
        return true;
    }

Reading the javacode of the xxxRawResult  worried me a little (I will welcome any advice). So I provided an additional method to grab the result:

long result() {
        return result;
    }
Of course the  exec() method is where we decide to compute serially or in parallel. You recognize the pattern. Here the size of the problem is really symbolized by a number we found small enough to start a sequential computation. Let's go to the divideAndConquer() method:

private void divideAndConquer() {
      final FibonacciTask previousTask = new FibonacciTask(number - 1);
      final FibonacciTask beforePreviousTask = new FibonacciTask(number - 2);
      invokeAll(previousTask, beforePreviousTask);
      merge(previousTask, beforePreviousTask);
  }

Once again we find the pattern of pseudo code presented at the beginning (remember ?).
We split the problem in two sub problems impersonated by tasks :

final FibonacciTask previousTask = new FibonacciTask(number - 1);
     final FibonacciTask beforePreviousTask = new FibonacciTask(number - 2);
We invoke and wait

invokeAll(previousTask, beforePreviousTask);
and we merge

merge(previousTask, beforePreviousTask);
What's left is the merge:

private void merge(
         final FibonacciTask previousTask, 
         final FibonacciTask beforePreviousTask
   ) {
        result = previousTask.result() + beforePreviousTask.result();
    }
And that's all folks !! The two parameters as specified in Doug Lea's article, is the threshold value which depends on the nature of the algorithm and the size of the ForkJoinPool. I chose to invoke the default constructor for the pool would create as many threads as found processors on my my little handbook.

As Amdahl's law expresses it, the serial parts of the program can drastically constrain the execution speed up. so the fork and then merge operations must be reduced to a minimum.

The execution of the tests provided the following results :

rec: 45 081
fj: 23 730

in ms

That is in concordance with Amdahl's law as I do have two cores, but the ratio is not exactly 2 because of the sequential parts. I found the optimum threshold value to be 21, versus 13 in Doug Lea's article but I did not challenge the program much.

In order to complete my exploration I decided to work on another sorting. I will go faster this time. I decided to work on time stamped object, that I would need to sort.

I imagined a times tamped object that could be sorted using the Comparable interface. So I designed the following tests:

public final class TimestampedObjectTest {

    @Test
    public void objectTimestampedAt_TestDate_ShouldNotBeNull() {
        assertThat(objectTimestampedAt(new Date()), is(not(nullValue())));
    }

    @Test
    public void compareTo_WithTwoSortedTestDate_ShouldMatch() {
        final Calendar calendar = Calendar.getInstance();
        final TimestampedObject objectOne = objectTimestampedAt(dateIn(calendar));
        update(calendar);
        final TimestampedObject objectTwo = objectTimestampedAt(dateIn(calendar));

        assertThat(objectOne.compareTo(objectTwo), is(lessThan(0)));
        assertThat(objectTwo.compareTo(objectOne), is(greaterThan(0)));
        assertThat(objectTwo.compareTo(objectTimestampedAt(dateIn(calendar))), is(equalTo(0)));
    }

    private Date dateIn(final Calendar calendar) {
        return calendar.getTime();
    }

    private void update(final Calendar calendar) {
        final long date = calendar.getTimeInMillis();
        calendar.setTimeInMillis(date + 1000);
    }

}

and the lead me to the following implementation (it was incremental of course :)) :

final class TimestampedObject implements Comparable<TimestampedObject>{
    private final Date date;

    private TimestampedObject(final Date forDate) {
        super();
        date =  forDate;
    }

    static TimestampedObject objectTimestampedAt(final Date date) {
        return new TimestampedObject(date);
    }

    @Override
    public int compareTo(final TimestampedObject other) {
        if (other == null) return 1;
        return date().compareTo(other.date());
    }

    private Date date() {
        assert date != null;
        return date;
    }
}

A piece of cake for the average developer.

So I imagined the following code to challenge the framework:

@Test
    public void forkJoin_WithZillionsOfTimestamps_ShouldBeSorted() {
        final List<TimestampedObject> listOfTimestampedObjects =
                    listOfTimestampedObjects();
        final TemporalSortTask sortingTask 
                    sortTaskFor(arrayFrom(listOfTimestampedObjects));
        final TimestampedObject[] result = executed(sortingTask);

        for (int i = 1, length = result.length; i < length; i++) {
          assertThat(result[i].compareTo(result[i - 1]), is(greaterThan(0)));
        }
    }

A TemporalSortTask will be executed in the ForkJoin framework, its mission being, sorting a big list
The list of timestamped objects is created using a calendar which time in milliseconds is incremented for each objects:

private List<TimestampedObject> listOfTimestampedObjects() {
        final List list = new LinkedList<>();
        final Calendar calendar = Calendar.getInstance();
        for (int i = 0; i < 100000; i++){
            list.add(objectTimestampedAt(dateIn(updated(calendar))));
        }
        shuffle(list);
        return list;
    }

    private Date dateIn(final Calendar calendar) {
        return calendar.getTime();
    }

    private Calendar updated(final Calendar calendar) {
        final long date = calendar.getTimeInMillis();
        calendar.setTimeInMillis(date + 1000);
        return calendar;
    }

objectTimestampedAt is a factory method of the TimestampedObject class. What interests us most is the central execute method of the test method:

private TimestampedObject[] executed(final TemporalSortTask sortingTask) {
        final ForkJoinPool pool = new ForkJoinPool();
        final long start = currentTimeMillis();
        pool.invoke(sortingTask);
        System.out.println(format("recursive {0}", currentTimeMillis() - start));
        final TimestampedObject[] result = sortingTask.result();
        pool.shutdown();
        return result;
    }
There we again create ForkJoinPool, letting the underlying implementation fix the number of threads. We fire the execution of a sorting task and return the result we have challenged in the test method.

The implementation of the TemporalSortTask under test surround the implementation of the exec() method:

@Override
    protected void compute() {
        if (size() < TRESHOLD) {
            process();
        } else {
            divideAndConquer();
        }
    }
Here the chosen threshold is the size of the array of objects we will be sorting.
The process sequential step, delegates sorting to the underlying JDK algorithm:

private void process() {
        arraycopy(fromSource(), start(), result, 0, size());
        sort(result, 0, size());
    }

While the divide and conquer method follows exactly the same pattern as in the previous example:

private void divideAndConquer() {
        final int midSize = size() / 2;

        final TemporalSortTask leftTask = 
            new TemporalSortTask(fromSource(), start(), start() + midSize);

        final TemporalSortTask rightTask = 
             new TemporalSortTask(fromSource(), start() + midSize, end());

        invokeAll(leftTask, rightTask);

        merge(leftTask.result(), rightTask.result());
    }

Where a left task and a right task have been created each taking in charge a problem of sorting divided by two in size. The trick here is that the source of time stamped objects is stored as an array which reference only is copied from tasks to task. The start/end bounds of the sort range are created for each new task. The result is stored into a result array of time stamped objects. The merge operation takes the results of the two sub-tasks and then rearrange them into a result array:

private void merge(
            final TimestampedObject[] leftTask, 
            final TimestampedObject[] rightTask
    ) {
        int resultIndex = 0;
        int leftIndex = 0;
        int rightIndex = 0;

        while((leftIndex < leftTask.length) 
                && (rightIndex < rightTask.length)) {
            result[resultIndex++] = 
                    leftTask[leftIndex].compareTo(rightTask[rightIndex]) < 0 ?
                    leftTask[leftIndex++] : rightTask[rightIndex++];
        }

        while (leftIndex < leftTask.length) {
            result[resultIndex++] = leftTask[leftIndex++];
        }
        while (rightIndex < rightTask.length) {
            result[resultIndex++] = rightTask[rightIndex++];
        }
    }
 
I won't detail it as it is a standard rearranging algorithm.

Once again Amdahl's law is verified, measured times for arrays of 500 000 objects  in ms provides the following results:

fork/join 337
sort 633

Well this one was too long. Hurray if you finally came to the end of the article. Be seeing you ! :)

Tuesday, May 3, 2011

Callback handler in JDK7 NIO.2

So to quote the Wikipedia article about Asynchronous I/O : "Input and output (I/O) operations on a computer can be extremely slow compared to the processing of data." Recent improvements in computer hardware, specifically multicore-CPU, will allow to fork processes of execution dedicated to long I/O operations (Gigabytes backups for examples) while continuing your main work.

The JDK7 NIO.2 add new capabilities for both file and socket operations. I decided to talk a little bit about this new feature as it attracted me at a glance. I expect a lot of feed backs on the subject as I have a hung that the underlying plumbing implements patterns like reactor/proactor. This is open for discussion, as my exploration started few hours ago. I will work on tests written in IntelliJ/maven/JUNIT environment supported by a JDK7 ea-b140 version. I will exercise File abstractions today.

JDK7 offers two styles of programming. The first way of firing asynchronous request, provides a reference to a now well known Future object, the exactly same as the one in the concurrent package. I was attracted novelty, so the second paradigm: the completion handler implementation.

The completion handler implements a callback approach (hello C++ :) ). A completion handler is regularly called back so to consume the result of an I/O operation.

I started my exploration, using a small read scenario. I wanted to read the content of a simple simple, check it has been fully loaded while tracing in the standard output its content.

The following is main test method:
@Test
public void completionHandler_WithTestFile_ShouldNotifyEndOfProcess()
throws IOException, InterruptedException {
try (final AsynchronousFileChannel channel =
withProvider()
.newAsynchronousFileChannel(
targetFile,
withOptions(),
newSingleThreadExecutor())
)
{
final ByteBuffer buffer = allocateDirect(512);
final CountDownLatch countDownLatch = new CountDownLatch(1);
final RunToCompletionHandler handler =
new RunToCompletionHandler(
channel, buffer, countDownLatch
);
channel.read(buffer, 0, allocateDirect(512), handler);
countDownLatch.await();
assertThat(handler.isComplete(), is(true));
}
}

The test code seems long but I tried to reduce the text width to avoid hazardous line return.
The asynchronous channel opening is achieved in a try-with-resource scope, so it will be automatically closed as a Closeable resource.
This channel is opened with standard open options:

private Set withOptions() {
final Set options = new TreeSet<>();
options.add(READ);
options.add(WRITE);
return options;
}

I basically used standard read/write options. An interesting point here is that you can provide the concurrent Executor service the underlying asynchronous mechanics can rely on. I chose a standard:

Executors.newSingleThreadExecutor()

implementation as provided by the concurrent framework.

The following code is standard NIO.1 data reading: I chose to allocate a buffer for reading:

final ByteBuffer buffer = allocateDirect(512);

and then starts the asynchronous reading with the completion handler I built:

channel.read(buffer, 0, allocateDirect(512), handler);

My main test thread can achieve whatever process required, while the file is asynchronously read. In order to wait peacefully and rejoin the end of execution of the asynchronous I/O process I let my test thread wait for a countdown latch:
countDownLatch.await();

The interesting part is in the completion handler (really it does remind me of the reactor or/and proactor pattern). The interface to implement is CompletionHandler, exposing to contract methods:

completed(final Integer result, final ByteBuffer buffer)
and

failed(final Throwable exc, final ByteBuffer attachment)


The names are self explanatory :) .


This is the way I implemented it:

final class RunToCompletionHandler implements CompletionHandler {
private volatile boolean completed;
private AsynchronousFileChannel channel;
private ByteBuffer destination;
private int alreadyRead;
private final CountDownLatch latch;

public RunToCompletionHandler(final AsynchronousFileChannel channel, final ByteBuffer buffer, CountDownLatch countDownLatch) {
super();
this.channel = channel;
destination = buffer;
latch = countDownLatch;
}

@Override
public void completed(final Integer result, final ByteBuffer buffer) {
if (result > 0) {
process(result, buffer);
} else {
end();
}
}

private void end() {
completed = true;
latch.countDown();
}

private void process(Integer result, ByteBuffer buffer) {
alreadyRead += result;
destination.rewind();
trace();
destination.flip();
reQueue(buffer);
}

private void reQueue(ByteBuffer buffer) {
channel.read(destination, alreadyRead, buffer, this);
}

private void trace() {
System.out.print(forName(encoding()).decode(destination));
}

private String encoding() {
return System.getProperty("file.encoding");
}

@Override
public void failed(final Throwable exc, final ByteBuffer attachment) {
latch.countDown();
}

public boolean isComplete() throws InterruptedException {
return completed;
}
}

On the completed operation I receive both the result of the operation (number of read byte for a file completion), and the object initially attached. I passed null (I had not need to identify my call back, there is only one reading in one asynchronous thread).

But what I do know on completion operation, when the result is greater than zero, is that I have filled my initial buffer. I stored it as an instance variable (destination), so I can trace its content, using the standard method proposed since the JDK 1.4:

private void process(Integer result, ByteBuffer buffer) {
alreadyRead += result;
destination.rewind();
trace();
destination.flip();
reQueue(buffer);
}
Otherwise, my operation is over and I can release my count down latch.
private void end() {
completed = true;
latch.countDown();
}

The most interesting is the reQueue invocation. Of course I have processed my filled destination buffer, but I have a hunch this is not the end of the story and there is more data to come, so the handler re-invoke a read operation with himself until there is no more data to read. Latch is released and test is over.
 private void reQueue(ByteBuffer buffer) {
channel.read(destination, alreadyRead, buffer, this);
}
Done. It really looks like a run to completion process (RTC), where the task re-enqueue itself till completion (Really nice description of RTC into Uncle Bob's Agile Software development book).

This is a pure abstract exercise so I decided to exercise a read/write of a reference file in another test class.

The starting test took the form:

  @Test
public void completionHandler_WithTestFile_ShouldNotifyEndOfProcess()
throws IOException, InterruptedException {
try (
final AsynchronousFileChannel sourceChannel
= withProvider().newAsynchronousFileChannel(
source, withReadOptions(), newSingleThreadExecutor());
final AsynchronousFileChannel targetChannel
= withProvider().newAsynchronousFileChannel(
target, withCreateOptions(), newSingleThreadExecutor())
)
{
final ByteBuffer buffer = allocateDirect(65536);
final CountDownLatch countDownLatch = new CountDownLatch(1);
final SourceCompletionHandler handler =
new SourceCompletionHandler(
sourceChannel, targetChannel, buffer, countDownLatch
);
sourceChannel.read(buffer, 0, null, handler);
countDownLatch.await();
}
}
were I opened two channels, each one ruled by separate single threaded pool (Yep a little lazy I know...) I am still using, the try-with-(closeable)-resource scope management.

But this time I use two completion handlers.
A reading completion handler which content is really close to the previous one:

final class SourceCompletionHandler
implements CompletionHandler {
private AsynchronousFileChannel channel;
private ByteBuffer destination;
private int alreadyRead;
private TargetCompletionHandler target;

public SourceCompletionHandler(
final AsynchronousFileChannel channel,
final AsynchronousFileChannel targetChannel,
final ByteBuffer buffer,
final CountDownLatch countDownLatch) {
super();
this.channel = channel;
destination = buffer;
target = new TargetCompletionHandler(targetChannel, countDownLatch);
}

@Override
public void completed(final Integer result, final ByteBuffer buffer) {
if (result > 0) {
process(result, buffer);
} else {
target.push(allocate(0));
}
}

private void process(Integer result, ByteBuffer buffer) {
alreadyRead += result;
destination.flip();
target.push(destination);
destination.rewind();
reQueue(buffer);
}

private void reQueue(ByteBuffer buffer) {
channel.read(destination, alreadyRead, buffer, this);
}

@Override
public void failed(final Throwable exc, final ByteBuffer attachment) {}
}
And a handler for the writing operation

final class TargetCompletionHandler implements CompletionHandler{
private final AsynchronousFileChannel channel;
private volatile long position;
private final CountDownLatch latch;

public TargetCompletionHandler(AsynchronousFileChannel targetChannel, CountDownLatch countDownLatch) {
super();
channel = targetChannel;
latch = countDownLatch;
}

@Override
public void completed(final Integer result, final ByteBuffer attachment) {
// System.out.println(currentThread().getName());
if (result == 0) {
latch.countDown();
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {latch.countDown();}

public void push(final ByteBuffer source) {
final ByteBuffer buffer = allocateDirect(source.limit()).put(source);
buffer.flip();
withChannel().write(buffer, position, null, this) ;
position += source.position();
}

private AsynchronousFileChannel withChannel() {
return channel;
}
}

The source handler asynchronously fire the write operation.

Once the test achieved, The copy is identical to the source. Done. I initiated an asynchronous file copy using publisher/ subscriber pattern between two asynchronous channels.
Of course there is no purpose into the test by itself except playing with the new API, and benches would show that this second exercise is certainly not as fast as directly copying the file with standard NIO.1 copy.... But it's fun ;) I welcome your comments of course.

Be seeing you !




Sunday, May 1, 2011

Starting with JDK7 NIO.2

Hurray, here it is starting withe the JDK7 NIO.2 API. Before trying the service watcher I wanted to start with the basics. My JDK is the ea-b140 version so a recent one. I was surprised to find samples based on slightly different API from the one I found in the version I downloaded. So this small kata became a real exploratory exercise.

As usual I started with TDD. My first shot was to try to create a simple file. Why not creating a directory where to play with my files ?
All starts in my test case with a canary test, checking that I did well in the setup (that's bad I did not use the JUNIT @Rule temporary file this time ;)).
    @Test
public void testDirectory_ShouldHaveBeenCreated() throws IOException {
assertThat(location, is(not(nullValue())));
final BasicFileAttributes attributes =
withProvider().readAttributes(location, BasicFileAttributes.class);
assertThat(attributes.isDirectory(), is(true));
}

The first assert checks that the directory location available during the upcoming test cases executions has been effectively created. Then using the new API, we check the BasicFileAttributes of the directory so to be sure it is really a directory.

Where does the provider come from ? From there:
    private FileSystemProvider withProvider() {
return fileSystem.provider();
}

where -quoting the javadoc -the file system object provides an interface to a file system and is the factory for objects to access files and other objects in the file system.

But in order to manipulate file system artifacts (creation, copy etc...), the file system instance is bound to a FileSystemProvider instance. A tool class Files is provided with static methods and mainly delegate to the FileSystemProvider instance. I have chosen to use one or the other,
depending on the situation.

A file system is obtained using a factory method in the generic FileSystems class. This is done
this way:
 fileSystem = FileSystems.getDefault();
In order to make the upper test comes to green (it needs to compile too), let's implement a setup
method:
        @Before
public void setup() throws IOException {
fileSystem = FileSystems.getDefault();
location = fileSystem.getPath(inUserHome(), forTests());
withProvider().createDirectory(location);
}
So as a rule of thumb, we use the filesystem instance to create a Path - the location - , the NIO.2 abstraction for a system path, not a file, but a more generic concept allowing to identify any
file system artifact.
We then use its bound provider to create effectively the directory.

Test is green.

Ok, then I want to create a file, and test it does exists. Using IntelliJ programming by intention facility, I wrote the following:
    @Test
public void resolve_WithTestingDirectory_ShouldHelpToCreateANewFile() throws IOException {
final Path newFile = location.resolve(newFileName());
assertThat(exists(newFile), is(not(true)));
try (final FileChannel toChannel = withProvider().newFileChannel(newFile, creationOptions())) {
write(toChannel, bufferThatRocks());
}
assertThat(exists(newFile), is(true));
}

private Set creationOptions() {
final Set options = new TreeSet<>();
options.add(StandardOpenOption.CREATE);
options.add(StandardOpenOption.WRITE);
return options;
}

private void write(final FileChannel channel, final ByteBuffer buf) throws IOException {
while (buf.hasRemaining()) {
channel.write(buf);
}
}

private ByteBuffer bufferThatRocks() {
final ByteBuffer buf = allocate(64);
buf.clear();
buf.put("Jdk7 rocks !!".getBytes());
buf.flip();
return buf;
}


Focusing on the test, I have chosen to use the resolve method of the Path class, to identify the up-to-be-created new file in the test location:
final Path newFile = location.resolve(newFileName());
Using the tools class Files, I invoke the exists method tho check that the file has not been created of course ! If you have heard of the coin project implementation in JDK 7, the following try scope
should not surprise you. In effect, to create the file I am going to open a file channel and write into its content. So to avoid code cluttering I will use the try-with-resource feature that will close the channel for me (compiler's my best friend).

But when you open a channel, you open it with a set of options, specifying your purpose. That's what I did! I specified I wanted to create the file and write in it:

private Set creationOptions() {
final Set options = new TreeSet<>();
options.add(StandardOpenOption.CREATE);
options.add(StandardOpenOption.WRITE);
return options;
}

As working on a standard Microsoft OS, I did not want to take risks and used StandardOpenOption enum constant for my options. The scope

 try (final FileChannel toChannel = withProvider().newFileChannel(newFile, creationOptions())){}

has been explained.


People who have been programming with NIO.1 will have recognized the standard code pattern for data writing into a file channel. I first create a "cool" byte buffer:
        private ByteBuffer bufferThatRocks() {
final ByteBuffer buf = allocate(64);
buf.clear();
buf.put("Jdk7 rocks !!".getBytes());
buf.flip();
return buf;
}
and then transfers the info into the channel:
    private void write(final FileChannel channel, final ByteBuffer buf) throws IOException {
while (buf.hasRemaining()) {
channel.write(buf);
}
}

Test green, cool! The file does exist.

What if I want to copy a file? Yet another test, very similar:
    @Test
public void copy_InTestingDirectory_ShouldCreateADuplicate() throws IOException {
final Path sourceFile = location.resolve(newFileName());
try (final FileChannel toChannel = withProvider().newFileChannel(sourceFile, creationOptions())){
write(toChannel, bufferThatRocks());
}
assertThat(exists(sourceFile), is(true));
final Path toFile = location.resolve(newFileName());
withProvider().copy(sourceFile, toFile);
assertThat(exists(toFile), is(true));
}

This time copy, is straight forward: I use the file system provider. You could have achieved the same with the copy method of the Files tool class.

Whats really striking, is the what I see when I gaze at my list of io import:
    import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.spi.FileSystemProvider;
import static java.nio.ByteBuffer.allocate;
import static java.nio.file.Files.exists;

There is only one reference to the java.io package. Everything was tested without referencing once the File object. Not difficult, smooth and nice. I hope it was helpful, although quite simple for most of you

Be seeing you !