2 Copyright (C) 2001, 2006 United States Government
3 as represented by the Administrator of the
4 National Aeronautics and Space Administration.
7 package gov
.nasa
.worldwind
.util
;
9 import gov
.nasa
.worldwind
.*;
10 import gov
.nasa
.worldwind
.avlist
.AVKey
;
11 import gov
.nasa
.worldwind
.exception
.WWDuplicateRequestException
;
13 import java
.util
.concurrent
.*;
17 * @version $Id: ThreadedTaskService.java 2471 2007-07-31 21:50:57Z tgaskins $
19 public class ThreadedTaskService
extends WWObjectImpl
implements TaskService
, Thread
.UncaughtExceptionHandler
21 static final private int DEFAULT_CORE_POOL_SIZE
= 1;
22 static final private int DEFAULT_QUEUE_SIZE
= 10;
23 private static final String RUNNING_THREAD_NAME_PREFIX
= Logging
.getMessage(
24 "ThreadedTaskService.RunningThreadNamePrefix");
25 private static final String IDLE_THREAD_NAME_PREFIX
= Logging
.getMessage(
26 "ThreadedTaskService.IdleThreadNamePrefix");
27 private ConcurrentLinkedQueue
<Runnable
> activeTasks
; // tasks currently allocated a thread
28 private TaskExecutor executor
; // thread pool for running retrievers
30 public ThreadedTaskService()
32 Integer poolSize
= Configuration
.getIntegerValue(AVKey
.TASK_POOL_SIZE
, DEFAULT_CORE_POOL_SIZE
);
33 Integer queueSize
= Configuration
.getIntegerValue(AVKey
.TASK_QUEUE_SIZE
, DEFAULT_QUEUE_SIZE
);
35 // this.executor runs the tasks, each in their own thread
36 this.executor
= new TaskExecutor(poolSize
, queueSize
);
38 // this.activeTasks holds the list of currently executing tasks
39 this.activeTasks
= new ConcurrentLinkedQueue
<Runnable
>();
42 public void shutdown(boolean immediately
)
45 this.executor
.shutdownNow();
47 this.executor
.shutdown();
49 this.activeTasks
.clear();
52 public void uncaughtException(Thread thread
, Throwable throwable
)
54 if (throwable
instanceof WWDuplicateRequestException
)
57 String message
= Logging
.getMessage("ThreadedTaskService.UncaughtExceptionDuringTask", thread
.getName());
58 Logging
.logger().fine(message
);
59 Thread
.currentThread().getThreadGroup().uncaughtException(thread
, throwable
);
62 private class TaskExecutor
extends ThreadPoolExecutor
64 private static final long THREAD_TIMEOUT
= 2; // keep idle threads alive this many seconds
66 private TaskExecutor(int poolSize
, int queueSize
)
68 super(poolSize
, poolSize
, THREAD_TIMEOUT
, TimeUnit
.SECONDS
,
69 new ArrayBlockingQueue
<Runnable
>(queueSize
),
72 public Thread
newThread(Runnable runnable
)
74 Thread thread
= new Thread(runnable
);
75 thread
.setDaemon(true);
76 thread
.setPriority(Thread
.MIN_PRIORITY
);
77 thread
.setUncaughtExceptionHandler(ThreadedTaskService
.this);
80 }, new ThreadPoolExecutor
.DiscardPolicy() // abandon task when queue is full
82 public void rejectedExecution(Runnable runnable
,
83 ThreadPoolExecutor threadPoolExecutor
)
85 // Interposes logging for rejected execution
86 String message
= Logging
.getMessage("ThreadedTaskService.ResourceRejected", runnable
);
87 Logging
.logger().fine(message
);
88 super.rejectedExecution(runnable
, threadPoolExecutor
);
93 protected void beforeExecute(Thread thread
, Runnable runnable
)
97 String msg
= Logging
.getMessage("nullValue.ThreadIsNull");
98 Logging
.logger().fine(msg
);
99 throw new IllegalArgumentException(msg
);
102 if (runnable
== null)
104 String msg
= Logging
.getMessage("nullValue.RunnableIsNull");
105 Logging
.logger().fine(msg
);
106 throw new IllegalArgumentException(msg
);
109 if (ThreadedTaskService
.this.activeTasks
.contains(runnable
))
111 String message
= Logging
.getMessage("ThreadedTaskService.CancellingDuplicateTask", runnable
);
112 Logging
.logger().finer(message
);
113 throw new WWDuplicateRequestException(message
);
116 ThreadedTaskService
.this.activeTasks
.add(runnable
);
118 if (RUNNING_THREAD_NAME_PREFIX
!= null)
119 thread
.setName(RUNNING_THREAD_NAME_PREFIX
+ runnable
);
120 thread
.setPriority(Thread
.MIN_PRIORITY
);
121 thread
.setUncaughtExceptionHandler(ThreadedTaskService
.this);
123 super.beforeExecute(thread
, runnable
);
126 protected void afterExecute(Runnable runnable
, Throwable throwable
)
128 if (runnable
== null)
130 String msg
= Logging
.getMessage("nullValue.RunnableIsNull");
131 Logging
.logger().fine(msg
);
132 throw new IllegalArgumentException(msg
);
135 super.afterExecute(runnable
, throwable
);
137 ThreadedTaskService
.this.activeTasks
.remove(runnable
);
139 if (throwable
== null && IDLE_THREAD_NAME_PREFIX
!= null)
140 Thread
.currentThread().setName(IDLE_THREAD_NAME_PREFIX
);
144 public synchronized boolean contains(Runnable runnable
)
146 //noinspection SimplifiableIfStatement
147 if (runnable
== null)
150 return (this.activeTasks
.contains(runnable
) || this.executor
.getQueue().contains(runnable
));
154 * Enqueues a task to run.
156 * @param runnable the task to add
157 * @throws IllegalArgumentException if <code>runnable</code> is null
159 public synchronized void addTask(Runnable runnable
)
161 if (runnable
== null)
163 String message
= Logging
.getMessage("nullValue.RunnableIsNull");
164 Logging
.logger().fine(message
);
165 throw new IllegalArgumentException(message
);
168 // Do not queue duplicates.
169 if (this.activeTasks
.contains(runnable
) || this.executor
.getQueue().contains(runnable
))
172 this.executor
.execute(runnable
);
175 public boolean isFull()
177 return this.executor
.getQueue().remainingCapacity() == 0;
180 public boolean hasActiveTasks()
182 Thread
[] threads
= new Thread
[Thread
.activeCount()];
183 int numThreads
= Thread
.enumerate(threads
);
184 for (int i
= 0; i
< numThreads
; i
++)
186 if (threads
[i
].getName().startsWith(RUNNING_THREAD_NAME_PREFIX
))