Sunday, May 8, 2011

Playing with concurrency in JDK7

As an addition to the existing concurrent package, the JDK7 comes with a Fork/Join framework. An originating article from Doug Lea about this implementation can be found here. Some algorithms solve problems using divide-and-conquer or recursive-data  patterns implementations, dividing the original problem into sub problems that can be handled independently. On each algorithm step, new independent tasks are forked to handle a sub problem and then joined once their own execution achieved. This kind of algorithm can be supported by a parallel framework implementation.
The fork/join process is repeated recursively till one task find the sub-problem small enough to  process it sequentially. The pattern of pseudo code found in each tasks, in charge of deciding to process or to fork the problem looks like:

def compute() {
        if (problem_is_small_enough()) {
            processSequentially();
        } else {
            divideAndConquer();
        }
    }

The divide and conquer pseudo code  then can be implemented as:

def divideAndConquer() {
           split_the_problem()
           fork(tasks)
           join
           merge(tasks)
     }


Doug Lea's and al. implementation of the framework uses an indirect task/thread mapping implementation so each thread takes in charge a double-ended queue of tasks but can also steals work from other threads queues when out of job. The article quoted at the beginning presents all the advantages of this approach.Specifically one should note that this indirect mapping prevent from clogging the system by heavy creation/destruction thread implementations as a limited number of threads can aggressively  work on set of tasks

The two important objects in the framework are the ForkJoinPool and the ForkJoinTask instances. The ForkJoinPool is an ExecutorService implementation of the concurrent API. It will provide the standard mechanisms of thread management and all the implementation of the work-stealing tactics. The ForkJoinTask is the light object impersonating the task that will be pushed onto the queue. This is where all the decisions concerning the fork/join strategy will be taken.

In a tribute to Doug Lea's genuine article I decided to compute some values from the Fibonacci function (Yep I like to compute the evolution of pairs of rabbits  :) )
In a idealistic world (check the previous link) you will find that the evolution of pair of rabbits can be computed versus the following formula:

f(n) = f(n - 1) + f(n -2)

In order to validate the algorithm versus tables of data I implemented the two followin tests:

@Test
    public void knownValues_ShouldBeFound() {
        assertThat(Fibonacci.processor().compute(0), is(equalTo(0L)));
        assertThat(Fibonacci.processor().compute(1), is(equalTo(1L)));
        assertThat(Fibonacci.processor().compute(2), is(equalTo(1L)));
        assertThat(Fibonacci.processor().compute(3), is(equalTo(2L)));
        assertThat(Fibonacci.processor().compute(8), is(equalTo(21L)));
    }

    @Test
    public void recursive_WithRabbitCouple_ShouldMatchExpectedResult() {
        final long fromStart = nanoTime();
        final long result = Fibonacci.processor().compute(47L);
        trace("rec: {0}", nanoTime(), fromStart);
        assertThat(result, is(equalTo(2971215073L)));
    }

The  first test looks like more  canary test allowing me to validate the algorithm  and the second test allow me to trace some time of execution for a big number.
The first class implemented, that allows to check make canary test green is

final class Fibonacci {
    private Fibonacci() {
        super();
    }

    static Fibonacci processor() {
        return new Fibonacci();
    }

    public long compute(final long upTo) {
        return sequence(upTo);
    }

    private long sequence(final long upTo) {
        if (upTo <= 1) {
            return upTo;
        } else {
        return sequence(upTo - 1) + sequence(upTo - 2);
        }
    }
}

So far so good, we are happy campers !
Then the big stuff, is writing the second test to challenge parallel computing. After some code exploration, here we go:

@Test
    public void forkJoin_WithRabbitCouple_ShouldMatchExpectedResult() {
        final ForkJoinPool pool = new ForkJoinPool();
        final FibonacciTask task = FibonacciTask.upTo(47L);
        final long fromStart = nanoTime();
        pool.invoke(task);
        trace("fj: {0}", nanoTime(), fromStart);
        assertThat(task.result(), is(equalTo(2971215073L)));
        pool.shutdown();
    }

We basically created a ForkJoinPool instance,  and invoked a new FibonacciTask. The invoke method returns upon completion. You might use the execute method to invoke asynchronously the task.

The real magic happens into the task her self. This task extends the abstract  ForkJoinTask template provided by the framework:

