Here is an attempt for creating a generic implementation for solving this kind of issue. This implementation reuse the Executor framework provided by Java and export the same Executor interface for submitting tasks of each client.
There are 2 classes, a SerialExecutor, named this way since it is presented like that in the Executor API documentation, and a Factory class, which is actually there to simplify the creation of SerialExecutors that share an underlying Executor.
You can get the full code in GitHub
Sample usage: This sample shows how the factory and the SerialExecutor can be used. In a real scenario, it is possible to have an association between the specific sources and their executors, so each incoming message from a given source can be scheduled for processing in its executor.
package org.muralx.concurrent.executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; public class SerialExecutorSample { /** * Number of tasks per serial executor */ private static final int TASKS_COUNT = 1000; /** * Number of serial executors */ private static final int EXECUTORS_COUNT = 200; /** * Keeps sum total for each serial executor */ private static int[] totals = new int[EXECUTORS_COUNT]; /** * The SerialExecutors we will use for scheduling tasks */ private static ExecutorService[] executors = new ExecutorService[EXECUTORS_COUNT]; public static void main(String[] args) throws InterruptedException { // Create the factory using the predefined values we know for TASKS and // EXECUTORS, and use 10 - 20 threads in the underlying // ThreadPoolExecutor SharedPoolSerialExecutorFactory factory = new SharedPoolSerialExecutorFactory( TASKS_COUNT, TASKS_COUNT, EXECUTORS_COUNT, 10, 20, 10, TimeUnit.SECONDS); // Create the executors for (int i = 0; i < EXECUTORS_COUNT; i++) { executors[i] = factory.createSerialExecutor(); } // Submit the tasks to the proper executor for (int i = 0; i < EXECUTORS_COUNT; i++) { final int index = i; for (int j = 0; j < TASKS_COUNT; j++) { final int valueAdd = j; executors[i].execute(new Runnable() { public void run() { totals[index] += valueAdd; } }); } } // Wait for termination of all executors and print results for (int i = 0; i < EXECUTORS_COUNT; i++) { executors[i].shutdown(); executors[i].awaitTermination(10000, TimeUnit.SECONDS); System.out.println(totals[i]); } // Shutdown underlying executor ((ExecutorService) factory.getUnderlyingExecutor()).shutdown(); } }
No comments:
Post a Comment