1 //------------------------------------------------------------
2 // Copyright (c) Microsoft Corporation. All rights reserved.
3 //------------------------------------------------------------
5 namespace Microsoft
.ServiceModel
.Samples
8 using System
.Collections
.Generic
;
9 using System
.Diagnostics
;
10 using System
.ServiceModel
;
11 using System
.ServiceModel
.Diagnostics
;
13 using System
.Threading
;
15 // ItemDequeuedCallback is called as an item is dequeued from the InputQueue. The
16 // InputQueue lock is not held during the callback. However, the user code is
17 // not notified of the item being available until the callback returns. If you
18 // are not sure if the callback blocks for a long time, then first call
19 // IOThreadScheduler.ScheduleCallback to get to a "safe" thread.
20 delegate void ItemDequeuedCallback();
23 /// Handles asynchronous interactions between producers and consumers.
24 /// Producers can dispatch available data to the input queue,
25 /// where it is dispatched to a waiting consumer or stored until a
26 /// consumer becomes available. Consumers can synchronously or asynchronously
27 /// request data from the queue, which is returned when data becomes
30 /// <typeparam name="T">The concrete type of the consumer objects that are waiting for data.</typeparam>
31 class InputQueue
<T
> : IDisposable where T
: class
33 //Stores items that are waiting to be accessed.
36 //Each IQueueReader represents some consumer that is waiting for
37 //items to appear in the queue. The readerQueue stores them
38 //in an ordered list so consumers get serviced in a FIFO manner.
39 Queue
<IQueueReader
> readerQueue
;
41 //Each IQueueWaiter represents some waiter that is waiting for
42 //items to appear in the queue. When any item appears, all
43 //waiters are signaled.
44 List
<IQueueWaiter
> waiterList
;
46 static WaitCallback onInvokeDequeuedCallback
;
47 static WaitCallback onDispatchCallback
;
48 static WaitCallback completeOutstandingReadersCallback
;
49 static WaitCallback completeWaitersFalseCallback
;
50 static WaitCallback completeWaitersTrueCallback
;
52 //Represents the current state of the InputQueue.
53 //as it transitions through its lifecycle.
54 QueueState queueState
;
64 this.itemQueue
= new ItemQueue();
65 this.readerQueue
= new Queue
<IQueueReader
>();
66 this.waiterList
= new List
<IQueueWaiter
>();
67 this.queueState
= QueueState
.Open
;
70 public int PendingCount
76 return itemQueue
.ItemCount
;
83 get { return itemQueue; }
86 public IAsyncResult
BeginDequeue(TimeSpan timeout
, AsyncCallback callback
, object state
)
88 Item item
= default(Item
);
92 if (queueState
== QueueState
.Open
)
94 if (itemQueue
.HasAvailableItem
)
96 item
= itemQueue
.DequeueAvailableItem();
100 AsyncQueueReader reader
= new AsyncQueueReader(this, timeout
, callback
, state
);
101 readerQueue
.Enqueue(reader
);
105 else if (queueState
== QueueState
.Shutdown
)
107 if (itemQueue
.HasAvailableItem
)
109 item
= itemQueue
.DequeueAvailableItem();
111 else if (itemQueue
.HasAnyItem
)
113 AsyncQueueReader reader
= new AsyncQueueReader(this, timeout
, callback
, state
);
114 readerQueue
.Enqueue(reader
);
120 InvokeDequeuedCallback(item
.DequeuedCallback
);
121 return new TypedCompletedAsyncResult
<T
>(item
.GetValue(), callback
, state
);
124 public IAsyncResult
BeginWaitForItem(TimeSpan timeout
, AsyncCallback callback
, object state
)
128 if (queueState
== QueueState
.Open
)
130 if (!itemQueue
.HasAvailableItem
)
132 AsyncQueueWaiter waiter
= new AsyncQueueWaiter(timeout
, callback
, state
);
133 waiterList
.Add(waiter
);
137 else if (queueState
== QueueState
.Shutdown
)
139 if (!itemQueue
.HasAvailableItem
&& itemQueue
.HasAnyItem
)
141 AsyncQueueWaiter waiter
= new AsyncQueueWaiter(timeout
, callback
, state
);
142 waiterList
.Add(waiter
);
148 return new TypedCompletedAsyncResult
<bool>(true, callback
, state
);
151 static void CompleteOutstandingReadersCallback(object state
)
153 IQueueReader
[] outstandingReaders
= (IQueueReader
[])state
;
155 for (int i
= 0; i
< outstandingReaders
.Length
; i
++)
157 outstandingReaders
[i
].Set(default(Item
));
161 static void CompleteWaitersFalseCallback(object state
)
163 CompleteWaiters(false, (IQueueWaiter
[])state
);
166 static void CompleteWaitersTrueCallback(object state
)
168 CompleteWaiters(true, (IQueueWaiter
[])state
);
171 static void CompleteWaiters(bool itemAvailable
, IQueueWaiter
[] waiters
)
173 for (int i
=0; i
<waiters
.Length
; i
++)
175 waiters
[i
].Set(itemAvailable
);
179 static void CompleteWaitersLater(bool itemAvailable
, IQueueWaiter
[] waiters
)
183 if (completeWaitersTrueCallback
== null)
184 completeWaitersTrueCallback
= new WaitCallback(CompleteWaitersTrueCallback
);
186 ThreadPool
.QueueUserWorkItem(completeWaitersTrueCallback
, waiters
);
190 if (completeWaitersFalseCallback
== null)
191 completeWaitersFalseCallback
= new WaitCallback(CompleteWaitersFalseCallback
);
193 ThreadPool
.QueueUserWorkItem(completeWaitersFalseCallback
, waiters
);
197 void GetWaiters(out IQueueWaiter
[] waiters
)
199 if (waiterList
.Count
> 0)
201 waiters
= waiterList
.ToArray();
212 ((IDisposable
)this).Dispose();
215 public void Shutdown()
217 IQueueReader
[] outstandingReaders
= null;
220 if (queueState
== QueueState
.Shutdown
)
223 if (queueState
== QueueState
.Closed
)
226 this.queueState
= QueueState
.Shutdown
;
228 if (readerQueue
.Count
> 0 && this.itemQueue
.ItemCount
== 0)
230 outstandingReaders
= new IQueueReader
[readerQueue
.Count
];
231 readerQueue
.CopyTo(outstandingReaders
, 0);
236 if (outstandingReaders
!= null)
238 for (int i
= 0; i
< outstandingReaders
.Length
; i
++)
240 outstandingReaders
[i
].Set(new Item((Exception
)null, null));
245 public T
Dequeue(TimeSpan timeout
)
249 if (!this.Dequeue(timeout
, out value))
251 throw new TimeoutException(string.Format("Dequeue timed out in {0}.", timeout
));
257 public bool Dequeue(TimeSpan timeout
, out T
value)
259 WaitQueueReader reader
= null;
260 Item item
= new Item();
264 if (queueState
== QueueState
.Open
)
266 if (itemQueue
.HasAvailableItem
)
268 item
= itemQueue
.DequeueAvailableItem();
272 reader
= new WaitQueueReader(this);
273 readerQueue
.Enqueue(reader
);
276 else if (queueState
== QueueState
.Shutdown
)
278 if (itemQueue
.HasAvailableItem
)
280 item
= itemQueue
.DequeueAvailableItem();
282 else if (itemQueue
.HasAnyItem
)
284 reader
= new WaitQueueReader(this);
285 readerQueue
.Enqueue(reader
);
293 else // queueState == QueueState.Closed
302 return reader
.Wait(timeout
, out value);
306 InvokeDequeuedCallback(item
.DequeuedCallback
);
307 value = item
.GetValue();
312 public void Dispose()
316 GC
.SuppressFinalize(this);
319 protected void Dispose(bool disposing
)
323 bool dispose
= false;
327 if (queueState
!= QueueState
.Closed
)
329 queueState
= QueueState
.Closed
;
336 while (readerQueue
.Count
> 0)
338 IQueueReader reader
= readerQueue
.Dequeue();
339 reader
.Set(default(Item
));
342 while (itemQueue
.HasAnyItem
)
344 Item item
= itemQueue
.DequeueAnyItem();
346 InvokeDequeuedCallback(item
.DequeuedCallback
);
352 public void Dispatch()
354 IQueueReader reader
= null;
355 Item item
= new Item();
356 IQueueReader
[] outstandingReaders
= null;
357 IQueueWaiter
[] waiters
= null;
358 bool itemAvailable
= true;
362 itemAvailable
= !((queueState
== QueueState
.Closed
) || (queueState
== QueueState
.Shutdown
));
363 this.GetWaiters(out waiters
);
365 if (queueState
!= QueueState
.Closed
)
367 itemQueue
.MakePendingItemAvailable();
369 if (readerQueue
.Count
> 0)
371 item
= itemQueue
.DequeueAvailableItem();
372 reader
= readerQueue
.Dequeue();
374 if (queueState
== QueueState
.Shutdown
&& readerQueue
.Count
> 0 && itemQueue
.ItemCount
== 0)
376 outstandingReaders
= new IQueueReader
[readerQueue
.Count
];
377 readerQueue
.CopyTo(outstandingReaders
, 0);
380 itemAvailable
= false;
386 if (outstandingReaders
!= null)
388 if (completeOutstandingReadersCallback
== null)
389 completeOutstandingReadersCallback
= new WaitCallback(CompleteOutstandingReadersCallback
);
391 ThreadPool
.QueueUserWorkItem(completeOutstandingReadersCallback
, outstandingReaders
);
396 CompleteWaitersLater(itemAvailable
, waiters
);
401 InvokeDequeuedCallback(item
.DequeuedCallback
);
406 //Ends an asynchronous Dequeue operation.
407 public T
EndDequeue(IAsyncResult result
)
411 if (!this.EndDequeue(result
, out value))
413 throw new TimeoutException("Asynchronous Dequeue operation timed out.");
419 public bool EndDequeue(IAsyncResult result
, out T
value)
421 TypedCompletedAsyncResult
<T
> typedResult
= result
as TypedCompletedAsyncResult
<T
>;
423 if (typedResult
!= null)
425 value = TypedCompletedAsyncResult
<T
>.End(result
);
429 return AsyncQueueReader
.End(result
, out value);
432 public bool EndWaitForItem(IAsyncResult result
)
434 TypedCompletedAsyncResult
<bool> typedResult
= result
as TypedCompletedAsyncResult
<bool>;
435 if (typedResult
!= null)
437 return TypedCompletedAsyncResult
<bool>.End(result
);
440 return AsyncQueueWaiter
.End(result
);
443 public void EnqueueAndDispatch(T item
)
445 EnqueueAndDispatch(item
, null);
448 public void EnqueueAndDispatch(T item
, ItemDequeuedCallback dequeuedCallback
)
450 EnqueueAndDispatch(item
, dequeuedCallback
, true);
453 public void EnqueueAndDispatch(Exception exception
, ItemDequeuedCallback dequeuedCallback
, bool canDispatchOnThisThread
)
455 Debug
.Assert(exception
!= null, "exception parameter should not be null");
456 EnqueueAndDispatch(new Item(exception
, dequeuedCallback
), canDispatchOnThisThread
);
459 public void EnqueueAndDispatch(T item
, ItemDequeuedCallback dequeuedCallback
, bool canDispatchOnThisThread
)
461 Debug
.Assert(item
!= null, "item parameter should not be null");
462 EnqueueAndDispatch(new Item(item
, dequeuedCallback
), canDispatchOnThisThread
);
465 void EnqueueAndDispatch(Item item
, bool canDispatchOnThisThread
)
467 bool disposeItem
= false;
468 IQueueReader reader
= null;
469 bool dispatchLater
= false;
470 IQueueWaiter
[] waiters
= null;
471 bool itemAvailable
= true;
475 itemAvailable
= !((queueState
== QueueState
.Closed
) || (queueState
== QueueState
.Shutdown
));
476 this.GetWaiters(out waiters
);
478 if (queueState
== QueueState
.Open
)
480 if (canDispatchOnThisThread
)
482 if (readerQueue
.Count
== 0)
484 itemQueue
.EnqueueAvailableItem(item
);
488 reader
= readerQueue
.Dequeue();
493 if (readerQueue
.Count
== 0)
495 itemQueue
.EnqueueAvailableItem(item
);
499 itemQueue
.EnqueuePendingItem(item
);
500 dispatchLater
= true;
504 else // queueState == QueueState.Closed || queueState == QueueState.Shutdown
512 if (canDispatchOnThisThread
)
514 CompleteWaiters(itemAvailable
, waiters
);
518 CompleteWaitersLater(itemAvailable
, waiters
);
524 InvokeDequeuedCallback(item
.DequeuedCallback
);
530 if (onDispatchCallback
== null)
532 onDispatchCallback
= new WaitCallback(OnDispatchCallback
);
535 ThreadPool
.QueueUserWorkItem(onDispatchCallback
, this);
537 else if (disposeItem
)
539 InvokeDequeuedCallback(item
.DequeuedCallback
);
544 public bool EnqueueWithoutDispatch(T item
, ItemDequeuedCallback dequeuedCallback
)
546 Debug
.Assert(item
!= null, "EnqueueWithoutDispatch: item parameter should not be null");
547 return EnqueueWithoutDispatch(new Item(item
, dequeuedCallback
));
550 public bool EnqueueWithoutDispatch(Exception exception
, ItemDequeuedCallback dequeuedCallback
)
552 Debug
.Assert(exception
!= null, "EnqueueWithoutDispatch: exception parameter should not be null");
553 return EnqueueWithoutDispatch(new Item(exception
, dequeuedCallback
));
556 // This does not block, however, Dispatch() must be called later if this function
558 bool EnqueueWithoutDispatch(Item item
)
563 if (queueState
!= QueueState
.Closed
&& queueState
!= QueueState
.Shutdown
)
565 if (readerQueue
.Count
== 0)
567 itemQueue
.EnqueueAvailableItem(item
);
572 itemQueue
.EnqueuePendingItem(item
);
579 InvokeDequeuedCallbackLater(item
.DequeuedCallback
);
583 static void OnDispatchCallback(object state
)
585 ((InputQueue
<T
>)state
).Dispatch();
588 static void InvokeDequeuedCallbackLater(ItemDequeuedCallback dequeuedCallback
)
590 if (dequeuedCallback
!= null)
592 if (onInvokeDequeuedCallback
== null)
594 onInvokeDequeuedCallback
= OnInvokeDequeuedCallback
;
597 ThreadPool
.QueueUserWorkItem(onInvokeDequeuedCallback
, dequeuedCallback
);
601 static void InvokeDequeuedCallback(ItemDequeuedCallback dequeuedCallback
)
603 if (dequeuedCallback
!= null)
609 static void OnInvokeDequeuedCallback(object state
)
611 ItemDequeuedCallback dequeuedCallback
= (ItemDequeuedCallback
)state
;
615 bool RemoveReader(IQueueReader reader
)
619 if (queueState
== QueueState
.Open
|| queueState
== QueueState
.Shutdown
)
621 bool removed
= false;
623 for (int i
= readerQueue
.Count
; i
> 0; i
--)
625 IQueueReader temp
= readerQueue
.Dequeue();
626 if (Object
.ReferenceEquals(temp
, reader
))
632 readerQueue
.Enqueue(temp
);
643 public bool WaitForItem(TimeSpan timeout
)
645 WaitQueueWaiter waiter
= null;
646 bool itemAvailable
= false;
650 if (queueState
== QueueState
.Open
)
652 if (itemQueue
.HasAvailableItem
)
654 itemAvailable
= true;
658 waiter
= new WaitQueueWaiter();
659 waiterList
.Add(waiter
);
662 else if (queueState
== QueueState
.Shutdown
)
664 if (itemQueue
.HasAvailableItem
)
666 itemAvailable
= true;
668 else if (itemQueue
.HasAnyItem
)
670 waiter
= new WaitQueueWaiter();
671 waiterList
.Add(waiter
);
678 else // queueState == QueueState.Closed
686 return waiter
.Wait(timeout
);
690 return itemAvailable
;
694 interface IQueueReader
699 interface IQueueWaiter
701 void Set(bool itemAvailable
);
704 class WaitQueueReader
: IQueueReader
707 InputQueue
<T
> inputQueue
;
709 ManualResetEvent waitEvent
;
710 object thisLock
= new object();
712 public WaitQueueReader(InputQueue
<T
> inputQueue
)
714 this.inputQueue
= inputQueue
;
715 waitEvent
= new ManualResetEvent(false);
722 return this.thisLock
;
726 public void Set(Item item
)
730 Debug
.Assert(this.item
== null, "InputQueue.WaitQueueReader.Set: (this.item == null)");
731 Debug
.Assert(this.exception
== null, "InputQueue.WaitQueueReader.Set: (this.exception == null)");
733 this.exception
= item
.Exception
;
734 this.item
= item
.Value
;
739 public bool Wait(TimeSpan timeout
, out T
value)
741 bool isSafeToClose
= false;
744 if (timeout
== TimeSpan
.MaxValue
)
748 else if (!waitEvent
.WaitOne(timeout
, false))
750 if (this.inputQueue
.RemoveReader(this))
753 isSafeToClose
= true;
762 isSafeToClose
= true;
777 class AsyncQueueReader
: AsyncResult
, IQueueReader
779 static TimerCallback timerCallback
= new TimerCallback(AsyncQueueReader
.TimerCallback
);
782 InputQueue
<T
> inputQueue
;
786 public AsyncQueueReader(InputQueue
<T
> inputQueue
, TimeSpan timeout
, AsyncCallback callback
, object state
)
787 : base(callback
, state
)
789 this.inputQueue
= inputQueue
;
790 if (timeout
!= TimeSpan
.MaxValue
)
792 this.timer
= new Timer(timerCallback
, this, timeout
, TimeSpan
.FromMilliseconds(-1));
796 public static bool End(IAsyncResult result
, out T
value)
798 AsyncQueueReader readerResult
= AsyncResult
.End
<AsyncQueueReader
>(result
);
800 if (readerResult
.expired
)
807 value = readerResult
.item
;
812 static void TimerCallback(object state
)
814 AsyncQueueReader thisPtr
= (AsyncQueueReader
)state
;
815 if (thisPtr
.inputQueue
.RemoveReader(thisPtr
))
817 thisPtr
.expired
= true;
818 thisPtr
.Complete(false);
822 public void Set(Item item
)
824 this.item
= item
.Value
;
825 if (this.timer
!= null)
827 this.timer
.Change(-1, -1);
829 Complete(false, item
.Exception
);
837 ItemDequeuedCallback dequeuedCallback
;
839 public Item(T
value, ItemDequeuedCallback dequeuedCallback
)
840 : this(value, null, dequeuedCallback
)
844 public Item(Exception exception
, ItemDequeuedCallback dequeuedCallback
)
845 : this(null, exception
, dequeuedCallback
)
849 Item(T
value, Exception exception
, ItemDequeuedCallback dequeuedCallback
)
852 this.exception
= exception
;
853 this.dequeuedCallback
= dequeuedCallback
;
856 public Exception Exception
858 get { return this.exception; }
863 get { return value; }
866 public ItemDequeuedCallback DequeuedCallback
868 get { return dequeuedCallback; }
871 public void Dispose()
875 if (value is IDisposable
)
877 ((IDisposable
)value).Dispose();
879 else if (value is ICommunicationObject
)
881 ((ICommunicationObject
)value).Abort();
888 if (this.exception
!= null)
890 throw this.exception
;
897 class WaitQueueWaiter
: IQueueWaiter
900 ManualResetEvent waitEvent
;
901 object thisLock
= new object();
903 public WaitQueueWaiter()
905 waitEvent
= new ManualResetEvent(false);
912 return this.thisLock
;
916 public void Set(bool itemAvailable
)
920 this.itemAvailable
= itemAvailable
;
925 public bool Wait(TimeSpan timeout
)
927 if (timeout
== TimeSpan
.MaxValue
)
931 else if (!waitEvent
.WaitOne(timeout
, false))
936 return this.itemAvailable
;
940 class AsyncQueueWaiter
: AsyncResult
, IQueueWaiter
942 static TimerCallback timerCallback
= new TimerCallback(AsyncQueueWaiter
.TimerCallback
);
945 object thisLock
= new object();
947 public AsyncQueueWaiter(TimeSpan timeout
, AsyncCallback callback
, object state
) : base(callback
, state
)
949 if (timeout
!= TimeSpan
.MaxValue
)
951 this.timer
= new Timer(timerCallback
, this, timeout
, TimeSpan
.FromMilliseconds(-1));
959 return this.thisLock
;
963 public static bool End(IAsyncResult result
)
965 AsyncQueueWaiter waiterResult
= AsyncResult
.End
<AsyncQueueWaiter
>(result
);
966 return waiterResult
.itemAvailable
;
969 static void TimerCallback(object state
)
971 AsyncQueueWaiter thisPtr
= (AsyncQueueWaiter
)state
;
972 thisPtr
.Complete(false);
975 public void Set(bool itemAvailable
)
981 timely
= (this.timer
== null) || this.timer
.Change(-1, -1);
982 this.itemAvailable
= itemAvailable
;
1001 items
= new Item
[1];
1004 public Item
DequeueAvailableItem()
1006 if (totalCount
== pendingCount
)
1008 Debug
.Assert(false, "ItemQueue does not contain any available items");
1009 throw new Exception("Internal Error");
1011 return DequeueItemCore();
1014 public Item
DequeueAnyItem()
1016 if (pendingCount
== totalCount
)
1018 return DequeueItemCore();
1021 void EnqueueItemCore(Item item
)
1023 if (totalCount
== items
.Length
)
1025 Item
[] newItems
= new Item
[items
.Length
* 2];
1026 for (int i
= 0; i
< totalCount
; i
++)
1027 newItems
[i
] = items
[(head
+ i
) % items
.Length
];
1031 int tail
= (head
+ totalCount
) % items
.Length
;
1036 Item
DequeueItemCore()
1038 if (totalCount
== 0)
1040 Debug
.Assert(false, "ItemQueue does not contain any items");
1041 throw new Exception("Internal Error");
1043 Item item
= items
[head
];
1044 items
[head
] = new Item();
1046 head
= (head
+ 1) % items
.Length
;
1050 public void EnqueuePendingItem(Item item
)
1052 EnqueueItemCore(item
);
1056 public void EnqueueAvailableItem(Item item
)
1058 EnqueueItemCore(item
);
1061 public void MakePendingItemAvailable()
1063 if (pendingCount
== 0)
1065 Debug
.Assert(false, "ItemQueue does not contain any pending items");
1066 throw new Exception("Internal Error");
1071 public bool HasAvailableItem
1073 get { return totalCount > pendingCount; }
1076 public bool HasAnyItem
1078 get { return totalCount > 0; }
1081 public int ItemCount
1083 get { return totalCount; }