Tuesday, May 3, 2011

Callback handler in JDK7 NIO.2

So to quote the Wikipedia article about Asynchronous I/O : "Input and output (I/O) operations on a computer can be extremely slow compared to the processing of data." Recent improvements in computer hardware, specifically multicore-CPU, will allow to fork processes of execution dedicated to long I/O operations (Gigabytes backups for examples) while continuing your main work.

The JDK7 NIO.2 add new capabilities for both file and socket operations. I decided to talk a little bit about this new feature as it attracted me at a glance. I expect a lot of feed backs on the subject as I have a hung that the underlying plumbing implements patterns like reactor/proactor. This is open for discussion, as my exploration started few hours ago. I will work on tests written in IntelliJ/maven/JUNIT environment supported by a JDK7 ea-b140 version. I will exercise File abstractions today.

JDK7 offers two styles of programming. The first way of firing asynchronous request, provides a reference to a now well known Future object, the exactly same as the one in the concurrent package. I was attracted novelty, so the second paradigm: the completion handler implementation.

The completion handler implements a callback approach (hello C++ :) ). A completion handler is regularly called back so to consume the result of an I/O operation.

I started my exploration, using a small read scenario. I wanted to read the content of a simple simple, check it has been fully loaded while tracing in the standard output its content.

The following is main test method:
@Test
public void completionHandler_WithTestFile_ShouldNotifyEndOfProcess()
throws IOException, InterruptedException {
try (final AsynchronousFileChannel channel =
withProvider()
.newAsynchronousFileChannel(
targetFile,
withOptions(),
newSingleThreadExecutor())
)
{
final ByteBuffer buffer = allocateDirect(512);
final CountDownLatch countDownLatch = new CountDownLatch(1);
final RunToCompletionHandler handler =
new RunToCompletionHandler(
channel, buffer, countDownLatch
);
channel.read(buffer, 0, allocateDirect(512), handler);
countDownLatch.await();
assertThat(handler.isComplete(), is(true));
}
}

The test code seems long but I tried to reduce the text width to avoid hazardous line return.
The asynchronous channel opening is achieved in a try-with-resource scope, so it will be automatically closed as a Closeable resource.
This channel is opened with standard open options:

private Set withOptions() {
final Set options = new TreeSet<>();
options.add(READ);
options.add(WRITE);
return options;
}

I basically used standard read/write options. An interesting point here is that you can provide the concurrent Executor service the underlying asynchronous mechanics can rely on. I chose a standard:

Executors.newSingleThreadExecutor()

implementation as provided by the concurrent framework.

The following code is standard NIO.1 data reading: I chose to allocate a buffer for reading:

final ByteBuffer buffer = allocateDirect(512);

and then starts the asynchronous reading with the completion handler I built:

channel.read(buffer, 0, allocateDirect(512), handler);

My main test thread can achieve whatever process required, while the file is asynchronously read. In order to wait peacefully and rejoin the end of execution of the asynchronous I/O process I let my test thread wait for a countdown latch:
countDownLatch.await();

The interesting part is in the completion handler (really it does remind me of the reactor or/and proactor pattern). The interface to implement is CompletionHandler, exposing to contract methods:

completed(final Integer result, final ByteBuffer buffer)
and

failed(final Throwable exc, final ByteBuffer attachment)


The names are self explanatory :) .


This is the way I implemented it:

final class RunToCompletionHandler implements CompletionHandler {
private volatile boolean completed;
private AsynchronousFileChannel channel;
private ByteBuffer destination;
private int alreadyRead;
private final CountDownLatch latch;

public RunToCompletionHandler(final AsynchronousFileChannel channel, final ByteBuffer buffer, CountDownLatch countDownLatch) {
super();
this.channel = channel;
destination = buffer;
latch = countDownLatch;
}

@Override
public void completed(final Integer result, final ByteBuffer buffer) {
if (result > 0) {
process(result, buffer);
} else {
end();
}
}

private void end() {
completed = true;
latch.countDown();
}

private void process(Integer result, ByteBuffer buffer) {
alreadyRead += result;
destination.rewind();
trace();
destination.flip();
reQueue(buffer);
}

private void reQueue(ByteBuffer buffer) {
channel.read(destination, alreadyRead, buffer, this);
}

private void trace() {
System.out.print(forName(encoding()).decode(destination));
}

private String encoding() {
return System.getProperty("file.encoding");
}

@Override
public void failed(final Throwable exc, final ByteBuffer attachment) {
latch.countDown();
}

public boolean isComplete() throws InterruptedException {
return completed;
}
}

