Saturday, July 7, 2012

Serial Executor over a Shared Thread Pool

From time to time while developing server applications, I needed to process incoming messages, packets or events from many clients using a pool of threads. The messages from each client had to be processed one at a time and in the exact same order in which they were received. This sounds simple, and a very usual thing that may happen when you are working outside of the JEE stack. Nevertheless, we solved these situations using custom thread pools and queue handling that were implemented for the specific needs of the project.

Here is an attempt for creating a generic implementation for solving this kind of issue. This implementation reuse the Executor framework provided by Java and export the same Executor interface for submitting tasks of each client.

There are 2 classes, a SerialExecutor, named this way since it is presented like that in the Executor API documentation, and a Factory class, which is actually there to simplify the creation of SerialExecutors that share an underlying Executor.

You can get the full code in GitHub

Sample usage: This sample shows how the factory and the SerialExecutor can be used. In a real scenario, it is possible to have an association between the specific sources and their executors, so each incoming message from a given source can be scheduled for processing in its executor.
package org.muralx.concurrent.executor;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

public class SerialExecutorSample {
    /**
     * Number of tasks per serial executor
     */
    private static final int TASKS_COUNT = 1000;
    /**
     * Number of serial executors
     */
    private static final int EXECUTORS_COUNT = 200;
    /**
     * Keeps sum total for each serial executor
     */
    private static int[] totals = new int[EXECUTORS_COUNT];
    /**
     * The SerialExecutors we will use for scheduling tasks
     */
    private static ExecutorService[] executors = new ExecutorService[EXECUTORS_COUNT];

    public static void main(String[] args) throws InterruptedException {
        // Create the factory using the predefined values we know for TASKS and
        // EXECUTORS, and use 10 - 20 threads in the underlying
        // ThreadPoolExecutor
        SharedPoolSerialExecutorFactory factory = new SharedPoolSerialExecutorFactory(
                TASKS_COUNT, TASKS_COUNT, EXECUTORS_COUNT, 10, 20, 10,
                TimeUnit.SECONDS);

        // Create the executors
        for (int i = 0; i < EXECUTORS_COUNT; i++) {
            executors[i] = factory.createSerialExecutor();
        }

        // Submit the tasks to the proper executor
        for (int i = 0; i < EXECUTORS_COUNT; i++) {
            final int index = i;
            for (int j = 0; j < TASKS_COUNT; j++) {
                final int valueAdd = j;
                executors[i].execute(new Runnable() {
                    public void run() {
                        totals[index] += valueAdd;
                    }
                });
            }
        }

        // Wait for termination of all executors and print results
        for (int i = 0; i < EXECUTORS_COUNT; i++) {
            executors[i].shutdown();
            executors[i].awaitTermination(10000, TimeUnit.SECONDS);
            System.out.println(totals[i]);
        }

        // Shutdown underlying executor
        ((ExecutorService) factory.getUnderlyingExecutor()).shutdown();
    }
}

No comments:

Post a Comment