2 // This file is part of the LWES .NET Binding (LWES.net)
4 // COPYRIGHT© 2009, Phillip Clark (cerebralkungfu[at*g mail[dot*com)
5 // original .NET implementation
7 // LWES.net is free software: you can redistribute it and/or modify
8 // it under the terms of the Lesser GNU General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
12 // LWES.net is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // Lesser GNU General Public License for more details.
17 // You should have received a copy of the Lesser GNU General Public License
18 // along with LWES.net. If not, see <http://www.gnu.org/licenses/>.
20 namespace Org
.Lwes
.Listener
23 using System
.Collections
.Generic
;
24 using System
.Diagnostics
;
27 using System
.Net
.Sockets
;
28 using System
.Threading
;
31 using Org
.Lwes
.Properties
;
34 /// Base class for event listeners.
36 public class EventListenerBase
: IEventListener
, ITraceable
40 const int CDisposeBackgroundThreadWaitTimeMS
= 200;
41 const int LeadNotifier
= 1;
43 List
<RegistrationKey
> _additions
= new List
<RegistrationKey
>();
44 Action
<RegistrationKey
, Exception
> _cacheHandleErrorsDelegate
;
45 int _consolidationVotes
= 0;
48 ListenerGarbageHandling _garbageHandling
;
49 Dictionary
<TrafficTrackingKey
, TrafficTrackingRec
> _garbageTracking
;
50 Object _garbageTrackingLock
;
52 ReaderWriterLockSlim _notifications
= new ReaderWriterLockSlim();
54 RegistrationKey
[] _registrations
= new RegistrationKey
[0];
61 /// Creates a new instance.
63 protected EventListenerBase()
65 _cacheHandleErrorsDelegate
= new Action
<RegistrationKey
, Exception
>(HandleErrorsOnEventSink
);
69 /// Destructor ensuring dispose is called.
76 #endregion Constructors
91 #endregion Enumerations
93 #region Nested Interfaces
95 interface IListener
: IDisposable
97 void Start(IEventTemplateDB db
99 , Action
<Socket
, IPEndPoint
> finishSocket
100 , EventListenerBase listener
);
103 #endregion Nested Interfaces
108 /// Indicates whether the listener has been initialized.
110 public virtual bool IsInitialized
112 get { return _listener != null; }
115 #endregion Properties
120 /// Disposes of the emitter and frees any resources held.
122 public void Dispose()
125 GC
.SuppressFinalize(this);
129 /// Registers an event sink and activates it.
131 /// <param name="sink">the event sink to register</param>
132 /// <param name="handback">a handback object - this object is opaque to the listener
133 /// and will be attached to the registration key prior to activation</param>
134 /// <returns>A registration key for the event sink.</returns>
135 public IEventSinkRegistrationKey
RegisterAndActivateEventSink(IEventSink sink
, object handback
)
137 IEventSinkRegistrationKey key
= RegisterEventSink(sink
);
138 key
.Handback
= handback
;
144 /// Registers an event sink with the listener without activating the
147 /// <param name="sink">the event sink to register</param>
148 /// <returns>A registration key for the event sink</returns>
149 public IEventSinkRegistrationKey
RegisterEventSink(IEventSink sink
)
151 if (sink
== null) throw new ArgumentNullException("sink");
152 RegistrationKey key
= new RegistrationKey(this, sink
);
153 AddRegistration(key
);
157 internal void PerformEventArrival(Event ev
)
159 int notifier
= Interlocked
.Increment(ref _notifiers
);
162 if (notifier
== LeadNotifier
) _notifications
.EnterUpgradeableReadLock();
163 else _notifications
.EnterReadLock();
166 foreach (var r
in _registrations
)
168 if (r
.PerformEventArrival(ev
, _cacheHandleErrorsDelegate
))
170 Interlocked
.Increment(ref _consolidationVotes
);
173 if (notifier
== LeadNotifier
&& Thread
.VolatileRead(ref _consolidationVotes
) > 0)
175 SafeConsolidateRegistrations();
180 if (notifier
== LeadNotifier
) _notifications
.ExitUpgradeableReadLock();
181 else _notifications
.ExitReadLock();
186 Interlocked
.Decrement(ref _notifiers
);
190 internal GarbageHandlingVote
PerformGarbageArrival(EndPoint remoteEndPoint
, int priorGarbageCountForEndpoint
, byte[] garbage
)
192 GarbageHandlingVote strategy
= GarbageHandlingVote
.None
;
193 int notifier
= Interlocked
.Increment(ref _notifiers
);
196 if (notifier
== LeadNotifier
) _notifications
.EnterUpgradeableReadLock();
197 else _notifications
.EnterReadLock();
200 foreach (var r
in _registrations
)
202 GarbageHandlingVote strategyVote
= r
.PerformGarbageArrival(
204 priorGarbageCountForEndpoint
,
206 _cacheHandleErrorsDelegate
208 if (strategyVote
> strategy
)
210 strategy
= strategyVote
;
213 if (notifier
== LeadNotifier
&& Thread
.VolatileRead(ref _consolidationVotes
) > 0)
215 SafeConsolidateRegistrations();
220 if (notifier
== LeadNotifier
) _notifications
.ExitUpgradeableReadLock();
221 else _notifications
.ExitReadLock();
226 Interlocked
.Decrement(ref _notifiers
);
232 /// Ensures the emitter has been initialized.
234 /// <exception cref="InvalidOperationException">thrown if the emitter has not yet been initialized.</exception>
235 protected void CheckInitialized()
237 if (IsInitialized
) throw new InvalidOperationException(Resources
.Error_NotYetInitialized
);
241 /// Disposes of the emitter.
243 /// <param name="disposing">Indicates whether the object is being disposed</param>
244 protected virtual void Dispose(bool disposing
)
246 Util
.Dispose(ref _listener
);
250 /// Initializes the base class.
252 /// <param name="db">template database used when creating events</param>
253 /// <param name="endpoint">an IP endpoint where listening will occur</param>
254 /// <param name="parallel">whether the listener will listen and dispatch events in parallel</param>
255 /// <param name="garbageHandling">indicates the garbage handling strategy the listener will use</param>
256 /// <param name="finishSocket">callback method used to complete the setup of the socket
257 /// connected to the given <paramref name="endpoint"/></param>
258 protected void Initialize(IEventTemplateDB db
259 , IPEndPoint endpoint
261 , ListenerGarbageHandling garbageHandling
262 , Action
<Socket
, IPEndPoint
> finishSocket
)
264 if (db
== null) throw new ArgumentNullException("db");
265 if (endpoint
== null) throw new ArgumentNullException("endpoint");
266 if (finishSocket
== null) throw new ArgumentNullException("finishSocket");
269 _endpoint
= endpoint
;
270 IListener listener
= (parallel
)
271 ? (IListener
)new ParallelListener()
272 : (IListener
)new BackgroundThreadListener();
274 _garbageHandling
= garbageHandling
;
275 if (_garbageHandling
> ListenerGarbageHandling
.FailSilently
)
277 _garbageTracking
= new Dictionary
<TrafficTrackingKey
, TrafficTrackingRec
>();
278 _garbageTrackingLock
= new Object();
281 listener
.Start(db
, endpoint
, finishSocket
, this);
282 _listener
= listener
;
285 private void AddRegistration(RegistrationKey key
)
287 int notifier
= Interlocked
.Increment(ref _notifiers
);
290 if (notifier
== LeadNotifier
)
292 if (_notifications
.TryEnterWriteLock(20))
299 UnsafeConsolidateRegistrations();
305 _notifications
.ExitWriteLock();
310 // We couldn't get the writelock so we're gonna have to schedule
311 // the key to be added later...
315 Interlocked
.Increment(ref _consolidationVotes
);
320 Interlocked
.Decrement(ref _notifiers
);
324 private GarbageHandlingVote
GetTrafficStrategyForEndpoint(EndPoint ep
)
326 if (_garbageHandling
== ListenerGarbageHandling
.FailSilently
)
328 return GarbageHandlingVote
.None
;
332 IPEndPoint ipep
= (IPEndPoint
)ep
;
333 TrafficTrackingKey key
= new TrafficTrackingKey(ep
);
334 TrafficTrackingRec tracking
;
335 lock (_garbageTrackingLock
)
337 if (_garbageTracking
.TryGetValue(key
, out tracking
))
339 return tracking
.Strategy
;
342 return GarbageHandlingVote
.Default
;
346 private void HandleErrorsOnEventSink(RegistrationKey key
, Exception e
)
348 this.TraceData(TraceEventType
.Error
, Resources
.Error_EventSinkThrewException
, key
, e
);
349 // TODO: Strategies for event sinks that cause exceptions.
352 private void HandleGarbageData(EndPoint ep
, byte[] buffer
, int offset
, int bytesTransferred
)
354 this.TraceData(TraceEventType
.Verbose
, new Func
<object[]>(() =>
356 return new object[] { ((IPEndPoint)ep).ToString(), Util.BytesToOctets(buffer, offset, bytesTransferred) }
;
360 if (_garbageHandling
> ListenerGarbageHandling
.FailSilently
)
362 IPEndPoint ipep
= (IPEndPoint
)ep
;
363 TrafficTrackingKey key
= new TrafficTrackingKey(ep
);
364 TrafficTrackingRec tracking
;
365 lock (_garbageTrackingLock
)
367 if (!_garbageTracking
.TryGetValue(key
, out tracking
))
369 tracking
= new TrafficTrackingRec(ep
);
370 _garbageTracking
.Add(key
, tracking
);
373 if (_garbageHandling
== ListenerGarbageHandling
.AskEventSinksToVoteOnStrategy
374 && tracking
.Strategy
!= GarbageHandlingVote
.IgnoreAllTrafficFromEndpoint
)
376 PerformGarbageDataNotification(tracking
, ep
, buffer
, offset
, bytesTransferred
);
381 private void PerformGarbageDataNotification(TrafficTrackingRec tracking
, EndPoint rcep
, byte[] buffer
, int offset
, int bytesTransferred
)
383 byte[] copy
= new byte[bytesTransferred
];
384 Array
.Copy(buffer
, copy
, bytesTransferred
);
385 tracking
.Strategy
= PerformGarbageArrival(rcep
, tracking
.IncrementGarbageCount(), copy
);
388 private void SafeConsolidateRegistrations()
390 _notifications
.EnterWriteLock();
395 UnsafeConsolidateRegistrations();
400 _notifications
.ExitWriteLock();
404 private void UnsafeConsolidateRegistrations()
407 Debug
.Assert(_notifications
.IsWriteLockHeld
);
409 _registrations
= (from r
in _registrations
410 where r
.Status
!= EventSinkStatus
.Canceled
411 select r
).Concat(from r
in _additions
412 where r
.Status
!= EventSinkStatus
.Canceled
416 Thread
.VolatileWrite(ref _consolidationVotes
, 0);
423 struct TrafficTrackingKey
428 public int AddressFamily
;
435 public TrafficTrackingKey(EndPoint ep
)
437 IPEndPoint ipep
= (IPEndPoint
)ep
;
438 Address
= BitConverter
.ToUInt32(ipep
.Address
.GetAddressBytes(), 0);
440 AddressFamily
= (int)ipep
.AddressFamily
;
443 #endregion Constructors
447 /// Uses background threads to receive events from LWES. This class uses two
448 /// threads, one to listen and deserialize the events and another to perform
449 /// the notifications.
451 class BackgroundThreadListener
: IListener
457 IEventTemplateDB _db
;
458 SimpleLockFreeQueue
<Event
> _eventQueue
= new SimpleLockFreeQueue
<Event
>();
459 UdpEndpoint _listenEP
;
460 EventListenerBase _listener
;
462 Status
<ListenerState
> _notifierState
;
463 Object _notifierWaitObject
;
465 Status
<ListenerState
> _recieverState
;
471 ~
BackgroundThreadListener()
476 #endregion Constructors
480 public void Dispose()
483 GC
.SuppressFinalize(this);
487 /// Starts the listener in multi-threaded mode. In this mode the listener
488 /// consumes from 1 to 2 threads from the threadpool. A thread is used for
489 /// receiving bytes and deserializing LWES events and another thread is
490 /// scheduled to perform event notification only when LWES events have
493 /// <param name="db">event template DB used during deserialization</param>
494 /// <param name="listenEP">a IP endpoint where listening should occur</param>
495 /// <param name="finishSocket"></param>
496 /// <param name="listener"></param>
497 public void Start(IEventTemplateDB db
498 , IPEndPoint listenEP
499 , Action
<Socket
, IPEndPoint
> finishSocket
500 , EventListenerBase listener
)
503 _listener
= listener
;
504 _anyEP
= (listenEP
.AddressFamily
== AddressFamily
.InterNetworkV6
)
505 ? new IPEndPoint(IPAddress
.IPv6Any
, listenEP
.Port
)
506 : new IPEndPoint(IPAddress
.Any
, listenEP
.Port
);
507 _buffer
= BufferManager
.AcquireBuffer(null);
508 _listenEP
= new UdpEndpoint(listenEP
).Initialize(finishSocket
);
509 // Start a dedicated background thread to handle the receiving...
510 _reciever
= new Thread(Background_Receiver
);
511 _reciever
.IsBackground
= true;
514 // Start a dedicated background thread to perform event notification...
515 _notifierWaitObject
= new Object();
516 _notifier
= new Thread(Background_Notifier
);
517 _notifier
.IsBackground
= true;
523 if (_recieverState
.TryTransition(ListenerState
.StopSignaled
, ListenerState
.Active
))
525 // Close the listener, this will cause the receiver thread to wakeup
526 // if it is blocked waiting for IO on the socket.
527 Util
.Dispose(ref _listenEP
);
532 private void Background_Notifier(object unused_state
)
534 _notifierState
.SetState(ListenerState
.Active
);
535 while (_notifierState
.CurrentState
< ListenerState
.StopSignaled
)
538 if (!_eventQueue
.TryDequeue(out ev
))
540 lock (_notifierWaitObject
)
541 { // double-check that the queue is empty
542 // this strategy catches the race condition when the
543 // reciever queue's an event while we're acquiring the lock.
544 _notifierState
.SetState(ListenerState
.Suspending
);
545 if (!_eventQueue
.TryDequeue(out ev
))
547 _notifierState
.SetState(ListenerState
.Suspended
);
548 Monitor
.Wait(_notifierWaitObject
);
551 // If the stop signal arrived during a wait then bail out...
552 if (_notifierState
.CurrentState
== ListenerState
.StopSignaled
)
554 _notifierState
.SetState(ListenerState
.Stopped
);
557 // otherwise we're active again
558 _notifierState
.SetState(ListenerState
.Active
);
561 _listener
.PerformEventArrival(ev
);
565 private void Background_Receiver(object unused_state
)
567 if (_recieverState
.TryTransition(ListenerState
.Active
, ListenerState
.Unknown
))
571 // Continue until signaled to stop...
572 while (_recieverState
.CurrentState
== ListenerState
.Active
)
574 EndPoint rcep
= _anyEP
;
575 // Perform a blocking receive...
576 int bytesTransferred
= _listenEP
.ReceiveFrom(ref rcep
, _buffer
, 0, _buffer
.Length
);
577 if (bytesTransferred
> 0)
579 GarbageHandlingVote handling
= _listener
.GetTrafficStrategyForEndpoint(rcep
);
580 if (handling
== GarbageHandlingVote
.None
)
582 PerformEventDeserializationAndQueueForNotification(rcep
, _buffer
, 0, bytesTransferred
);
584 else if (handling
== GarbageHandlingVote
.TreatTrafficFromEndpointAsGarbage
)
586 _listener
.HandleGarbageData(rcep
, _buffer
, 0, bytesTransferred
);
588 // Otherwise the handling was GarbageHandlingStrategy.FailfastForTrafficOnEndpoint
589 // and we're going to ignore it altogether.
593 catch (SocketException se
)
595 if (se
.ErrorCode
!= 10004)
598 if (_recieverState
.TryTransition(ListenerState
.Stopping
, ListenerState
.StopSignaled
))
600 // Cascade the stop signal to the notifier and wait for it to exit...
601 _notifierState
.SetState(ListenerState
.StopSignaled
);
607 private void Dispose(bool disposing
)
609 // Signal background threads...
610 _recieverState
.TryTransition(ListenerState
.StopSignaled
, ListenerState
.Active
, () =>
612 Util
.Dispose(ref _listenEP
);
613 _reciever
.Join(CDisposeBackgroundThreadWaitTimeMS
);
614 BufferManager
.ReleaseBuffer(_buffer
);
619 private void PerformEventDeserializationAndQueueForNotification(EndPoint rcep
621 , int offset
, int bytesTransferred
)
623 IPEndPoint ep
= (IPEndPoint
)rcep
;
626 // For received events, set MetaEventInfo.ReciptTime, MetaEventInfo.SenderIP, and MetaEventInfo.SenderPort...
627 Event ev
= Event
.BinaryDecode(_db
, buffer
, offset
, bytesTransferred
);
628 ev
.SetValue(Constants
.MetaEventInfoAttributes
.ReceiptTime
.Name
, Constants
.DateTimeToLwesTimeTicks(DateTime
.UtcNow
));
629 ev
.SetValue(Constants
.MetaEventInfoAttributes
.SenderIP
.Name
, ep
.Address
);
630 ev
.SetValue(Constants
.MetaEventInfoAttributes
.SenderPort
.Name
, ep
.Port
);
631 _eventQueue
.Enqueue(ev
);
633 catch (BadLwesDataException
)
635 _listener
.HandleGarbageData(rcep
, buffer
, offset
, bytesTransferred
);
638 if (_notifierState
.CurrentState
> ListenerState
.Active
)
640 // notifier thread is suspended;
642 lock (_notifierWaitObject
)
644 Monitor
.Pulse(_notifierWaitObject
);
653 /// Uses the threadpool and overlapped IO on the recieving socket. This listener
654 /// will consume between 0 and 3 threads from the threadpool, depending on which
655 /// jobs are active. The jobs may consist of the following:
657 /// <li>Receiver - invoked by the socket on a threadpool thread when input is received</li>
658 /// <li>Deserializer - scheduled for a threadpool thread and runs as long as buffers are in the receive queue</li>
659 /// <li>Notifier - scheduled for a threadpool thread and runs as long as Events are in the notification queue</li>
662 class ParallelListener
: IListener
667 IEventTemplateDB _db
;
669 SimpleLockFreeQueue
<Event
> _eventQueue
= new SimpleLockFreeQueue
<Event
>();
670 UdpEndpoint _listenEP
;
671 EventListenerBase _listener
;
672 Status
<ListenerState
> _listenerState
;
674 SimpleLockFreeQueue
<ReceiveCapture
> _receiveQueue
;
685 #endregion Constructors
689 public void Dispose()
692 GC
.SuppressFinalize(this);
696 /// Starts the listener.
698 /// <param name="db">an event template DB</param>
699 /// <param name="listenEP">the listening endpoint</param>
700 /// <param name="finishSocket">a callback method that is called upon to finish the listening socket</param>
701 /// <param name="owner">the owner</param>
702 public void Start(IEventTemplateDB db
703 , IPEndPoint listenEP
704 , Action
<Socket
, IPEndPoint
> finishSocket
705 , EventListenerBase owner
)
709 _anyEP
= (listenEP
.AddressFamily
== AddressFamily
.InterNetworkV6
)
710 ? new IPEndPoint(IPAddress
.IPv6Any
, 0)
711 : new IPEndPoint(IPAddress
.Any
, 0);
713 _receiveQueue
= new SimpleLockFreeQueue
<ReceiveCapture
>();
715 _listenEP
= new UdpEndpoint(listenEP
).Initialize(finishSocket
);
721 _listenerState
.TryTransition(ListenerState
.StopSignaled
, ListenerState
.Active
, () =>
723 while (Thread
.VolatileRead(ref _deserializers
) > 0)
725 Thread
.Sleep(CDisposeBackgroundThreadWaitTimeMS
);
727 while (Thread
.VolatileRead(ref _notifiers
) > 0)
729 Thread
.Sleep(CDisposeBackgroundThreadWaitTimeMS
);
732 Util
.Dispose(ref _listenEP
);
734 _listenerState
.SpinWaitForState(ListenerState
.Stopped
, () => Thread
.Sleep(CDisposeBackgroundThreadWaitTimeMS
));
738 private void Background_Deserializer(object unused_state
)
740 // Called within the thread pool:
742 // Drains the recieve queue of capture records and
743 // transforms those records into Event objects by deserialization.
748 ReceiveCapture input
;
749 while (_listenerState
.IsLessThan(ListenerState
.StopSignaled
) && _receiveQueue
.TryDequeue(out input
))
751 GarbageHandlingVote handling
= _listener
.GetTrafficStrategyForEndpoint(input
.RemoteEndPoint
);
752 if (handling
== GarbageHandlingVote
.None
)
754 PerformEventDeserializationAndQueueForNotification(input
.RemoteEndPoint
, input
.Buffer
, 0, input
.BytesTransferred
);
756 else if (handling
== GarbageHandlingVote
.TreatTrafficFromEndpointAsGarbage
)
758 _listener
.HandleGarbageData(input
.RemoteEndPoint
, input
.Buffer
, 0, input
.BytesTransferred
);
760 // Otherwise the handling was GarbageHandlingStrategy.FailfastForTrafficOnEndpoint
761 // and we're going to ignore it altogether.
766 int z
= Interlocked
.Decrement(ref _deserializers
);
767 if (z
== 0 && !_receiveQueue
.IsEmpty
)
768 EnsureDeserializerIsActive();
772 private void Background_Notifier(object unused_state
)
775 // Drains the event queue and performs notification
780 while (_listenerState
.IsLessThan(ListenerState
.StopSignaled
) && _eventQueue
.TryDequeue(out ev
))
782 _listener
.PerformEventArrival(ev
);
787 int z
= Interlocked
.Decrement(ref _notifiers
);
788 if (z
== 0 && !_receiveQueue
.IsEmpty
)
789 EnsureNotifierIsActive();
793 private void Background_ParallelReceiver(object unused_state
)
795 // Continue until signalled to stop...
796 if (_listenerState
.IsLessThan(ListenerState
.StopSignaled
))
798 // Acquiring a buffer may block until a buffer
799 // becomes available.
800 byte[] buffer
= BufferManager
.AcquireBuffer(() => _listenerState
.IsGreaterThan(ListenerState
.Active
));
802 // If the buffer is null then the stop-signal was received while acquiring a buffer
805 _listenEP
.ReceiveFromAsync(_anyEP
, buffer
, 0, buffer
.Length
, (op
) =>
807 if (op
.SocketError
== SocketError
.Success
)
809 // Reschedule the receiver before pulling the buffer out, we want to catch receives
810 // in the tightest loop possible, although we don't want to keep a threadpool thread
811 // *forever* and possibly cause thread-starvation in for other jobs so we continually
812 // put the job back in the queue - this way our parallelism plays nicely with other
813 // jobs - now, if only the other jobs were programmed to give up their threads periodically
815 ThreadPool
.QueueUserWorkItem(new WaitCallback(Background_ParallelReceiver
));
816 if (op
.BytesTransferred
> 0)
818 _receiveQueue
.Enqueue(new ReceiveCapture(op
.RemoteEndPoint
, op
.Buffer
, op
.BytesTransferred
));
820 EnsureDeserializerIsActive();
823 else if (op
.SocketError
== SocketError
.OperationAborted
)
825 // This is the dispose or stop call. fall through
836 // We get here if the receiver is signaled to stop.
840 private void CascadeStopSignal()
842 _listenerState
.TryTransition(ListenerState
.Stopping
, ListenerState
.StopSignaled
, () =>
844 while (Thread
.VolatileRead(ref _deserializers
) > 0)
846 Thread
.Sleep(CDisposeBackgroundThreadWaitTimeMS
);
848 while (Thread
.VolatileRead(ref _notifiers
) > 0)
850 Thread
.Sleep(CDisposeBackgroundThreadWaitTimeMS
);
852 _listenerState
.SetState(ListenerState
.Stopped
);
856 private void Dispose(bool disposing
)
858 if (_listenerState
.CurrentState
== ListenerState
.Active
)
862 private void EnsureDeserializerIsActive()
864 int current
= -1, value = Thread
.VolatileRead(ref _deserializers
);
867 WaitCallback cb
= new WaitCallback(Background_Deserializer
);
871 value = Interlocked
.CompareExchange(ref _deserializers
, value + 1, current
);
872 if (value == current
)
874 ThreadPool
.QueueUserWorkItem(cb
);
881 private void EnsureNotifierIsActive()
883 int current
= -1, value = Thread
.VolatileRead(ref _notifiers
);
886 WaitCallback cb
= new WaitCallback(Background_Notifier
);
890 value = Interlocked
.CompareExchange(ref _notifiers
, value + 1, current
);
891 if (value == current
)
893 ThreadPool
.QueueUserWorkItem(cb
);
900 private void ParallelReceiver()
902 // Only startup once.
903 if (_listenerState
.TryTransition(ListenerState
.Active
, ListenerState
.Unknown
))
905 Background_ParallelReceiver(null);
909 private void PerformEventDeserializationAndQueueForNotification(EndPoint rcep
911 , int offset
, int bytesTransferred
)
913 IPEndPoint ep
= (IPEndPoint
)rcep
;
916 // For received events, set MetaEventInfo.ReciptTime, MetaEventInfo.SenderIP, and MetaEventInfo.SenderPort...
917 Event ev
= Event
.BinaryDecode(_db
, buffer
, offset
, bytesTransferred
);
918 ev
.SetValue(Constants
.MetaEventInfoAttributes
.ReceiptTime
.Name
, Constants
.DateTimeToLwesTimeTicks(DateTime
.UtcNow
));
919 ev
.SetValue(Constants
.MetaEventInfoAttributes
.SenderIP
.Name
, ep
.Address
);
920 ev
.SetValue(Constants
.MetaEventInfoAttributes
.SenderPort
.Name
, ep
.Port
);
921 _eventQueue
.Enqueue(ev
);
923 catch (BadLwesDataException
)
925 _listener
.HandleGarbageData(rcep
, buffer
, offset
, bytesTransferred
);
928 BufferManager
.ReleaseBuffer(buffer
);
929 EnsureNotifierIsActive();
936 struct ReceiveCapture
940 public byte[] Buffer
;
941 public int BytesTransferred
;
942 public EndPoint RemoteEndPoint
;
948 public ReceiveCapture(EndPoint ep
, byte[] data
, int transferred
)
950 this.RemoteEndPoint
= ep
;
952 this.BytesTransferred
= transferred
;
955 #endregion Constructors
958 #endregion Nested Types
961 class RegistrationKey
: IEventSinkRegistrationKey
965 bool _disableGarbageNotification
;
966 Status
<EventSinkStatus
> _status
= new Status
<EventSinkStatus
>(EventSinkStatus
.Suspended
);
973 public RegistrationKey(EventListenerBase listener
, IEventSink sink
)
977 _threadSafe
= sink
.IsThreadSafe
;
980 #endregion Constructors
984 public object Handback
990 public IEventListener Listener
996 public IEventSink Sink
1002 public EventSinkStatus Status
1004 get { return _status.CurrentState; }
1007 #endregion Properties
1011 public bool Activate()
1013 return _status
.SetStateIfLessThan(EventSinkStatus
.Active
, EventSinkStatus
.Canceled
);
1016 public void Cancel()
1018 _status
.SetState(EventSinkStatus
.Canceled
);
1021 public void DisableGarbageNotification()
1023 Thread
.MemoryBarrier();
1024 _disableGarbageNotification
= true;
1025 Thread
.MemoryBarrier();
1028 public bool Suspend()
1030 return _status
.SetStateIfLessThan(EventSinkStatus
.Suspended
, EventSinkStatus
.Canceled
);
1033 internal bool PerformEventArrival(Event ev
, Action
<RegistrationKey
, Exception
> errorHandler
)
1037 if (_status
.SpinToggleState(EventSinkStatus
.Notifying
, EventSinkStatus
.Active
))
1041 Sink
.HandleEventArrival(this, ev
);
1042 _status
.TryTransition(EventSinkStatus
.Active
, EventSinkStatus
.Notifying
);
1046 errorHandler(this, e
);
1054 EventSinkStatus s
= _status
.CompareExchange(EventSinkStatus
.Notifying
, EventSinkStatus
.Active
);
1055 if (s
== EventSinkStatus
.Active
|| s
== EventSinkStatus
.Notifying
)
1057 Sink
.HandleEventArrival(this, ev
);
1058 _status
.TryTransition(EventSinkStatus
.Active
, EventSinkStatus
.Notifying
);
1063 errorHandler(this, e
);
1066 return _status
.CurrentState
== EventSinkStatus
.Canceled
;
1069 internal GarbageHandlingVote
PerformGarbageArrival(EndPoint remoteEndPoint
, int priorGarbageCountForEndpoint
, byte[] garbage
,
1070 Action
<RegistrationKey
, Exception
> errorHandler
)
1072 Thread
.MemoryBarrier();
1073 bool ignoring
= _disableGarbageNotification
;
1075 GarbageHandlingVote strategy
= GarbageHandlingVote
.None
;
1080 if (_status
.SpinToggleState(EventSinkStatus
.Active
, EventSinkStatus
.Notifying
))
1084 strategy
= Sink
.HandleGarbageData(this, remoteEndPoint
, priorGarbageCountForEndpoint
, garbage
);
1088 errorHandler(this, e
);
1090 _status
.TryTransition(EventSinkStatus
.Active
, EventSinkStatus
.Notifying
);
1097 if (_status
.TryTransition(EventSinkStatus
.Notifying
, EventSinkStatus
.Active
))
1099 strategy
= Sink
.HandleGarbageData(this, remoteEndPoint
, priorGarbageCountForEndpoint
, garbage
);
1100 _status
.TryTransition(EventSinkStatus
.Active
, EventSinkStatus
.Notifying
);
1105 errorHandler(this, e
);
1115 class TrafficTrackingRec
1119 int _garbageCount
= 0;
1123 #region Constructors
1125 public TrafficTrackingRec(EndPoint ep
)
1127 RemoteEndPoint
= ep
;
1130 #endregion Constructors
1136 get { return RemoteEndPoint == null; }
1139 public int PreviousGargageDataCount
1141 get { return _garbageCount; }
1144 public EndPoint RemoteEndPoint
1150 public GarbageHandlingVote Strategy
1156 #endregion Properties
1160 public int IncrementGarbageCount()
1162 return Interlocked
.Increment(ref _garbageCount
);
1168 #endregion Nested Types