On the completed operation I receive both the result of the operation (number of read byte for a file completion), and the object initially attached. I passed null (I had not need to identify my call back, there is only one reading in one asynchronous thread).

But what I do know on completion operation, when the result is greater than zero, is that I have filled my initial buffer. I stored it as an instance variable (destination), so I can trace its content, using the standard method proposed since the JDK 1.4:

private void process(Integer result, ByteBuffer buffer) {
alreadyRead += result;
destination.rewind();
trace();
destination.flip();
reQueue(buffer);
}
Otherwise, my operation is over and I can release my count down latch.
private void end() {
completed = true;
latch.countDown();
}

The most interesting is the reQueue invocation. Of course I have processed my filled destination buffer, but I have a hunch this is not the end of the story and there is more data to come, so the handler re-invoke a read operation with himself until there is no more data to read. Latch is released and test is over.
 private void reQueue(ByteBuffer buffer) {
channel.read(destination, alreadyRead, buffer, this);
}
Done. It really looks like a run to completion process (RTC), where the task re-enqueue itself till completion (Really nice description of RTC into Uncle Bob's Agile Software development book).

This is a pure abstract exercise so I decided to exercise a read/write of a reference file in another test class.

The starting test took the form:

  @Test
public void completionHandler_WithTestFile_ShouldNotifyEndOfProcess()
throws IOException, InterruptedException {
try (
final AsynchronousFileChannel sourceChannel
= withProvider().newAsynchronousFileChannel(
source, withReadOptions(), newSingleThreadExecutor());
final AsynchronousFileChannel targetChannel
= withProvider().newAsynchronousFileChannel(
target, withCreateOptions(), newSingleThreadExecutor())
)
{
final ByteBuffer buffer = allocateDirect(65536);
final CountDownLatch countDownLatch = new CountDownLatch(1);
final SourceCompletionHandler handler =
new SourceCompletionHandler(
sourceChannel, targetChannel, buffer, countDownLatch
);
sourceChannel.read(buffer, 0, null, handler);
countDownLatch.await();
}
}
were I opened two channels, each one ruled by separate single threaded pool (Yep a little lazy I know...) I am still using, the try-with-(closeable)-resource scope management.

But this time I use two completion handlers.
A reading completion handler which content is really close to the previous one:

final class SourceCompletionHandler
implements CompletionHandler {
private AsynchronousFileChannel channel;
private ByteBuffer destination;
private int alreadyRead;
private TargetCompletionHandler target;

public SourceCompletionHandler(
final AsynchronousFileChannel channel,
final AsynchronousFileChannel targetChannel,
final ByteBuffer buffer,
final CountDownLatch countDownLatch) {
super();
this.channel = channel;
destination = buffer;
target = new TargetCompletionHandler(targetChannel, countDownLatch);
}

@Override
public void completed(final Integer result, final ByteBuffer buffer) {
if (result > 0) {
process(result, buffer);
} else {
target.push(allocate(0));
}
}

private void process(Integer result, ByteBuffer buffer) {
alreadyRead += result;
destination.flip();
target.push(destination);
destination.rewind();
reQueue(buffer);
}

private void reQueue(ByteBuffer buffer) {
channel.read(destination, alreadyRead, buffer, this);
}

@Override
public void failed(final Throwable exc, final ByteBuffer attachment) {}
}
And a handler for the writing operation

final class TargetCompletionHandler implements CompletionHandler{
private final AsynchronousFileChannel channel;
private volatile long position;
private final CountDownLatch latch;

public TargetCompletionHandler(AsynchronousFileChannel targetChannel, CountDownLatch countDownLatch) {
super();
channel = targetChannel;
latch = countDownLatch;
}

@Override
public void completed(final Integer result, final ByteBuffer attachment) {
// System.out.println(currentThread().getName());
if (result == 0) {
latch.countDown();
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {latch.countDown();}

public void push(final ByteBuffer source) {
final ByteBuffer buffer = allocateDirect(source.limit()).put(source);
buffer.flip();
withChannel().write(buffer, position, null, this) ;
position += source.position();
}

private AsynchronousFileChannel withChannel() {
return channel;
}
}

The source handler asynchronously fire the write operation.

Once the test achieved, The copy is identical to the source. Done. I initiated an asynchronous file copy using publisher/ subscriber pattern between two asynchronous channels.
Of course there is no purpose into the test by itself except playing with the new API, and benches would show that this second exercise is certainly not as fast as directly copying the file with standard NIO.1 copy.... But it's fun ;) I welcome your comments of course.

Be seeing you !




1 comments:

katharine said...

nice article about the NIO! although I did not see a bigger advantage between future/callable vs NIO/Callback

Post a Comment