public class FibonacciTask extends ForkJoinTask<long>
This constrains us to implements the three following convenience method:

@Override
    public Long getRawResult() {
       return result;
    }

    @Override
    protected void setRawResult(final Long value) {
    }

    @Override
    protected boolean exec() {
        if (number < TRESHOLD) {
            result = sequence(number);
        } else {
            divideAndConquer();
        }
        return true;
    }

Reading the javacode of the xxxRawResult  worried me a little (I will welcome any advice). So I provided an additional method to grab the result:

long result() {
        return result;
    }
Of course the  exec() method is where we decide to compute serially or in parallel. You recognize the pattern. Here the size of the problem is really symbolized by a number we found small enough to start a sequential computation. Let's go to the divideAndConquer() method:

private void divideAndConquer() {
      final FibonacciTask previousTask = new FibonacciTask(number - 1);
      final FibonacciTask beforePreviousTask = new FibonacciTask(number - 2);
      invokeAll(previousTask, beforePreviousTask);
      merge(previousTask, beforePreviousTask);
  }

Once again we find the pattern of pseudo code presented at the beginning (remember ?).
We split the problem in two sub problems impersonated by tasks :

final FibonacciTask previousTask = new FibonacciTask(number - 1);
     final FibonacciTask beforePreviousTask = new FibonacciTask(number - 2);
We invoke and wait

invokeAll(previousTask, beforePreviousTask);
and we merge

merge(previousTask, beforePreviousTask);
What's left is the merge:

private void merge(
         final FibonacciTask previousTask, 
         final FibonacciTask beforePreviousTask
   ) {
        result = previousTask.result() + beforePreviousTask.result();
    }
And that's all folks !! The two parameters as specified in Doug Lea's article, is the threshold value which depends on the nature of the algorithm and the size of the ForkJoinPool. I chose to invoke the default constructor for the pool would create as many threads as found processors on my my little handbook.

As Amdahl's law expresses it, the serial parts of the program can drastically constrain the execution speed up. so the fork and then merge operations must be reduced to a minimum.

The execution of the tests provided the following results :

rec: 45 081
fj: 23 730

in ms

That is in concordance with Amdahl's law as I do have two cores, but the ratio is not exactly 2 because of the sequential parts. I found the optimum threshold value to be 21, versus 13 in Doug Lea's article but I did not challenge the program much.

In order to complete my exploration I decided to work on another sorting. I will go faster this time. I decided to work on time stamped object, that I would need to sort.

I imagined a times tamped object that could be sorted using the Comparable interface. So I designed the following tests:

public final class TimestampedObjectTest {

    @Test
    public void objectTimestampedAt_TestDate_ShouldNotBeNull() {
        assertThat(objectTimestampedAt(new Date()), is(not(nullValue())));
    }

    @Test
    public void compareTo_WithTwoSortedTestDate_ShouldMatch() {
        final Calendar calendar = Calendar.getInstance();
        final TimestampedObject objectOne = objectTimestampedAt(dateIn(calendar));
        update(calendar);
        final TimestampedObject objectTwo = objectTimestampedAt(dateIn(calendar));

        assertThat(objectOne.compareTo(objectTwo), is(lessThan(0)));
        assertThat(objectTwo.compareTo(objectOne), is(greaterThan(0)));
        assertThat(objectTwo.compareTo(objectTimestampedAt(dateIn(calendar))), is(equalTo(0)));
    }

    private Date dateIn(final Calendar calendar) {
        return calendar.getTime();
    }

    private void update(final Calendar calendar) {
        final long date = calendar.getTimeInMillis();
        calendar.setTimeInMillis(date + 1000);
    }

}

and the lead me to the following implementation (it was incremental of course :)) :

final class TimestampedObject implements Comparable<TimestampedObject>{
    private final Date date;

    private TimestampedObject(final Date forDate) {
        super();
        date =  forDate;
    }

    static TimestampedObject objectTimestampedAt(final Date date) {
        return new TimestampedObject(date);
    }

    @Override
    public int compareTo(final TimestampedObject other) {
        if (other == null) return 1;
        return date().compareTo(other.date());
    }

    private Date date() {
        assert date != null;
        return date;
    }
}

A piece of cake for the average developer.

