Updated to worldwind release 20070817
[worldwind-tracker.git] / gov / nasa / worldwind / util / ThreadedTaskService.java
blobf4575e7477cecf0ed25901d79222c6bd0a268130
1 /*
2 Copyright (C) 2001, 2006 United States Government
3 as represented by the Administrator of the
4 National Aeronautics and Space Administration.
5 All Rights Reserved.
6 */
7 package gov.nasa.worldwind.util;
9 import gov.nasa.worldwind.*;
10 import gov.nasa.worldwind.avlist.AVKey;
11 import gov.nasa.worldwind.exception.WWDuplicateRequestException;
13 import java.util.concurrent.*;
15 /**
16 * @author Tom Gaskins
17 * @version $Id: ThreadedTaskService.java 2471 2007-07-31 21:50:57Z tgaskins $
19 public class ThreadedTaskService extends WWObjectImpl implements TaskService, Thread.UncaughtExceptionHandler
21 static final private int DEFAULT_CORE_POOL_SIZE = 1;
22 static final private int DEFAULT_QUEUE_SIZE = 10;
23 private static final String RUNNING_THREAD_NAME_PREFIX = Logging.getMessage(
24 "ThreadedTaskService.RunningThreadNamePrefix");
25 private static final String IDLE_THREAD_NAME_PREFIX = Logging.getMessage(
26 "ThreadedTaskService.IdleThreadNamePrefix");
27 private ConcurrentLinkedQueue<Runnable> activeTasks; // tasks currently allocated a thread
28 private TaskExecutor executor; // thread pool for running retrievers
30 public ThreadedTaskService()
32 Integer poolSize = Configuration.getIntegerValue(AVKey.TASK_POOL_SIZE, DEFAULT_CORE_POOL_SIZE);
33 Integer queueSize = Configuration.getIntegerValue(AVKey.TASK_QUEUE_SIZE, DEFAULT_QUEUE_SIZE);
35 // this.executor runs the tasks, each in their own thread
36 this.executor = new TaskExecutor(poolSize, queueSize);
38 // this.activeTasks holds the list of currently executing tasks
39 this.activeTasks = new ConcurrentLinkedQueue<Runnable>();
42 public void shutdown(boolean immediately)
44 if (immediately)
45 this.executor.shutdownNow();
46 else
47 this.executor.shutdown();
49 this.activeTasks.clear();
52 public void uncaughtException(Thread thread, Throwable throwable)
54 if (throwable instanceof WWDuplicateRequestException)
55 return;
57 String message = Logging.getMessage("ThreadedTaskService.UncaughtExceptionDuringTask", thread.getName());
58 Logging.logger().fine(message);
59 Thread.currentThread().getThreadGroup().uncaughtException(thread, throwable);
62 private class TaskExecutor extends ThreadPoolExecutor
64 private static final long THREAD_TIMEOUT = 2; // keep idle threads alive this many seconds
66 private TaskExecutor(int poolSize, int queueSize)
68 super(poolSize, poolSize, THREAD_TIMEOUT, TimeUnit.SECONDS,
69 new ArrayBlockingQueue<Runnable>(queueSize),
70 new ThreadFactory()
72 public Thread newThread(Runnable runnable)
74 Thread thread = new Thread(runnable);
75 thread.setDaemon(true);
76 thread.setPriority(Thread.MIN_PRIORITY);
77 thread.setUncaughtExceptionHandler(ThreadedTaskService.this);
78 return thread;
80 }, new ThreadPoolExecutor.DiscardPolicy() // abandon task when queue is full
82 public void rejectedExecution(Runnable runnable,
83 ThreadPoolExecutor threadPoolExecutor)
85 // Interposes logging for rejected execution
86 String message = Logging.getMessage("ThreadedTaskService.ResourceRejected", runnable);
87 Logging.logger().fine(message);
88 super.rejectedExecution(runnable, threadPoolExecutor);
90 });
93 protected void beforeExecute(Thread thread, Runnable runnable)
95 if (thread == null)
97 String msg = Logging.getMessage("nullValue.ThreadIsNull");
98 Logging.logger().fine(msg);
99 throw new IllegalArgumentException(msg);
102 if (runnable == null)
104 String msg = Logging.getMessage("nullValue.RunnableIsNull");
105 Logging.logger().fine(msg);
106 throw new IllegalArgumentException(msg);
109 if (ThreadedTaskService.this.activeTasks.contains(runnable))
111 String message = Logging.getMessage("ThreadedTaskService.CancellingDuplicateTask", runnable);
112 Logging.logger().finer(message);
113 throw new WWDuplicateRequestException(message);
116 ThreadedTaskService.this.activeTasks.add(runnable);
118 if (RUNNING_THREAD_NAME_PREFIX != null)
119 thread.setName(RUNNING_THREAD_NAME_PREFIX + runnable);
120 thread.setPriority(Thread.MIN_PRIORITY);
121 thread.setUncaughtExceptionHandler(ThreadedTaskService.this);
123 super.beforeExecute(thread, runnable);
126 protected void afterExecute(Runnable runnable, Throwable throwable)
128 if (runnable == null)
130 String msg = Logging.getMessage("nullValue.RunnableIsNull");
131 Logging.logger().fine(msg);
132 throw new IllegalArgumentException(msg);
135 super.afterExecute(runnable, throwable);
137 ThreadedTaskService.this.activeTasks.remove(runnable);
139 if (throwable == null && IDLE_THREAD_NAME_PREFIX != null)
140 Thread.currentThread().setName(IDLE_THREAD_NAME_PREFIX);
144 public synchronized boolean contains(Runnable runnable)
146 //noinspection SimplifiableIfStatement
147 if (runnable == null)
148 return false;
150 return (this.activeTasks.contains(runnable) || this.executor.getQueue().contains(runnable));
154 * Enqueues a task to run.
156 * @param runnable the task to add
157 * @throws IllegalArgumentException if <code>runnable</code> is null
159 public synchronized void addTask(Runnable runnable)
161 if (runnable == null)
163 String message = Logging.getMessage("nullValue.RunnableIsNull");
164 Logging.logger().fine(message);
165 throw new IllegalArgumentException(message);
168 // Do not queue duplicates.
169 if (this.activeTasks.contains(runnable) || this.executor.getQueue().contains(runnable))
170 return;
172 this.executor.execute(runnable);
175 public boolean isFull()
177 return this.executor.getQueue().remainingCapacity() == 0;
180 public boolean hasActiveTasks()
182 Thread[] threads = new Thread[Thread.activeCount()];
183 int numThreads = Thread.enumerate(threads);
184 for (int i = 0; i < numThreads; i++)
186 if (threads[i].getName().startsWith(RUNNING_THREAD_NAME_PREFIX))
187 return true;
189 return false;