2 Copyright (C) 2001, 2006 United States Government
3 as represented by the Administrator of the
4 National Aeronautics and Space Administration.
7 package gov
.nasa
.worldwind
.retrieve
;
9 import gov
.nasa
.worldwind
.*;
10 import gov
.nasa
.worldwind
.avlist
.AVKey
;
11 import gov
.nasa
.worldwind
.util
.Logging
;
13 import java
.net
.SocketTimeoutException
;
14 import java
.util
.concurrent
.*;
15 import java
.util
.logging
.Level
;
18 * Performs threaded retrieval of data.
21 * @version $Id: BasicRetrievalService.java 2471 2007-07-31 21:50:57Z tgaskins $
23 public final class BasicRetrievalService
extends WWObjectImpl
24 implements RetrievalService
, Thread
.UncaughtExceptionHandler
26 // These constants are last-ditch values in case Configuration lacks defaults
27 private static final int DEFAULT_QUEUE_SIZE
= 100;
28 private static final int DEFAULT_POOL_SIZE
= 5;
29 private static final long DEFAULT_STALE_REQUEST_LIMIT
= 30000; // milliseconds
30 private static final int DEFAULT_TIME_PRIORITY_GRANULARITY
= 500; // milliseconds
32 private static final String RUNNING_THREAD_NAME_PREFIX
= Logging
.getMessage(
33 "BasicRetrievalService.RunningThreadNamePrefix");
34 private static final String IDLE_THREAD_NAME_PREFIX
= Logging
.getMessage(
35 "BasicRetrievalService.IdleThreadNamePrefix");
37 private RetrievalExecutor executor
; // thread pool for running retrievers
38 private ConcurrentLinkedQueue
<RetrievalTask
> activeTasks
; // tasks currently allocated a thread
39 private int queueSize
; // maximum queue size
42 * Encapsulates a single threaded retrieval as a {@link java.util.concurrent.FutureTask}.
44 private static class RetrievalTask
extends FutureTask
<Retriever
>
45 implements RetrievalFuture
, Comparable
<RetrievalTask
>
47 private Retriever retriever
;
48 private double priority
; // retrieval secondary priority (primary priority is submit time)
50 private RetrievalTask(Retriever retriever
, double priority
)
53 this.retriever
= retriever
;
54 this.priority
= priority
;
57 public double getPriority()
62 public Retriever
getRetriever()
64 return this.retriever
;
70 if (this.isDone() || this.isCancelled())
77 * @param that the task to compare with this one
78 * @return 0 if task priorities are equal, -1 if priority of this is less than that, 1 otherwise
79 * @throws IllegalArgumentException if <code>that</code> is null
81 public int compareTo(RetrievalTask that
)
85 String msg
= Logging
.getMessage("nullValue.RetrieverIsNull");
86 Logging
.logger().fine(msg
);
87 throw new IllegalArgumentException(msg
);
90 if (this.priority
> 0 && that
.priority
> 0) // only secondary priority used if either is negative
92 // Requests submitted within different time-granularity periods are ordered exclusive of their
93 // client-specified priority.
94 long now
= System
.currentTimeMillis();
95 long thisElapsedTime
= now
- this.retriever
.getSubmitTime();
96 long thatElapsedTime
= now
- that
.retriever
.getSubmitTime();
97 if (((thisElapsedTime
- thatElapsedTime
) / DEFAULT_TIME_PRIORITY_GRANULARITY
) != 0)
98 return thisElapsedTime
< thatElapsedTime ?
-1 : 1;
101 // The client-pecified priority is compared for requests submitted within the same granularity period.
102 return this.priority
== that
.priority ?
0 : this.priority
< that
.priority ?
-1 : 1;
105 public boolean equals(Object o
)
109 if (o
== null || getClass() != o
.getClass())
112 final RetrievalTask that
= (RetrievalTask
) o
;
114 // Tasks are equal if their retrievers are equivalent
115 return this.retriever
.equals(that
.retriever
);
116 // Priority and submint time are not factors in equality
119 public int hashCode()
121 return this.retriever
.getName().hashCode();
125 public void uncaughtException(Thread thread
, Throwable throwable
)
127 Logging
.logger().fine(Logging
.getMessage("BasicRetrievalService.UncaughtExceptionDuringRetrieval",
131 private class RetrievalExecutor
extends ThreadPoolExecutor
133 private static final long THREAD_TIMEOUT
= 2; // keep idle threads alive this many seconds
134 private long staleRequestLimit
; // reject requests older than this
136 private RetrievalExecutor(int poolSize
, int queueSize
)
138 super(poolSize
, poolSize
, THREAD_TIMEOUT
, TimeUnit
.SECONDS
, new PriorityBlockingQueue
<Runnable
>(queueSize
),
141 public Thread
newThread(Runnable runnable
)
143 Thread thread
= new Thread(runnable
);
144 thread
.setDaemon(true);
145 thread
.setPriority(Thread
.MIN_PRIORITY
);
146 thread
.setUncaughtExceptionHandler(BasicRetrievalService
.this);
149 }, new ThreadPoolExecutor
.DiscardPolicy() // abandon task when queue is full
151 // This listener is invoked only when the executor queue is a bounded queue and runs out of room.
152 // If the queue is a java.util.concurrent.PriorityBlockingQueue, this listener is never invoked.
153 public void rejectedExecution(Runnable runnable
, ThreadPoolExecutor threadPoolExecutor
)
155 // Interposes logging for rejected execution
156 Logging
.logger().finer(Logging
.getMessage("BasicRetrievalService.ResourceRejected",
157 ((RetrievalTask
) runnable
).getRetriever().getName()));
159 super.rejectedExecution(runnable
, threadPoolExecutor
);
163 this.staleRequestLimit
= Configuration
.getLongValue(AVKey
.RETRIEVAL_QUEUE_STALE_REQUEST_LIMIT
,
164 DEFAULT_STALE_REQUEST_LIMIT
);
168 * @param thread the thread the task is running on
169 * @param runnable the <code>Retriever</code> running on the thread
170 * @throws IllegalArgumentException if either <code>thread</code> or <code>runnable</code> is null
172 protected void beforeExecute(Thread thread
, Runnable runnable
)
176 String msg
= Logging
.getMessage("nullValue.ThreadIsNull");
177 Logging
.logger().fine(msg
);
178 throw new IllegalArgumentException(msg
);
180 if (runnable
== null)
182 String msg
= Logging
.getMessage("nullValue.RunnableIsNull");
183 Logging
.logger().fine(msg
);
184 throw new IllegalArgumentException(msg
);
187 RetrievalTask task
= (RetrievalTask
) runnable
;
189 task
.retriever
.setBeginTime(System
.currentTimeMillis());
190 long limit
= task
.retriever
.getStaleRequestLimit() >= 0
191 ? task
.retriever
.getStaleRequestLimit() : this.staleRequestLimit
;
192 if (task
.retriever
.getBeginTime() - task
.retriever
.getSubmitTime() > limit
)
194 // Task has been sitting on the queue too long
195 Logging
.logger().finer(Logging
.getMessage("BasicRetrievalService.CancellingTooOldRetrieval",
196 task
.getRetriever().getName()));
200 if (BasicRetrievalService
.this.activeTasks
.contains(task
))
202 // Task is a duplicate
203 Logging
.logger().finer(Logging
.getMessage("BasicRetrievalService.CancellingDuplicateRetrieval",
204 task
.getRetriever().getName()));
208 BasicRetrievalService
.this.activeTasks
.add(task
);
210 thread
.setName(RUNNING_THREAD_NAME_PREFIX
+ task
.getRetriever().getName());
211 thread
.setPriority(Thread
.MIN_PRIORITY
); // Subordinate thread priority to rendering
212 thread
.setUncaughtExceptionHandler(BasicRetrievalService
.this);
214 super.beforeExecute(thread
, runnable
);
218 * @param runnable the <code>Retriever</code> running on the thread
219 * @param throwable an exception thrown during retrieval, will be null if no exception occurred
220 * @throws IllegalArgumentException if <code>runnable</code> is null
222 protected void afterExecute(Runnable runnable
, Throwable throwable
)
224 if (runnable
== null)
226 String msg
= Logging
.getMessage("nullValue.RunnableIsNull");
227 Logging
.logger().fine(msg
);
228 throw new IllegalArgumentException(msg
);
231 super.afterExecute(runnable
, throwable
);
233 RetrievalTask task
= (RetrievalTask
) runnable
;
234 BasicRetrievalService
.this.activeTasks
.remove(task
);
235 task
.retriever
.setEndTime(System
.currentTimeMillis());
239 if (throwable
!= null)
241 Logging
.logger().log(Level
.FINE
,
242 Logging
.getMessage("BasicRetrievalService.ExceptionDuringRetrieval",
243 task
.getRetriever().getName()), throwable
);
246 task
.get(); // Wait for task to finish, cancel or break
248 catch (java
.util
.concurrent
.ExecutionException e
)
250 String message
= Logging
.getMessage("BasicRetrievalService.ExecutionExceptionDuringRetrieval",
251 task
.getRetriever().getName());
252 if (e
.getCause() instanceof SocketTimeoutException
)
254 Logging
.logger().fine(message
+ " " + e
.getCause().getLocalizedMessage());
258 Logging
.logger().log(Level
.FINE
, message
, e
);
261 catch (InterruptedException e
)
263 Logging
.logger().log(Level
.FINE
, Logging
.getMessage("BasicRetrievalService.RetrievalInterrupted",
264 task
.getRetriever().getName()), e
);
266 catch (java
.util
.concurrent
.CancellationException e
)
268 Logging
.logger().fine(Logging
.getMessage("BasicRetrievalService.RetrievalCancelled",
269 task
.getRetriever().getName()));
274 Thread
.currentThread().setName(IDLE_THREAD_NAME_PREFIX
);
279 public BasicRetrievalService()
281 Integer poolSize
= Configuration
.getIntegerValue(AVKey
.RETRIEVAL_POOL_SIZE
, DEFAULT_POOL_SIZE
);
282 this.queueSize
= Configuration
.getIntegerValue(AVKey
.RETRIEVAL_QUEUE_SIZE
, DEFAULT_QUEUE_SIZE
);
284 // this.executor runs the retrievers, each in their own thread
285 this.executor
= new RetrievalExecutor(poolSize
, this.queueSize
);
287 // this.activeTasks holds the list of currently executing tasks (*not* those pending on the queue)
288 this.activeTasks
= new ConcurrentLinkedQueue
<RetrievalTask
>();
291 public void shutdown(boolean immediately
)
294 this.executor
.shutdownNow();
296 this.executor
.shutdown();
298 this.activeTasks
.clear();
302 * @param retriever the retriever to run
303 * @return a future object that can be used to query the request status of cancel the request.
304 * @throws IllegalArgumentException if <code>retrieer</code> is null or has no name
306 public RetrievalFuture
runRetriever(Retriever retriever
)
308 if (retriever
== null)
310 String msg
= Logging
.getMessage("nullValue.RetrieverIsNull");
311 Logging
.logger().fine(msg
);
312 throw new IllegalArgumentException(msg
);
314 if (retriever
.getName() == null)
316 String message
= Logging
.getMessage("nullValue.RetrieverNameIsNull");
317 Logging
.logger().fine(message
);
318 throw new IllegalArgumentException(message
);
321 // Add with secondary priority that removes most recently added requests first.
322 return this.runRetriever(retriever
, (double) (Long
.MAX_VALUE
- System
.currentTimeMillis()));
326 * @param retriever the retriever to run
327 * @param priority the secondary priority of the retriever, or negative if it is to be the primary priority
328 * @return a future object that can be used to query the request status of cancel the request.
329 * @throws IllegalArgumentException if <code>retriever</code> is null or has no name
331 public synchronized RetrievalFuture
runRetriever(Retriever retriever
, double priority
)
333 if (retriever
== null)
335 String message
= Logging
.getMessage("nullValue.RetrieverIsNull");
336 Logging
.logger().fine(message
);
337 throw new IllegalArgumentException(message
);
340 if (retriever
.getName() == null)
342 String message
= Logging
.getMessage("nullValue.RetrieverNameIsNull");
343 Logging
.logger().fine(message
);
344 throw new IllegalArgumentException(message
);
349 Logging
.logger().finer(Logging
.getMessage("BasicRetrievalService.ResourceRejected", retriever
.getName()));
352 RetrievalTask task
= new RetrievalTask(retriever
, priority
);
353 retriever
.setSubmitTime(System
.currentTimeMillis());
355 // Do not queue duplicates.
356 if (this.activeTasks
.contains(task
) || this.executor
.getQueue().contains(task
))
359 this.executor
.execute(task
);
365 * @param poolSize the number of threads in the thread pool
366 * @throws IllegalArgumentException if <code>poolSize</code> is non-positive
368 public void setRetrieverPoolSize(int poolSize
)
372 String message
= Logging
.getMessage("BasicRetrievalService.RetrieverPoolSizeIsLessThanOne");
373 Logging
.logger().fine(message
);
374 throw new IllegalArgumentException(message
);
377 this.executor
.setCorePoolSize(poolSize
);
378 this.executor
.setMaximumPoolSize(poolSize
);
381 public int getRetrieverPoolSize()
383 return this.executor
.getCorePoolSize();
386 private boolean hasRetrievers()
388 Thread
[] threads
= new Thread
[Thread
.activeCount()];
389 int numThreads
= Thread
.enumerate(threads
);
390 for (int i
= 0; i
< numThreads
; i
++)
392 if (threads
[i
].getName().startsWith(RUNNING_THREAD_NAME_PREFIX
))
398 public boolean hasActiveTasks()
400 return this.hasRetrievers();
403 public boolean isFull()
405 return this.executor
.getQueue().size() >= this.queueSize
;
408 public int getNumRetrieversPending()
410 // Could use same method to determine active tasks as hasRetrievers() above, but this method only advisory.
411 return this.activeTasks
.size() + this.executor
.getQueue().size();
415 * @param retriever the retriever to check
416 * @return <code>true</code> if the retriever is being run or pending execution
417 * @throws IllegalArgumentException if <code>retriever</code> is null
419 public boolean contains(Retriever retriever
)
421 if (retriever
== null)
423 String msg
= Logging
.getMessage("nullValue.RetrieverIsNull");
424 Logging
.logger().fine(msg
);
425 throw new IllegalArgumentException(msg
);
427 RetrievalTask task
= new RetrievalTask(retriever
, 0d
);
428 return (this.activeTasks
.contains(task
) || this.executor
.getQueue().contains(task
));
431 public double getProgress()
433 int totalContentLength
= 0;
434 int totalBytesRead
= 0;
436 for (RetrievalTask task
: this.activeTasks
)
441 Retriever retriever
= task
.getRetriever();
444 double tcl
= retriever
.getContentLength();
447 totalContentLength
+= tcl
;
448 totalBytesRead
+= retriever
.getContentLengthRead();
453 Logging
.logger().log(Level
.FINE
,
454 Logging
.getMessage("BasicRetrievalService.ExceptionRetrievingContentSizes",
455 retriever
.getName() != null ? retriever
.getName() : ""), e
);
459 for (Runnable runnable
: this.executor
.getQueue())
462 (RetrievalTask
) runnable
;
464 Retriever retriever
= task
.getRetriever();
467 double tcl
= retriever
.getContentLength();
470 totalContentLength
+= tcl
;
471 totalBytesRead
+= retriever
.getContentLengthRead();
476 String message
= Logging
.getMessage("BasicRetrievalService.ExceptionRetrievingContentSizes") + (
477 retriever
.getName() != null ? retriever
.getName() : "");
478 Logging
.logger().log(Level
.FINE
, message
, e
);
482 // Compute an aggregated progress notification.
486 if (totalContentLength
< 1)
489 progress
= Math
.min(100.0, 100.0 * (double) totalBytesRead
/ (double) totalContentLength
);