Updated to worldwind release 20070817
[worldwind-tracker.git] / gov / nasa / worldwind / retrieve / BasicRetrievalService.java
blob84c2f3ead672f32a21573c7bc4ea30c389b757d6
1 /*
2 Copyright (C) 2001, 2006 United States Government
3 as represented by the Administrator of the
4 National Aeronautics and Space Administration.
5 All Rights Reserved.
6 */
7 package gov.nasa.worldwind.retrieve;
9 import gov.nasa.worldwind.*;
10 import gov.nasa.worldwind.avlist.AVKey;
11 import gov.nasa.worldwind.util.Logging;
13 import java.net.SocketTimeoutException;
14 import java.util.concurrent.*;
15 import java.util.logging.Level;
17 /**
18 * Performs threaded retrieval of data.
20 * @author Tom Gaskins
21 * @version $Id: BasicRetrievalService.java 2471 2007-07-31 21:50:57Z tgaskins $
23 public final class BasicRetrievalService extends WWObjectImpl
24 implements RetrievalService, Thread.UncaughtExceptionHandler
26 // These constants are last-ditch values in case Configuration lacks defaults
27 private static final int DEFAULT_QUEUE_SIZE = 100;
28 private static final int DEFAULT_POOL_SIZE = 5;
29 private static final long DEFAULT_STALE_REQUEST_LIMIT = 30000; // milliseconds
30 private static final int DEFAULT_TIME_PRIORITY_GRANULARITY = 500; // milliseconds
32 private static final String RUNNING_THREAD_NAME_PREFIX = Logging.getMessage(
33 "BasicRetrievalService.RunningThreadNamePrefix");
34 private static final String IDLE_THREAD_NAME_PREFIX = Logging.getMessage(
35 "BasicRetrievalService.IdleThreadNamePrefix");
37 private RetrievalExecutor executor; // thread pool for running retrievers
38 private ConcurrentLinkedQueue<RetrievalTask> activeTasks; // tasks currently allocated a thread
39 private int queueSize; // maximum queue size
41 /**
42 * Encapsulates a single threaded retrieval as a {@link java.util.concurrent.FutureTask}.
44 private static class RetrievalTask extends FutureTask<Retriever>
45 implements RetrievalFuture, Comparable<RetrievalTask>
47 private Retriever retriever;
48 private double priority; // retrieval secondary priority (primary priority is submit time)
50 private RetrievalTask(Retriever retriever, double priority)
52 super(retriever);
53 this.retriever = retriever;
54 this.priority = priority;
57 public double getPriority()
59 return priority;
62 public Retriever getRetriever()
64 return this.retriever;
67 @Override
68 public void run()
70 if (this.isDone() || this.isCancelled())
71 return;
73 super.run();
76 /**
77 * @param that the task to compare with this one
78 * @return 0 if task priorities are equal, -1 if priority of this is less than that, 1 otherwise
79 * @throws IllegalArgumentException if <code>that</code> is null
81 public int compareTo(RetrievalTask that)
83 if (that == null)
85 String msg = Logging.getMessage("nullValue.RetrieverIsNull");
86 Logging.logger().fine(msg);
87 throw new IllegalArgumentException(msg);
90 if (this.priority > 0 && that.priority > 0) // only secondary priority used if either is negative
92 // Requests submitted within different time-granularity periods are ordered exclusive of their
93 // client-specified priority.
94 long now = System.currentTimeMillis();
95 long thisElapsedTime = now - this.retriever.getSubmitTime();
96 long thatElapsedTime = now - that.retriever.getSubmitTime();
97 if (((thisElapsedTime - thatElapsedTime) / DEFAULT_TIME_PRIORITY_GRANULARITY) != 0)
98 return thisElapsedTime < thatElapsedTime ? -1 : 1;
101 // The client-pecified priority is compared for requests submitted within the same granularity period.
102 return this.priority == that.priority ? 0 : this.priority < that.priority ? -1 : 1;
105 public boolean equals(Object o)
107 if (this == o)
108 return true;
109 if (o == null || getClass() != o.getClass())
110 return false;
112 final RetrievalTask that = (RetrievalTask) o;
114 // Tasks are equal if their retrievers are equivalent
115 return this.retriever.equals(that.retriever);
116 // Priority and submint time are not factors in equality
119 public int hashCode()
121 return this.retriever.getName().hashCode();
125 public void uncaughtException(Thread thread, Throwable throwable)
127 Logging.logger().fine(Logging.getMessage("BasicRetrievalService.UncaughtExceptionDuringRetrieval",
128 thread.getName()));
131 private class RetrievalExecutor extends ThreadPoolExecutor
133 private static final long THREAD_TIMEOUT = 2; // keep idle threads alive this many seconds
134 private long staleRequestLimit; // reject requests older than this
136 private RetrievalExecutor(int poolSize, int queueSize)
138 super(poolSize, poolSize, THREAD_TIMEOUT, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(queueSize),
139 new ThreadFactory()
141 public Thread newThread(Runnable runnable)
143 Thread thread = new Thread(runnable);
144 thread.setDaemon(true);
145 thread.setPriority(Thread.MIN_PRIORITY);
146 thread.setUncaughtExceptionHandler(BasicRetrievalService.this);
147 return thread;
149 }, new ThreadPoolExecutor.DiscardPolicy() // abandon task when queue is full
151 // This listener is invoked only when the executor queue is a bounded queue and runs out of room.
152 // If the queue is a java.util.concurrent.PriorityBlockingQueue, this listener is never invoked.
153 public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor)
155 // Interposes logging for rejected execution
156 Logging.logger().finer(Logging.getMessage("BasicRetrievalService.ResourceRejected",
157 ((RetrievalTask) runnable).getRetriever().getName()));
159 super.rejectedExecution(runnable, threadPoolExecutor);
163 this.staleRequestLimit = Configuration.getLongValue(AVKey.RETRIEVAL_QUEUE_STALE_REQUEST_LIMIT,
164 DEFAULT_STALE_REQUEST_LIMIT);
168 * @param thread the thread the task is running on
169 * @param runnable the <code>Retriever</code> running on the thread
170 * @throws IllegalArgumentException if either <code>thread</code> or <code>runnable</code> is null
172 protected void beforeExecute(Thread thread, Runnable runnable)
174 if (thread == null)
176 String msg = Logging.getMessage("nullValue.ThreadIsNull");
177 Logging.logger().fine(msg);
178 throw new IllegalArgumentException(msg);
180 if (runnable == null)
182 String msg = Logging.getMessage("nullValue.RunnableIsNull");
183 Logging.logger().fine(msg);
184 throw new IllegalArgumentException(msg);
187 RetrievalTask task = (RetrievalTask) runnable;
189 task.retriever.setBeginTime(System.currentTimeMillis());
190 long limit = task.retriever.getStaleRequestLimit() >= 0
191 ? task.retriever.getStaleRequestLimit() : this.staleRequestLimit;
192 if (task.retriever.getBeginTime() - task.retriever.getSubmitTime() > limit)
194 // Task has been sitting on the queue too long
195 Logging.logger().finer(Logging.getMessage("BasicRetrievalService.CancellingTooOldRetrieval",
196 task.getRetriever().getName()));
197 task.cancel(true);
200 if (BasicRetrievalService.this.activeTasks.contains(task))
202 // Task is a duplicate
203 Logging.logger().finer(Logging.getMessage("BasicRetrievalService.CancellingDuplicateRetrieval",
204 task.getRetriever().getName()));
205 task.cancel(true);
208 BasicRetrievalService.this.activeTasks.add(task);
210 thread.setName(RUNNING_THREAD_NAME_PREFIX + task.getRetriever().getName());
211 thread.setPriority(Thread.MIN_PRIORITY); // Subordinate thread priority to rendering
212 thread.setUncaughtExceptionHandler(BasicRetrievalService.this);
214 super.beforeExecute(thread, runnable);
218 * @param runnable the <code>Retriever</code> running on the thread
219 * @param throwable an exception thrown during retrieval, will be null if no exception occurred
220 * @throws IllegalArgumentException if <code>runnable</code> is null
222 protected void afterExecute(Runnable runnable, Throwable throwable)
224 if (runnable == null)
226 String msg = Logging.getMessage("nullValue.RunnableIsNull");
227 Logging.logger().fine(msg);
228 throw new IllegalArgumentException(msg);
231 super.afterExecute(runnable, throwable);
233 RetrievalTask task = (RetrievalTask) runnable;
234 BasicRetrievalService.this.activeTasks.remove(task);
235 task.retriever.setEndTime(System.currentTimeMillis());
239 if (throwable != null)
241 Logging.logger().log(Level.FINE,
242 Logging.getMessage("BasicRetrievalService.ExceptionDuringRetrieval",
243 task.getRetriever().getName()), throwable);
246 task.get(); // Wait for task to finish, cancel or break
248 catch (java.util.concurrent.ExecutionException e)
250 String message = Logging.getMessage("BasicRetrievalService.ExecutionExceptionDuringRetrieval",
251 task.getRetriever().getName());
252 if (e.getCause() instanceof SocketTimeoutException)
254 Logging.logger().fine(message + " " + e.getCause().getLocalizedMessage());
256 else
258 Logging.logger().log(Level.FINE, message, e);
261 catch (InterruptedException e)
263 Logging.logger().log(Level.FINE, Logging.getMessage("BasicRetrievalService.RetrievalInterrupted",
264 task.getRetriever().getName()), e);
266 catch (java.util.concurrent.CancellationException e)
268 Logging.logger().fine(Logging.getMessage("BasicRetrievalService.RetrievalCancelled",
269 task.getRetriever().getName()));
271 finally
274 Thread.currentThread().setName(IDLE_THREAD_NAME_PREFIX);
279 public BasicRetrievalService()
281 Integer poolSize = Configuration.getIntegerValue(AVKey.RETRIEVAL_POOL_SIZE, DEFAULT_POOL_SIZE);
282 this.queueSize = Configuration.getIntegerValue(AVKey.RETRIEVAL_QUEUE_SIZE, DEFAULT_QUEUE_SIZE);
284 // this.executor runs the retrievers, each in their own thread
285 this.executor = new RetrievalExecutor(poolSize, this.queueSize);
287 // this.activeTasks holds the list of currently executing tasks (*not* those pending on the queue)
288 this.activeTasks = new ConcurrentLinkedQueue<RetrievalTask>();
291 public void shutdown(boolean immediately)
293 if (immediately)
294 this.executor.shutdownNow();
295 else
296 this.executor.shutdown();
298 this.activeTasks.clear();
302 * @param retriever the retriever to run
303 * @return a future object that can be used to query the request status of cancel the request.
304 * @throws IllegalArgumentException if <code>retrieer</code> is null or has no name
306 public RetrievalFuture runRetriever(Retriever retriever)
308 if (retriever == null)
310 String msg = Logging.getMessage("nullValue.RetrieverIsNull");
311 Logging.logger().fine(msg);
312 throw new IllegalArgumentException(msg);
314 if (retriever.getName() == null)
316 String message = Logging.getMessage("nullValue.RetrieverNameIsNull");
317 Logging.logger().fine(message);
318 throw new IllegalArgumentException(message);
321 // Add with secondary priority that removes most recently added requests first.
322 return this.runRetriever(retriever, (double) (Long.MAX_VALUE - System.currentTimeMillis()));
326 * @param retriever the retriever to run
327 * @param priority the secondary priority of the retriever, or negative if it is to be the primary priority
328 * @return a future object that can be used to query the request status of cancel the request.
329 * @throws IllegalArgumentException if <code>retriever</code> is null or has no name
331 public synchronized RetrievalFuture runRetriever(Retriever retriever, double priority)
333 if (retriever == null)
335 String message = Logging.getMessage("nullValue.RetrieverIsNull");
336 Logging.logger().fine(message);
337 throw new IllegalArgumentException(message);
340 if (retriever.getName() == null)
342 String message = Logging.getMessage("nullValue.RetrieverNameIsNull");
343 Logging.logger().fine(message);
344 throw new IllegalArgumentException(message);
347 if (this.isFull())
349 Logging.logger().finer(Logging.getMessage("BasicRetrievalService.ResourceRejected", retriever.getName()));
352 RetrievalTask task = new RetrievalTask(retriever, priority);
353 retriever.setSubmitTime(System.currentTimeMillis());
355 // Do not queue duplicates.
356 if (this.activeTasks.contains(task) || this.executor.getQueue().contains(task))
357 return null;
359 this.executor.execute(task);
361 return task;
365 * @param poolSize the number of threads in the thread pool
366 * @throws IllegalArgumentException if <code>poolSize</code> is non-positive
368 public void setRetrieverPoolSize(int poolSize)
370 if (poolSize < 1)
372 String message = Logging.getMessage("BasicRetrievalService.RetrieverPoolSizeIsLessThanOne");
373 Logging.logger().fine(message);
374 throw new IllegalArgumentException(message);
377 this.executor.setCorePoolSize(poolSize);
378 this.executor.setMaximumPoolSize(poolSize);
381 public int getRetrieverPoolSize()
383 return this.executor.getCorePoolSize();
386 private boolean hasRetrievers()
388 Thread[] threads = new Thread[Thread.activeCount()];
389 int numThreads = Thread.enumerate(threads);
390 for (int i = 0; i < numThreads; i++)
392 if (threads[i].getName().startsWith(RUNNING_THREAD_NAME_PREFIX))
393 return true;
395 return false;
398 public boolean hasActiveTasks()
400 return this.hasRetrievers();
403 public boolean isFull()
405 return this.executor.getQueue().size() >= this.queueSize;
408 public int getNumRetrieversPending()
410 // Could use same method to determine active tasks as hasRetrievers() above, but this method only advisory.
411 return this.activeTasks.size() + this.executor.getQueue().size();
415 * @param retriever the retriever to check
416 * @return <code>true</code> if the retriever is being run or pending execution
417 * @throws IllegalArgumentException if <code>retriever</code> is null
419 public boolean contains(Retriever retriever)
421 if (retriever == null)
423 String msg = Logging.getMessage("nullValue.RetrieverIsNull");
424 Logging.logger().fine(msg);
425 throw new IllegalArgumentException(msg);
427 RetrievalTask task = new RetrievalTask(retriever, 0d);
428 return (this.activeTasks.contains(task) || this.executor.getQueue().contains(task));
431 public double getProgress()
433 int totalContentLength = 0;
434 int totalBytesRead = 0;
436 for (RetrievalTask task : this.activeTasks)
438 if (task.isDone())
439 continue;
441 Retriever retriever = task.getRetriever();
444 double tcl = retriever.getContentLength();
445 if (tcl > 0)
447 totalContentLength += tcl;
448 totalBytesRead += retriever.getContentLengthRead();
451 catch (Exception e)
453 Logging.logger().log(Level.FINE,
454 Logging.getMessage("BasicRetrievalService.ExceptionRetrievingContentSizes",
455 retriever.getName() != null ? retriever.getName() : ""), e);
459 for (Runnable runnable : this.executor.getQueue())
461 RetrievalTask task =
462 (RetrievalTask) runnable;
464 Retriever retriever = task.getRetriever();
467 double tcl = retriever.getContentLength();
468 if (tcl > 0)
470 totalContentLength += tcl;
471 totalBytesRead += retriever.getContentLengthRead();
474 catch (Exception e)
476 String message = Logging.getMessage("BasicRetrievalService.ExceptionRetrievingContentSizes") + (
477 retriever.getName() != null ? retriever.getName() : "");
478 Logging.logger().log(Level.FINE, message, e);
482 // Compute an aggregated progress notification.
484 double progress;
486 if (totalContentLength < 1)
487 progress = 0;
488 else
489 progress = Math.min(100.0, 100.0 * (double) totalBytesRead / (double) totalContentLength);
491 return progress;