bump product version to 5.0.4.1
[LibreOffice.git] / jurt / com / sun / star / lib / uno / environments / remote / JobQueue.java
blob9d964761b187a561ca766b9e5416c81f088556ab
1 /*
2 * This file is part of the LibreOffice project.
4 * This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 * This file incorporates work covered by the following license notice:
10 * Licensed to the Apache Software Foundation (ASF) under one or more
11 * contributor license agreements. See the NOTICE file distributed
12 * with this work for additional information regarding copyright
13 * ownership. The ASF licenses this file to you under the Apache
14 * License, Version 2.0 (the "License"); you may not use this file
15 * except in compliance with the License. You may obtain a copy of
16 * the License at http://www.apache.org/licenses/LICENSE-2.0 .
19 package com.sun.star.lib.uno.environments.remote;
21 import com.sun.star.lang.DisposedException;
23 /**
24 * The <code>JobQueue</code> implements a queue for jobs.
26 * <p>For every jobs thread id exists a job queue which is registered
27 * at the <code>ThreadPool</code>.</p>
29 * <p>A JobQueue is splitted in a sync job queue and an async job queue.
30 * The sync job queue is the registered queue, it delegates async jobs
31 * (put by <code>putjob</code>) into the async queue, which is only
32 * known by the sync queue.</p>
34 * @see com.sun.star.lib.uno.environments.remote.ThreadPool
35 * @see com.sun.star.lib.uno.environments.remote.Job
36 * @see com.sun.star.lib.uno.environments.remote.ThreadId
37 * @since UDK1.0
39 public class JobQueue {
40 /**
41 * When set to true, enables various debugging output.
43 private static final boolean DEBUG = false;
45 protected Job _head; // the head of the job list
46 protected Job _tail; // the tail of the job list
48 protected ThreadId _threadId; // the thread id of the queue
49 protected int _ref_count = 0; // the stack deepness
50 protected boolean _createThread; // create a worker thread, if needed
51 protected boolean _createThread_now; // create a worker thread, if needed
52 protected Thread _worker_thread; // the thread that does the jobs
54 protected Object _disposeId; // the active dispose id
55 protected Object _doDispose = null;
56 protected Throwable _throwable;
58 protected JobQueue _async_jobQueue; // chaining job qeueus for asyncs
59 protected JobQueue _sync_jobQueue; // chaining job qeueus for syncs
61 protected boolean _active = false;
63 protected JavaThreadPoolFactory _javaThreadPoolFactory;
65 /**
66 * A thread for dispatching jobs.
68 class JobDispatcher extends Thread {
69 Object _disposeId;
71 JobDispatcher(Object disposeId) {
72 super("JobDispatcher");
74 if(DEBUG) System.err.println("JobQueue$JobDispatcher.<init>:" + _threadId);
76 _disposeId = disposeId;
79 ThreadId getThreadId() {
80 return _threadId;
83 @Override
84 public void run() {
85 if(DEBUG) System.err.println("ThreadPool$JobDispatcher.run: " + Thread.currentThread());
87 try {
88 enter(2000, _disposeId);
89 } catch(Throwable throwable) {
90 if(_head != null || _active) { // there was a job in progress, so give a stack
91 System.err.println(getClass().getName() + " - exception occurred:" + throwable);
92 throwable.printStackTrace(System.err);
95 finally {
96 release();
99 if(DEBUG) System.err.println("##### " + getClass().getName() + ".run - exit:" + _threadId);
105 * Constructs a async job queue with the given thread id which belongs to
106 * the given sync job queue.
108 * @param threadId the thread id.
109 * @see com.sun.star.lib.uno.environments.remote.ThreadId
111 JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId) {
112 _javaThreadPoolFactory = javaThreadPoolFactory;
113 _threadId = ThreadId.createFresh();
115 _sync_jobQueue = javaThreadPoolFactory.getJobQueue(threadId);
116 if(_sync_jobQueue == null) {
117 _sync_jobQueue = new JobQueue(javaThreadPoolFactory, threadId, true);
118 _sync_jobQueue.acquire();
121 _sync_jobQueue._async_jobQueue = this;
123 _createThread = true;
124 _createThread_now = true;
126 acquire();
128 if(DEBUG) System.err.println("##### " + getClass().getName() + " - init:" + _threadId);
132 * Constructs a sync job queue with the given thread id and the given thread.
134 * @param threadId the thread id.
135 * @param createThread if true, the queue creates a worker thread if needed.
136 * @see com.sun.star.lib.uno.environments.remote.ThreadId
138 JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId, boolean createThread){
139 _javaThreadPoolFactory = javaThreadPoolFactory;
140 _threadId = threadId;
141 _createThread = createThread;
142 _createThread_now = createThread;
144 if(DEBUG) System.err.println("##### " + getClass().getName() + " - init:" + _threadId + " " + createThread);
148 * Gives the thread id of this queue.
150 * @return the thread id.
151 * @see com.sun.star.lib.uno.environments.remote.ThreadId
153 ThreadId getThreadId() {
154 return _threadId;
157 synchronized void acquire() {
158 // add only synchronous queues .
159 if(_ref_count <= 0 && _sync_jobQueue == null )
160 _javaThreadPoolFactory.addJobQueue(this);
162 ++ _ref_count;
165 synchronized void release() {
166 -- _ref_count;
168 if(_ref_count <= 0) {
169 // only synchronous queues needs to be removed .
170 if( _sync_jobQueue == null )
171 _javaThreadPoolFactory.removeJobQueue(this);
174 if(_sync_jobQueue != null) {
175 _sync_jobQueue._async_jobQueue = null;
176 _sync_jobQueue.release();
182 * Removes a job from the queue.
184 * @param waitTime the maximum amount of time to wait for a job.
185 * @return a job or null if timed out.
187 private Job removeJob(int waitTime) {
188 if(DEBUG) System.err.println("##### " + getClass().getName() + ".removeJob:" + _head + " " + _threadId);
190 Job job = null;
191 synchronized (this) {
192 // wait max. waitTime time for a job to enter the queue
193 boolean waited = false;
194 while(_head == null && (waitTime == 0 || !waited)) {
195 if(_doDispose == _disposeId) {
196 _doDispose = null;
197 throw (DisposedException)
198 new DisposedException().initCause(_throwable);
201 // notify sync queues
202 notifyAll();
204 try {
205 // wait for new job
206 wait(waitTime);
207 } catch(InterruptedException interruptedException) {
208 throw new com.sun.star.uno.RuntimeException(getClass().getName() + ".removeJob - unexpected:" + interruptedException);
211 // signal that we have already waited once
212 waited = true;
216 if(_head != null) {
217 Job current = _head;
218 _head = _head._next;
220 if(_head == null)
221 _tail = null;
223 job = current;
224 _active = true;
228 // always wait for asynchron jobqueue to be finished !
229 if(job != null && _async_jobQueue != null) {
230 synchronized(_async_jobQueue) {
231 // wait for async queue to be empty and last job to be done
232 while(_async_jobQueue._active || _async_jobQueue._head != null) {
233 if(DEBUG) System.err.println("waiting for async:" + _async_jobQueue._head + " " + _async_jobQueue._worker_thread);
235 if(_doDispose == _disposeId) {
236 _doDispose = null;
237 throw (DisposedException)
238 new DisposedException().initCause(_throwable);
241 try {
242 _async_jobQueue.wait();
243 } catch(InterruptedException interruptedException) {
244 throw new com.sun.star.uno.RuntimeException(getClass().getName() + ".removeJob - unexpected:" + interruptedException);
250 return job;
254 * Puts a job into the queue.
256 * @param job the job.
257 * @param disposeId a dispose id.
259 synchronized void putJob(Job job, Object disposeId) {
260 if(DEBUG) System.err.println("##### " + getClass().getName() + ".putJob todoes: " + " job:" + job);
262 if(_tail != null)
263 _tail._next = job;
264 else
265 _head = job;
267 _tail = job;
269 if(_worker_thread == null && _createThread && _createThread_now) { // if there is no thread, which dispatches and if shall create one, create one
271 acquire();
273 _createThread_now = false;
274 new JobDispatcher(disposeId).start();
277 // always notify possible waiters
278 notifyAll();
282 * Enters the job queue.
284 * @param disposeId a dispose id.
285 * @return the result of the final job (reply).
287 Object enter(Object disposeId) throws Throwable {
288 return enter(0, disposeId); // wait infinitly
292 * Enters the job queue.
294 * @param waitTime the maximum amount of time to wait for a job (0 means wait infinitly).
295 * @param disposeId a dispose id.
296 * @return the result of the final job (reply).
298 Object enter(int waitTime, Object disposeId) throws Throwable {
299 if(DEBUG) System.err.println("#####" + getClass().getName() + ".enter: " + _threadId);
301 boolean quit = false;
303 Object hold_disposeId = _disposeId;
304 _disposeId = disposeId;
306 Object result = null;
308 Thread hold_worker_thread = _worker_thread;
309 _worker_thread = Thread.currentThread();
311 while(!quit) {
312 Job job = null;
314 try {
315 job = removeJob(waitTime);
317 if(job != null) {
318 try {
319 result = job.execute();
321 finally {
322 _active = false;
325 if (!job.isRequest()) {
326 job.dispose();
328 quit = true;
331 job = null;
333 else
334 quit = true;
338 finally { // ensure that this queue becomes disposed, if necessary
339 if(DEBUG) System.err.println("##### " + getClass().getName() + ".enter leaving: " + _threadId + " " + _worker_thread + " " + hold_worker_thread + " " + result);
341 synchronized(this) {
342 if(job != null || (quit && _head == null)) {
343 _worker_thread = hold_worker_thread;
345 _createThread_now = true;
347 _disposeId = hold_disposeId;
349 if(_sync_jobQueue != null)
350 notifyAll(); // notify waiters (e.g. this is an asyncQueue and there is a sync waiting)
352 else
353 quit = false;
359 return result;
363 * If the given disposeId is registered, interrups the worker thread.
365 * @param disposeId the dispose id.
367 synchronized void dispose(Object disposeId, Throwable throwable) {
368 if(_sync_jobQueue == null) { // dispose only sync queues
369 _doDispose = disposeId;
370 _throwable = throwable;
372 // get thread out of wait and let it throw the throwable
373 if(DEBUG) System.err.println(getClass().getName() + ".dispose - notifying thread");
375 notifyAll();