So I imagined the following code to challenge the framework:

@Test
    public void forkJoin_WithZillionsOfTimestamps_ShouldBeSorted() {
        final List<TimestampedObject> listOfTimestampedObjects =
                    listOfTimestampedObjects();
        final TemporalSortTask sortingTask 
                    sortTaskFor(arrayFrom(listOfTimestampedObjects));
        final TimestampedObject[] result = executed(sortingTask);

        for (int i = 1, length = result.length; i < length; i++) {
          assertThat(result[i].compareTo(result[i - 1]), is(greaterThan(0)));
        }
    }

A TemporalSortTask will be executed in the ForkJoin framework, its mission being, sorting a big list
The list of timestamped objects is created using a calendar which time in milliseconds is incremented for each objects:

private List<TimestampedObject> listOfTimestampedObjects() {
        final List list = new LinkedList<>();
        final Calendar calendar = Calendar.getInstance();
        for (int i = 0; i < 100000; i++){
            list.add(objectTimestampedAt(dateIn(updated(calendar))));
        }
        shuffle(list);
        return list;
    }

    private Date dateIn(final Calendar calendar) {
        return calendar.getTime();
    }

    private Calendar updated(final Calendar calendar) {
        final long date = calendar.getTimeInMillis();
        calendar.setTimeInMillis(date + 1000);
        return calendar;
    }

objectTimestampedAt is a factory method of the TimestampedObject class. What interests us most is the central execute method of the test method:

private TimestampedObject[] executed(final TemporalSortTask sortingTask) {
        final ForkJoinPool pool = new ForkJoinPool();
        final long start = currentTimeMillis();
        pool.invoke(sortingTask);
        System.out.println(format("recursive {0}", currentTimeMillis() - start));
        final TimestampedObject[] result = sortingTask.result();
        pool.shutdown();
        return result;
    }
There we again create ForkJoinPool, letting the underlying implementation fix the number of threads. We fire the execution of a sorting task and return the result we have challenged in the test method.

The implementation of the TemporalSortTask under test surround the implementation of the exec() method:

@Override
    protected void compute() {
        if (size() < TRESHOLD) {
            process();
        } else {
            divideAndConquer();
        }
    }
Here the chosen threshold is the size of the array of objects we will be sorting.
The process sequential step, delegates sorting to the underlying JDK algorithm:

private void process() {
        arraycopy(fromSource(), start(), result, 0, size());
        sort(result, 0, size());
    }

While the divide and conquer method follows exactly the same pattern as in the previous example:

private void divideAndConquer() {
        final int midSize = size() / 2;

        final TemporalSortTask leftTask = 
            new TemporalSortTask(fromSource(), start(), start() + midSize);

        final TemporalSortTask rightTask = 
             new TemporalSortTask(fromSource(), start() + midSize, end());

        invokeAll(leftTask, rightTask);

        merge(leftTask.result(), rightTask.result());
    }

Where a left task and a right task have been created each taking in charge a problem of sorting divided by two in size. The trick here is that the source of time stamped objects is stored as an array which reference only is copied from tasks to task. The start/end bounds of the sort range are created for each new task. The result is stored into a result array of time stamped objects. The merge operation takes the results of the two sub-tasks and then rearrange them into a result array:

private void merge(
            final TimestampedObject[] leftTask, 
            final TimestampedObject[] rightTask
    ) {
        int resultIndex = 0;
        int leftIndex = 0;
        int rightIndex = 0;

        while((leftIndex < leftTask.length) 
                && (rightIndex < rightTask.length)) {
            result[resultIndex++] = 
                    leftTask[leftIndex].compareTo(rightTask[rightIndex]) < 0 ?
                    leftTask[leftIndex++] : rightTask[rightIndex++];
        }

        while (leftIndex < leftTask.length) {
            result[resultIndex++] = leftTask[leftIndex++];
        }
        while (rightIndex < rightTask.length) {
            result[resultIndex++] = rightTask[rightIndex++];
        }
    }
 
I won't detail it as it is a standard rearranging algorithm.

Once again Amdahl's law is verified, measured times for arrays of 500 000 objects  in ms provides the following results:

fork/join 337
sort 633

Well this one was too long. Hurray if you finally came to the end of the article. Be seeing you ! :)

0 comments:

Post a Comment