From 258aa8a1f0575393884cdcf8eda18d1f72ce3073 Mon Sep 17 00:00:00 2001 From: Frank Maritato Date: Fri, 16 Apr 2010 18:04:59 +0000 Subject: [PATCH] use commons-logging git-svn-id: https://lwes.svn.sourceforge.net/svnroot/lwes/lwes-java/trunk@444 a2f82657-cdd2-4550-bd36-68a8e7111808 --- .../java/org/lwes/listener/DatagramDequeuer.java | 22 +- .../java/org/lwes/listener/DatagramEnqueuer.java | 372 +++++++++++---------- .../org/lwes/listener/ThreadedEventDispatcher.java | 20 +- .../java/org/lwes/listener/ThreadedProcessor.java | 8 +- 4 files changed, 226 insertions(+), 196 deletions(-) rewrite src/main/java/org/lwes/listener/DatagramEnqueuer.java (67%) diff --git a/src/main/java/org/lwes/listener/DatagramDequeuer.java b/src/main/java/org/lwes/listener/DatagramDequeuer.java index b870d1f..906aed8 100644 --- a/src/main/java/org/lwes/listener/DatagramDequeuer.java +++ b/src/main/java/org/lwes/listener/DatagramDequeuer.java @@ -1,14 +1,18 @@ package org.lwes.listener; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.lwes.Event; import org.lwes.EventFactory; import org.lwes.util.IPAddress; -import org.lwes.util.Log; import java.io.IOException; import java.net.DatagramPacket; public class DatagramDequeuer extends ThreadedDequeuer { + + private static transient Log log = LogFactory.getLog(DatagramDequeuer.class); + private boolean running = false; /* an event factory */ @@ -32,14 +36,16 @@ public class DatagramDequeuer extends ThreadedDequeuer { try { QueueElement element = null; element = queue.take(); - Log.trace("Removed from queue: "+element); - handleElement((DatagramQueueElement)element); + if (log.isTraceEnabled()) { + log.trace("Removed from queue: " + element); + } + handleElement((DatagramQueueElement) element); } catch (UnsupportedOperationException uoe) { // not a problem, someone grabbed the event before we did } catch (Exception e) { - Log.error("Error in dequeueing event for processing", e); + log.error("Error in dequeueing event for processing", e); } } } @@ -78,13 +84,15 @@ public class DatagramDequeuer extends ThreadedDequeuer { event.setInt64(Event.RECEIPT_TIME, timestamp); event.setIPAddress(Event.SENDER_IP, address); event.setUInt16(Event.SENDER_PORT, port); - if (Log.isLogTrace()) { - Log.trace("Dispatching event " + event.toString()); + if (log.isTraceEnabled()) { + log.trace("Dispatching event " + event.toString()); } dispatchEvent(event); } catch (Exception e) { - Log.warning("Unable to deserialize event in handleElement()", e); + if (log.isWarnEnabled()) { + log.warn("Unable to deserialize event in handleElement()", e); + } } } } diff --git a/src/main/java/org/lwes/listener/DatagramEnqueuer.java b/src/main/java/org/lwes/listener/DatagramEnqueuer.java dissimilarity index 67% index 8e26dde..d47c136 100644 --- a/src/main/java/org/lwes/listener/DatagramEnqueuer.java +++ b/src/main/java/org/lwes/listener/DatagramEnqueuer.java @@ -1,179 +1,193 @@ -package org.lwes.listener; - -import org.lwes.util.Log; - -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetAddress; -import java.net.MulticastSocket; - -/** - * This class listens to packets sent via UDP, and enqueues them for processing. - * It detects multicast addresses and listens to multicast groups if one is detected, otherwise - * it listens for unicast datagrams. - * - * @author Anthony Molinaro - * @author Michael P. Lum - * - */ -public class DatagramEnqueuer extends ThreadedEnqueuer { - /* max datagram size in bytes */ - private static final int MAX_DATAGRAM_SIZE = 65535; - private String DEFAULT_ADDRESS = "224.0.0.69"; - - /* the default network settings */ - private InetAddress address = null; - private int port = 9191; - private InetAddress iface = null; - private int ttl = 31; - - /* the network socket */ - private DatagramSocket socket = null; - - /* a running buffer */ - private byte[] buffer = null; - - /* thread control */ - private boolean running = false; - - public DatagramEnqueuer() { - super(); - buffer = new byte[MAX_DATAGRAM_SIZE]; - } - - /** - * Gets the network address being used for this listener. - * @return the address - */ - public InetAddress getAddress() { - return address; - } - - /** - * Sets the address being used for this listener. - * @param address the address to use - */ - public void setAddress(InetAddress address) { - this.address = address; - } - - /** - * Gets the port being used for this listener. - * @return the port number - */ - public int getPort() { - return port; - } - - /** - * Sets the port being used for this listener. - * @param port the port number - */ - public void setPort(int port) { - this.port = port; - } - - /** - * Gets the network interface being used for this listener. - * @return the interface - */ - public InetAddress getInterface() { - return iface; - } - - /** - * Sets the network interface to use for this listener. - * @param iface the interface - */ - public void setInterface(InetAddress iface) { - this.iface = iface; - } - - /** - * Returns the multicast TTL (if applicable). - * Applies to multicast listeners only. - * @return the TTL value - */ - public int getTimeToLive() { - return ttl; - } - - /** - * Sets the multicast TTL. This typically does not need to be modified. - * Applies to multicast listeners only. - * @param ttl the multicast TTL value. - */ - public void setTimeToLive(int ttl) { - this.ttl = ttl; - } - - public void initialize() throws IOException { - if (address == null) { - address = InetAddress.getByName(DEFAULT_ADDRESS); - } - - if (address.isMulticastAddress()) { - socket = new MulticastSocket(port); - ((MulticastSocket) socket).setTimeToLive(ttl); - if (iface != null) { - ((MulticastSocket) socket).setInterface(iface); - } - ((MulticastSocket) socket).joinGroup(address); - } - else { - if (iface != null) { - socket = new DatagramSocket(port, iface); - } - else { - socket = new DatagramSocket(port, address); - } - } - int bufSize = MAX_DATAGRAM_SIZE*50; - String bufSizeStr = System.getProperty("MulticastReceiveBufferSize"); - if (bufSizeStr != null && !"".equals(bufSizeStr)) { - bufSize = Integer.parseInt(bufSizeStr); - } - socket.setReceiveBufferSize(bufSize); - } - - public synchronized void shutdown() { - running = false; - } - - /** - * While running, repeatedly read datagrams and insert them into the queue along with the - * receipt time and other metadata. - */ - public void run() { - running = true; - - while(running) { - try { - DatagramPacket datagram = new DatagramPacket(buffer, buffer.length); - socket.receive(datagram); - Log.trace("Received datagram: "+datagram); - - /* we record the time *after* the receive because it blocks */ - long receiptTime = System.currentTimeMillis(); - - /* copy the data into a tight buffer so we can release the loose buffer */ - final byte[] tightBuffer = new byte[datagram.getLength()]; - System.arraycopy(datagram.getData(), 0, tightBuffer, 0, tightBuffer.length); - datagram.setData(tightBuffer); - - /* create an element for the queue */ - DatagramQueueElement element = new DatagramQueueElement(); - element.setPacket(datagram); - element.setTimestamp(receiptTime); - - /* add the element to the queue and notify everyone there's work to do */ - queue.put(element); - Log.trace("Enqueued: "+element); - } catch(Exception e) { - Log.warning("Unable to read datagram", e); - } - } - } - -} +package org.lwes.listener; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.MulticastSocket; + +/** + * This class listens to packets sent via UDP, and enqueues them for processing. + * It detects multicast addresses and listens to multicast groups if one is detected, otherwise + * it listens for unicast datagrams. + * + * @author Anthony Molinaro + * @author Michael P. Lum + */ +public class DatagramEnqueuer extends ThreadedEnqueuer { + + private static transient Log log = LogFactory.getLog(DatagramEnqueuer.class); + + /* max datagram size in bytes */ + private static final int MAX_DATAGRAM_SIZE = 65535; + private String DEFAULT_ADDRESS = "224.0.0.69"; + + /* the default network settings */ + private InetAddress address = null; + private int port = 9191; + private InetAddress iface = null; + private int ttl = 31; + + /* the network socket */ + private DatagramSocket socket = null; + + /* a running buffer */ + private byte[] buffer = null; + + /* thread control */ + private boolean running = false; + + public DatagramEnqueuer() { + super(); + buffer = new byte[MAX_DATAGRAM_SIZE]; + } + + /** + * Gets the network address being used for this listener. + * + * @return the address + */ + public InetAddress getAddress() { + return address; + } + + /** + * Sets the address being used for this listener. + * + * @param address the address to use + */ + public void setAddress(InetAddress address) { + this.address = address; + } + + /** + * Gets the port being used for this listener. + * + * @return the port number + */ + public int getPort() { + return port; + } + + /** + * Sets the port being used for this listener. + * + * @param port the port number + */ + public void setPort(int port) { + this.port = port; + } + + /** + * Gets the network interface being used for this listener. + * + * @return the interface + */ + public InetAddress getInterface() { + return iface; + } + + /** + * Sets the network interface to use for this listener. + * + * @param iface the interface + */ + public void setInterface(InetAddress iface) { + this.iface = iface; + } + + /** + * Returns the multicast TTL (if applicable). + * Applies to multicast listeners only. + * + * @return the TTL value + */ + public int getTimeToLive() { + return ttl; + } + + /** + * Sets the multicast TTL. This typically does not need to be modified. + * Applies to multicast listeners only. + * + * @param ttl the multicast TTL value. + */ + public void setTimeToLive(int ttl) { + this.ttl = ttl; + } + + public void initialize() throws IOException { + if (address == null) { + address = InetAddress.getByName(DEFAULT_ADDRESS); + } + + if (address.isMulticastAddress()) { + socket = new MulticastSocket(port); + ((MulticastSocket) socket).setTimeToLive(ttl); + if (iface != null) { + ((MulticastSocket) socket).setInterface(iface); + } + ((MulticastSocket) socket).joinGroup(address); + } + else { + if (iface != null) { + socket = new DatagramSocket(port, iface); + } + else { + socket = new DatagramSocket(port, address); + } + } + int bufSize = MAX_DATAGRAM_SIZE * 50; + String bufSizeStr = System.getProperty("MulticastReceiveBufferSize"); + if (bufSizeStr != null && !"".equals(bufSizeStr)) { + bufSize = Integer.parseInt(bufSizeStr); + } + socket.setReceiveBufferSize(bufSize); + } + + public synchronized void shutdown() { + running = false; + } + + /** + * While running, repeatedly read datagrams and insert them into the queue along with the + * receipt time and other metadata. + */ + public void run() { + running = true; + + while (running) { + try { + DatagramPacket datagram = new DatagramPacket(buffer, buffer.length); + socket.receive(datagram); + if (log.isTraceEnabled()) { + log.trace("Received datagram: " + datagram); + } + /* we record the time *after* the receive because it blocks */ + long receiptTime = System.currentTimeMillis(); + + /* copy the data into a tight buffer so we can release the loose buffer */ + final byte[] tightBuffer = new byte[datagram.getLength()]; + System.arraycopy(datagram.getData(), 0, tightBuffer, 0, tightBuffer.length); + datagram.setData(tightBuffer); + + /* create an element for the queue */ + DatagramQueueElement element = new DatagramQueueElement(); + element.setPacket(datagram); + element.setTimestamp(receiptTime); + + /* add the element to the queue and notify everyone there's work to do */ + queue.put(element); + if (log.isTraceEnabled()) { + log.trace("Enqueued: " + element); + } + } catch(Exception e) { + log.warn("Unable to read datagram", e); + } + } + } + +} diff --git a/src/main/java/org/lwes/listener/ThreadedEventDispatcher.java b/src/main/java/org/lwes/listener/ThreadedEventDispatcher.java index 773e8f9..07c2885 100644 --- a/src/main/java/org/lwes/listener/ThreadedEventDispatcher.java +++ b/src/main/java/org/lwes/listener/ThreadedEventDispatcher.java @@ -1,7 +1,8 @@ package org.lwes.listener; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.lwes.Event; -import org.lwes.util.Log; /** * Dispatches events to downstream handlers using threads. @@ -9,11 +10,14 @@ import org.lwes.util.Log; * @author Michael P. Lum */ public class ThreadedEventDispatcher extends Thread { + + private static transient Log log = LogFactory.getLog(ThreadedEventDispatcher.class); + /* dequeuer controlling this object */ private ThreadedDequeuer dequeuer; private EventHandler eventHandler; private Event event; - + protected ThreadedEventDispatcher(ThreadedDequeuer aDequeuer) { this.dequeuer = aDequeuer; super.start(); @@ -31,18 +35,18 @@ public class ThreadedEventDispatcher extends Thread { throw new IllegalStateException("Processor already has a listener"); } } - + public final boolean isActive() { return (! isIdle()); } - + public final boolean isIdle() { final boolean p1 = (eventHandler == null); final boolean p2 = (event == null); if(p1 == p2) return p1; else throw new IllegalStateException("Contradictory state indication"); } - + public void run() { while(true) { synchronized(this) { @@ -50,7 +54,7 @@ public class ThreadedEventDispatcher extends Thread { try { eventHandler.handleEvent(event); } catch(Exception e) { - Log.warning("Caught exception handling event", e); + log.warn("Caught exception handling event", e); } clearTask(); } else { @@ -61,13 +65,13 @@ public class ThreadedEventDispatcher extends Thread { } } } - + private void clearTask() { synchronized(this) { eventHandler = null; event = null; } - + dequeuer.makeAvailable(this); } diff --git a/src/main/java/org/lwes/listener/ThreadedProcessor.java b/src/main/java/org/lwes/listener/ThreadedProcessor.java index e398bfd..9e3a955 100644 --- a/src/main/java/org/lwes/listener/ThreadedProcessor.java +++ b/src/main/java/org/lwes/listener/ThreadedProcessor.java @@ -1,7 +1,8 @@ package org.lwes.listener; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.lwes.EventSystemException; -import org.lwes.util.Log; import java.util.concurrent.LinkedBlockingQueue; @@ -14,6 +15,9 @@ import java.util.concurrent.LinkedBlockingQueue; * @author Michael P. Lum */ public class ThreadedProcessor implements Runnable { + + private static transient Log log = LogFactory.getLog(ThreadedProcessor.class); + /* a flag to tell whether or not the thread is running */ private boolean running = false; @@ -200,7 +204,7 @@ public class ThreadedProcessor implements Runnable { try { Thread.sleep(seconds * 1000L); } catch (InterruptedException ie) { - Log.warning("ThreadedProcessor interrupted", ie); + log.warn("ThreadedProcessor interrupted", ie); } } } -- 2.11.4.GIT