Brian's Waste of Time

Sat, 23 Sep 2006

Trying out Grizzly ARP

I had the opportunity, recently, to play with Grizzly and more specifically, their ARP . Along the way Jean-Francois Arcand was kind enough to give me enough pointers for something ~workable to emerge! Thank you!

Step one looked like:

package org.skife.grizzly;

import com.sun.enterprise.web.connector.grizzly.SelectorThread;
import java.io.IOException;

public class App
{
    public static void main(String[] args) throws IOException, 
                                                  InstantiationException
    {
        SelectorThread sel = new SelectorThread();
        sel.setPort(8000);

        // XXX

        sel.setDisplayConfiguration(true);
        sel.initEndpoint();
        sel.startEndpoint();
    }
}

Where I had no clue what to do in XXX. This sets up and starts the basic server, but it doesn't actually do anything (except accept connections). What I wanted was something like this:

def handle(request, responder)
    instructions = extractInstructions(request)

    LongRunningThing.submit(instructions, lambda(arg) {
        responder.submit(lambda(response) {
            response.write(arg.responseBody)
            response.complete()
        })
    });
end

The next step, then was to add the async processing filter. This is a filter on the execution chain not on the HTTP stream (such as servlet filters are). The code becomes:

public static void main(String[] args) throws IOException, InstantiationException
{
    SelectorThread sel = new SelectorThread();
    sel.setPort(8000);
    sel.setEnableAsyncExecution(true);

    AsyncHandler handler = new DefaultAsyncHandler();
    handler.addAsyncFilter(new AsyncFilter()
    {
        public boolean doFilter(AsyncExecutor executor)
        {
            return false;
        }
    });

    sel.setAsyncHandler(handler);

    sel.setDisplayConfiguration(true);
    sel.initEndpoint();
    sel.startEndpoint();
}

And I am stuck again :-) The AsyncFilter is passed the HTTP processing state, and a handle to the network event handling dodad. To do anything useful, we need to grab the next step in the chain and, er, delay it a bit. Our filter becomes:

handler.addAsyncFilter(new AsyncFilter()
{
    public boolean doFilter(final AsyncExecutor executor)
    {
        final AsyncTask asyncTask = executor.getAsyncTask();
        final AsyncHandler asyncHandler = executor.getAsyncHandler();

        final DefaultProcessorTask processorTask =
                (DefaultProcessorTask) executor.getAsyncTask().getProcessorTask();

        processorTask.getRequest().setAttribute(CALLBACK_KEY, new Runnable() {
            public void run()
            {
                asyncHandler.handle(asyncTask);
            }
        });
        processorTask.invokeAdapter();
        return false;
    }
});

