From 1528e9d8a70cef36366413ccf332d5a655f0f46a Mon Sep 17 00:00:00 2001 From: Frank Maritato Date: Wed, 30 Jun 2010 20:56:42 +0000 Subject: [PATCH] merged in queueSize from 0.2.2 release git-svn-id: https://lwes.svn.sourceforge.net/svnroot/lwes/lwes-java/trunk@511 a2f82657-cdd2-4550-bd36-68a8e7111808 --- .../org/lwes/listener/ThreadedEventListener.java | 185 +++++---- .../java/org/lwes/listener/ThreadedProcessor.java | 460 +++++++++++---------- 2 files changed, 354 insertions(+), 291 deletions(-) rewrite src/main/java/org/lwes/listener/ThreadedEventListener.java (92%) rewrite src/main/java/org/lwes/listener/ThreadedProcessor.java (84%) diff --git a/src/main/java/org/lwes/listener/ThreadedEventListener.java b/src/main/java/org/lwes/listener/ThreadedEventListener.java dissimilarity index 92% index 3560ae7..811657c 100644 --- a/src/main/java/org/lwes/listener/ThreadedEventListener.java +++ b/src/main/java/org/lwes/listener/ThreadedEventListener.java @@ -1,80 +1,105 @@ -package org.lwes.listener; - -import org.lwes.EventSystemException; - -public abstract class ThreadedEventListener implements EventListener { - /* the processor for handling events */ - protected ThreadedProcessor processor = new ThreadedProcessor(); - - /* the event enqueuer and dequeuer */ - private ThreadedEnqueuer enqueuer = null; - private ThreadedDequeuer dequeuer = null; - - /** - * Default constructor. - */ - public ThreadedEventListener() { - } - - /** - * Gets the enqueuer being used by this listener - * @return the enqueuer - */ - public ThreadedEnqueuer getEnqueuer() { - return enqueuer; - } - - /** - * Sets the enqueuer to use for this listener - * @param enqueuer the enqueuer to set - */ - public void setEnqueuer(ThreadedEnqueuer enqueuer) { - this.enqueuer = enqueuer; - } - - /** - * Gets the dequeuer being used by this listener - * @return the dequeuer - */ - public ThreadedDequeuer getDequeuer() { - return dequeuer; - } - - /** - * Sets the dequeuer used by this listener - * @param dequeuer the dequeuer to set - */ - public void setDequeuer(ThreadedDequeuer dequeuer) { - this.dequeuer = dequeuer; - } - - /** - * Add an EventHandler to handle events for processing. - * @param handler the EventHandler to add - */ - public void addHandler(EventHandler handler) { - - } - - /** - * Initializes this listener, and starts the processor threads - * @throws EventSystemException if there is a problem initializing the listener - */ - public void initialize() throws EventSystemException { - if(processor == null) throw new EventSystemException("No processor exists"); - if(enqueuer == null) throw new EventSystemException("No enqueuer exists"); - if(dequeuer == null) throw new EventSystemException("No dequeuer exists"); - - processor.setEnqueuerPriority(Thread.MAX_PRIORITY); - processor.setDequeuerPriority(Thread.NORM_PRIORITY); - processor.setEnqueuer(enqueuer); - processor.setDequeuer(dequeuer); - processor.initialize(); - } - - public void shutdown() throws EventSystemException { - if(processor == null) throw new EventSystemException("No processor exists"); - processor.shutdown(); - } - -} +package org.lwes.listener; + +import org.lwes.EventSystemException; + +public abstract class ThreadedEventListener implements EventListener { + /* the processor for handling events */ + protected ThreadedProcessor processor = new ThreadedProcessor(); + + /* the event enqueuer and dequeuer */ + private ThreadedEnqueuer enqueuer = null; + private ThreadedDequeuer dequeuer = null; + + private int queueSize = -1; + + /** + * Default constructor. + */ + public ThreadedEventListener() { + } + + public int getQueueSize() { + return queueSize; + } + + public void setQueueSize(int queueSize) { + this.queueSize = queueSize; + } + + /** + * Gets the enqueuer being used by this listener + * + * @return the enqueuer + */ + public ThreadedEnqueuer getEnqueuer() { + return enqueuer; + } + + /** + * Sets the enqueuer to use for this listener + * + * @param enqueuer the enqueuer to set + */ + public void setEnqueuer(ThreadedEnqueuer enqueuer) { + this.enqueuer = enqueuer; + } + + /** + * Gets the dequeuer being used by this listener + * + * @return the dequeuer + */ + public ThreadedDequeuer getDequeuer() { + return dequeuer; + } + + /** + * Sets the dequeuer used by this listener + * + * @param dequeuer the dequeuer to set + */ + public void setDequeuer(ThreadedDequeuer dequeuer) { + this.dequeuer = dequeuer; + } + + /** + * Add an EventHandler to handle events for processing. + * + * @param handler the EventHandler to add + */ + public void addHandler(EventHandler handler) { + + } + + /** + * Initializes this listener, and starts the processor threads + * + * @throws EventSystemException if there is a problem initializing the listener + */ + public void initialize() throws EventSystemException { + if (processor == null) { + throw new EventSystemException("No processor exists"); + } + if (enqueuer == null) { + throw new EventSystemException("No enqueuer exists"); + } + if (dequeuer == null) { + throw new EventSystemException("No dequeuer exists"); + } + + processor.setQueueSize(queueSize); + processor.setEnqueuerPriority(Thread.MAX_PRIORITY); + processor.setDequeuerPriority(Thread.NORM_PRIORITY); + processor.setEnqueuer(enqueuer); + processor.setDequeuer(dequeuer); + processor.initialize(); + } + + public void shutdown() throws EventSystemException { + if (processor == null) { + throw new EventSystemException("No processor exists"); + } + processor.shutdown(); + } + +} diff --git a/src/main/java/org/lwes/listener/ThreadedProcessor.java b/src/main/java/org/lwes/listener/ThreadedProcessor.java dissimilarity index 84% index 9e3a955..f44f399 100644 --- a/src/main/java/org/lwes/listener/ThreadedProcessor.java +++ b/src/main/java/org/lwes/listener/ThreadedProcessor.java @@ -1,211 +1,249 @@ -package org.lwes.listener; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.lwes.EventSystemException; - -import java.util.concurrent.LinkedBlockingQueue; - -/** - * A threaded, queueing event processor. This class requires setting a class to - * enqueue events (for example, a network listener) and a class to dequeue - * events (for example, writing to disk). - * - * @author Anthony Molinaro - * @author Michael P. Lum - */ -public class ThreadedProcessor implements Runnable { - - private static transient Log log = LogFactory.getLog(ThreadedProcessor.class); - - /* a flag to tell whether or not the thread is running */ - private boolean running = false; - - /* the number of seconds to sleep */ - private int seconds = 30; - - /* the thread placing events into the queue */ - private ThreadedEnqueuer enqueuer = null; - - /* the thread dispatching events from the queue */ - private ThreadedDequeuer dequeuer = null; - - /* the enqueuer thread */ - private Thread enqueuerThread = null; - - /* the dequeuer thread */ - private Thread dequeuerThread = null; - - /* a watcher thread (myself) */ - private Thread watcherThread = null; - - /* the queue for events */ - private LinkedBlockingQueue queue = null; - - /* the priority for the enqueuing thread */ - int enqueuerPriority = Thread.NORM_PRIORITY; - - /* the priority for the dequeuing thread */ - int dequeuerPriority = Thread.NORM_PRIORITY; - - /* the priority for the watcher thread */ - int watcherPriority = Thread.MIN_PRIORITY; - - /** - * Default constructor. - */ - public ThreadedProcessor() { - } - - /** - * Gets the enqueuer being used by this event processor - * @return the ThreadedEnqueuer being used by this processor - */ - public ThreadedEnqueuer getEnqueuer() { - return this.enqueuer; - } - - /** - * Sets the enqueuer to use for this event processor. - * @param enqueuer the ThreadedEnqueuer to use - */ - public void setEnqueuer(ThreadedEnqueuer enqueuer) { - this.enqueuer = enqueuer; - } - - /** - * Gets the dequeuer being used by this event processor - * @return the ThreadedDequeuer being used by this processor - */ - public ThreadedDequeuer getDequeuer() { - return this.dequeuer; - } - - /** - * Sets the dequeuer to use for this event processor. - * @param dequeuer the ThreadedDequeuer to use - */ - public void setDequeuer(ThreadedDequeuer dequeuer) { - this.dequeuer = dequeuer; - } - - /** - * Returns the List being used as the queue - * @return the List object - */ - public synchronized LinkedBlockingQueue getQueue() { - return this.queue; - } - - /** - * Sets the List being used as the queue. - * Warning: this list needs to be thread-synchronized! - * @param queue the List to use for this processor - */ - public synchronized void setQueue(LinkedBlockingQueue queue) { - this.queue = queue; - } - - /** - * Returns the thread priority of the enqueuer. - * @return the thread priority - */ - public int getEnqueuerPriority() { - return this.enqueuerPriority; - } - - /** - * Sets the thread priority of the enqueuer. - * @param priority the thread priority to use - */ - public void setEnqueuerPriority(int priority) { - this.enqueuerPriority = priority; - } - - /** - * Returns the thread priority of the dequeuer. - * @return the thread priority - */ - public int getDequeuerPriority() { - return this.dequeuerPriority; - } - - /** - * Sets the thread priority of the dequeuer - * @param priority the thread priority to use - */ - public void setDequeuerPriority(int priority) { - this.dequeuerPriority = priority; - } - - /** - * Initializes the processor to handle events. Starts the enqueuer and - * dequeuer threads. - * - * @throws EventSystemException - * if there is a problem setting up the processor - */ - public void initialize() throws EventSystemException { - if (enqueuer == null) { - throw new EventSystemException( - "Event enqueuer is not set, call setEnqueuer() first"); - } - - if (dequeuer == null) { - throw new EventSystemException( - "Event dequeuer is not set call setDequeuer() first"); - } - - /* create a queue if it doesn't exist */ - if (queue == null) { - queue = new LinkedBlockingQueue(); - } - - /* make the queue available to the enqueuer and dequeuer */ - dequeuer.setQueue(queue); - enqueuer.setQueue(queue); - - try { - dequeuer.initialize(); - dequeuerThread = new Thread(dequeuer, "Dequeueing Thread"); - dequeuerThread.setPriority(dequeuerPriority); - dequeuerThread.start(); - - enqueuer.initialize(); - enqueuerThread = new Thread(enqueuer, "Enqueueing Thread"); - enqueuerThread.setPriority(enqueuerPriority); - enqueuerThread.start(); - - watcherThread = new Thread(this, "Watcher Thread"); - watcherThread.setPriority(watcherPriority); - watcherThread.start(); - } catch (Exception ie) { - throw new EventSystemException("Unable to start ThreadedProcessor", - ie); - } - } - - /** - * Shuts down the event listener. Stops the enqueuer and dequeuer threads. - */ - public synchronized void shutdown() { - running = false; - dequeuer.shutdown(); - enqueuer.shutdown(); - } - - /** - * The thread's execution loop. Doesn't do much because the enqueue and - * dequeue threads do the heavy lifting. - */ - public final void run() { - running = true; - while (running) { - try { - Thread.sleep(seconds * 1000L); - } catch (InterruptedException ie) { - log.warn("ThreadedProcessor interrupted", ie); - } - } - } -} +package org.lwes.listener; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.lwes.EventSystemException; + +import java.util.concurrent.LinkedBlockingQueue; + +/** + * A threaded, queueing event processor. This class requires setting a class to + * enqueue events (for example, a network listener) and a class to dequeue + * events (for example, writing to disk). + * + * @author Anthony Molinaro + * @author Michael P. Lum + */ +public class ThreadedProcessor implements Runnable { + + private static transient Log log = LogFactory.getLog(ThreadedProcessor.class); + + /* a flag to tell whether or not the thread is running */ + private boolean running = false; + + /* the number of seconds to sleep */ + private int seconds = 30; + + /* the thread placing events into the queue */ + private ThreadedEnqueuer enqueuer = null; + + /* the thread dispatching events from the queue */ + private ThreadedDequeuer dequeuer = null; + + /* the enqueuer thread */ + private Thread enqueuerThread = null; + + /* the dequeuer thread */ + private Thread dequeuerThread = null; + + /* a watcher thread (myself) */ + private Thread watcherThread = null; + + /* the queue for events */ + private LinkedBlockingQueue queue = null; + + /* the priority for the enqueuing thread */ + int enqueuerPriority = Thread.NORM_PRIORITY; + + /* the priority for the dequeuing thread */ + int dequeuerPriority = Thread.NORM_PRIORITY; + + /* the priority for the watcher thread */ + int watcherPriority = Thread.MIN_PRIORITY; + + /** + * The maximum size of the queue. If you don't set it will be unlimited. + */ + private int queueSize = -1; + + /** + * Default constructor. + */ + public ThreadedProcessor() { + } + + /** + * Gets the enqueuer being used by this event processor + * + * @return the ThreadedEnqueuer being used by this processor + */ + public ThreadedEnqueuer getEnqueuer() { + return this.enqueuer; + } + + /** + * Sets the enqueuer to use for this event processor. + * + * @param enqueuer the ThreadedEnqueuer to use + */ + public void setEnqueuer(ThreadedEnqueuer enqueuer) { + this.enqueuer = enqueuer; + } + + /** + * Gets the dequeuer being used by this event processor + * + * @return the ThreadedDequeuer being used by this processor + */ + public ThreadedDequeuer getDequeuer() { + return this.dequeuer; + } + + /** + * Sets the dequeuer to use for this event processor. + * + * @param dequeuer the ThreadedDequeuer to use + */ + public void setDequeuer(ThreadedDequeuer dequeuer) { + this.dequeuer = dequeuer; + } + + /** + * Returns the List being used as the queue + * + * @return the List object + */ + public synchronized LinkedBlockingQueue getQueue() { + return this.queue; + } + + /** + * Sets the List being used as the queue. + * Warning: this list needs to be thread-synchronized! + * + * @param queue the List to use for this processor + */ + public synchronized void setQueue(LinkedBlockingQueue queue) { + this.queue = queue; + } + + /** + * Returns the thread priority of the enqueuer. + * + * @return the thread priority + */ + public int getEnqueuerPriority() { + return this.enqueuerPriority; + } + + /** + * Sets the thread priority of the enqueuer. + * + * @param priority the thread priority to use + */ + public void setEnqueuerPriority(int priority) { + this.enqueuerPriority = priority; + } + + /** + * Returns the thread priority of the dequeuer. + * + * @return the thread priority + */ + public int getDequeuerPriority() { + return this.dequeuerPriority; + } + + /** + * Sets the thread priority of the dequeuer + * + * @param priority the thread priority to use + */ + public void setDequeuerPriority(int priority) { + this.dequeuerPriority = priority; + } + + /** + * @return the maximum queue size + */ + public int getQueueSize() { + return queueSize; + } + + /** + * Use this to set an upper bound on how big the queue can get. If you + * don't set, it will be unbounded and you risk OOME. If you do set you + * risk dropping events. + * + * @param queueSize Sets the maximum size for the internal queue. + */ + public void setQueueSize(int queueSize) { + this.queueSize = queueSize; + } + + /** + * Initializes the processor to handle events. Starts the enqueuer and + * dequeuer threads. + * + * @throws EventSystemException if there is a problem setting up the processor + */ + public void initialize() throws EventSystemException { + if (enqueuer == null) { + throw new EventSystemException( + "Event enqueuer is not set, call setEnqueuer() first"); + } + + if (dequeuer == null) { + throw new EventSystemException( + "Event dequeuer is not set call setDequeuer() first"); + } + + /* create a queue if it doesn't exist */ + if (queue == null) { + if (queueSize > 0) { + queue = new LinkedBlockingQueue(queueSize); + } + else { + queue = new LinkedBlockingQueue(); + } + } + + /* make the queue available to the enqueuer and dequeuer */ + dequeuer.setQueue(queue); + enqueuer.setQueue(queue); + + try { + dequeuer.initialize(); + dequeuerThread = new Thread(dequeuer, "Dequeueing Thread"); + dequeuerThread.setPriority(dequeuerPriority); + dequeuerThread.start(); + + enqueuer.initialize(); + enqueuerThread = new Thread(enqueuer, "Enqueueing Thread"); + enqueuerThread.setPriority(enqueuerPriority); + enqueuerThread.start(); + + watcherThread = new Thread(this, "Watcher Thread"); + watcherThread.setPriority(watcherPriority); + watcherThread.start(); + } + catch (Exception ie) { + throw new EventSystemException("Unable to start ThreadedProcessor", + ie); + } + } + + /** + * Shuts down the event listener. Stops the enqueuer and dequeuer threads. + */ + public synchronized void shutdown() { + running = false; + dequeuer.shutdown(); + enqueuer.shutdown(); + } + + /** + * The thread's execution loop. Doesn't do much because the enqueue and + * dequeue threads do the heavy lifting. + */ + public final void run() { + running = true; + while (running) { + try { + Thread.sleep(seconds * 1000L); + } catch (InterruptedException ie) { + log.warn("ThreadedProcessor interrupted", ie); + } + } + } +} -- 2.11.4.GIT