2 Copyright (C) 2001, 2006 United States Government
3 as represented by the Administrator of the
4 National Aeronautics and Space Administration.
7 package gov
.nasa
.worldwind
;
10 * Performs threaded retrieval of data.
13 * @version $Id: BasicRetrievalService.java 1985 2007-06-09 00:33:37Z tgaskins $
15 public final class BasicRetrievalService
extends WWObjectImpl
16 implements RetrievalService
, Thread
.UncaughtExceptionHandler
18 // These constants are last-ditch values in case Configuration lacks defaults
19 private static final int DEFAULT_QUEUE_SIZE
= 100;
20 private static final int DEFAULT_POOL_SIZE
= 5;
21 private static final long DEFAULT_STALE_REQUEST_LIMIT
= 30000; // milliseconds
22 private static final int DEFAULT_TIME_PRIORITY_GRANULARITY
= 500; // milliseconds
24 private static final String RUNNING_THREAD_NAME_PREFIX
= WorldWind
.retrieveMessage(
25 "BasicRetrievalService.RUNNING_THREAD_NAME_PREFIX", "ThreadStrings");
26 private static final String IDLE_THREAD_NAME_PREFIX
= WorldWind
.retrieveMessage(
27 "BasicRetrievalService.IDLE_THREAD_NAME_PREFIX", "ThreadStrings");
29 private RetrievalExecutor executor
; // thread pool for running retrievers
30 private java
.util
.concurrent
.ConcurrentLinkedQueue
<RetrievalTask
> activeTasks
; // tasks currently allocated a thread
31 private int queueSize
; // maximum queue size
34 * Encapsulates a single threaded retrieval as a {@link java.util.concurrent.FutureTask}.
36 private static class RetrievalTask
extends java
.util
.concurrent
.FutureTask
<Retriever
>
37 implements RetrievalFuture
, Comparable
<RetrievalTask
>
39 private Retriever retriever
;
40 private double priority
; // retrieval secondary priority (primary priority is submit time)
42 private RetrievalTask(Retriever retriever
, double priority
)
45 this.retriever
= retriever
;
46 this.priority
= priority
;
49 public double getPriority()
54 public Retriever
getRetriever()
56 return this.retriever
;
59 public void setException(Throwable throwable
)
61 super.setException(throwable
);
67 if (this.isDone() || this.isCancelled())
74 * @param that the task to compare with this one
75 * @return 0 if task priorities are equal, -1 if priority of this is less than that, 1 otherwise
76 * @throws IllegalArgumentException if <code>that</code> is null
78 public int compareTo(RetrievalTask that
)
82 String msg
= WorldWind
.retrieveErrMsg("nullValue.RetrieverIsNull");
83 WorldWind
.logger().log(java
.util
.logging
.Level
.FINE
, msg
);
84 throw new IllegalArgumentException(msg
);
87 if (this.priority
> 0 && that
.priority
> 0) // only secondary priority used if either is negative
89 // Requests submitted within different time-granularity periods are ordered exclusive of their
90 // client-specified priority.
91 long now
= System
.currentTimeMillis();
92 long thisElapsedTime
= now
- this.retriever
.getSubmitTime();
93 long thatElapsedTime
= now
- that
.retriever
.getSubmitTime();
94 if (((thisElapsedTime
- thatElapsedTime
) / DEFAULT_TIME_PRIORITY_GRANULARITY
) != 0)
95 return thisElapsedTime
< thatElapsedTime ?
-1 : 1;
98 // The client-pecified priority is compared for requests submitted within the same granularity period.
99 return this.priority
== that
.priority ?
0 : this.priority
< that
.priority ?
-1 : 1;
102 public boolean equals(Object o
)
106 if (o
== null || getClass() != o
.getClass())
109 final RetrievalTask that
= (RetrievalTask
) o
;
111 // Tasks are equal if their retrievers are equivalent
112 return this.retriever
.equals(that
.retriever
);
113 // Priority and submint time are not factors in equality
116 public int hashCode()
118 return this.retriever
.getName().hashCode();
122 public void uncaughtException(Thread thread
, Throwable throwable
)
124 String message
= WorldWind
.retrieveErrMsg("BasicRetrievalService.UncaughtExceptionDuringRetrieval")
126 WorldWind
.logger().log(java
.util
.logging
.Level
.FINE
, message
);
129 private class RetrievalExecutor
extends java
.util
.concurrent
.ThreadPoolExecutor
131 private static final long THREAD_TIMEOUT
= 2; // keep idle threads alive this many seconds
132 private long staleRequestLimit
; // reject requests older than this
134 private RetrievalExecutor(int poolSize
, int queueSize
)
136 super(poolSize
, poolSize
, THREAD_TIMEOUT
, java
.util
.concurrent
.TimeUnit
.SECONDS
,
137 new java
.util
.concurrent
.PriorityBlockingQueue
<Runnable
>(queueSize
),
138 new java
.util
.concurrent
.ThreadFactory()
140 public Thread
newThread(Runnable runnable
)
142 Thread thread
= new Thread(runnable
);
143 thread
.setDaemon(true);
144 thread
.setPriority(Thread
.MIN_PRIORITY
);
145 thread
.setUncaughtExceptionHandler(BasicRetrievalService
.this);
148 }, new java
.util
.concurrent
.ThreadPoolExecutor
.DiscardPolicy() // abandon task when queue is full
150 // This listener is invoked only when the executor queue is a bounded queue and runs out of room.
151 // If the queue is a java.util.concurrent.PriorityBlockingQueue, this listener is never invoked.
152 public void rejectedExecution(Runnable runnable
,
153 java
.util
.concurrent
.ThreadPoolExecutor threadPoolExecutor
)
155 // Interposes logging for rejected execution
156 RetrievalTask task
= (RetrievalTask
) runnable
;
157 String name
= task
.getRetriever().getName();
158 String message
= WorldWind
.retrieveErrMsg("BasicRetrievalService.ResourceRejected") + name
;
159 WorldWind
.logger().log(java
.util
.logging
.Level
.FINER
, message
);
161 super.rejectedExecution(runnable
, threadPoolExecutor
);
165 this.staleRequestLimit
= Configuration
.getLongValue(AVKey
.RETRIEVAL_QUEUE_STALE_REQUEST_LIMIT
,
166 DEFAULT_STALE_REQUEST_LIMIT
);
170 * @param thread the thread the task is running on
171 * @param runnable the <code>Retriever</code> running on the thread
172 * @throws IllegalArgumentException if either <code>thread</code> or <code>runnable</code> is null
174 protected void beforeExecute(Thread thread
, Runnable runnable
)
176 WorldWind
.logger().log(java
.util
.logging
.Level
.FINEST
, WorldWind
.retrieveErrMsg(
177 "BasicRetrievalService.EnteringBeforeExecute"));
181 String msg
= WorldWind
.retrieveErrMsg("nullValue.ThreadIsNull");
182 WorldWind
.logger().log(java
.util
.logging
.Level
.FINE
, msg
);
183 throw new IllegalArgumentException(msg
);
185 if (runnable
== null)
187 String msg
= WorldWind
.retrieveErrMsg("nullValue.RunnableIsNull");
188 WorldWind
.logger().log(java
.util
.logging
.Level
.FINE
, msg
);
189 throw new IllegalArgumentException(msg
);
192 RetrievalTask task
= (RetrievalTask
) runnable
;
194 task
.retriever
.setBeginTime(System
.currentTimeMillis());
195 long limit
= task
.retriever
.getStaleRequestLimit() >= 0
196 ? task
.retriever
.getStaleRequestLimit() : this.staleRequestLimit
;
197 if (task
.retriever
.getBeginTime() - task
.retriever
.getSubmitTime() > limit
)
199 // Task has been sitting on the queue too long
200 String message
= WorldWind
.retrieveErrMsg("BasicRetrievalService.CancellingTooOldRetrieval")
201 + task
.getRetriever().getName();
202 WorldWind
.logger().log(java
.util
.logging
.Level
.FINER
, message
);
206 if (BasicRetrievalService
.this.activeTasks
.contains(task
))
208 // Task is a duplicate
209 String message
= WorldWind
.retrieveErrMsg("BasicRetrievalService.CancellingDuplicateRetrieval") + task
210 .getRetriever().getName();
211 WorldWind
.logger().log(java
.util
.logging
.Level
.FINER
, message
);
215 BasicRetrievalService
.this.activeTasks
.add(task
);
217 thread
.setName(RUNNING_THREAD_NAME_PREFIX
+ task
.getRetriever().getName());
218 thread
.setPriority(Thread
.MIN_PRIORITY
); // Subordinate thread priority to rendering
219 thread
.setUncaughtExceptionHandler(BasicRetrievalService
.this);
221 super.beforeExecute(thread
, runnable
);
223 WorldWind
.logger().log(java
.util
.logging
.Level
.FINEST
, WorldWind
.retrieveErrMsg(
224 "BasicRetrievalService.LeavingBeforeExecute"));
228 * @param runnable the <code>Retriever</code> running on the thread
229 * @param throwable an exception thrown during retrieval, will be null if no exception occurred
230 * @throws IllegalArgumentException if <code>runnable</code> is null
232 protected void afterExecute(Runnable runnable
, Throwable throwable
)
234 WorldWind
.logger().log(java
.util
.logging
.Level
.FINEST
, WorldWind
.retrieveErrMsg(
235 "BasicRetrievalService.EnteringAfterExecute"));
237 if (runnable
== null)
239 String msg
= WorldWind
.retrieveErrMsg("nullValue.RunnableIsNull");
240 WorldWind
.logger().log(java
.util
.logging
.Level
.FINE
, msg
);
241 throw new IllegalArgumentException(msg
);
244 super.afterExecute(runnable
, throwable
);
246 RetrievalTask task
= (RetrievalTask
) runnable
;
247 BasicRetrievalService
.this.activeTasks
.remove(task
);
248 task
.retriever
.setEndTime(System
.currentTimeMillis());
252 if (throwable
!= null)
254 String message
= WorldWind
.retrieveErrMsg("BasicRetrievalService.ExceptionDuringRetrieval") + task
255 .getRetriever().getName();
256 WorldWind
.logger().log(java
.util
.logging
.Level
.FINE
, message
, throwable
);
259 task
.get(); // Wait for task to finish, cancel or break
261 catch (java
.util
.concurrent
.ExecutionException e
)
263 String message
= WorldWind
.retrieveErrMsg("BasicRetrievalService.ExecutionExceptionDuringRetrieval")
264 + task
.getRetriever().getName();
265 if (e
.getCause() instanceof java
.net
.SocketTimeoutException
)
268 .log(java
.util
.logging
.Level
.FINE
, message
+ " " + e
.getCause().getLocalizedMessage());
272 WorldWind
.logger().log(java
.util
.logging
.Level
.FINE
, message
, e
);
275 catch (InterruptedException e
)
277 String message
= WorldWind
.retrieveErrMsg("BasicRetrievalService.RetrievalOf_1")
278 + task
.getRetriever().getName() + WorldWind
279 .retrieveErrMsg("BasicRetrievalService.WasInterrupted_2");
281 WorldWind
.logger().log(java
.util
.logging
.Level
.FINE
, message
, e
);
283 catch (java
.util
.concurrent
.CancellationException e
)
285 String message
= WorldWind
.retrieveErrMsg("BasicRetrievalService.RetrievalOf_1")
286 + task
.getRetriever().getName() + WorldWind
.retrieveErrMsg("BasicRetrievalService.WasCancelled_2");
288 WorldWind
.logger().log(java
.util
.logging
.Level
.FINE
, message
);
293 Thread
.currentThread().setName(IDLE_THREAD_NAME_PREFIX
);
295 WorldWind
.logger().log(java
.util
.logging
.Level
.FINEST
, WorldWind
.retrieveErrMsg(
296 "BasicRetrievalService.LeavingAfterExecute"));
301 public BasicRetrievalService()
303 Integer poolSize
= Configuration
.getIntegerValue(AVKey
.RETRIEVAL_POOL_SIZE
, DEFAULT_POOL_SIZE
);
304 this.queueSize
= Configuration
.getIntegerValue(AVKey
.RETRIEVAL_QUEUE_SIZE
, DEFAULT_QUEUE_SIZE
);
306 // this.executor runs the retrievers, each in their own thread
307 this.executor
= new RetrievalExecutor(poolSize
, this.queueSize
);
309 // this.activeTasks holds the list of currently executing tasks (*not* those pending on the queue)
310 this.activeTasks
= new java
.util
.concurrent
.ConcurrentLinkedQueue
<RetrievalTask
>();
314 * @param retriever the retriever to run
315 * @return a future object that can be used to query the request status of cancel the request.
316 * @throws IllegalArgumentException if <code>retrieer</code> is null or has no name
318 public RetrievalFuture
runRetriever(Retriever retriever
)
320 if (retriever
== null)
322 String msg
= WorldWind
.retrieveErrMsg("nullValue.RetrieverIsNull");
323 WorldWind
.logger().log(java
.util
.logging
.Level
.FINE
, msg
);
324 throw new IllegalArgumentException(msg
);
326 if (retriever
.getName() == null)
328 String message
= WorldWind
.retrieveErrMsg("nullValue.RetrieverNameIsNull");
329 WorldWind
.logger().log(java
.util
.logging
.Level
.FINE
, message
);
330 throw new IllegalArgumentException(message
);
333 // Add with secondary priority that removes most recently added requests first.
334 return this.runRetriever(retriever
, (double) (Long
.MAX_VALUE
- System
.currentTimeMillis()));
338 * @param retriever the retriever to run
339 * @param priority the secondary priority of the retriever, or negative if it is to be the primary priority
340 * @return a future object that can be used to query the request status of cancel the request.
341 * @throws IllegalArgumentException if <code>retriever</code> is null or has no name
343 public synchronized RetrievalFuture
runRetriever(Retriever retriever
, double priority
)
345 if (retriever
== null)
347 String message
= WorldWind
.retrieveErrMsg("nullValue.RetrieverIsNull");
348 WorldWind
.logger().log(java
.util
.logging
.Level
.FINE
, message
);
349 throw new IllegalArgumentException(message
);
352 if (retriever
.getName() == null)
354 String message
= WorldWind
.retrieveErrMsg("nullValue.RetrieverNameIsNull");
355 WorldWind
.logger().log(java
.util
.logging
.Level
.FINE
, message
);
356 throw new IllegalArgumentException(message
);
361 String name
= retriever
.getName();
362 String message
= WorldWind
.retrieveErrMsg("BasicRetrievalService.ResourceRejected") + name
;
363 WorldWind
.logger().log(java
.util
.logging
.Level
.FINER
, message
);
366 RetrievalTask task
= new RetrievalTask(retriever
, priority
);
367 retriever
.setSubmitTime(System
.currentTimeMillis());
369 // Do not queue duplicates.
370 if (this.activeTasks
.contains(task
) || this.executor
.getQueue().contains(task
))
373 this.executor
.execute(task
);
379 * @param poolSize the number of threads in the thread pool
380 * @throws IllegalArgumentException if <code>poolSize</code> is non-positive
382 public void setRetrieverPoolSize(int poolSize
)
386 String message
= WorldWind
.retrieveErrMsg("BasicRetrievalService.RetrieverPoolSizeIsLessThanOne");
387 WorldWind
.logger().log(java
.util
.logging
.Level
.FINE
, message
);
388 throw new IllegalArgumentException(message
);
391 this.executor
.setCorePoolSize(poolSize
);
392 this.executor
.setMaximumPoolSize(poolSize
);
395 public int getRetrieverPoolSize()
397 return this.executor
.getCorePoolSize();
400 private boolean hasRetrievers()
402 Thread
[] threads
= new Thread
[Thread
.activeCount()];
403 int numThreads
= Thread
.enumerate(threads
);
404 for (int i
= 0; i
< numThreads
; i
++)
406 if (threads
[i
].getName().startsWith(RUNNING_THREAD_NAME_PREFIX
))
412 public boolean hasActiveTasks()
414 return this.hasRetrievers();
417 public boolean isFull()
419 return this.executor
.getQueue().size() >= this.queueSize
;
422 public int getNumRetrieversPending()
424 // Could use same method to determine active tasks as hasRetrievers() above, but this method only advisory.
425 return this.activeTasks
.size() + this.executor
.getQueue().size();
429 * @param retriever the retriever to check
430 * @return <code>true</code> if the retriever is being run or pending execution
431 * @throws IllegalArgumentException if <code>retriever</code> is null
433 public boolean contains(Retriever retriever
)
435 if (retriever
== null)
437 String msg
= WorldWind
.retrieveErrMsg("nullValue.RetrieverIsNull");
438 WorldWind
.logger().log(java
.util
.logging
.Level
.FINE
, msg
);
439 throw new IllegalArgumentException(msg
);
441 RetrievalTask task
= new RetrievalTask(retriever
, 0d
);
442 return (this.activeTasks
.contains(task
) || this.executor
.getQueue().contains(task
));
445 public double getProgress()
447 int totalContentLength
= 0;
448 int totalBytesRead
= 0;
450 for (RetrievalTask task
: this.activeTasks
)
455 Retriever retriever
= task
.getRetriever();
458 double tcl
= retriever
.getContentLength();
461 totalContentLength
+= tcl
;
462 totalBytesRead
+= retriever
.getContentLengthRead();
467 String message
= WorldWind
.retrieveErrMsg("BasicRetrievalService.ExcptnRetrievingContentSizes") + (
468 retriever
.getName() != null ? retriever
.getName() : "");
469 WorldWind
.logger().log(java
.util
.logging
.Level
.FINE
, message
, e
);
473 for (Runnable runnable
: this.executor
.getQueue())
475 gov
.nasa
.worldwind
.BasicRetrievalService
.RetrievalTask task
=
476 (gov
.nasa
.worldwind
.BasicRetrievalService
.RetrievalTask
) runnable
;
478 Retriever retriever
= task
.getRetriever();
481 double tcl
= retriever
.getContentLength();
484 totalContentLength
+= tcl
;
485 totalBytesRead
+= retriever
.getContentLengthRead();
490 String message
= WorldWind
.retrieveErrMsg("BasicRetrievalService.ExcptnRetrievingContentSizes") + (
491 retriever
.getName() != null ? retriever
.getName() : "");
492 WorldWind
.logger().log(java
.util
.logging
.Level
.FINE
, message
, e
);
496 // Compute an aggregated progress notification.
500 if (totalContentLength
< 1)
503 progress
= Math
.min(100.0, 100.0 * (double) totalBytesRead
/ (double) totalContentLength
);