2 * This file is part of the LibreOffice project.
4 * This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 * This file incorporates work covered by the following license notice:
10 * Licensed to the Apache Software Foundation (ASF) under one or more
11 * contributor license agreements. See the NOTICE file distributed
12 * with this work for additional information regarding copyright
13 * ownership. The ASF licenses this file to you under the Apache
14 * License, Version 2.0 (the "License"); you may not use this file
15 * except in compliance with the License. You may obtain a copy of
16 * the License at http://www.apache.org/licenses/LICENSE-2.0 .
19 package com
.sun
.star
.lib
.uno
.environments
.remote
;
21 import com
.sun
.star
.lang
.DisposedException
;
24 * The <code>JobQueue</code> implements a queue for jobs.
26 * <p>For every jobs thread id exists a job queue which is registered
27 * at the <code>ThreadPool</code>.</p>
29 * <p>A JobQueue is splitted in a sync job queue and an async job queue.
30 * The sync job queue is the registered queue, it delegates async jobs
31 * (put by <code>putjob</code>) into the async queue, which is only
32 * known by the sync queue.</p>
34 * @see com.sun.star.lib.uno.environments.remote.ThreadPool
35 * @see com.sun.star.lib.uno.environments.remote.Job
36 * @see com.sun.star.lib.uno.environments.remote.ThreadId
39 public class JobQueue
{
41 * When set to true, enables various debugging output.
43 private static final boolean DEBUG
= false;
45 protected Job _head
; // the head of the job list
46 protected Job _tail
; // the tail of the job list
48 protected ThreadId _threadId
; // the thread id of the queue
49 protected int _ref_count
= 0; // the stack deepness
50 protected boolean _createThread
; // create a worker thread, if needed
51 protected boolean _createThread_now
; // create a worker thread, if needed
52 protected Thread _worker_thread
; // the thread that does the jobs
54 protected Object _disposeId
; // the active dispose id
55 protected Object _doDispose
= null;
56 protected Throwable _throwable
;
58 protected JobQueue _async_jobQueue
; // chaining job qeueus for asyncs
59 protected JobQueue _sync_jobQueue
; // chaining job qeueus for syncs
61 protected boolean _active
= false;
63 protected JavaThreadPoolFactory _javaThreadPoolFactory
;
66 * A thread for dispatching jobs.
68 class JobDispatcher
extends Thread
{
71 JobDispatcher(Object disposeId
) {
72 super("JobDispatcher");
74 if(DEBUG
) System
.err
.println("JobQueue$JobDispatcher.<init>:" + _threadId
);
76 _disposeId
= disposeId
;
79 ThreadId
getThreadId() {
85 if(DEBUG
) System
.err
.println("ThreadPool$JobDispatcher.run: " + Thread
.currentThread());
88 enter(2000, _disposeId
);
89 } catch(Throwable throwable
) {
90 if(_head
!= null || _active
) { // there was a job in progress, so give a stack
91 System
.err
.println(getClass().getName() + " - exception occurred:" + throwable
);
92 throwable
.printStackTrace(System
.err
);
99 if(DEBUG
) System
.err
.println("##### " + getClass().getName() + ".run - exit:" + _threadId
);
105 * Constructs a async job queue with the given thread id which belongs to
106 * the given sync job queue.
108 * @param threadId the thread id.
109 * @see com.sun.star.lib.uno.environments.remote.ThreadId
111 JobQueue(JavaThreadPoolFactory javaThreadPoolFactory
, ThreadId threadId
) {
112 _javaThreadPoolFactory
= javaThreadPoolFactory
;
113 _threadId
= ThreadId
.createFresh();
115 _sync_jobQueue
= javaThreadPoolFactory
.getJobQueue(threadId
);
116 if(_sync_jobQueue
== null) {
117 _sync_jobQueue
= new JobQueue(javaThreadPoolFactory
, threadId
, true);
118 _sync_jobQueue
.acquire();
121 _sync_jobQueue
._async_jobQueue
= this;
123 _createThread
= true;
124 _createThread_now
= true;
128 if(DEBUG
) System
.err
.println("##### " + getClass().getName() + " - init:" + _threadId
);
132 * Constructs a sync job queue with the given thread id and the given thread.
134 * @param threadId the thread id.
135 * @param createThread if true, the queue creates a worker thread if needed.
136 * @see com.sun.star.lib.uno.environments.remote.ThreadId
138 JobQueue(JavaThreadPoolFactory javaThreadPoolFactory
, ThreadId threadId
, boolean createThread
){
139 _javaThreadPoolFactory
= javaThreadPoolFactory
;
140 _threadId
= threadId
;
141 _createThread
= createThread
;
142 _createThread_now
= createThread
;
144 if(DEBUG
) System
.err
.println("##### " + getClass().getName() + " - init:" + _threadId
+ " " + createThread
);
148 * Gives the thread id of this queue.
150 * @return the thread id.
151 * @see com.sun.star.lib.uno.environments.remote.ThreadId
153 ThreadId
getThreadId() {
157 synchronized void acquire() {
158 // add only synchronous queues .
159 if(_ref_count
<= 0 && _sync_jobQueue
== null )
160 _javaThreadPoolFactory
.addJobQueue(this);
165 synchronized void release() {
168 if(_ref_count
<= 0) {
169 // only synchronous queues needs to be removed .
170 if( _sync_jobQueue
== null )
171 _javaThreadPoolFactory
.removeJobQueue(this);
174 if(_sync_jobQueue
!= null) {
175 _sync_jobQueue
._async_jobQueue
= null;
176 _sync_jobQueue
.release();
182 * Removes a job from the queue.
184 * @param waitTime the maximum amount of time to wait for a job.
185 * @return a job or null if timed out.
187 private Job
removeJob(int waitTime
) {
188 if(DEBUG
) System
.err
.println("##### " + getClass().getName() + ".removeJob:" + _head
+ " " + _threadId
);
191 synchronized (this) {
192 // wait max. waitTime time for a job to enter the queue
193 boolean waited
= false;
194 while(_head
== null && (waitTime
== 0 || !waited
)) {
195 if(_doDispose
== _disposeId
) {
197 throw (DisposedException
)
198 new DisposedException().initCause(_throwable
);
201 // notify sync queues
207 } catch(InterruptedException interruptedException
) {
208 throw new com
.sun
.star
.uno
.RuntimeException(getClass().getName() + ".removeJob - unexpected:" + interruptedException
);
211 // signal that we have already waited once
228 // always wait for asynchron jobqueue to be finished !
229 if(job
!= null && _async_jobQueue
!= null) {
230 synchronized(_async_jobQueue
) {
231 // wait for async queue to be empty and last job to be done
232 while(_async_jobQueue
._active
|| _async_jobQueue
._head
!= null) {
233 if(DEBUG
) System
.err
.println("waiting for async:" + _async_jobQueue
._head
+ " " + _async_jobQueue
._worker_thread
);
235 if(_doDispose
== _disposeId
) {
237 throw (DisposedException
)
238 new DisposedException().initCause(_throwable
);
242 _async_jobQueue
.wait();
243 } catch(InterruptedException interruptedException
) {
244 throw new com
.sun
.star
.uno
.RuntimeException(getClass().getName() + ".removeJob - unexpected:" + interruptedException
);
254 * Puts a job into the queue.
256 * @param job the job.
257 * @param disposeId a dispose id.
259 synchronized void putJob(Job job
, Object disposeId
) {
260 if(DEBUG
) System
.err
.println("##### " + getClass().getName() + ".putJob todoes: " + " job:" + job
);
269 if(_worker_thread
== null && _createThread
&& _createThread_now
) { // if there is no thread, which dispatches and if shall create one, create one
273 _createThread_now
= false;
274 new JobDispatcher(disposeId
).start();
277 // always notify possible waiters
282 * Enters the job queue.
284 * @param disposeId a dispose id.
285 * @return the result of the final job (reply).
287 Object
enter(Object disposeId
) throws Throwable
{
288 return enter(0, disposeId
); // wait infinitly
292 * Enters the job queue.
294 * @param waitTime the maximum amount of time to wait for a job (0 means wait infinitly).
295 * @param disposeId a dispose id.
296 * @return the result of the final job (reply).
298 Object
enter(int waitTime
, Object disposeId
) throws Throwable
{
299 if(DEBUG
) System
.err
.println("#####" + getClass().getName() + ".enter: " + _threadId
);
301 boolean quit
= false;
303 Object hold_disposeId
= _disposeId
;
304 _disposeId
= disposeId
;
306 Object result
= null;
308 Thread hold_worker_thread
= _worker_thread
;
309 _worker_thread
= Thread
.currentThread();
315 job
= removeJob(waitTime
);
319 result
= job
.execute();
325 if (!job
.isRequest()) {
338 finally { // ensure that this queue becomes disposed, if necessary
339 if(DEBUG
) System
.err
.println("##### " + getClass().getName() + ".enter leaving: " + _threadId
+ " " + _worker_thread
+ " " + hold_worker_thread
+ " " + result
);
342 if(job
!= null || (quit
&& _head
== null)) {
343 _worker_thread
= hold_worker_thread
;
345 _createThread_now
= true;
347 _disposeId
= hold_disposeId
;
349 if(_sync_jobQueue
!= null)
350 notifyAll(); // notify waiters (e.g. this is an asyncQueue and there is a sync waiting)
363 * If the given disposeId is registered, interrups the worker thread.
365 * @param disposeId the dispose id.
367 synchronized void dispose(Object disposeId
, Throwable throwable
) {
368 if(_sync_jobQueue
== null) { // dispose only sync queues
369 _doDispose
= disposeId
;
370 _throwable
= throwable
;
372 // get thread out of wait and let it throw the throwable
373 if(DEBUG
) System
.err
.println(getClass().getName() + ".dispose - notifying thread");