Added trace output to many classes.
[lwes-dotnet/github-mirror.git] / Org.Lwes / Emitter / EventEmitterBase.cs
blob74941000b226793fe9930023733374ce8a4fb727
1 //
2 // This file is part of the LWES .NET Binding (LWES.net)
3 //
4 // COPYRIGHT© 2009, Phillip Clark (phillip[at*flitbit[dot*org)
5 // original .NET implementation
6 //
7 // LWES.net is free software: you can redistribute it and/or modify
8 // it under the terms of the Lesser GNU General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
12 // LWES.net is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // Lesser GNU General Public License for more details.
17 // You should have received a copy of the Lesser GNU General Public License
18 // along with LWES.net. If not, see <http://www.gnu.org/licenses/>.
20 namespace Org.Lwes.Emitter
22 using System;
23 using System.Diagnostics;
24 using System.Net;
25 using System.Net.Sockets;
26 using System.Text;
27 using System.Threading;
28 using Org.Lwes.DB;
29 using Org.Lwes.ESF;
30 using Org.Lwes.Properties;
31 using Org.Lwes.Trace;
33 /// <summary>
34 /// Base class for event emitters.
35 /// </summary>
36 public abstract class EventEmitterBase : IEventEmitter, ITraceable
38 #region Fields
40 const int CDisposeBackgroundThreadWaitTimeMS = 200;
42 IPAddress _address;
43 IEventTemplateDB _db;
44 IEmitter _emitter;
45 SupportedEncoding _enc;
46 Encoding _encoding;
47 int _port;
48 Status<EmitterState> _status;
49 bool _validate;
51 #endregion Fields
53 #region Constructors
55 /// <summary>
56 /// Creates a new instance.
57 /// </summary>
58 protected EventEmitterBase()
62 /// <summary>
63 /// Destroys the instance; completes the IDisposable pattern.
64 /// </summary>
65 ~EventEmitterBase()
67 Dispose(false);
70 #endregion Constructors
72 #region Enumerations
74 enum EmitterState
76 Unknown = 0,
77 Initializing = 1,
78 Active = 2,
79 Suspending = 3,
80 Suspended = 4,
81 StopSignaled = 5,
82 Stopping = 6,
83 Stopped = 7,
86 #endregion Enumerations
88 #region Nested Interfaces
90 interface IEmitter : IDisposable
92 void Emit(Event ev);
94 void Start(IEventTemplateDB db
95 , IPEndPoint sendToEP
96 , Action<Socket, IPEndPoint> finishSocket);
99 #endregion Nested Interfaces
101 #region Properties
103 /// <summary>
104 /// The ip address to which events are emitted.
105 /// </summary>
106 public IPAddress Address
110 return _address;
114 if (IsInitialized) throw new InvalidOperationException(Resources.Error_AlreadyInitialized);
115 _address = value;
119 /// <summary>
120 /// The character encoding used when performing event IO.
121 /// </summary>
122 public SupportedEncoding Encoding
124 get { return _enc; }
127 if (IsInitialized) throw new InvalidOperationException(Resources.Error_AlreadyInitialized);
128 _enc = value;
129 _encoding = Constants.GetEncoding((short)value);
133 /// <summary>
134 /// Indicates whether the factory has been initialized.
135 /// </summary>
136 public virtual bool IsInitialized
138 get { return _status.CurrentState == EmitterState.Active; }
141 /// <summary>
142 /// The ip port to which events are emitted.
143 /// </summary>
144 public int Port
148 return _port;
152 if (IsInitialized) throw new InvalidOperationException(Resources.Error_AlreadyInitialized);
153 _port = value;
157 /// <summary>
158 /// The event template database used when creating events.
159 /// </summary>
160 public IEventTemplateDB TemplateDB
162 get { return _db; }
165 if (IsInitialized) throw new InvalidOperationException(Resources.Error_AlreadyInitialized);
166 _db = value;
170 /// <summary>
171 /// Indicates whether events issued from the factory will validate
172 /// when they are written to.
173 /// </summary>
174 public bool Validate
176 get { return _validate; }
179 if (IsInitialized) throw new InvalidOperationException(Resources.Error_AlreadyInitialized);
180 _validate = value;
184 /// <summary>
185 /// Indicates whether the emitter is using a parallel emit strategy.
186 /// </summary>
187 protected bool IsParallel
189 get; set;
192 #endregion Properties
194 #region Methods
196 /// <summary>
197 /// Creates an event type identified by the event name.
198 /// </summary>
199 /// <param name="eventName">the event type's name</param>
200 /// <returns>a new LWES event instance</returns>
201 public Event CreateEvent(string eventName)
203 return CreateEvent(eventName, _validate, _enc);
206 /// <summary>
207 /// Creates an event type identified by the event name.
208 /// </summary>
209 /// <param name="eventName">the event type's name</param>
210 /// <param name="enc">encoding used when performing IO on the event</param>
211 /// <returns>a new LWES event instance</returns>
212 public Event CreateEvent(string eventName, SupportedEncoding enc)
214 return CreateEvent(eventName, _validate, enc);
217 /// <summary>
218 /// Creates an event type identified by the event name.
219 /// </summary>
220 /// <param name="eventName">the event type's name</param>
221 /// <param name="validate">whether the event is validated</param>
222 /// <returns>a new LWES event instance</returns>
223 public Event CreateEvent(string eventName, bool validate)
225 return CreateEvent(eventName, validate, _enc);
228 /// <summary>
229 /// Creates an event type identified by the event name.
230 /// </summary>
231 /// <param name="eventName">the event type's name</param>
232 /// <param name="validate">whether the event is validated</param>
233 /// <param name="enc">encoding used when performing IO on the event</param>
234 /// <returns>a new LWES event instance</returns>
235 public Event CreateEvent(string eventName, bool validate, SupportedEncoding enc)
237 if (!IsInitialized) throw new InvalidOperationException(Resources.Error_NotYetInitialized);
238 if (eventName == null) throw new ArgumentNullException("eventName");
239 if (eventName.Length == 0) throw new ArgumentException(Resources.Error_EmptyStringNotAllowed, "eventName");
241 this.TraceData(TraceEventType.Verbose, () => { return new object[] { String.Concat("CreateEvent: ", eventName
242 , Environment.NewLine, "\twith validate = ", validate
243 , Environment.NewLine, "\tand encoding = ", enc) }; }
246 Event result;
247 if (!_db.TryCreateEvent(eventName, out result, validate, enc))
249 this.TraceData(TraceEventType.Verbose, () => { return new object[] { String.Concat("CreateEvent, event not found in db: ", eventName) }; });
251 result = new Event(new EventTemplate(false, eventName), false, _enc);
253 return result;
256 /// <summary>
257 /// Disposes of the emitter and frees any resources held.
258 /// </summary>
259 public void Dispose()
261 Dispose(true);
262 GC.SuppressFinalize(this);
265 /// <summary>
266 /// Emits an event to the event system.
267 /// </summary>
268 /// <param name="evt">the event being emitted</param>
269 public void Emit(Event evt)
271 if (!IsInitialized) throw new InvalidOperationException(Resources.Error_NotYetInitialized);
273 _emitter.Emit(evt);
276 /// <summary>
277 /// Initializes the emitter.
278 /// </summary>
279 public void Initialize()
281 if (IsInitialized) throw new InvalidOperationException(Resources.Error_AlreadyInitialized);
283 if (_status.SetStateIfLessThan(EmitterState.Initializing, EmitterState.Initializing))
287 PerformInitialization();
289 finally
291 _status.TryTransition(EmitterState.Active, EmitterState.Initializing);
296 /// <summary>
297 /// Disposes of the emitter.
298 /// </summary>
299 /// <param name="disposing">Indicates whether the object is being disposed</param>
300 protected virtual void Dispose(bool disposing)
302 if (disposing) this.TraceData(TraceEventType.Verbose, "EventEmitterBase - disposing");
303 Util.Dispose(ref _emitter);
304 if (disposing) this.TraceData(TraceEventType.Verbose, "EventEmitterBase - disposed");
307 /// <summary>
308 /// Finishes initialization of the emitter.
309 /// </summary>
310 /// <param name="endpoint">An IP endpoint where events will be emitted</param>
311 /// <param name="finishSocket">callback method used to finish setup of the socket</param>
312 protected void FinishInitialize(IPEndPoint endpoint, Action<Socket, IPEndPoint> finishSocket)
314 if (endpoint == null) throw new ArgumentNullException("endpoint");
315 if (finishSocket == null) throw new ArgumentNullException("finishSocket");
317 if (_status.CurrentState != EmitterState.Initializing)
318 throw new InvalidOperationException("only valid while initialzing");
320 if (_db == null) throw new InvalidOperationException("TemplateDB must be set before initialization");
321 if (_encoding == null) throw new InvalidOperationException("Encoding must be set before initialization");
323 IEmitter emitter = (IsParallel)
324 ? (IEmitter)new ParallelEmitter()
325 : (IEmitter)new DirectEmitter();
327 emitter.Start(_db, endpoint, finishSocket);
329 _emitter = emitter;
332 /// <summary>
333 /// Performs initialization of the emitter. Derived classes must implement this method
334 /// and subsequently call the <em>FinishInitialize</em> method of the base class.
335 /// </summary>
336 protected abstract void PerformInitialization();
338 #endregion Methods
340 #region Nested Types
342 class DirectEmitter : IEmitter, ITraceable
344 #region Fields
346 byte[] _buffer;
347 IEventTemplateDB _db;
348 UdpEndpoint _emitEP;
349 EndPoint _sendToEP;
350 Status<EmitterState> _senderState;
352 #endregion Fields
354 #region Constructors
356 ~DirectEmitter()
358 Dispose(false);
361 #endregion Constructors
363 #region Methods
365 public void Dispose()
367 this.Dispose(true);
368 GC.SuppressFinalize(this);
371 public void Emit(Event ev)
373 if (_senderState.IsGreaterThan(EmitterState.Active))
374 throw new InvalidOperationException(Resources.Error_EmitterHasEnteredShutdownState);
376 byte[] bytes = LwesSerializer.Serialize(ev);
377 this.TraceData(TraceEventType.Verbose, () =>
379 return new object[] { String.Concat("EventEmitterBase.DirectEmitter - Emitting to ", _sendToEP, ": ", ev.ToString(true)),
380 String.Concat(" octets: ", Util.BytesToOctets(bytes, 0, bytes.Length)) };
382 _emitEP.SendTo(_sendToEP, bytes);
385 public void Start(IEventTemplateDB db, IPEndPoint sendToEP, Action<Socket, IPEndPoint> finishSocket)
387 this.TraceData(TraceEventType.Verbose, "EventEmitterBase.DirectEmitter - Starting");
388 _db = db;
389 _sendToEP = sendToEP;
390 _buffer = BufferManager.AcquireBuffer(null);
391 _emitEP = new UdpEndpoint(sendToEP).Initialize(finishSocket);
392 _senderState.SetState(EmitterState.Active);
393 this.TraceData(TraceEventType.Verbose, "EventEmitterBase.DirectEmitter - Started");
396 private void Dispose(bool disposing)
398 // Signal background threads...
399 if (disposing) this.TraceData(TraceEventType.Verbose, "EventEmitterBase.DirectEmitter - disposing, sending stop signal");
400 _senderState.TryTransition(EmitterState.StopSignaled, EmitterState.Active, () =>
402 Util.Dispose(ref _emitEP);
403 BufferManager.ReleaseBuffer(_buffer);
404 _buffer = null;
405 if (disposing) this.TraceData(TraceEventType.Verbose, "EventEmitterBase.DirectEmitter - disposed");
409 #endregion Methods
412 class ParallelEmitter : IEmitter, ITraceable
414 #region Fields
416 SimpleLockFreeQueue<byte[]> _dataQueue = new SimpleLockFreeQueue<byte[]>();
417 IEventTemplateDB _db;
418 UdpEndpoint _emitEP;
419 Status<EmitterState> _emitterState;
420 EndPoint _sendToEP;
421 int _senders;
423 #endregion Fields
425 #region Constructors
427 ~ParallelEmitter()
429 Dispose(false);
432 #endregion Constructors
434 #region Methods
436 public void Dispose()
438 this.Dispose(true);
439 GC.SuppressFinalize(this);
442 public void Emit(Event ev)
444 this.TraceData(TraceEventType.Verbose, () =>
446 return new object[] { String.Concat("EventEmitterBase.ParallelEmitter - Queuing for ", _sendToEP, ": ", ev.ToString(true)) };
448 _dataQueue.Enqueue(LwesSerializer.SerializeToMemoryBuffer(ev));
449 EnsureSenderIsActive();
452 public void Start(IEventTemplateDB db, IPEndPoint sendToEP, Action<Socket, IPEndPoint> finishSocket)
454 this.TraceData(TraceEventType.Verbose, "EventEmitterBase.ParallelEmitter - Starting");
455 _db = db;
456 _sendToEP = sendToEP;
457 _emitEP = new UdpEndpoint(sendToEP).Initialize(finishSocket);
458 _emitterState.SetState(EmitterState.Active);
459 this.TraceData(TraceEventType.Verbose, "EventEmitterBase.ParallelEmitter - Started");
462 void Background_Sender(object unused_state)
465 // Drains the event queue and performs notification
469 byte[] data;
470 while (_emitterState.IsLessThan(EmitterState.StopSignaled) && _dataQueue.TryDequeue(out data))
472 this.TraceData(TraceEventType.Verbose, () =>
474 return new object[] { String.Concat("EventEmitterBase.ParallelEmitter - Background_Sender - Sending to ", _sendToEP, " octects: ", Util.BytesToOctets(data, 0, data.Length)) };
476 _emitEP.SendTo(_sendToEP, data);
477 BufferManager.ReleaseBuffer(data);
480 finally
482 int z = Interlocked.Decrement(ref _senders);
483 if (_emitterState.IsLessThan(EmitterState.StopSignaled))
484 this.TraceData(TraceEventType.Verbose, "EventEmitterBase.ParallelEmitter - Background_Sender stopped because queue is empty");
485 else
486 this.TraceData(TraceEventType.Verbose, "EventEmitterBase.ParallelEmitter - Background_Sender stopped sending because it was signaled to stop");
488 if (z == 0 && _emitterState.IsLessThan(EmitterState.StopSignaled) && !_dataQueue.IsEmpty)
489 EnsureSenderIsActive();
493 private void Dispose(bool disposing)
495 if (disposing) this.TraceData(TraceEventType.Verbose, "EventEmitterBase.ParallelEmitter - disposing, sending stop signal");
496 // Signal background threads...
497 _emitterState.TryTransition(EmitterState.StopSignaled, EmitterState.Active, () =>
499 while (Thread.VolatileRead(ref _senders) > 0)
501 Thread.Sleep(CDisposeBackgroundThreadWaitTimeMS);
503 byte[] b;
504 while (_dataQueue.TryDequeue(out b))
506 BufferManager.ReleaseBuffer(b);
508 Util.Dispose(ref _emitEP);
509 if (disposing) this.TraceData(TraceEventType.Verbose, "EventEmitterBase.ParallelEmitter - disposed");
513 private void EnsureSenderIsActive()
515 var value = Thread.VolatileRead(ref _senders);
516 if (value <= 0)
518 WaitCallback cb = new WaitCallback(Background_Sender);
519 if (Interlocked.CompareExchange(ref _senders, value + 1, value) == value)
521 this.TraceData(TraceEventType.Verbose, "EventEmitterBase.ParallelEmitter - Background_Sender started on demand");
522 ThreadPool.QueueUserWorkItem(cb);
527 #endregion Methods
530 #endregion Nested Types