Phew, a chunk of confusing code in there. I'll attempt to explain my understanding of it, and hope Jean-Francois will correct my glaring mistakes. He gave me the snippet that is the basis for this, and I don't full get it (not having fully driven through Grizzly's innards (yet)).

  1. First we grab the AsyncTask from the AsyncExecutor. This represents, I think, the next step in the chain. We hold onto that, we'll use it in just a moment.
  2. Next we grab the AsyncHandler. Not sure the details of this, but it seems to control the processing of the async tasks that make up the request processing life cycle. We'll be using this with the AsyncTask.
  3. We now get ahold of the ProcessorTask. This is the thing that is responsible for processing this specific request. From it we can obtain (and we do) the Request instance we are intercepting the processing chain of.
  4. Having grabbed the request, we add an attribute which is a callback that will "finish" the request processing. The callback, a Runnable in this case, just submits the AsyncTask we have obtained back for further execution. When this callback is invoked, the request processing will pick back up based on whatever scheduling algorithm is in use (just a queue, I believe). We store the callback on the request so that it will be available to our handler.
  5. Finally, we invoke the Adaptor for this processor task/request. The adaptor is, in a typical situation, the servlet container. In this case it is a custom adaptor as I didn't muck with a servlet container here. That would make it all more complicated because of servlet's unfortunate contract about committing responses when Servlet#service returns. I digress.
  6. Really finally, we return false from the filter to indicate that the process chain should not continue normally -- we signal that we are going to take care of things. Woot!

Okay, so we have stopped the normal HTTP processing chain for this request, an created a function that will let us resume it when we want to. Sweet! Now we need an Adapter so that we can actually do something with the response. We use a silly little adapter, like this:

private static class MyAdapter implements Adapter
{
    static byte[] CONTENT;
    {
        CONTENT = ByteChunk.convertToBytes("hello world!\n");
    }

    private static final ScheduledExecutorService exec = 
        Executors.newScheduledThreadPool(2);

    public void service(final Request req, final Response res) 
        throws Exception
    {
        exec.schedule(new Callable<Void>() {
            public Void call() throws Exception
            {
                res.setStatus(200);
                
                byte[] bytes = CONTENT;
                ByteChunk chunk = new ByteChunk();
                res.setContentLength(bytes.length);
                res.setContentType("text/plain");
                chunk.append(bytes, 0, bytes.length);
                res.doWrite(chunk);

                Runnable completion = 
                    (Runnable) req.getAttribute(CALLBACK_KEY);
                completion.run();
                return null;
            }
        }, 10, TimeUnit.SECONDS);
    }

    public void afterService(Request req, Response res) throws Exception
    {
        req.action(ActionCode.ACTION_POST_REQUEST , null);
    }

    public void fireAdapterEvent(String type, Object data)
    {
    }
}

Now, this is a Coyote Adapter, Tomcat style. In our adapter we do something fun, we immediately schedule a Callable to be invoked 10 seconds later! This just simulates some slow asynchronous process going on. The service method schedules this and returns. It will return back to our filter, via a couple intermediary returns, where the filter returns false to indicate that we are going to restart things later, when we feel like it.

Okay, so ten seconds later we set up our response with a chunk of bytes. I am assured that writing to the Response object at this point is okay. I want to dig in and see if I can get it to do chunked responses from here so as to be able to stream and lower the memory footprint, but that is for later. For now we just fire a small response. After setting up the response we pull the callback out of the request and invoke it.

Invoking the callback submits the async task we had earlier grabbed to be finished, and the response gets shoved down the wire at the next opportunity. Cool!

The final form looks like this:

package org.skife.grizzly;

import com.sun.enterprise.web.connector.grizzly.*;
import com.sun.enterprise.web.connector.grizzly.async.DefaultAsyncHandler;
import org.apache.coyote.Adapter;
import org.apache.coyote.Request;
import org.apache.coyote.Response;
import org.apache.coyote.ActionCode;
import org.apache.tomcat.util.buf.ByteChunk;

import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class App
{
    private static final String CALLBACK_KEY = "org.skife.grizzly.FINISH_REQUEST";

    public static void main(String[] args) throws IOException, InstantiationException
    {
        SelectorThread sel = new SelectorThread();
        sel.setPort(8000);

        sel.setAdapter(new MyAdapter());

        final AsyncHandler handler = new DefaultAsyncHandler();
        handler.addAsyncFilter(new AsyncFilter()
        {
            public boolean doFilter(final AsyncExecutor executor)
            {
                final AsyncTask asyncTask = executor.getAsyncTask();
                final AsyncHandler asyncHandler = executor.getAsyncHandler();

                final DefaultProcessorTask processorTask =
                    (DefaultProcessorTask) executor.getAsyncTask().getProcessorTask();
                processorTask.getRequest().setAttribute(CALLBACK_KEY, new Runnable() {
                    public void run()
                    {
                        asyncHandler.handle(asyncTask);
                    }
                });
                processorTask.invokeAdapter();
                return false;
            }
        });
        sel.setAsyncHandler(handler);
        sel.setEnableAsyncExecution(true);
        sel.setDisplayConfiguration(true);
        sel.initEndpoint();
        sel.startEndpoint();
    }

    private static class MyAdapter implements Adapter
    {
        static byte[] CONTENT;
        {
            CONTENT = ByteChunk.convertToBytes("hello world!\n");
        }

        private static final ScheduledExecutorService exec = 
            Executors.newScheduledThreadPool(2);

        public void service(final Request req, final Response res) 
            throws Exception
        {
            exec.schedule(new Callable<Void>() {
                public Void call() throws Exception
                {
                    res.setStatus(200);

                    byte[] bytes = CONTENT;
                    ByteChunk chunk = new ByteChunk();
                    res.setContentLength(bytes.length);
                    res.setContentType("text/plain");
                    chunk.append(bytes, 0, bytes.length);
                    res.doWrite(chunk);

                    Runnable completion = 
                        (Runnable) req.getAttribute(CALLBACK_KEY);
                    completion.run();
                    return null;
                }
            }, 10, TimeUnit.SECONDS);
        }

        public void afterService(Request req, Response res) throws Exception
        {
            req.action( ActionCode.ACTION_POST_REQUEST , null);
        }

        public void fireAdapterEvent(String type, Object data)
        {
        }
    }
}

It doesn't do a lot, but does demonstrate how it works, or at least a start at how it works. There are a number of things I'm not sure of yet, such as the reason for the specific Adapter#afterService setup (Jean-Francois said to do it, I haven't yet dug into why, though). Fun start, though!

3 writebacks [/src/java] permanent link