1 /*************************************************************************
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 * Copyright 2008 by Sun Microsystems, Inc.
7 * OpenOffice.org - a multi-platform office productivity suite
9 * $RCSfile: JobQueue.java,v $
12 * This file is part of OpenOffice.org.
14 * OpenOffice.org is free software: you can redistribute it and/or modify
15 * it under the terms of the GNU Lesser General Public License version 3
16 * only, as published by the Free Software Foundation.
18 * OpenOffice.org is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 * GNU Lesser General Public License version 3 for more details
22 * (a copy is included in the LICENSE file that accompanied this code).
24 * You should have received a copy of the GNU Lesser General Public License
25 * version 3 along with OpenOffice.org. If not, see
26 * <http://www.openoffice.org/license.html>
27 * for a copy of the LGPLv3 License.
29 ************************************************************************/
31 package com
.sun
.star
.lib
.uno
.environments
.remote
;
35 * The <code>JobQueue</code> implements a queue for jobs.
36 * For every jobs thread id exists a job queue which is registered
37 * at the <code>ThreadPool</code>.
38 * A JobQueue is splitted in a sync job queue and an async job queue.
39 * The sync job queue is the registerd queue, it delegates async jobs
40 * (put by <code>putjob</code>) into the async queue, which is only
41 * known by the sync queue.
43 * @version $Revision: 1.19 $ $ $Date: 2008-04-11 11:21:18 $
45 * @see com.sun.star.lib.uno.environments.remote.ThreadPool
46 * @see com.sun.star.lib.uno.environments.remote.Job
47 * @see com.sun.star.lib.uno.environments.remote.ThreadID
50 public class JobQueue
{
52 * When set to true, enables various debugging output.
54 private static final boolean DEBUG
= false;
56 protected Job _head
; // the head of the job list
57 protected Job _tail
; // the tail of the job list
59 protected ThreadId _threadId
; // the thread id of the queue
60 protected int _ref_count
= 0; // the stack deepness
61 protected boolean _createThread
; // create a worker thread, if needed
62 protected boolean _createThread_now
; // create a worker thread, if needed
63 protected Thread _worker_thread
; // the thread that does the jobs
65 protected Object _disposeId
; // the active dispose id
66 protected Object _doDispose
= null;
67 protected Throwable _throwable
;
69 protected JobQueue _async_jobQueue
; // chaining job qeueus for asyncs
70 protected JobQueue _sync_jobQueue
; // chaining job qeueus for syncs
72 protected boolean _active
= false;
74 protected JavaThreadPoolFactory _javaThreadPoolFactory
;
77 * A thread for dispatching jobs
79 class JobDispatcher
extends Thread
{
82 JobDispatcher(Object disposeId
) {
83 if(DEBUG
) System
.err
.println("JobQueue$JobDispatcher.<init>:" + _threadId
);
85 _disposeId
= disposeId
;
88 ThreadId
getThreadId() {
93 if(DEBUG
) System
.err
.println("ThreadPool$JobDispatcher.run: " + Thread
.currentThread());
96 enter(2000, _disposeId
);
98 catch(Throwable throwable
) {
99 if(_head
!= null || _active
) { // there was a job in progress, so give a stack
100 System
.err
.println(getClass().getName() + " - exception occurred:" + throwable
);
101 throwable
.printStackTrace(System
.err
);
108 if(DEBUG
) System
.err
.println("##### " + getClass().getName() + ".run - exit:" + _threadId
);
111 // Object object = new Object();
112 // synchronized(object) {
116 // catch(InterruptedException interruptedException) {
123 * Constructs a async job queue with the given thread id
124 * which belongs to the given sync job queue.
126 * @param threadId the thread id
127 * @param sync_jobQueue the sync queue this async queue belongs to
128 * @see com.sun.star.lib.uno.environments.remote.ThreadID
130 JobQueue(JavaThreadPoolFactory javaThreadPoolFactory
, ThreadId threadId
) {
131 _javaThreadPoolFactory
= javaThreadPoolFactory
;
132 _threadId
= ThreadId
.createFresh();
134 _sync_jobQueue
= javaThreadPoolFactory
.getJobQueue(threadId
);
135 if(_sync_jobQueue
== null) {
136 _sync_jobQueue
= new JobQueue(javaThreadPoolFactory
, threadId
, true);
137 _sync_jobQueue
.acquire();
140 _sync_jobQueue
._async_jobQueue
= this;
142 _createThread
= true;
143 _createThread_now
= true;
147 if(DEBUG
) System
.err
.println("##### " + getClass().getName() + " - init:" + _threadId
);
151 * Constructs a sync job queue with the given thread id and the given thread.
153 * @param threadId the thread id
154 * @param createThread if true, the queue creates a worker thread if needed
155 * @see com.sun.star.lib.uno.environments.remote.ThreadID
157 JobQueue(JavaThreadPoolFactory javaThreadPoolFactory
, ThreadId threadId
, boolean createThread
){
158 _javaThreadPoolFactory
= javaThreadPoolFactory
;
159 _threadId
= threadId
;
160 _createThread
= createThread
;
161 _createThread_now
= createThread
;
163 if(DEBUG
) System
.err
.println("##### " + getClass().getName() + " - init:" + _threadId
+ " " + createThread
);
167 * Gives the thread id of this queue
169 * @return the thread id
170 * @see com.sun.star.lib.uno.environments.remote.ThreadID
172 ThreadId
getThreadId() {
176 synchronized void acquire() {
177 // add only synchronous queues .
178 if(_ref_count
<= 0 && _sync_jobQueue
== null )
179 _javaThreadPoolFactory
.addJobQueue(this);
184 synchronized void release() {
187 if(_ref_count
<= 0) {
188 // only synchronous queues needs to be removed .
189 if( _sync_jobQueue
== null )
190 _javaThreadPoolFactory
.removeJobQueue(this);
193 if(_sync_jobQueue
!= null) {
194 _sync_jobQueue
._async_jobQueue
= null;
195 _sync_jobQueue
.release();
201 * Removes a job from the queue.
203 * @return a job or null if timed out
204 * @param waitTime the maximum amount of time to wait for a job
206 private Job
removeJob(int waitTime
) throws Throwable
{
207 if(DEBUG
) System
.err
.println("##### " + getClass().getName() + ".removeJob:" + _head
+ " " + _threadId
);
210 synchronized (this) {
211 // wait max. waitTime time for a job to enter the queue
212 boolean waited
= false;
213 while(_head
== null && (waitTime
== 0 || !waited
)) {
214 if(_doDispose
== _disposeId
) {
219 // notify sync queues
226 catch(InterruptedException interruptedException
) {
227 throw new com
.sun
.star
.uno
.RuntimeException(getClass().getName() + ".removeJob - unexpected:" + interruptedException
);
230 // signal that we have already waited once
247 // always wait for asynchron jobqueue to be finished !
248 if(job
!= null && _async_jobQueue
!= null) {
249 synchronized(_async_jobQueue
) {
250 // wait for async queue to be empty and last job to be done
251 while(_async_jobQueue
._active
|| _async_jobQueue
._head
!= null) {
252 if(DEBUG
) System
.err
.println("waiting for async:" + _async_jobQueue
._head
+ " " + _async_jobQueue
._worker_thread
);
254 if(_doDispose
== _disposeId
) {
260 _async_jobQueue
.wait();
262 catch(InterruptedException interruptedException
) {
263 throw new com
.sun
.star
.uno
.RuntimeException(getClass().getName() + ".removeJob - unexpected:" + interruptedException
);
273 * Puts a job into the queue.
276 * @param disposeId a dispose id
278 synchronized void putJob(Job job
, Object disposeId
) {
279 if(DEBUG
) System
.err
.println("##### " + getClass().getName() + ".putJob todoes: " + " job:" + job
);
288 if(_worker_thread
== null && _createThread
&& _createThread_now
) { // if there is no thread, which dispatches and if shall create one, create one
292 _createThread_now
= false;
293 new JobDispatcher(disposeId
).start();
296 // always notify possible waiters
301 * Enters the job queue.
303 * @return the result of the final job (reply)
304 * @param disposeId a dispose id
306 Object
enter(Object disposeId
) throws Throwable
{
307 return enter(0, disposeId
); // wait infinitly
311 * Enters the job queue.
313 * @return the result of the final job (reply)
314 * @param waitTime the maximum amount of time to wait for a job (0 means wait infinitly)
315 * @param disposeId a dispose id
317 Object
enter(int waitTime
, Object disposeId
) throws Throwable
{
318 if(DEBUG
) System
.err
.println("#####" + getClass().getName() + ".enter: " + _threadId
);
320 boolean quit
= false;
322 Object hold_disposeId
= _disposeId
;
323 _disposeId
= disposeId
;
325 Object result
= null;
327 Thread hold_worker_thread
= _worker_thread
;
328 _worker_thread
= Thread
.currentThread();
334 job
= removeJob(waitTime
);
338 result
= job
.execute();
344 if (!job
.isRequest()) {
357 finally { // ensure that this queue becomes disposed, if necessary
358 if(DEBUG
) System
.err
.println("##### " + getClass().getName() + ".enter leaving: " + _threadId
+ " " + _worker_thread
+ " " + hold_worker_thread
+ " " + result
);
361 if(job
!= null || (quit
&& _head
== null)) {
362 _worker_thread
= hold_worker_thread
;
364 _createThread_now
= true;
366 _disposeId
= hold_disposeId
;
368 if(_sync_jobQueue
!= null)
369 notifyAll(); // notify waiters (e.g. this is an asyncQueue and there is a sync waiting)
382 * If the given disposeId is registered,
383 * interrups the worker thread.
385 * @param disposeId the dispose id
387 synchronized void dispose(Object disposeId
, Throwable throwable
) {
388 if(_sync_jobQueue
== null) { // dispose only sync queues
389 _doDispose
= disposeId
;
390 _throwable
= throwable
;
392 // get thread out of wait and let it throw the throwable
393 if(DEBUG
) System
.err
.println(getClass().getName() + ".dispose - notifying thread");