2 // This file is part of the LWES .NET Binding (LWES.net)
4 // COPYRIGHT© 2009, Phillip Clark (phillip[at*flitbit[dot*org)
5 // original .NET implementation
7 // LWES.net is free software: you can redistribute it and/or modify
8 // it under the terms of the Lesser GNU General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
12 // LWES.net is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // Lesser GNU General Public License for more details.
17 // You should have received a copy of the Lesser GNU General Public License
18 // along with LWES.net. If not, see <http://www.gnu.org/licenses/>.
20 namespace Org
.Lwes
.Emitter
23 using System
.Diagnostics
;
25 using System
.Net
.Sockets
;
27 using System
.Threading
;
30 using Org
.Lwes
.Properties
;
34 /// Base class for event emitters.
36 public abstract class EventEmitterBase
: IEventEmitter
, ITraceable
40 const int CDisposeBackgroundThreadWaitTimeMS
= 200;
45 SupportedEncoding _enc
;
48 Status
<EmitterState
> _status
;
56 /// Creates a new instance.
58 protected EventEmitterBase()
63 /// Destroys the instance; completes the IDisposable pattern.
70 #endregion Constructors
86 #endregion Enumerations
88 #region Nested Interfaces
90 interface IEmitter
: IDisposable
94 void Start(IEventTemplateDB db
96 , Action
<Socket
, IPEndPoint
> finishSocket
);
99 #endregion Nested Interfaces
104 /// The ip address to which events are emitted.
106 public IPAddress Address
114 if (IsInitialized
) throw new InvalidOperationException(Resources
.Error_AlreadyInitialized
);
120 /// The character encoding used when performing event IO.
122 public SupportedEncoding Encoding
127 if (IsInitialized
) throw new InvalidOperationException(Resources
.Error_AlreadyInitialized
);
129 _encoding
= Constants
.GetEncoding((short)value);
134 /// Indicates whether the factory has been initialized.
136 public virtual bool IsInitialized
138 get { return _status.CurrentState == EmitterState.Active; }
142 /// The ip port to which events are emitted.
152 if (IsInitialized
) throw new InvalidOperationException(Resources
.Error_AlreadyInitialized
);
158 /// The event template database used when creating events.
160 public IEventTemplateDB TemplateDB
165 if (IsInitialized
) throw new InvalidOperationException(Resources
.Error_AlreadyInitialized
);
171 /// Indicates whether events issued from the factory will validate
172 /// when they are written to.
176 get { return _validate; }
179 if (IsInitialized
) throw new InvalidOperationException(Resources
.Error_AlreadyInitialized
);
185 /// Indicates whether the emitter is using a parallel emit strategy.
187 protected bool IsParallel
192 #endregion Properties
197 /// Creates an event type identified by the event name.
199 /// <param name="eventName">the event type's name</param>
200 /// <returns>a new LWES event instance</returns>
201 public Event
CreateEvent(string eventName
)
203 return CreateEvent(eventName
, _validate
, _enc
);
207 /// Creates an event type identified by the event name.
209 /// <param name="eventName">the event type's name</param>
210 /// <param name="enc">encoding used when performing IO on the event</param>
211 /// <returns>a new LWES event instance</returns>
212 public Event
CreateEvent(string eventName
, SupportedEncoding enc
)
214 return CreateEvent(eventName
, _validate
, enc
);
218 /// Creates an event type identified by the event name.
220 /// <param name="eventName">the event type's name</param>
221 /// <param name="validate">whether the event is validated</param>
222 /// <returns>a new LWES event instance</returns>
223 public Event
CreateEvent(string eventName
, bool validate
)
225 return CreateEvent(eventName
, validate
, _enc
);
229 /// Creates an event type identified by the event name.
231 /// <param name="eventName">the event type's name</param>
232 /// <param name="validate">whether the event is validated</param>
233 /// <param name="enc">encoding used when performing IO on the event</param>
234 /// <returns>a new LWES event instance</returns>
235 public Event
CreateEvent(string eventName
, bool validate
, SupportedEncoding enc
)
237 if (!IsInitialized
) throw new InvalidOperationException(Resources
.Error_NotYetInitialized
);
238 if (eventName
== null) throw new ArgumentNullException("eventName");
239 if (eventName
.Length
== 0) throw new ArgumentException(Resources
.Error_EmptyStringNotAllowed
, "eventName");
241 this.TraceData(TraceEventType
.Verbose
, () => { return new object[] { String
.Concat("CreateEvent: ", eventName
242 , Environment
.NewLine
, "\twith validate = ", validate
243 , Environment
.NewLine
, "\tand encoding = ", enc
) }; }
247 if (!_db
.TryCreateEvent(eventName
, out result
, validate
, enc
))
249 this.TraceData(TraceEventType
.Verbose
, () => { return new object[] { String.Concat("CreateEvent, event not found in db: ", eventName) }
; });
251 result
= new Event(new EventTemplate(false, eventName
), false, _enc
);
257 /// Disposes of the emitter and frees any resources held.
259 public void Dispose()
262 GC
.SuppressFinalize(this);
266 /// Emits an event to the event system.
268 /// <param name="evt">the event being emitted</param>
269 public void Emit(Event evt
)
271 if (!IsInitialized
) throw new InvalidOperationException(Resources
.Error_NotYetInitialized
);
277 /// Initializes the emitter.
279 public void Initialize()
281 if (IsInitialized
) throw new InvalidOperationException(Resources
.Error_AlreadyInitialized
);
283 if (_status
.SetStateIfLessThan(EmitterState
.Initializing
, EmitterState
.Initializing
))
287 PerformInitialization();
291 _status
.TryTransition(EmitterState
.Active
, EmitterState
.Initializing
);
297 /// Disposes of the emitter.
299 /// <param name="disposing">Indicates whether the object is being disposed</param>
300 protected virtual void Dispose(bool disposing
)
302 if (disposing
) this.TraceData(TraceEventType
.Verbose
, "EventEmitterBase - disposing");
303 Util
.Dispose(ref _emitter
);
304 if (disposing
) this.TraceData(TraceEventType
.Verbose
, "EventEmitterBase - disposed");
308 /// Finishes initialization of the emitter.
310 /// <param name="endpoint">An IP endpoint where events will be emitted</param>
311 /// <param name="finishSocket">callback method used to finish setup of the socket</param>
312 protected void FinishInitialize(IPEndPoint endpoint
, Action
<Socket
, IPEndPoint
> finishSocket
)
314 if (endpoint
== null) throw new ArgumentNullException("endpoint");
315 if (finishSocket
== null) throw new ArgumentNullException("finishSocket");
317 if (_status
.CurrentState
!= EmitterState
.Initializing
)
318 throw new InvalidOperationException("only valid while initialzing");
320 if (_db
== null) throw new InvalidOperationException("TemplateDB must be set before initialization");
321 if (_encoding
== null) throw new InvalidOperationException("Encoding must be set before initialization");
323 IEmitter emitter
= (IsParallel
)
324 ? (IEmitter
)new ParallelEmitter()
325 : (IEmitter
)new DirectEmitter();
327 emitter
.Start(_db
, endpoint
, finishSocket
);
333 /// Performs initialization of the emitter. Derived classes must implement this method
334 /// and subsequently call the <em>FinishInitialize</em> method of the base class.
336 protected abstract void PerformInitialization();
342 class DirectEmitter
: IEmitter
, ITraceable
347 IEventTemplateDB _db
;
350 Status
<EmitterState
> _senderState
;
361 #endregion Constructors
365 public void Dispose()
368 GC
.SuppressFinalize(this);
371 public void Emit(Event ev
)
373 if (_senderState
.IsGreaterThan(EmitterState
.Active
))
374 throw new InvalidOperationException(Resources
.Error_EmitterHasEnteredShutdownState
);
376 byte[] bytes
= LwesSerializer
.Serialize(ev
);
377 this.TraceData(TraceEventType
.Verbose
, () =>
379 return new object[] { String
.Concat("EventEmitterBase.DirectEmitter - Emitting to ", _sendToEP
, ": ", ev
.ToString(true)),
380 String
.Concat(" octets: ", Util
.BytesToOctets(bytes
, 0, bytes
.Length
)) };
382 _emitEP
.SendTo(_sendToEP
, bytes
);
385 public void Start(IEventTemplateDB db
, IPEndPoint sendToEP
, Action
<Socket
, IPEndPoint
> finishSocket
)
387 this.TraceData(TraceEventType
.Verbose
, "EventEmitterBase.DirectEmitter - Starting");
389 _sendToEP
= sendToEP
;
390 _buffer
= BufferManager
.AcquireBuffer(null);
391 _emitEP
= new UdpEndpoint(sendToEP
).Initialize(finishSocket
);
392 _senderState
.SetState(EmitterState
.Active
);
393 this.TraceData(TraceEventType
.Verbose
, "EventEmitterBase.DirectEmitter - Started");
396 private void Dispose(bool disposing
)
398 // Signal background threads...
399 if (disposing
) this.TraceData(TraceEventType
.Verbose
, "EventEmitterBase.DirectEmitter - disposing, sending stop signal");
400 _senderState
.TryTransition(EmitterState
.StopSignaled
, EmitterState
.Active
, () =>
402 Util
.Dispose(ref _emitEP
);
403 BufferManager
.ReleaseBuffer(_buffer
);
405 if (disposing
) this.TraceData(TraceEventType
.Verbose
, "EventEmitterBase.DirectEmitter - disposed");
412 class ParallelEmitter
: IEmitter
, ITraceable
416 SimpleLockFreeQueue
<byte[]> _dataQueue
= new SimpleLockFreeQueue
<byte[]>();
417 IEventTemplateDB _db
;
419 Status
<EmitterState
> _emitterState
;
432 #endregion Constructors
436 public void Dispose()
439 GC
.SuppressFinalize(this);
442 public void Emit(Event ev
)
444 this.TraceData(TraceEventType
.Verbose
, () =>
446 return new object[] { String.Concat("EventEmitterBase.ParallelEmitter - Queuing for ", _sendToEP, ": ", ev.ToString(true)) }
;
448 _dataQueue
.Enqueue(LwesSerializer
.SerializeToMemoryBuffer(ev
));
449 EnsureSenderIsActive();
452 public void Start(IEventTemplateDB db
, IPEndPoint sendToEP
, Action
<Socket
, IPEndPoint
> finishSocket
)
454 this.TraceData(TraceEventType
.Verbose
, "EventEmitterBase.ParallelEmitter - Starting");
456 _sendToEP
= sendToEP
;
457 _emitEP
= new UdpEndpoint(sendToEP
).Initialize(finishSocket
);
458 _emitterState
.SetState(EmitterState
.Active
);
459 this.TraceData(TraceEventType
.Verbose
, "EventEmitterBase.ParallelEmitter - Started");
462 void Background_Sender(object unused_state
)
465 // Drains the event queue and performs notification
470 while (_emitterState
.IsLessThan(EmitterState
.StopSignaled
) && _dataQueue
.TryDequeue(out data
))
472 this.TraceData(TraceEventType
.Verbose
, () =>
474 return new object[] { String.Concat("EventEmitterBase.ParallelEmitter - Background_Sender - Sending to ", _sendToEP, " octects: ", Util.BytesToOctets(data, 0, data.Length)) }
;
476 _emitEP
.SendTo(_sendToEP
, data
);
477 BufferManager
.ReleaseBuffer(data
);
482 int z
= Interlocked
.Decrement(ref _senders
);
483 if (_emitterState
.IsLessThan(EmitterState
.StopSignaled
))
484 this.TraceData(TraceEventType
.Verbose
, "EventEmitterBase.ParallelEmitter - Background_Sender stopped because queue is empty");
486 this.TraceData(TraceEventType
.Verbose
, "EventEmitterBase.ParallelEmitter - Background_Sender stopped sending because it was signaled to stop");
488 if (z
== 0 && _emitterState
.IsLessThan(EmitterState
.StopSignaled
) && !_dataQueue
.IsEmpty
)
489 EnsureSenderIsActive();
493 private void Dispose(bool disposing
)
495 if (disposing
) this.TraceData(TraceEventType
.Verbose
, "EventEmitterBase.ParallelEmitter - disposing, sending stop signal");
496 // Signal background threads...
497 _emitterState
.TryTransition(EmitterState
.StopSignaled
, EmitterState
.Active
, () =>
499 while (Thread
.VolatileRead(ref _senders
) > 0)
501 Thread
.Sleep(CDisposeBackgroundThreadWaitTimeMS
);
504 while (_dataQueue
.TryDequeue(out b
))
506 BufferManager
.ReleaseBuffer(b
);
508 Util
.Dispose(ref _emitEP
);
509 if (disposing
) this.TraceData(TraceEventType
.Verbose
, "EventEmitterBase.ParallelEmitter - disposed");
513 private void EnsureSenderIsActive()
515 var value = Thread
.VolatileRead(ref _senders
);
518 WaitCallback cb
= new WaitCallback(Background_Sender
);
519 if (Interlocked
.CompareExchange(ref _senders
, value + 1, value) == value)
521 this.TraceData(TraceEventType
.Verbose
, "EventEmitterBase.ParallelEmitter - Background_Sender started on demand");
522 ThreadPool
.QueueUserWorkItem(cb
);
530 #endregion Nested Types