The fork/join process is repeated recursively till one task find the sub-problem small enough to process it sequentially. The pattern of pseudo code found in each tasks, in charge of deciding to process or to fork the problem looks like:
def compute() {
if (problem_is_small_enough()) {
processSequentially();
} else {
divideAndConquer();
}
}
The divide and conquer pseudo code then can be implemented as:
def divideAndConquer() {
split_the_problem()
fork(tasks)
join
merge(tasks)
}
Doug Lea's and al. implementation of the framework uses an indirect task/thread mapping implementation so each thread takes in charge a double-ended queue of tasks but can also steals work from other threads queues when out of job. The article quoted at the beginning presents all the advantages of this approach.Specifically one should note that this indirect mapping prevent from clogging the system by heavy creation/destruction thread implementations as a limited number of threads can aggressively work on set of tasks
The two important objects in the framework are the ForkJoinPool and the ForkJoinTask instances. The ForkJoinPool is an ExecutorService implementation of the concurrent API. It will provide the standard mechanisms of thread management and all the implementation of the work-stealing tactics. The ForkJoinTask is the light object impersonating the task that will be pushed onto the queue. This is where all the decisions concerning the fork/join strategy will be taken.
In a tribute to Doug Lea's genuine article I decided to compute some values from the Fibonacci function (Yep I like to compute the evolution of pairs of rabbits :) )
In a idealistic world (check the previous link) you will find that the evolution of pair of rabbits can be computed versus the following formula:
f(n) = f(n - 1) + f(n -2)
In order to validate the algorithm versus tables of data I implemented the two followin tests:
@Test
public void knownValues_ShouldBeFound() {
assertThat(Fibonacci.processor().compute(0), is(equalTo(0L)));
assertThat(Fibonacci.processor().compute(1), is(equalTo(1L)));
assertThat(Fibonacci.processor().compute(2), is(equalTo(1L)));
assertThat(Fibonacci.processor().compute(3), is(equalTo(2L)));
assertThat(Fibonacci.processor().compute(8), is(equalTo(21L)));
}
@Test
public void recursive_WithRabbitCouple_ShouldMatchExpectedResult() {
final long fromStart = nanoTime();
final long result = Fibonacci.processor().compute(47L);
trace("rec: {0}", nanoTime(), fromStart);
assertThat(result, is(equalTo(2971215073L)));
}
The first test looks like more canary test allowing me to validate the algorithm and the second test allow me to trace some time of execution for a big number.
The first class implemented, that allows to check make canary test green is
final class Fibonacci {
private Fibonacci() {
super();
}
static Fibonacci processor() {
return new Fibonacci();
}
public long compute(final long upTo) {
return sequence(upTo);
}
private long sequence(final long upTo) {
if (upTo <= 1) {
return upTo;
} else {
return sequence(upTo - 1) + sequence(upTo - 2);
}
}
}
So far so good, we are happy campers !
Then the big stuff, is writing the second test to challenge parallel computing. After some code exploration, here we go:
@Test
public void forkJoin_WithRabbitCouple_ShouldMatchExpectedResult() {
final ForkJoinPool pool = new ForkJoinPool();
final FibonacciTask task = FibonacciTask.upTo(47L);
final long fromStart = nanoTime();
pool.invoke(task);
trace("fj: {0}", nanoTime(), fromStart);
assertThat(task.result(), is(equalTo(2971215073L)));
pool.shutdown();
}
We basically created a ForkJoinPool instance, and invoked a new FibonacciTask. The invoke method returns upon completion. You might use the execute method to invoke asynchronously the task.
The real magic happens into the task her self. This task extends the abstract ForkJoinTask template provided by the framework:
public class FibonacciTask extends ForkJoinTask<long>This constrains us to implements the three following convenience method:
@Override
public Long getRawResult() {
return result;
}
@Override
protected void setRawResult(final Long value) {
}
@Override
protected boolean exec() {
if (number < TRESHOLD) {
result = sequence(number);
} else {
divideAndConquer();
}
return true;
}Reading the javacode of the xxxRawResult worried me a little (I will welcome any advice). So I provided an additional method to grab the result:
long result() {
return result;
}
Of course the exec() method is where we decide to compute serially or in parallel. You recognize the pattern. Here the size of the problem is really symbolized by a number we found small enough to start a sequential computation. Let's go to the divideAndConquer() method:private void divideAndConquer() {
final FibonacciTask previousTask = new FibonacciTask(number - 1);
final FibonacciTask beforePreviousTask = new FibonacciTask(number - 2);
invokeAll(previousTask, beforePreviousTask);
merge(previousTask, beforePreviousTask);
}
Once again we find the pattern of pseudo code presented at the beginning (remember ?).We split the problem in two sub problems impersonated by tasks :
final FibonacciTask previousTask = new FibonacciTask(number - 1);
final FibonacciTask beforePreviousTask = new FibonacciTask(number - 2);
We invoke and waitinvokeAll(previousTask, beforePreviousTask);and we merge
merge(previousTask, beforePreviousTask);What's left is the merge:
private void merge(
final FibonacciTask previousTask,
final FibonacciTask beforePreviousTask
) {
result = previousTask.result() + beforePreviousTask.result();
}
And that's all folks !! The two parameters as specified in Doug Lea's article, is the threshold value which depends on the nature of the algorithm and the size of the ForkJoinPool. I chose to invoke the default constructor for the pool would create as many threads as found processors on my my little handbook.As Amdahl's law expresses it, the serial parts of the program can drastically constrain the execution speed up. so the fork and then merge operations must be reduced to a minimum.
The execution of the tests provided the following results :
rec: 45 081 fj: 23 730
in ms
In order to complete my exploration I decided to work on another sorting. I will go faster this time. I decided to work on time stamped object, that I would need to sort.
I imagined a times tamped object that could be sorted using the Comparable interface. So I designed the following tests:
public final class TimestampedObjectTest {
@Test
public void objectTimestampedAt_TestDate_ShouldNotBeNull() {
assertThat(objectTimestampedAt(new Date()), is(not(nullValue())));
}
@Test
public void compareTo_WithTwoSortedTestDate_ShouldMatch() {
final Calendar calendar = Calendar.getInstance();
final TimestampedObject objectOne = objectTimestampedAt(dateIn(calendar));
update(calendar);
final TimestampedObject objectTwo = objectTimestampedAt(dateIn(calendar));
assertThat(objectOne.compareTo(objectTwo), is(lessThan(0)));
assertThat(objectTwo.compareTo(objectOne), is(greaterThan(0)));
assertThat(objectTwo.compareTo(objectTimestampedAt(dateIn(calendar))), is(equalTo(0)));
}
private Date dateIn(final Calendar calendar) {
return calendar.getTime();
}
private void update(final Calendar calendar) {
final long date = calendar.getTimeInMillis();
calendar.setTimeInMillis(date + 1000);
}
}
and the lead me to the following implementation (it was incremental of course :)) :
final class TimestampedObject implements Comparable<TimestampedObject>{
private final Date date;
private TimestampedObject(final Date forDate) {
super();
date = forDate;
}
static TimestampedObject objectTimestampedAt(final Date date) {
return new TimestampedObject(date);
}
@Override
public int compareTo(final TimestampedObject other) {
if (other == null) return 1;
return date().compareTo(other.date());
}
private Date date() {
assert date != null;
return date;
}
}
A piece of cake for the average developer.
So I imagined the following code to challenge the framework:
@Test
public void forkJoin_WithZillionsOfTimestamps_ShouldBeSorted() {
final List<TimestampedObject> listOfTimestampedObjects =
listOfTimestampedObjects();
final TemporalSortTask sortingTask
sortTaskFor(arrayFrom(listOfTimestampedObjects));
final TimestampedObject[] result = executed(sortingTask);
for (int i = 1, length = result.length; i < length; i++) {
assertThat(result[i].compareTo(result[i - 1]), is(greaterThan(0)));
}
}
A TemporalSortTask will be executed in the ForkJoin framework, its mission being, sorting a big listThe list of timestamped objects is created using a calendar which time in milliseconds is incremented for each objects:
private List<TimestampedObject> listOfTimestampedObjects() {
final List list = new LinkedList<>();
final Calendar calendar = Calendar.getInstance();
for (int i = 0; i < 100000; i++){
list.add(objectTimestampedAt(dateIn(updated(calendar))));
}
shuffle(list);
return list;
}
private Date dateIn(final Calendar calendar) {
return calendar.getTime();
}
private Calendar updated(final Calendar calendar) {
final long date = calendar.getTimeInMillis();
calendar.setTimeInMillis(date + 1000);
return calendar;
}
objectTimestampedAt is a factory method of the TimestampedObject class. What interests us most is the central execute method of the test method:
private TimestampedObject[] executed(final TemporalSortTask sortingTask) {
final ForkJoinPool pool = new ForkJoinPool();
final long start = currentTimeMillis();
pool.invoke(sortingTask);
System.out.println(format("recursive {0}", currentTimeMillis() - start));
final TimestampedObject[] result = sortingTask.result();
pool.shutdown();
return result;
}
There we again create ForkJoinPool, letting the underlying implementation fix the number of threads. We fire the execution of a sorting task and return the result we have challenged in the test method.The implementation of the TemporalSortTask under test surround the implementation of the exec() method:
@Override
protected void compute() {
if (size() < TRESHOLD) {
process();
} else {
divideAndConquer();
}
}
Here the chosen threshold is the size of the array of objects we will be sorting.The process sequential step, delegates sorting to the underlying JDK algorithm:
private void process() {
arraycopy(fromSource(), start(), result, 0, size());
sort(result, 0, size());
}
While the divide and conquer method follows exactly the same pattern as in the previous example:
private void divideAndConquer() {
final int midSize = size() / 2;
final TemporalSortTask leftTask =
new TemporalSortTask(fromSource(), start(), start() + midSize);
final TemporalSortTask rightTask =
new TemporalSortTask(fromSource(), start() + midSize, end());
invokeAll(leftTask, rightTask);
merge(leftTask.result(), rightTask.result());
}
Where a left task and a right task have been created each taking in charge a problem of sorting divided by two in size. The trick here is that the source of time stamped objects is stored as an array which reference only is copied from tasks to task. The start/end bounds of the sort range are created for each new task. The result is stored into a result array of time stamped objects. The merge operation takes the results of the two sub-tasks and then rearrange them into a result array:
private void merge(
final TimestampedObject[] leftTask,
final TimestampedObject[] rightTask
) {
int resultIndex = 0;
int leftIndex = 0;
int rightIndex = 0;
while((leftIndex < leftTask.length)
&& (rightIndex < rightTask.length)) {
result[resultIndex++] =
leftTask[leftIndex].compareTo(rightTask[rightIndex]) < 0 ?
leftTask[leftIndex++] : rightTask[rightIndex++];
}
while (leftIndex < leftTask.length) {
result[resultIndex++] = leftTask[leftIndex++];
}
while (rightIndex < rightTask.length) {
result[resultIndex++] = rightTask[rightIndex++];
}
}
I won't detail it as it is a standard rearranging algorithm.Once again Amdahl's law is verified, measured times for arrays of 500 000 objects in ms provides the following results:
fork/join 337 sort 633
Well this one was too long. Hurray if you finally came to the end of the article. Be seeing you ! :)
0 comments:
Post a Comment