From 37a40594bafb4a80fb65b9b36503346d21f9f00d Mon Sep 17 00:00:00 2001 From: Frank Maritato Date: Thu, 13 Aug 2009 23:53:08 +0000 Subject: [PATCH] Added Heartbeat, Startup and Shutdown events git-svn-id: https://lwes.svn.sourceforge.net/svnroot/lwes/lwes-java/trunk@194 a2f82657-cdd2-4550-bd36-68a8e7111808 --- src/org/lwes/emitter/AbstractEventEmitter.java | 114 +++++ src/org/lwes/emitter/MulticastEventEmitter.java | 531 +++++++++++---------- .../lwes/emitter/MockMulticastEventEmitter.java | 32 ++ .../lwes/emitter/MulticastEventEmitterTest.java | 82 ++++ 4 files changed, 496 insertions(+), 263 deletions(-) create mode 100644 src/org/lwes/emitter/AbstractEventEmitter.java rewrite src/org/lwes/emitter/MulticastEventEmitter.java (85%) create mode 100644 tests/org/lwes/emitter/MockMulticastEventEmitter.java create mode 100644 tests/org/lwes/emitter/MulticastEventEmitterTest.java diff --git a/src/org/lwes/emitter/AbstractEventEmitter.java b/src/org/lwes/emitter/AbstractEventEmitter.java new file mode 100644 index 0000000..a72efa8 --- /dev/null +++ b/src/org/lwes/emitter/AbstractEventEmitter.java @@ -0,0 +1,114 @@ +package org.lwes.emitter; +/** + * @author fmaritato + */ + +import org.lwes.Event; +import org.lwes.EventFactory; +import org.lwes.EventSystemException; +import org.lwes.util.Log; + +import java.io.IOException; + +public abstract class AbstractEventEmitter implements EventEmitter { + + private EventFactory factory = new EventFactory(); + + private boolean emitHeartbeat = false; + private long eventCount = 0; + private long totalEventCount = 0; + private long frequency = 60000; + private long lastBeatTime = 0; + private long sequence = 0; + + public void initialize() throws IOException { + try { + factory.initialize(); + lastBeatTime = System.currentTimeMillis(); + Event e = factory.createEvent("System::Startup", false); + emit(e); + } + catch (EventSystemException e) { + Log.error(e.getMessage(), e); + } + } + + public void shutdown() throws IOException { + try { + Event e = factory.createEvent("System::Shutdown", false); + long time = System.currentTimeMillis(); + long freqThisPeriod = time - lastBeatTime; + sendEventWithStatistics(e, freqThisPeriod); + } + catch (EventSystemException e) { + Log.error(e.getMessage(), e); + } + } + + public void collectStatistics() throws EventSystemException, IOException { + + eventCount++; + totalEventCount++; + long time = System.currentTimeMillis(); + long freqThisPeriod = time - lastBeatTime; + + if (emitHeartbeat && (freqThisPeriod >= frequency)) { + Event e = factory.createEvent("System::Heartbeat", false); + sendEventWithStatistics(e, freqThisPeriod); + eventCount = 0; + lastBeatTime = time; + } + } + + public void sendEventWithStatistics(Event e, long freq) + throws EventSystemException, IOException { + e.setInt64("freq", freq); + e.setInt64("seq", ++sequence); + e.setInt64("count", eventCount); + e.setInt64("total", totalEventCount); + emit(e.serialize()); + } + + protected abstract void emit(byte[] bytes) throws IOException; + + public boolean isEmitHeartbeat() { + return emitHeartbeat; + } + + public void setEmitHeartbeat(boolean emitHeartbeat) { + this.emitHeartbeat = emitHeartbeat; + } + + public long getEventCount() { + return eventCount; + } + + public EventFactory getFactory() { + return factory; + } + + public void setFactory(EventFactory factory) { + this.factory = factory; + } + + public long getFrequency() { + return frequency; + } + + public void setFrequency(long frequency) { + this.frequency = frequency; + } + + public long getLastBeatTime() { + return lastBeatTime; + } + + public long getSequence() { + return sequence; + } + + public long getTotalEventCount() { + return totalEventCount; + } + +} diff --git a/src/org/lwes/emitter/MulticastEventEmitter.java b/src/org/lwes/emitter/MulticastEventEmitter.java dissimilarity index 85% index ef65b45..0f78ed6 100644 --- a/src/org/lwes/emitter/MulticastEventEmitter.java +++ b/src/org/lwes/emitter/MulticastEventEmitter.java @@ -1,263 +1,268 @@ -package org.lwes.emitter; - -import java.io.InputStream; -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.InetAddress; -import java.net.MulticastSocket; - -import org.lwes.Event; -import org.lwes.EventFactory; -import org.lwes.EventSystemException; -import org.lwes.util.Log; - -/** - * MulticastEventEmitter emits events to multicast groups on the network. This is the most common - * class used by users of the Light Weight Event System. - * - * Example code: - *
- * MulticastEventEmitter emitter = new MulticastEventEmitter();
- * emitter.setESFFilePath("/path/to/esf/file");
- * emitter.setMulticastAddress(InetAddress.getByName("224.0.0.69"));
- * emitter.setMulticastPort(9191);
- * emitter.initialize();
- *
- * Event e = emitter.createEvent("MyEvent", false);
- * e.setString("key","value");
- * emitter.emit(e);
- * 
- * - * @author Michael P. Lum - * @author Anthony Molinaro - * @since 0.0.1 - */ -public class MulticastEventEmitter implements EventEmitter { - /* an EventFactory */ - private EventFactory factory = new EventFactory(); - - /* the actual multicast socket being used */ - private MulticastSocket socket = null; - - /* the multicast address */ - private InetAddress address = null; - - /* the multicast port */ - private int port = 9191; - - /* the multicast interface */ - private InetAddress iface = null; - - /* the multicast time-to-live */ - private int ttl = 31; - - /* a lock variable to synchronize events */ - private Object lock = new Object(); - - /** - * Default constructor. - */ - public MulticastEventEmitter() { - } - - /** - * Sets the multicast address for this emitter. - * - * @param address the multicast address - * - */ - public void setMulticastAddress(InetAddress address) { - this.address = address; - } - - /** - * Gets the multicast address for this emitter. - * - * @return the address - */ - public InetAddress getMulticastAddress() { - return this.address; - } - - /** - * Sets the multicast port for this emitter. - * - * @param port the multicast port - * - */ - public void setMulticastPort(int port) { - this.port = port; - } - - /** - * Gets the multicast port for this emitter. - * - * @return the multicast port - */ - public int getMulticastPort() { - return this.port; - } - - /** - * Sets the network interface for this emitter. - * - * @param iface the network interface - * - */ - public void setInterface(InetAddress iface) { - this.iface = iface; - } - - /** - * Gets the network interface for this emitter. - * - * @return the interface address - */ - public InetAddress getInterface() { - return this.iface; - } - - /** - * Sets the multicast time-to-live for this emitter. - * - * @param ttl the time to live - * - */ - public void setTimeToLive(int ttl) { - this.ttl = ttl; - } - - /** - * Gets the multicast time-to-live for this emitter. - * - * @return the time to live - */ - public int getTimeToLive() { - return this.ttl; - } - - /** - * Sets the ESF file used for event validation. - * @param esfFilePath the path of the ESF file - */ - public void setESFFilePath(String esfFilePath) { - if(factory != null) { - factory.setESFFilePath(esfFilePath); - } - } - - /** - * Gets the ESF file used for event validation - * @return the ESF file path - */ - public String getESFFilePath() { - if(factory != null) { - return factory.getESFFilePath(); - } else { - return null; - } - } - - /** - * Sets an InputStream to be used for event validation. - * @param esfInputStream an InputStream used for event validation - */ - public void setESFInputStream(InputStream esfInputStream) { - if(factory != null) { - factory.setESFInputStream(esfInputStream); - } - } - - /** - * Gets the InputStream being used for event validation. - * @return the InputStream of the ESF validator - */ - public InputStream getESFInputStream() { - if(factory != null) { - return factory.getESFInputStream(); - } else { - return null; - } - } - - /** - * Initializes the emitter. - */ - public void initialize() throws IOException { - try { - factory.initialize(); - socket = new MulticastSocket(); - - if(iface != null) { - socket.setInterface(iface); - } - - socket.setTimeToLive(ttl); - } catch(IOException ie) { - throw ie; - } catch(Exception e) { - Log.error("Unable to initialize MulticastEventEmitter", e); - } - } - - /** - * Shuts down the emitter. - */ - public void shutdown() throws IOException { - socket.close(); - } - - /** - * Creates a new event named eventName. - * @param eventName the name of the event to be created - * @return a new Event - * @exception EventSystemException if there is a problem creating the event - */ - public Event createEvent(String eventName) throws EventSystemException { - return createEvent(eventName, true); - } - - /** - * Creates a new event named eventName. - * @param eventName the name of the event to be created - * @param validate whether or not to validate the event against the EventTemplateDB - * @return a new Event - * @exception EventSystemException if there is a problem creating the event - */ - public Event createEvent(String eventName, boolean validate) throws EventSystemException { - if(factory != null) { - return factory.createEvent(eventName, validate); - } else { - throw new EventSystemException("EventFactory not initialized"); - } - } - - /** - * Emits the event to the network. - * - * @param event the event to emit - * @exception IOException throws an IOException is there is a network error. - */ - public void emit(Event event) throws IOException { - byte[] msg = event.serialize(); - - synchronized(lock) { - emit(msg); - } - } - - /** - * Emits a byte array to the network. - * - * @param bytes the byte array to emit - * @exception IOException throws an IOException if there is a network error. - */ - protected void emit(byte[] bytes) throws IOException { - /* don't bother with empty arrays */ - if(bytes == null) return; - - /* construct a datagram */ - DatagramPacket dp = new DatagramPacket(bytes, bytes.length, address, port); - socket.send(dp); - } -} +package org.lwes.emitter; + +import org.lwes.Event; +import org.lwes.EventSystemException; +import org.lwes.util.Log; + +import java.io.IOException; +import java.io.InputStream; +import java.net.DatagramPacket; +import java.net.InetAddress; +import java.net.MulticastSocket; + +/** + * MulticastEventEmitter emits events to multicast groups on the network. This is the most common + * class used by users of the Light Weight Event System. + *

+ * Example code: + *

+ * MulticastEventEmitter emitter = new MulticastEventEmitter();
+ * emitter.setESFFilePath("/path/to/esf/file");
+ * emitter.setMulticastAddress(InetAddress.getByName("224.0.0.69"));
+ * emitter.setMulticastPort(9191);
+ * emitter.initialize();
+ * 

+ * Event e = emitter.createEvent("MyEvent", false); + * e.setString("key","value"); + * emitter.emit(e); + *

+ * + * @author Michael P. Lum + * @author Anthony Molinaro + * @since 0.0.1 + */ +public class MulticastEventEmitter extends AbstractEventEmitter { + + /* the actual multicast socket being used */ + private MulticastSocket socket = null; + + /* the multicast address */ + private InetAddress address = null; + + /* the multicast port */ + private int port = 9191; + + /* the multicast interface */ + private InetAddress iface = null; + + /* the multicast time-to-live */ + private int ttl = 31; + + /* a lock variable to synchronize events */ + private final Object lock = new Object(); + + /** + * Default constructor. + */ + public MulticastEventEmitter() { + } + + /** + * Sets the multicast address for this emitter. + * + * @param address the multicast address + */ + public void setMulticastAddress(InetAddress address) { + this.address = address; + } + + /** + * Gets the multicast address for this emitter. + * + * @return the address + */ + public InetAddress getMulticastAddress() { + return this.address; + } + + /** + * Sets the multicast port for this emitter. + * + * @param port the multicast port + */ + public void setMulticastPort(int port) { + this.port = port; + } + + /** + * Gets the multicast port for this emitter. + * + * @return the multicast port + */ + public int getMulticastPort() { + return this.port; + } + + /** + * Sets the network interface for this emitter. + * + * @param iface the network interface + */ + public void setInterface(InetAddress iface) { + this.iface = iface; + } + + /** + * Gets the network interface for this emitter. + * + * @return the interface address + */ + public InetAddress getInterface() { + return this.iface; + } + + /** + * Sets the multicast time-to-live for this emitter. + * + * @param ttl the time to live + */ + public void setTimeToLive(int ttl) { + this.ttl = ttl; + } + + /** + * Gets the multicast time-to-live for this emitter. + * + * @return the time to live + */ + public int getTimeToLive() { + return this.ttl; + } + + /** + * Sets the ESF file used for event validation. + * + * @param esfFilePath the path of the ESF file + */ + public void setESFFilePath(String esfFilePath) { + if (getFactory() != null) { + getFactory().setESFFilePath(esfFilePath); + } + } + + /** + * Gets the ESF file used for event validation + * + * @return the ESF file path + */ + public String getESFFilePath() { + if (getFactory() != null) { + return getFactory().getESFFilePath(); + } + else { + return null; + } + } + + /** + * Sets an InputStream to be used for event validation. + * + * @param esfInputStream an InputStream used for event validation + */ + public void setESFInputStream(InputStream esfInputStream) { + if (getFactory() != null) { + getFactory().setESFInputStream(esfInputStream); + } + } + + /** + * Gets the InputStream being used for event validation. + * + * @return the InputStream of the ESF validator + */ + public InputStream getESFInputStream() { + if (getFactory() != null) { + return getFactory().getESFInputStream(); + } + else { + return null; + } + } + + /** + * Initializes the emitter. + */ + public void initialize() throws IOException { + super.initialize(); + socket = new MulticastSocket(); + + if (iface != null) { + socket.setInterface(iface); + } + + socket.setTimeToLive(ttl); + } + + /** + * Shuts down the emitter. + */ + public void shutdown() throws IOException { + socket.close(); + super.shutdown(); + } + + /** + * Creates a new event named eventName. + * + * @param eventName the name of the event to be created + * @return a new Event + * @throws EventSystemException if there is a problem creating the event + */ + public Event createEvent(String eventName) throws EventSystemException { + return createEvent(eventName, true); + } + + /** + * Creates a new event named eventName. + * + * @param eventName the name of the event to be created + * @param validate whether or not to validate the event against the EventTemplateDB + * @return a new Event + * @throws EventSystemException if there is a problem creating the event + */ + public Event createEvent(String eventName, boolean validate) throws EventSystemException { + if (getFactory() != null) { + return getFactory().createEvent(eventName, validate); + } + else { + throw new EventSystemException("EventFactory not initialized"); + } + } + + /** + * Emits the event to the network. + * + * @param event the event to emit + * @throws IOException throws an IOException is there is a network error. + */ + public void emit(Event event) throws IOException { + byte[] msg = event.serialize(); + + synchronized (lock) { + emit(msg); + try { + collectStatistics(); + } + catch (EventSystemException e) { + Log.error(e.getMessage(), e); + } + } + } + + /** + * Emits a byte array to the network. + * + * @param bytes the byte array to emit + * @throws IOException throws an IOException if there is a network error. + */ + protected void emit(byte[] bytes) throws IOException { + /* don't bother with empty arrays */ + if (bytes == null) { + return; + } + + /* construct a datagram */ + DatagramPacket dp = new DatagramPacket(bytes, bytes.length, address, port); + socket.send(dp); + } +} diff --git a/tests/org/lwes/emitter/MockMulticastEventEmitter.java b/tests/org/lwes/emitter/MockMulticastEventEmitter.java new file mode 100644 index 0000000..1054fcd --- /dev/null +++ b/tests/org/lwes/emitter/MockMulticastEventEmitter.java @@ -0,0 +1,32 @@ +package org.lwes.emitter; +/** + * @author fmaritato + */ + +import org.lwes.Event; +import org.lwes.EventSystemException; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +public class MockMulticastEventEmitter extends MulticastEventEmitter { + + private LinkedList events = new LinkedList(); + + @Override + protected void emit(byte[] bytes) throws IOException { + try { + events.add(getFactory().createEvent(bytes, false)); + } + catch (EventSystemException e) { + e.printStackTrace(); + } + } + + public List getEvents() { + List l = new LinkedList(); + l.addAll(events); + return l; + } +} diff --git a/tests/org/lwes/emitter/MulticastEventEmitterTest.java b/tests/org/lwes/emitter/MulticastEventEmitterTest.java new file mode 100644 index 0000000..72b4d03 --- /dev/null +++ b/tests/org/lwes/emitter/MulticastEventEmitterTest.java @@ -0,0 +1,82 @@ +package org.lwes.emitter; +/** + * @author fmaritato + */ + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import org.junit.Test; +import org.lwes.Event; +import org.lwes.NoSuchAttributeException; +import org.lwes.NoSuchAttributeTypeException; +import org.lwes.NoSuchEventException; +import org.lwes.db.EventTemplateDB; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.List; + +public class MulticastEventEmitterTest { + + @Test + public void testMulticastEmitter() throws Exception { + MockMulticastEventEmitter emitter = new MockMulticastEventEmitter(); + emitter.setEmitHeartbeat(true); + emitter.setFrequency(1000l); + emitter.setMulticastAddress(InetAddress.getByName("224.0.0.69")); + emitter.setMulticastPort(9191); + + emitter.initialize(); + emitter.emit(createTestEvent()); + Thread.sleep(1000); + emitter.emit(createTestEvent()); + emitter.emit(createTestEvent()); + emitter.emit(createTestEvent()); + Thread.sleep(1000); + emitter.emit(createTestEvent()); + emitter.shutdown(); + + List events = emitter.getEvents(); + assertNotNull(events); + System.out.println(events); + + for (int i = 0; i < events.size(); i++) { + Event e = events.get(i); + if (i == 0) { + assertEquals("First event was not startup.", + "System::Startup", e.getEventName()); + } + else if (i == events.size() - 1) { + assertEquals("Second to last event was not shutdown", + "System::Shutdown", e.getEventName()); + assertEquals("Shutdown count was incorrect", 0, e.getInt64("count")); + assertEquals("Shutdown total was incorrect", 6, e.getInt64("total")); + } + else if ("System::Heartbeat".equals(e.getEventName())) { + if (e.getInt64("seq") == 1) { + assertEquals("Heartbeat count was incorrect", 3, e.getInt64("count")); + assertEquals("Heartbeat total was incorrect", 3, e.getInt64("total")); + } + } + } + + } + + public Event createTestEvent() + throws NoSuchAttributeException, + NoSuchAttributeTypeException, + NoSuchEventException, + UnknownHostException { + + EventTemplateDB evtDb = new EventTemplateDB(); + evtDb.initialize(); + Event evt = new Event("TestEvent", false, evtDb); + evt.setIPAddress("SenderIP", InetAddress.getByName("192.168.1.1")); + evt.setUInt16("SenderPort", 9191); + evt.setInt64("ReceiptTime", System.currentTimeMillis()); + evt.setUInt16("SiteID", 0); + evt.setString("field1", "testing"); + evt.setInt32("intField1", 256); + return evt; + } +} -- 2.11.4.GIT