Worldwind public release 0.2.1
[worldwind-tracker.git] / gov / nasa / worldwind / BasicRetrievalService.java
blob0e7bae540f6f67de820ff24dd0158c9890767f7b
1 /*
2 Copyright (C) 2001, 2006 United States Government
3 as represented by the Administrator of the
4 National Aeronautics and Space Administration.
5 All Rights Reserved.
6 */
7 package gov.nasa.worldwind;
9 /**
10 * Performs threaded retrieval of data.
12 * @author Tom Gaskins
13 * @version $Id: BasicRetrievalService.java 1985 2007-06-09 00:33:37Z tgaskins $
15 public final class BasicRetrievalService extends WWObjectImpl
16 implements RetrievalService, Thread.UncaughtExceptionHandler
18 // These constants are last-ditch values in case Configuration lacks defaults
19 private static final int DEFAULT_QUEUE_SIZE = 100;
20 private static final int DEFAULT_POOL_SIZE = 5;
21 private static final long DEFAULT_STALE_REQUEST_LIMIT = 30000; // milliseconds
22 private static final int DEFAULT_TIME_PRIORITY_GRANULARITY = 500; // milliseconds
24 private static final String RUNNING_THREAD_NAME_PREFIX = WorldWind.retrieveMessage(
25 "BasicRetrievalService.RUNNING_THREAD_NAME_PREFIX", "ThreadStrings");
26 private static final String IDLE_THREAD_NAME_PREFIX = WorldWind.retrieveMessage(
27 "BasicRetrievalService.IDLE_THREAD_NAME_PREFIX", "ThreadStrings");
29 private RetrievalExecutor executor; // thread pool for running retrievers
30 private java.util.concurrent.ConcurrentLinkedQueue<RetrievalTask> activeTasks; // tasks currently allocated a thread
31 private int queueSize; // maximum queue size
33 /**
34 * Encapsulates a single threaded retrieval as a {@link java.util.concurrent.FutureTask}.
36 private static class RetrievalTask extends java.util.concurrent.FutureTask<Retriever>
37 implements RetrievalFuture, Comparable<RetrievalTask>
39 private Retriever retriever;
40 private double priority; // retrieval secondary priority (primary priority is submit time)
42 private RetrievalTask(Retriever retriever, double priority)
44 super(retriever);
45 this.retriever = retriever;
46 this.priority = priority;
49 public double getPriority()
51 return priority;
54 public Retriever getRetriever()
56 return this.retriever;
59 public void setException(Throwable throwable)
61 super.setException(throwable);
64 @Override
65 public void run()
67 if (this.isDone() || this.isCancelled())
68 return;
70 super.run();
73 /**
74 * @param that the task to compare with this one
75 * @return 0 if task priorities are equal, -1 if priority of this is less than that, 1 otherwise
76 * @throws IllegalArgumentException if <code>that</code> is null
78 public int compareTo(RetrievalTask that)
80 if (that == null)
82 String msg = WorldWind.retrieveErrMsg("nullValue.RetrieverIsNull");
83 WorldWind.logger().log(java.util.logging.Level.FINE, msg);
84 throw new IllegalArgumentException(msg);
87 if (this.priority > 0 && that.priority > 0) // only secondary priority used if either is negative
89 // Requests submitted within different time-granularity periods are ordered exclusive of their
90 // client-specified priority.
91 long now = System.currentTimeMillis();
92 long thisElapsedTime = now - this.retriever.getSubmitTime();
93 long thatElapsedTime = now - that.retriever.getSubmitTime();
94 if (((thisElapsedTime - thatElapsedTime) / DEFAULT_TIME_PRIORITY_GRANULARITY) != 0)
95 return thisElapsedTime < thatElapsedTime ? -1 : 1;
98 // The client-pecified priority is compared for requests submitted within the same granularity period.
99 return this.priority == that.priority ? 0 : this.priority < that.priority ? -1 : 1;
102 public boolean equals(Object o)
104 if (this == o)
105 return true;
106 if (o == null || getClass() != o.getClass())
107 return false;
109 final RetrievalTask that = (RetrievalTask) o;
111 // Tasks are equal if their retrievers are equivalent
112 return this.retriever.equals(that.retriever);
113 // Priority and submint time are not factors in equality
116 public int hashCode()
118 return this.retriever.getName().hashCode();
122 public void uncaughtException(Thread thread, Throwable throwable)
124 String message = WorldWind.retrieveErrMsg("BasicRetrievalService.UncaughtExceptionDuringRetrieval")
125 + thread.getName();
126 WorldWind.logger().log(java.util.logging.Level.FINE, message);
129 private class RetrievalExecutor extends java.util.concurrent.ThreadPoolExecutor
131 private static final long THREAD_TIMEOUT = 2; // keep idle threads alive this many seconds
132 private long staleRequestLimit; // reject requests older than this
134 private RetrievalExecutor(int poolSize, int queueSize)
136 super(poolSize, poolSize, THREAD_TIMEOUT, java.util.concurrent.TimeUnit.SECONDS,
137 new java.util.concurrent.PriorityBlockingQueue<Runnable>(queueSize),
138 new java.util.concurrent.ThreadFactory()
140 public Thread newThread(Runnable runnable)
142 Thread thread = new Thread(runnable);
143 thread.setDaemon(true);
144 thread.setPriority(Thread.MIN_PRIORITY);
145 thread.setUncaughtExceptionHandler(BasicRetrievalService.this);
146 return thread;
148 }, new java.util.concurrent.ThreadPoolExecutor.DiscardPolicy() // abandon task when queue is full
150 // This listener is invoked only when the executor queue is a bounded queue and runs out of room.
151 // If the queue is a java.util.concurrent.PriorityBlockingQueue, this listener is never invoked.
152 public void rejectedExecution(Runnable runnable,
153 java.util.concurrent.ThreadPoolExecutor threadPoolExecutor)
155 // Interposes logging for rejected execution
156 RetrievalTask task = (RetrievalTask) runnable;
157 String name = task.getRetriever().getName();
158 String message = WorldWind.retrieveErrMsg("BasicRetrievalService.ResourceRejected") + name;
159 WorldWind.logger().log(java.util.logging.Level.FINER, message);
161 super.rejectedExecution(runnable, threadPoolExecutor);
165 this.staleRequestLimit = Configuration.getLongValue(AVKey.RETRIEVAL_QUEUE_STALE_REQUEST_LIMIT,
166 DEFAULT_STALE_REQUEST_LIMIT);
170 * @param thread the thread the task is running on
171 * @param runnable the <code>Retriever</code> running on the thread
172 * @throws IllegalArgumentException if either <code>thread</code> or <code>runnable</code> is null
174 protected void beforeExecute(Thread thread, Runnable runnable)
176 WorldWind.logger().log(java.util.logging.Level.FINEST, WorldWind.retrieveErrMsg(
177 "BasicRetrievalService.EnteringBeforeExecute"));
179 if (thread == null)
181 String msg = WorldWind.retrieveErrMsg("nullValue.ThreadIsNull");
182 WorldWind.logger().log(java.util.logging.Level.FINE, msg);
183 throw new IllegalArgumentException(msg);
185 if (runnable == null)
187 String msg = WorldWind.retrieveErrMsg("nullValue.RunnableIsNull");
188 WorldWind.logger().log(java.util.logging.Level.FINE, msg);
189 throw new IllegalArgumentException(msg);
192 RetrievalTask task = (RetrievalTask) runnable;
194 task.retriever.setBeginTime(System.currentTimeMillis());
195 long limit = task.retriever.getStaleRequestLimit() >= 0
196 ? task.retriever.getStaleRequestLimit() : this.staleRequestLimit;
197 if (task.retriever.getBeginTime() - task.retriever.getSubmitTime() > limit)
199 // Task has been sitting on the queue too long
200 String message = WorldWind.retrieveErrMsg("BasicRetrievalService.CancellingTooOldRetrieval")
201 + task.getRetriever().getName();
202 WorldWind.logger().log(java.util.logging.Level.FINER, message);
203 task.cancel(true);
206 if (BasicRetrievalService.this.activeTasks.contains(task))
208 // Task is a duplicate
209 String message = WorldWind.retrieveErrMsg("BasicRetrievalService.CancellingDuplicateRetrieval") + task
210 .getRetriever().getName();
211 WorldWind.logger().log(java.util.logging.Level.FINER, message);
212 task.cancel(true);
215 BasicRetrievalService.this.activeTasks.add(task);
217 thread.setName(RUNNING_THREAD_NAME_PREFIX + task.getRetriever().getName());
218 thread.setPriority(Thread.MIN_PRIORITY); // Subordinate thread priority to rendering
219 thread.setUncaughtExceptionHandler(BasicRetrievalService.this);
221 super.beforeExecute(thread, runnable);
223 WorldWind.logger().log(java.util.logging.Level.FINEST, WorldWind.retrieveErrMsg(
224 "BasicRetrievalService.LeavingBeforeExecute"));
228 * @param runnable the <code>Retriever</code> running on the thread
229 * @param throwable an exception thrown during retrieval, will be null if no exception occurred
230 * @throws IllegalArgumentException if <code>runnable</code> is null
232 protected void afterExecute(Runnable runnable, Throwable throwable)
234 WorldWind.logger().log(java.util.logging.Level.FINEST, WorldWind.retrieveErrMsg(
235 "BasicRetrievalService.EnteringAfterExecute"));
237 if (runnable == null)
239 String msg = WorldWind.retrieveErrMsg("nullValue.RunnableIsNull");
240 WorldWind.logger().log(java.util.logging.Level.FINE, msg);
241 throw new IllegalArgumentException(msg);
244 super.afterExecute(runnable, throwable);
246 RetrievalTask task = (RetrievalTask) runnable;
247 BasicRetrievalService.this.activeTasks.remove(task);
248 task.retriever.setEndTime(System.currentTimeMillis());
252 if (throwable != null)
254 String message = WorldWind.retrieveErrMsg("BasicRetrievalService.ExceptionDuringRetrieval") + task
255 .getRetriever().getName();
256 WorldWind.logger().log(java.util.logging.Level.FINE, message, throwable);
259 task.get(); // Wait for task to finish, cancel or break
261 catch (java.util.concurrent.ExecutionException e)
263 String message = WorldWind.retrieveErrMsg("BasicRetrievalService.ExecutionExceptionDuringRetrieval")
264 + task.getRetriever().getName();
265 if (e.getCause() instanceof java.net.SocketTimeoutException)
267 WorldWind.logger()
268 .log(java.util.logging.Level.FINE, message + " " + e.getCause().getLocalizedMessage());
270 else
272 WorldWind.logger().log(java.util.logging.Level.FINE, message, e);
275 catch (InterruptedException e)
277 String message = WorldWind.retrieveErrMsg("BasicRetrievalService.RetrievalOf_1")
278 + task.getRetriever().getName() + WorldWind
279 .retrieveErrMsg("BasicRetrievalService.WasInterrupted_2");
281 WorldWind.logger().log(java.util.logging.Level.FINE, message, e);
283 catch (java.util.concurrent.CancellationException e)
285 String message = WorldWind.retrieveErrMsg("BasicRetrievalService.RetrievalOf_1")
286 + task.getRetriever().getName() + WorldWind.retrieveErrMsg("BasicRetrievalService.WasCancelled_2");
288 WorldWind.logger().log(java.util.logging.Level.FINE, message);
290 finally
293 Thread.currentThread().setName(IDLE_THREAD_NAME_PREFIX);
295 WorldWind.logger().log(java.util.logging.Level.FINEST, WorldWind.retrieveErrMsg(
296 "BasicRetrievalService.LeavingAfterExecute"));
301 public BasicRetrievalService()
303 Integer poolSize = Configuration.getIntegerValue(AVKey.RETRIEVAL_POOL_SIZE, DEFAULT_POOL_SIZE);
304 this.queueSize = Configuration.getIntegerValue(AVKey.RETRIEVAL_QUEUE_SIZE, DEFAULT_QUEUE_SIZE);
306 // this.executor runs the retrievers, each in their own thread
307 this.executor = new RetrievalExecutor(poolSize, this.queueSize);
309 // this.activeTasks holds the list of currently executing tasks (*not* those pending on the queue)
310 this.activeTasks = new java.util.concurrent.ConcurrentLinkedQueue<RetrievalTask>();
314 * @param retriever the retriever to run
315 * @return a future object that can be used to query the request status of cancel the request.
316 * @throws IllegalArgumentException if <code>retrieer</code> is null or has no name
318 public RetrievalFuture runRetriever(Retriever retriever)
320 if (retriever == null)
322 String msg = WorldWind.retrieveErrMsg("nullValue.RetrieverIsNull");
323 WorldWind.logger().log(java.util.logging.Level.FINE, msg);
324 throw new IllegalArgumentException(msg);
326 if (retriever.getName() == null)
328 String message = WorldWind.retrieveErrMsg("nullValue.RetrieverNameIsNull");
329 WorldWind.logger().log(java.util.logging.Level.FINE, message);
330 throw new IllegalArgumentException(message);
333 // Add with secondary priority that removes most recently added requests first.
334 return this.runRetriever(retriever, (double) (Long.MAX_VALUE - System.currentTimeMillis()));
338 * @param retriever the retriever to run
339 * @param priority the secondary priority of the retriever, or negative if it is to be the primary priority
340 * @return a future object that can be used to query the request status of cancel the request.
341 * @throws IllegalArgumentException if <code>retriever</code> is null or has no name
343 public synchronized RetrievalFuture runRetriever(Retriever retriever, double priority)
345 if (retriever == null)
347 String message = WorldWind.retrieveErrMsg("nullValue.RetrieverIsNull");
348 WorldWind.logger().log(java.util.logging.Level.FINE, message);
349 throw new IllegalArgumentException(message);
352 if (retriever.getName() == null)
354 String message = WorldWind.retrieveErrMsg("nullValue.RetrieverNameIsNull");
355 WorldWind.logger().log(java.util.logging.Level.FINE, message);
356 throw new IllegalArgumentException(message);
359 if (this.isFull())
361 String name = retriever.getName();
362 String message = WorldWind.retrieveErrMsg("BasicRetrievalService.ResourceRejected") + name;
363 WorldWind.logger().log(java.util.logging.Level.FINER, message);
366 RetrievalTask task = new RetrievalTask(retriever, priority);
367 retriever.setSubmitTime(System.currentTimeMillis());
369 // Do not queue duplicates.
370 if (this.activeTasks.contains(task) || this.executor.getQueue().contains(task))
371 return null;
373 this.executor.execute(task);
375 return task;
379 * @param poolSize the number of threads in the thread pool
380 * @throws IllegalArgumentException if <code>poolSize</code> is non-positive
382 public void setRetrieverPoolSize(int poolSize)
384 if (poolSize < 1)
386 String message = WorldWind.retrieveErrMsg("BasicRetrievalService.RetrieverPoolSizeIsLessThanOne");
387 WorldWind.logger().log(java.util.logging.Level.FINE, message);
388 throw new IllegalArgumentException(message);
391 this.executor.setCorePoolSize(poolSize);
392 this.executor.setMaximumPoolSize(poolSize);
395 public int getRetrieverPoolSize()
397 return this.executor.getCorePoolSize();
400 private boolean hasRetrievers()
402 Thread[] threads = new Thread[Thread.activeCount()];
403 int numThreads = Thread.enumerate(threads);
404 for (int i = 0; i < numThreads; i++)
406 if (threads[i].getName().startsWith(RUNNING_THREAD_NAME_PREFIX))
407 return true;
409 return false;
412 public boolean hasActiveTasks()
414 return this.hasRetrievers();
417 public boolean isFull()
419 return this.executor.getQueue().size() >= this.queueSize;
422 public int getNumRetrieversPending()
424 // Could use same method to determine active tasks as hasRetrievers() above, but this method only advisory.
425 return this.activeTasks.size() + this.executor.getQueue().size();
429 * @param retriever the retriever to check
430 * @return <code>true</code> if the retriever is being run or pending execution
431 * @throws IllegalArgumentException if <code>retriever</code> is null
433 public boolean contains(Retriever retriever)
435 if (retriever == null)
437 String msg = WorldWind.retrieveErrMsg("nullValue.RetrieverIsNull");
438 WorldWind.logger().log(java.util.logging.Level.FINE, msg);
439 throw new IllegalArgumentException(msg);
441 RetrievalTask task = new RetrievalTask(retriever, 0d);
442 return (this.activeTasks.contains(task) || this.executor.getQueue().contains(task));
445 public double getProgress()
447 int totalContentLength = 0;
448 int totalBytesRead = 0;
450 for (RetrievalTask task : this.activeTasks)
452 if (task.isDone())
453 continue;
455 Retriever retriever = task.getRetriever();
458 double tcl = retriever.getContentLength();
459 if (tcl > 0)
461 totalContentLength += tcl;
462 totalBytesRead += retriever.getContentLengthRead();
465 catch (Exception e)
467 String message = WorldWind.retrieveErrMsg("BasicRetrievalService.ExcptnRetrievingContentSizes") + (
468 retriever.getName() != null ? retriever.getName() : "");
469 WorldWind.logger().log(java.util.logging.Level.FINE, message, e);
473 for (Runnable runnable : this.executor.getQueue())
475 gov.nasa.worldwind.BasicRetrievalService.RetrievalTask task =
476 (gov.nasa.worldwind.BasicRetrievalService.RetrievalTask) runnable;
478 Retriever retriever = task.getRetriever();
481 double tcl = retriever.getContentLength();
482 if (tcl > 0)
484 totalContentLength += tcl;
485 totalBytesRead += retriever.getContentLengthRead();
488 catch (Exception e)
490 String message = WorldWind.retrieveErrMsg("BasicRetrievalService.ExcptnRetrievingContentSizes") + (
491 retriever.getName() != null ? retriever.getName() : "");
492 WorldWind.logger().log(java.util.logging.Level.FINE, message, e);
496 // Compute an aggregated progress notification.
498 double progress;
500 if (totalContentLength < 1)
501 progress = 0;
502 else
503 progress = Math.min(100.0, 100.0 * (double) totalBytesRead / (double) totalContentLength);
505 return progress;