Added initial documentation tree using doxygen. More tweaks on the license text ensur...
[lwes-dotnet/github-mirror.git] / Org.Lwes / Listener / EventListenerBase.cs
blob3f967b31a8bd3e01c0b0372a1a34695e8fc993af
1 //
2 // This file is part of the LWES .NET Binding (LWES.net)
3 //
4 // COPYRIGHT© 2009, Phillip Clark (cerebralkungfu[at*g mail[dot*com)
5 // original .NET implementation
6 //
7 // LWES.net is free software: you can redistribute it and/or modify
8 // it under the terms of the Lesser GNU General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
12 // LWES.net is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // Lesser GNU General Public License for more details.
17 // You should have received a copy of the Lesser GNU General Public License
18 // along with LWES.net. If not, see <http://www.gnu.org/licenses/>.
20 namespace Org.Lwes.Listener
22 using System;
23 using System.Collections.Generic;
24 using System.Diagnostics;
25 using System.Linq;
26 using System.Net;
27 using System.Net.Sockets;
28 using System.Threading;
30 using Org.Lwes.DB;
31 using Org.Lwes.Properties;
33 /// <summary>
34 /// Base class for event listeners.
35 /// </summary>
36 public class EventListenerBase : IEventListener, ITraceable
38 #region Fields
40 const int CDisposeBackgroundThreadWaitTimeMS = 200;
41 const int LeadNotifier = 1;
43 List<RegistrationKey> _additions = new List<RegistrationKey>();
44 Action<RegistrationKey, Exception> _cacheHandleErrorsDelegate;
45 int _consolidationVotes = 0;
46 IEventTemplateDB _db;
47 IPEndPoint _endpoint;
48 ListenerGarbageHandling _garbageHandling;
49 Dictionary<TrafficTrackingKey, TrafficTrackingRec> _garbageTracking;
50 Object _garbageTrackingLock;
51 IListener _listener;
52 ReaderWriterLockSlim _notifications = new ReaderWriterLockSlim();
53 int _notifiers = 0;
54 RegistrationKey[] _registrations = new RegistrationKey[0];
56 #endregion Fields
58 #region Constructors
60 /// <summary>
61 /// Creates a new instance.
62 /// </summary>
63 protected EventListenerBase()
65 _cacheHandleErrorsDelegate = new Action<RegistrationKey, Exception>(HandleErrorsOnEventSink);
68 /// <summary>
69 /// Destructor ensuring dispose is called.
70 /// </summary>
71 ~EventListenerBase()
73 Dispose(false);
76 #endregion Constructors
78 #region Enumerations
80 enum ListenerState
82 Unknown = 0,
83 Active = 1,
84 Suspending = 2,
85 Suspended = 3,
86 StopSignaled = 4,
87 Stopping = 5,
88 Stopped = 6,
91 #endregion Enumerations
93 #region Nested Interfaces
95 interface IListener : IDisposable
97 void Start(IEventTemplateDB db
98 , IPEndPoint listenEP
99 , Action<Socket, IPEndPoint> finishSocket
100 , EventListenerBase listener);
103 #endregion Nested Interfaces
105 #region Properties
107 /// <summary>
108 /// Indicates whether the listener has been initialized.
109 /// </summary>
110 public virtual bool IsInitialized
112 get { return _listener != null; }
115 #endregion Properties
117 #region Methods
119 /// <summary>
120 /// Disposes of the emitter and frees any resources held.
121 /// </summary>
122 public void Dispose()
124 Dispose(true);
125 GC.SuppressFinalize(this);
128 /// <summary>
129 /// Registers an event sink and activates it.
130 /// </summary>
131 /// <param name="sink">the event sink to register</param>
132 /// <param name="handback">a handback object - this object is opaque to the listener
133 /// and will be attached to the registration key prior to activation</param>
134 /// <returns>A registration key for the event sink.</returns>
135 public IEventSinkRegistrationKey RegisterAndActivateEventSink(IEventSink sink, object handback)
137 IEventSinkRegistrationKey key = RegisterEventSink(sink);
138 key.Handback = handback;
139 key.Activate();
140 return key;
143 /// <summary>
144 /// Registers an event sink with the listener without activating the
145 /// event sink.
146 /// </summary>
147 /// <param name="sink">the event sink to register</param>
148 /// <returns>A registration key for the event sink</returns>
149 public IEventSinkRegistrationKey RegisterEventSink(IEventSink sink)
151 if (sink == null) throw new ArgumentNullException("sink");
152 RegistrationKey key = new RegistrationKey(this, sink);
153 AddRegistration(key);
154 return key;
157 internal void PerformEventArrival(Event ev)
159 int notifier = Interlocked.Increment(ref _notifiers);
162 if (notifier == LeadNotifier) _notifications.EnterUpgradeableReadLock();
163 else _notifications.EnterReadLock();
166 foreach (var r in _registrations)
168 if (r.PerformEventArrival(ev, _cacheHandleErrorsDelegate))
170 Interlocked.Increment(ref _consolidationVotes);
173 if (notifier == LeadNotifier && Thread.VolatileRead(ref _consolidationVotes) > 0)
175 SafeConsolidateRegistrations();
178 finally
180 if (notifier == LeadNotifier) _notifications.ExitUpgradeableReadLock();
181 else _notifications.ExitReadLock();
184 finally
186 Interlocked.Decrement(ref _notifiers);
190 internal GarbageHandlingVote PerformGarbageArrival(EndPoint remoteEndPoint, int priorGarbageCountForEndpoint, byte[] garbage)
192 GarbageHandlingVote strategy = GarbageHandlingVote.None;
193 int notifier = Interlocked.Increment(ref _notifiers);
196 if (notifier == LeadNotifier) _notifications.EnterUpgradeableReadLock();
197 else _notifications.EnterReadLock();
200 foreach (var r in _registrations)
202 GarbageHandlingVote strategyVote = r.PerformGarbageArrival(
203 remoteEndPoint,
204 priorGarbageCountForEndpoint,
205 garbage,
206 _cacheHandleErrorsDelegate
208 if (strategyVote > strategy)
210 strategy = strategyVote;
213 if (notifier == LeadNotifier && Thread.VolatileRead(ref _consolidationVotes) > 0)
215 SafeConsolidateRegistrations();
218 finally
220 if (notifier == LeadNotifier) _notifications.ExitUpgradeableReadLock();
221 else _notifications.ExitReadLock();
224 finally
226 Interlocked.Decrement(ref _notifiers);
228 return strategy;
231 /// <summary>
232 /// Ensures the emitter has been initialized.
233 /// </summary>
234 /// <exception cref="InvalidOperationException">thrown if the emitter has not yet been initialized.</exception>
235 protected void CheckInitialized()
237 if (IsInitialized) throw new InvalidOperationException(Resources.Error_NotYetInitialized);
240 /// <summary>
241 /// Disposes of the emitter.
242 /// </summary>
243 /// <param name="disposing">Indicates whether the object is being disposed</param>
244 protected virtual void Dispose(bool disposing)
246 Util.Dispose(ref _listener);
249 /// <summary>
250 /// Initializes the base class.
251 /// </summary>
252 /// <param name="db">template database used when creating events</param>
253 /// <param name="endpoint">an IP endpoint where listening will occur</param>
254 /// <param name="parallel">whether the listener will listen and dispatch events in parallel</param>
255 /// <param name="garbageHandling">indicates the garbage handling strategy the listener will use</param>
256 /// <param name="finishSocket">callback method used to complete the setup of the socket
257 /// connected to the given <paramref name="endpoint"/></param>
258 protected void Initialize(IEventTemplateDB db
259 , IPEndPoint endpoint
260 , bool parallel
261 , ListenerGarbageHandling garbageHandling
262 , Action<Socket, IPEndPoint> finishSocket)
264 if (db == null) throw new ArgumentNullException("db");
265 if (endpoint == null) throw new ArgumentNullException("endpoint");
266 if (finishSocket == null) throw new ArgumentNullException("finishSocket");
268 _db = db;
269 _endpoint = endpoint;
270 IListener listener = (parallel)
271 ? (IListener)new ParallelListener()
272 : (IListener)new BackgroundThreadListener();
274 _garbageHandling = garbageHandling;
275 if (_garbageHandling > ListenerGarbageHandling.FailSilently)
277 _garbageTracking = new Dictionary<TrafficTrackingKey, TrafficTrackingRec>();
278 _garbageTrackingLock = new Object();
281 listener.Start(db, endpoint, finishSocket, this);
282 _listener = listener;
285 private void AddRegistration(RegistrationKey key)
287 int notifier = Interlocked.Increment(ref _notifiers);
290 if (notifier == LeadNotifier)
292 if (_notifications.TryEnterWriteLock(20))
296 lock (_additions)
298 _additions.Add(key);
299 UnsafeConsolidateRegistrations();
301 return;
303 finally
305 _notifications.ExitWriteLock();
310 // We couldn't get the writelock so we're gonna have to schedule
311 // the key to be added later...
312 lock (_additions)
314 _additions.Add(key);
315 Interlocked.Increment(ref _consolidationVotes);
318 finally
320 Interlocked.Decrement(ref _notifiers);
324 private GarbageHandlingVote GetTrafficStrategyForEndpoint(EndPoint ep)
326 if (_garbageHandling == ListenerGarbageHandling.FailSilently)
328 return GarbageHandlingVote.None;
330 else
332 IPEndPoint ipep = (IPEndPoint)ep;
333 TrafficTrackingKey key = new TrafficTrackingKey(ep);
334 TrafficTrackingRec tracking;
335 lock (_garbageTrackingLock)
337 if (_garbageTracking.TryGetValue(key, out tracking))
339 return tracking.Strategy;
342 return GarbageHandlingVote.Default;
346 private void HandleErrorsOnEventSink(RegistrationKey key, Exception e)
348 this.TraceData(TraceEventType.Error, Resources.Error_EventSinkThrewException, key, e);
349 // TODO: Strategies for event sinks that cause exceptions.
352 private void HandleGarbageData(EndPoint ep, byte[] buffer, int offset, int bytesTransferred)
354 this.TraceData(TraceEventType.Verbose, new Func<object[]>(() =>
356 return new object[] { ((IPEndPoint)ep).ToString(), Util.BytesToOctets(buffer, offset, bytesTransferred) };
360 if (_garbageHandling > ListenerGarbageHandling.FailSilently)
362 IPEndPoint ipep = (IPEndPoint)ep;
363 TrafficTrackingKey key = new TrafficTrackingKey(ep);
364 TrafficTrackingRec tracking;
365 lock (_garbageTrackingLock)
367 if (!_garbageTracking.TryGetValue(key, out tracking))
369 tracking = new TrafficTrackingRec(ep);
370 _garbageTracking.Add(key, tracking);
373 if (_garbageHandling == ListenerGarbageHandling.AskEventSinksToVoteOnStrategy
374 && tracking.Strategy != GarbageHandlingVote.IgnoreAllTrafficFromEndpoint)
376 PerformGarbageDataNotification(tracking, ep, buffer, offset, bytesTransferred);
381 private void PerformGarbageDataNotification(TrafficTrackingRec tracking, EndPoint rcep, byte[] buffer, int offset, int bytesTransferred)
383 byte[] copy = new byte[bytesTransferred];
384 Array.Copy(buffer, copy, bytesTransferred);
385 tracking.Strategy = PerformGarbageArrival(rcep, tracking.IncrementGarbageCount(), copy);
388 private void SafeConsolidateRegistrations()
390 _notifications.EnterWriteLock();
393 lock (_additions)
395 UnsafeConsolidateRegistrations();
398 finally
400 _notifications.ExitWriteLock();
404 private void UnsafeConsolidateRegistrations()
406 #if DEBUG
407 Debug.Assert(_notifications.IsWriteLockHeld);
408 #endif
409 _registrations = (from r in _registrations
410 where r.Status != EventSinkStatus.Canceled
411 select r).Concat(from r in _additions
412 where r.Status != EventSinkStatus.Canceled
413 select r).ToArray();
414 _additions.Clear();
416 Thread.VolatileWrite(ref _consolidationVotes, 0);
419 #endregion Methods
421 #region Nested Types
423 struct TrafficTrackingKey
425 #region Fields
427 public uint Address;
428 public int AddressFamily;
429 public int Port;
431 #endregion Fields
433 #region Constructors
435 public TrafficTrackingKey(EndPoint ep)
437 IPEndPoint ipep = (IPEndPoint)ep;
438 Address = BitConverter.ToUInt32(ipep.Address.GetAddressBytes(), 0);
439 Port = ipep.Port;
440 AddressFamily = (int)ipep.AddressFamily;
443 #endregion Constructors
446 /// <remarks>
447 /// Uses background threads to receive events from LWES. This class uses two
448 /// threads, one to listen and deserialize the events and another to perform
449 /// the notifications.
450 /// </remarks>
451 class BackgroundThreadListener : IListener
453 #region Fields
455 EndPoint _anyEP;
456 byte[] _buffer;
457 IEventTemplateDB _db;
458 SimpleLockFreeQueue<Event> _eventQueue = new SimpleLockFreeQueue<Event>();
459 UdpEndpoint _listenEP;
460 EventListenerBase _listener;
461 Thread _notifier;
462 Status<ListenerState> _notifierState;
463 Object _notifierWaitObject;
464 Thread _reciever;
465 Status<ListenerState> _recieverState;
467 #endregion Fields
469 #region Constructors
471 ~BackgroundThreadListener()
473 Dispose(false);
476 #endregion Constructors
478 #region Methods
480 public void Dispose()
482 Dispose(true);
483 GC.SuppressFinalize(this);
486 /// <summary>
487 /// Starts the listener in multi-threaded mode. In this mode the listener
488 /// consumes from 1 to 2 threads from the threadpool. A thread is used for
489 /// receiving bytes and deserializing LWES events and another thread is
490 /// scheduled to perform event notification only when LWES events have
491 /// been received.
492 /// </summary>
493 /// <param name="db">event template DB used during deserialization</param>
494 /// <param name="listenEP">a IP endpoint where listening should occur</param>
495 /// <param name="finishSocket"></param>
496 /// <param name="listener"></param>
497 public void Start(IEventTemplateDB db
498 , IPEndPoint listenEP
499 , Action<Socket, IPEndPoint> finishSocket
500 , EventListenerBase listener)
502 _db = db;
503 _listener = listener;
504 _anyEP = (listenEP.AddressFamily == AddressFamily.InterNetworkV6)
505 ? new IPEndPoint(IPAddress.IPv6Any, listenEP.Port)
506 : new IPEndPoint(IPAddress.Any, listenEP.Port);
507 _buffer = BufferManager.AcquireBuffer(null);
508 _listenEP = new UdpEndpoint(listenEP).Initialize(finishSocket);
509 // Start a dedicated background thread to handle the receiving...
510 _reciever = new Thread(Background_Receiver);
511 _reciever.IsBackground = true;
512 _reciever.Start();
514 // Start a dedicated background thread to perform event notification...
515 _notifierWaitObject = new Object();
516 _notifier = new Thread(Background_Notifier);
517 _notifier.IsBackground = true;
518 _notifier.Start();
521 internal void Stop()
523 if (_recieverState.TryTransition(ListenerState.StopSignaled, ListenerState.Active))
525 // Close the listener, this will cause the receiver thread to wakeup
526 // if it is blocked waiting for IO on the socket.
527 Util.Dispose(ref _listenEP);
528 _reciever.Join();
532 private void Background_Notifier(object unused_state)
534 _notifierState.SetState(ListenerState.Active);
535 while (_notifierState.CurrentState < ListenerState.StopSignaled)
537 Event ev;
538 if (!_eventQueue.TryDequeue(out ev))
540 lock (_notifierWaitObject)
541 { // double-check that the queue is empty
542 // this strategy catches the race condition when the
543 // reciever queue's an event while we're acquiring the lock.
544 _notifierState.SetState(ListenerState.Suspending);
545 if (!_eventQueue.TryDequeue(out ev))
547 _notifierState.SetState(ListenerState.Suspended);
548 Monitor.Wait(_notifierWaitObject);
549 continue;
551 // If the stop signal arrived during a wait then bail out...
552 if (_notifierState.CurrentState == ListenerState.StopSignaled)
554 _notifierState.SetState(ListenerState.Stopped);
555 break;
557 // otherwise we're active again
558 _notifierState.SetState(ListenerState.Active);
561 _listener.PerformEventArrival(ev);
565 private void Background_Receiver(object unused_state)
567 if (_recieverState.TryTransition(ListenerState.Active, ListenerState.Unknown))
571 // Continue until signaled to stop...
572 while (_recieverState.CurrentState == ListenerState.Active)
574 EndPoint rcep = _anyEP;
575 // Perform a blocking receive...
576 int bytesTransferred = _listenEP.ReceiveFrom(ref rcep, _buffer, 0, _buffer.Length);
577 if (bytesTransferred > 0)
579 GarbageHandlingVote handling = _listener.GetTrafficStrategyForEndpoint(rcep);
580 if (handling == GarbageHandlingVote.None)
582 PerformEventDeserializationAndQueueForNotification(rcep, _buffer, 0, bytesTransferred);
584 else if (handling == GarbageHandlingVote.TreatTrafficFromEndpointAsGarbage)
586 _listener.HandleGarbageData(rcep, _buffer, 0, bytesTransferred);
588 // Otherwise the handling was GarbageHandlingStrategy.FailfastForTrafficOnEndpoint
589 // and we're going to ignore it altogether.
593 catch (SocketException se)
595 if (se.ErrorCode != 10004)
596 throw se;
598 if (_recieverState.TryTransition(ListenerState.Stopping, ListenerState.StopSignaled))
600 // Cascade the stop signal to the notifier and wait for it to exit...
601 _notifierState.SetState(ListenerState.StopSignaled);
602 _notifier.Join();
607 private void Dispose(bool disposing)
609 // Signal background threads...
610 _recieverState.TryTransition(ListenerState.StopSignaled, ListenerState.Active, () =>
612 Util.Dispose(ref _listenEP);
613 _reciever.Join(CDisposeBackgroundThreadWaitTimeMS);
614 BufferManager.ReleaseBuffer(_buffer);
615 _buffer = null;
619 private void PerformEventDeserializationAndQueueForNotification(EndPoint rcep
620 , byte[] buffer
621 , int offset, int bytesTransferred)
623 IPEndPoint ep = (IPEndPoint)rcep;
626 // For received events, set MetaEventInfo.ReciptTime, MetaEventInfo.SenderIP, and MetaEventInfo.SenderPort...
627 Event ev = Event.BinaryDecode(_db, buffer, offset, bytesTransferred);
628 ev.SetValue(Constants.MetaEventInfoAttributes.ReceiptTime.Name, Constants.DateTimeToLwesTimeTicks(DateTime.UtcNow));
629 ev.SetValue(Constants.MetaEventInfoAttributes.SenderIP.Name, ep.Address);
630 ev.SetValue(Constants.MetaEventInfoAttributes.SenderPort.Name, ep.Port);
631 _eventQueue.Enqueue(ev);
633 catch (BadLwesDataException)
635 _listener.HandleGarbageData(rcep, buffer, offset, bytesTransferred);
638 if (_notifierState.CurrentState > ListenerState.Active)
640 // notifier thread is suspended;
641 // wake it up...
642 lock (_notifierWaitObject)
644 Monitor.Pulse(_notifierWaitObject);
649 #endregion Methods
652 /// <remarks>
653 /// Uses the threadpool and overlapped IO on the recieving socket. This listener
654 /// will consume between 0 and 3 threads from the threadpool, depending on which
655 /// jobs are active. The jobs may consist of the following:
656 /// <ul>
657 /// <li>Receiver - invoked by the socket on a threadpool thread when input is received</li>
658 /// <li>Deserializer - scheduled for a threadpool thread and runs as long as buffers are in the receive queue</li>
659 /// <li>Notifier - scheduled for a threadpool thread and runs as long as Events are in the notification queue</li>
660 /// </ul>
661 /// </remarks>
662 class ParallelListener : IListener
664 #region Fields
666 EndPoint _anyEP;
667 IEventTemplateDB _db;
668 int _deserializers;
669 SimpleLockFreeQueue<Event> _eventQueue = new SimpleLockFreeQueue<Event>();
670 UdpEndpoint _listenEP;
671 EventListenerBase _listener;
672 Status<ListenerState> _listenerState;
673 int _notifiers;
674 SimpleLockFreeQueue<ReceiveCapture> _receiveQueue;
676 #endregion Fields
678 #region Constructors
680 ~ParallelListener()
682 Dispose(false);
685 #endregion Constructors
687 #region Methods
689 public void Dispose()
691 Dispose(true);
692 GC.SuppressFinalize(this);
695 /// <summary>
696 /// Starts the listener.
697 /// </summary>
698 /// <param name="db">an event template DB</param>
699 /// <param name="listenEP">the listening endpoint</param>
700 /// <param name="finishSocket">a callback method that is called upon to finish the listening socket</param>
701 /// <param name="owner">the owner</param>
702 public void Start(IEventTemplateDB db
703 , IPEndPoint listenEP
704 , Action<Socket, IPEndPoint> finishSocket
705 , EventListenerBase owner)
707 _db = db;
708 _listener = owner;
709 _anyEP = (listenEP.AddressFamily == AddressFamily.InterNetworkV6)
710 ? new IPEndPoint(IPAddress.IPv6Any, 0)
711 : new IPEndPoint(IPAddress.Any, 0);
713 _receiveQueue = new SimpleLockFreeQueue<ReceiveCapture>();
715 _listenEP = new UdpEndpoint(listenEP).Initialize(finishSocket);
716 ParallelReceiver();
719 internal void Stop()
721 _listenerState.TryTransition(ListenerState.StopSignaled, ListenerState.Active, () =>
723 while (Thread.VolatileRead(ref _deserializers) > 0)
725 Thread.Sleep(CDisposeBackgroundThreadWaitTimeMS);
727 while (Thread.VolatileRead(ref _notifiers) > 0)
729 Thread.Sleep(CDisposeBackgroundThreadWaitTimeMS);
732 Util.Dispose(ref _listenEP);
734 _listenerState.SpinWaitForState(ListenerState.Stopped, () => Thread.Sleep(CDisposeBackgroundThreadWaitTimeMS));
738 private void Background_Deserializer(object unused_state)
740 // Called within the thread pool:
742 // Drains the recieve queue of capture records and
743 // transforms those records into Event objects by deserialization.
748 ReceiveCapture input;
749 while (_listenerState.IsLessThan(ListenerState.StopSignaled) && _receiveQueue.TryDequeue(out input))
751 GarbageHandlingVote handling = _listener.GetTrafficStrategyForEndpoint(input.RemoteEndPoint);
752 if (handling == GarbageHandlingVote.None)
754 PerformEventDeserializationAndQueueForNotification(input.RemoteEndPoint, input.Buffer, 0, input.BytesTransferred);
756 else if (handling == GarbageHandlingVote.TreatTrafficFromEndpointAsGarbage)
758 _listener.HandleGarbageData(input.RemoteEndPoint, input.Buffer, 0, input.BytesTransferred);
760 // Otherwise the handling was GarbageHandlingStrategy.FailfastForTrafficOnEndpoint
761 // and we're going to ignore it altogether.
764 finally
766 int z = Interlocked.Decrement(ref _deserializers);
767 if (z == 0 && !_receiveQueue.IsEmpty)
768 EnsureDeserializerIsActive();
772 private void Background_Notifier(object unused_state)
775 // Drains the event queue and performs notification
779 Event ev;
780 while (_listenerState.IsLessThan(ListenerState.StopSignaled) && _eventQueue.TryDequeue(out ev))
782 _listener.PerformEventArrival(ev);
785 finally
787 int z = Interlocked.Decrement(ref _notifiers);
788 if (z == 0 && !_receiveQueue.IsEmpty)
789 EnsureNotifierIsActive();
793 private void Background_ParallelReceiver(object unused_state)
795 // Continue until signalled to stop...
796 if (_listenerState.IsLessThan(ListenerState.StopSignaled))
798 // Acquiring a buffer may block until a buffer
799 // becomes available.
800 byte[] buffer = BufferManager.AcquireBuffer(() => _listenerState.IsGreaterThan(ListenerState.Active));
802 // If the buffer is null then the stop-signal was received while acquiring a buffer
803 if (buffer != null)
805 _listenEP.ReceiveFromAsync(_anyEP, buffer, 0, buffer.Length, (op) =>
807 if (op.SocketError == SocketError.Success)
809 // Reschedule the receiver before pulling the buffer out, we want to catch receives
810 // in the tightest loop possible, although we don't want to keep a threadpool thread
811 // *forever* and possibly cause thread-starvation in for other jobs so we continually
812 // put the job back in the queue - this way our parallelism plays nicely with other
813 // jobs - now, if only the other jobs were programmed to give up their threads periodically
814 // too... hmmm!
815 ThreadPool.QueueUserWorkItem(new WaitCallback(Background_ParallelReceiver));
816 if (op.BytesTransferred > 0)
818 _receiveQueue.Enqueue(new ReceiveCapture(op.RemoteEndPoint, op.Buffer, op.BytesTransferred));
820 EnsureDeserializerIsActive();
823 else if (op.SocketError == SocketError.OperationAborted)
825 // This is the dispose or stop call. fall through
826 CascadeStopSignal();
829 return false;
830 }, null);
832 return;
836 // We get here if the receiver is signaled to stop.
837 CascadeStopSignal();
840 private void CascadeStopSignal()
842 _listenerState.TryTransition(ListenerState.Stopping, ListenerState.StopSignaled, () =>
844 while (Thread.VolatileRead(ref _deserializers) > 0)
846 Thread.Sleep(CDisposeBackgroundThreadWaitTimeMS);
848 while (Thread.VolatileRead(ref _notifiers) > 0)
850 Thread.Sleep(CDisposeBackgroundThreadWaitTimeMS);
852 _listenerState.SetState(ListenerState.Stopped);
856 private void Dispose(bool disposing)
858 if (_listenerState.CurrentState == ListenerState.Active)
859 Stop();
862 private void EnsureDeserializerIsActive()
864 int current = -1, value = Thread.VolatileRead(ref _deserializers);
865 if (value < 1)
867 WaitCallback cb = new WaitCallback(Background_Deserializer);
868 while (true)
870 current = value;
871 value = Interlocked.CompareExchange(ref _deserializers, value + 1, current);
872 if (value == current)
874 ThreadPool.QueueUserWorkItem(cb);
875 break;
881 private void EnsureNotifierIsActive()
883 int current = -1, value = Thread.VolatileRead(ref _notifiers);
884 if (value < 1)
886 WaitCallback cb = new WaitCallback(Background_Notifier);
887 while (true)
889 current = value;
890 value = Interlocked.CompareExchange(ref _notifiers, value + 1, current);
891 if (value == current)
893 ThreadPool.QueueUserWorkItem(cb);
894 break;
900 private void ParallelReceiver()
902 // Only startup once.
903 if (_listenerState.TryTransition(ListenerState.Active, ListenerState.Unknown))
905 Background_ParallelReceiver(null);
909 private void PerformEventDeserializationAndQueueForNotification(EndPoint rcep
910 , byte[] buffer
911 , int offset, int bytesTransferred)
913 IPEndPoint ep = (IPEndPoint)rcep;
916 // For received events, set MetaEventInfo.ReciptTime, MetaEventInfo.SenderIP, and MetaEventInfo.SenderPort...
917 Event ev = Event.BinaryDecode(_db, buffer, offset, bytesTransferred);
918 ev.SetValue(Constants.MetaEventInfoAttributes.ReceiptTime.Name, Constants.DateTimeToLwesTimeTicks(DateTime.UtcNow));
919 ev.SetValue(Constants.MetaEventInfoAttributes.SenderIP.Name, ep.Address);
920 ev.SetValue(Constants.MetaEventInfoAttributes.SenderPort.Name, ep.Port);
921 _eventQueue.Enqueue(ev);
923 catch (BadLwesDataException)
925 _listener.HandleGarbageData(rcep, buffer, offset, bytesTransferred);
928 BufferManager.ReleaseBuffer(buffer);
929 EnsureNotifierIsActive();
932 #endregion Methods
934 #region Nested Types
936 struct ReceiveCapture
938 #region Fields
940 public byte[] Buffer;
941 public int BytesTransferred;
942 public EndPoint RemoteEndPoint;
944 #endregion Fields
946 #region Constructors
948 public ReceiveCapture(EndPoint ep, byte[] data, int transferred)
950 this.RemoteEndPoint = ep;
951 this.Buffer = data;
952 this.BytesTransferred = transferred;
955 #endregion Constructors
958 #endregion Nested Types
961 class RegistrationKey : IEventSinkRegistrationKey
963 #region Fields
965 bool _disableGarbageNotification;
966 Status<EventSinkStatus> _status = new Status<EventSinkStatus>(EventSinkStatus.Suspended);
967 bool _threadSafe;
969 #endregion Fields
971 #region Constructors
973 public RegistrationKey(EventListenerBase listener, IEventSink sink)
975 Listener = listener;
976 Sink = sink;
977 _threadSafe = sink.IsThreadSafe;
980 #endregion Constructors
982 #region Properties
984 public object Handback
986 get;
987 set;
990 public IEventListener Listener
992 get;
993 private set;
996 public IEventSink Sink
998 get;
999 private set;
1002 public EventSinkStatus Status
1004 get { return _status.CurrentState; }
1007 #endregion Properties
1009 #region Methods
1011 public bool Activate()
1013 return _status.SetStateIfLessThan(EventSinkStatus.Active, EventSinkStatus.Canceled);
1016 public void Cancel()
1018 _status.SetState(EventSinkStatus.Canceled);
1021 public void DisableGarbageNotification()
1023 Thread.MemoryBarrier();
1024 _disableGarbageNotification = true;
1025 Thread.MemoryBarrier();
1028 public bool Suspend()
1030 return _status.SetStateIfLessThan(EventSinkStatus.Suspended, EventSinkStatus.Canceled);
1033 internal bool PerformEventArrival(Event ev, Action<RegistrationKey, Exception> errorHandler)
1035 if (!_threadSafe)
1037 if (_status.SpinToggleState(EventSinkStatus.Notifying, EventSinkStatus.Active))
1041 Sink.HandleEventArrival(this, ev);
1042 _status.TryTransition(EventSinkStatus.Active, EventSinkStatus.Notifying);
1044 catch (Exception e)
1046 errorHandler(this, e);
1050 else
1054 EventSinkStatus s = _status.CompareExchange(EventSinkStatus.Notifying, EventSinkStatus.Active);
1055 if (s == EventSinkStatus.Active || s == EventSinkStatus.Notifying)
1057 Sink.HandleEventArrival(this, ev);
1058 _status.TryTransition(EventSinkStatus.Active, EventSinkStatus.Notifying);
1061 catch (Exception e)
1063 errorHandler(this, e);
1066 return _status.CurrentState == EventSinkStatus.Canceled;
1069 internal GarbageHandlingVote PerformGarbageArrival(EndPoint remoteEndPoint, int priorGarbageCountForEndpoint, byte[] garbage,
1070 Action<RegistrationKey, Exception> errorHandler)
1072 Thread.MemoryBarrier();
1073 bool ignoring = _disableGarbageNotification;
1075 GarbageHandlingVote strategy = GarbageHandlingVote.None;
1076 if (!ignoring)
1078 if (!_threadSafe)
1080 if (_status.SpinToggleState(EventSinkStatus.Active, EventSinkStatus.Notifying))
1084 strategy = Sink.HandleGarbageData(this, remoteEndPoint, priorGarbageCountForEndpoint, garbage);
1086 catch (Exception e)
1088 errorHandler(this, e);
1090 _status.TryTransition(EventSinkStatus.Active, EventSinkStatus.Notifying);
1093 else
1097 if (_status.TryTransition(EventSinkStatus.Notifying, EventSinkStatus.Active))
1099 strategy = Sink.HandleGarbageData(this, remoteEndPoint, priorGarbageCountForEndpoint, garbage);
1100 _status.TryTransition(EventSinkStatus.Active, EventSinkStatus.Notifying);
1103 catch (Exception e)
1105 errorHandler(this, e);
1109 return strategy;
1112 #endregion Methods
1115 class TrafficTrackingRec
1117 #region Fields
1119 int _garbageCount = 0;
1121 #endregion Fields
1123 #region Constructors
1125 public TrafficTrackingRec(EndPoint ep)
1127 RemoteEndPoint = ep;
1130 #endregion Constructors
1132 #region Properties
1134 public bool IsEmpty
1136 get { return RemoteEndPoint == null; }
1139 public int PreviousGargageDataCount
1141 get { return _garbageCount; }
1144 public EndPoint RemoteEndPoint
1146 get;
1147 private set;
1150 public GarbageHandlingVote Strategy
1152 get;
1153 set;
1156 #endregion Properties
1158 #region Methods
1160 public int IncrementGarbageCount()
1162 return Interlocked.Increment(ref _garbageCount);
1165 #endregion Methods
1168 #endregion Nested Types