added samples
[windows-sources.git] / sdk / samples / WCFSamples / TechnologySamples / Extensibility / Channels / HttpCookieSession / CS / extensions / InputQueue.cs
blob2c63d2480cb3545b58a4decb26561345043efa05
1 //------------------------------------------------------------
2 // Copyright (c) Microsoft Corporation. All rights reserved.
3 //------------------------------------------------------------
5 namespace Microsoft.ServiceModel.Samples
7 using System;
8 using System.Collections.Generic;
9 using System.Diagnostics;
10 using System.ServiceModel;
11 using System.ServiceModel.Diagnostics;
13 using System.Threading;
15 // ItemDequeuedCallback is called as an item is dequeued from the InputQueue. The
16 // InputQueue lock is not held during the callback. However, the user code is
17 // not notified of the item being available until the callback returns. If you
18 // are not sure if the callback blocks for a long time, then first call
19 // IOThreadScheduler.ScheduleCallback to get to a "safe" thread.
20 delegate void ItemDequeuedCallback();
22 /// <summary>
23 /// Handles asynchronous interactions between producers and consumers.
24 /// Producers can dispatch available data to the input queue,
25 /// where it is dispatched to a waiting consumer or stored until a
26 /// consumer becomes available. Consumers can synchronously or asynchronously
27 /// request data from the queue, which is returned when data becomes
28 /// available.
29 /// </summary>
30 /// <typeparam name="T">The concrete type of the consumer objects that are waiting for data.</typeparam>
31 class InputQueue<T> : IDisposable where T : class
33 //Stores items that are waiting to be accessed.
34 ItemQueue itemQueue;
36 //Each IQueueReader represents some consumer that is waiting for
37 //items to appear in the queue. The readerQueue stores them
38 //in an ordered list so consumers get serviced in a FIFO manner.
39 Queue<IQueueReader> readerQueue;
41 //Each IQueueWaiter represents some waiter that is waiting for
42 //items to appear in the queue. When any item appears, all
43 //waiters are signaled.
44 List<IQueueWaiter> waiterList;
46 static WaitCallback onInvokeDequeuedCallback;
47 static WaitCallback onDispatchCallback;
48 static WaitCallback completeOutstandingReadersCallback;
49 static WaitCallback completeWaitersFalseCallback;
50 static WaitCallback completeWaitersTrueCallback;
52 //Represents the current state of the InputQueue.
53 //as it transitions through its lifecycle.
54 QueueState queueState;
55 enum QueueState
57 Open,
58 Shutdown,
59 Closed
62 public InputQueue()
64 this.itemQueue = new ItemQueue();
65 this.readerQueue = new Queue<IQueueReader>();
66 this.waiterList = new List<IQueueWaiter>();
67 this.queueState = QueueState.Open;
70 public int PendingCount
72 get
74 lock (ThisLock)
76 return itemQueue.ItemCount;
81 object ThisLock
83 get { return itemQueue; }
86 public IAsyncResult BeginDequeue(TimeSpan timeout, AsyncCallback callback, object state)
88 Item item = default(Item);
90 lock (ThisLock)
92 if (queueState == QueueState.Open)
94 if (itemQueue.HasAvailableItem)
96 item = itemQueue.DequeueAvailableItem();
98 else
100 AsyncQueueReader reader = new AsyncQueueReader(this, timeout, callback, state);
101 readerQueue.Enqueue(reader);
102 return reader;
105 else if (queueState == QueueState.Shutdown)
107 if (itemQueue.HasAvailableItem)
109 item = itemQueue.DequeueAvailableItem();
111 else if (itemQueue.HasAnyItem)
113 AsyncQueueReader reader = new AsyncQueueReader(this, timeout, callback, state);
114 readerQueue.Enqueue(reader);
115 return reader;
120 InvokeDequeuedCallback(item.DequeuedCallback);
121 return new TypedCompletedAsyncResult<T>(item.GetValue(), callback, state);
124 public IAsyncResult BeginWaitForItem(TimeSpan timeout, AsyncCallback callback, object state)
126 lock (ThisLock)
128 if (queueState == QueueState.Open)
130 if (!itemQueue.HasAvailableItem)
132 AsyncQueueWaiter waiter = new AsyncQueueWaiter(timeout, callback, state);
133 waiterList.Add(waiter);
134 return waiter;
137 else if (queueState == QueueState.Shutdown)
139 if (!itemQueue.HasAvailableItem && itemQueue.HasAnyItem)
141 AsyncQueueWaiter waiter = new AsyncQueueWaiter(timeout, callback, state);
142 waiterList.Add(waiter);
143 return waiter;
148 return new TypedCompletedAsyncResult<bool>(true, callback, state);
151 static void CompleteOutstandingReadersCallback(object state)
153 IQueueReader[] outstandingReaders = (IQueueReader[])state;
155 for (int i = 0; i < outstandingReaders.Length; i++)
157 outstandingReaders[i].Set(default(Item));
161 static void CompleteWaitersFalseCallback(object state)
163 CompleteWaiters(false, (IQueueWaiter[])state);
166 static void CompleteWaitersTrueCallback(object state)
168 CompleteWaiters(true, (IQueueWaiter[])state);
171 static void CompleteWaiters(bool itemAvailable, IQueueWaiter[] waiters)
173 for (int i=0; i<waiters.Length; i++)
175 waiters[i].Set(itemAvailable);
179 static void CompleteWaitersLater(bool itemAvailable, IQueueWaiter[] waiters)
181 if (itemAvailable)
183 if (completeWaitersTrueCallback == null)
184 completeWaitersTrueCallback = new WaitCallback(CompleteWaitersTrueCallback);
186 ThreadPool.QueueUserWorkItem(completeWaitersTrueCallback, waiters);
188 else
190 if (completeWaitersFalseCallback == null)
191 completeWaitersFalseCallback = new WaitCallback(CompleteWaitersFalseCallback);
193 ThreadPool.QueueUserWorkItem(completeWaitersFalseCallback, waiters);
197 void GetWaiters(out IQueueWaiter[] waiters)
199 if (waiterList.Count > 0)
201 waiters = waiterList.ToArray();
202 waiterList.Clear();
204 else
206 waiters = null;
210 public void Close()
212 ((IDisposable)this).Dispose();
215 public void Shutdown()
217 IQueueReader[] outstandingReaders = null;
218 lock (ThisLock)
220 if (queueState == QueueState.Shutdown)
221 return;
223 if (queueState == QueueState.Closed)
224 return;
226 this.queueState = QueueState.Shutdown;
228 if (readerQueue.Count > 0 && this.itemQueue.ItemCount == 0)
230 outstandingReaders = new IQueueReader[readerQueue.Count];
231 readerQueue.CopyTo(outstandingReaders, 0);
232 readerQueue.Clear();
236 if (outstandingReaders != null)
238 for (int i = 0; i < outstandingReaders.Length; i++)
240 outstandingReaders[i].Set(new Item((Exception)null, null));
245 public T Dequeue(TimeSpan timeout)
247 T value;
249 if (!this.Dequeue(timeout, out value))
251 throw new TimeoutException(string.Format("Dequeue timed out in {0}.", timeout));
254 return value;
257 public bool Dequeue(TimeSpan timeout, out T value)
259 WaitQueueReader reader = null;
260 Item item = new Item();
262 lock (ThisLock)
264 if (queueState == QueueState.Open)
266 if (itemQueue.HasAvailableItem)
268 item = itemQueue.DequeueAvailableItem();
270 else
272 reader = new WaitQueueReader(this);
273 readerQueue.Enqueue(reader);
276 else if (queueState == QueueState.Shutdown)
278 if (itemQueue.HasAvailableItem)
280 item = itemQueue.DequeueAvailableItem();
282 else if (itemQueue.HasAnyItem)
284 reader = new WaitQueueReader(this);
285 readerQueue.Enqueue(reader);
287 else
289 value = default(T);
290 return true;
293 else // queueState == QueueState.Closed
295 value = default(T);
296 return true;
300 if (reader != null)
302 return reader.Wait(timeout, out value);
304 else
306 InvokeDequeuedCallback(item.DequeuedCallback);
307 value = item.GetValue();
308 return true;
312 public void Dispose()
314 Dispose(true);
316 GC.SuppressFinalize(this);
319 protected void Dispose(bool disposing)
321 if (disposing)
323 bool dispose = false;
325 lock (ThisLock)
327 if (queueState != QueueState.Closed)
329 queueState = QueueState.Closed;
330 dispose = true;
334 if (dispose)
336 while (readerQueue.Count > 0)
338 IQueueReader reader = readerQueue.Dequeue();
339 reader.Set(default(Item));
342 while (itemQueue.HasAnyItem)
344 Item item = itemQueue.DequeueAnyItem();
345 item.Dispose();
346 InvokeDequeuedCallback(item.DequeuedCallback);
352 public void Dispatch()
354 IQueueReader reader = null;
355 Item item = new Item();
356 IQueueReader[] outstandingReaders = null;
357 IQueueWaiter[] waiters = null;
358 bool itemAvailable = true;
360 lock (ThisLock)
362 itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown));
363 this.GetWaiters(out waiters);
365 if (queueState != QueueState.Closed)
367 itemQueue.MakePendingItemAvailable();
369 if (readerQueue.Count > 0)
371 item = itemQueue.DequeueAvailableItem();
372 reader = readerQueue.Dequeue();
374 if (queueState == QueueState.Shutdown && readerQueue.Count > 0 && itemQueue.ItemCount == 0)
376 outstandingReaders = new IQueueReader[readerQueue.Count];
377 readerQueue.CopyTo(outstandingReaders, 0);
378 readerQueue.Clear();
380 itemAvailable = false;
386 if (outstandingReaders != null)
388 if (completeOutstandingReadersCallback == null)
389 completeOutstandingReadersCallback = new WaitCallback(CompleteOutstandingReadersCallback);
391 ThreadPool.QueueUserWorkItem(completeOutstandingReadersCallback, outstandingReaders);
394 if (waiters != null)
396 CompleteWaitersLater(itemAvailable, waiters);
399 if (reader != null)
401 InvokeDequeuedCallback(item.DequeuedCallback);
402 reader.Set(item);
406 //Ends an asynchronous Dequeue operation.
407 public T EndDequeue(IAsyncResult result)
409 T value;
411 if (!this.EndDequeue(result, out value))
413 throw new TimeoutException("Asynchronous Dequeue operation timed out.");
416 return value;
419 public bool EndDequeue(IAsyncResult result, out T value)
421 TypedCompletedAsyncResult<T> typedResult = result as TypedCompletedAsyncResult<T>;
423 if (typedResult != null)
425 value = TypedCompletedAsyncResult<T>.End(result);
426 return true;
429 return AsyncQueueReader.End(result, out value);
432 public bool EndWaitForItem(IAsyncResult result)
434 TypedCompletedAsyncResult<bool> typedResult = result as TypedCompletedAsyncResult<bool>;
435 if (typedResult != null)
437 return TypedCompletedAsyncResult<bool>.End(result);
440 return AsyncQueueWaiter.End(result);
443 public void EnqueueAndDispatch(T item)
445 EnqueueAndDispatch(item, null);
448 public void EnqueueAndDispatch(T item, ItemDequeuedCallback dequeuedCallback)
450 EnqueueAndDispatch(item, dequeuedCallback, true);
453 public void EnqueueAndDispatch(Exception exception, ItemDequeuedCallback dequeuedCallback, bool canDispatchOnThisThread)
455 Debug.Assert(exception != null, "exception parameter should not be null");
456 EnqueueAndDispatch(new Item(exception, dequeuedCallback), canDispatchOnThisThread);
459 public void EnqueueAndDispatch(T item, ItemDequeuedCallback dequeuedCallback, bool canDispatchOnThisThread)
461 Debug.Assert(item != null, "item parameter should not be null");
462 EnqueueAndDispatch(new Item(item, dequeuedCallback), canDispatchOnThisThread);
465 void EnqueueAndDispatch(Item item, bool canDispatchOnThisThread)
467 bool disposeItem = false;
468 IQueueReader reader = null;
469 bool dispatchLater = false;
470 IQueueWaiter[] waiters = null;
471 bool itemAvailable = true;
473 lock (ThisLock)
475 itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown));
476 this.GetWaiters(out waiters);
478 if (queueState == QueueState.Open)
480 if (canDispatchOnThisThread)
482 if (readerQueue.Count == 0)
484 itemQueue.EnqueueAvailableItem(item);
486 else
488 reader = readerQueue.Dequeue();
491 else
493 if (readerQueue.Count == 0)
495 itemQueue.EnqueueAvailableItem(item);
497 else
499 itemQueue.EnqueuePendingItem(item);
500 dispatchLater = true;
504 else // queueState == QueueState.Closed || queueState == QueueState.Shutdown
506 disposeItem = true;
510 if (waiters != null)
512 if (canDispatchOnThisThread)
514 CompleteWaiters(itemAvailable, waiters);
516 else
518 CompleteWaitersLater(itemAvailable, waiters);
522 if (reader != null)
524 InvokeDequeuedCallback(item.DequeuedCallback);
525 reader.Set(item);
528 if (dispatchLater)
530 if (onDispatchCallback == null)
532 onDispatchCallback = new WaitCallback(OnDispatchCallback);
535 ThreadPool.QueueUserWorkItem(onDispatchCallback, this);
537 else if (disposeItem)
539 InvokeDequeuedCallback(item.DequeuedCallback);
540 item.Dispose();
544 public bool EnqueueWithoutDispatch(T item, ItemDequeuedCallback dequeuedCallback)
546 Debug.Assert(item != null, "EnqueueWithoutDispatch: item parameter should not be null");
547 return EnqueueWithoutDispatch(new Item(item, dequeuedCallback));
550 public bool EnqueueWithoutDispatch(Exception exception, ItemDequeuedCallback dequeuedCallback)
552 Debug.Assert(exception != null, "EnqueueWithoutDispatch: exception parameter should not be null");
553 return EnqueueWithoutDispatch(new Item(exception, dequeuedCallback));
556 // This does not block, however, Dispatch() must be called later if this function
557 // returns true.
558 bool EnqueueWithoutDispatch(Item item)
560 lock (ThisLock)
562 // Open
563 if (queueState != QueueState.Closed && queueState != QueueState.Shutdown)
565 if (readerQueue.Count == 0)
567 itemQueue.EnqueueAvailableItem(item);
568 return false;
570 else
572 itemQueue.EnqueuePendingItem(item);
573 return true;
578 item.Dispose();
579 InvokeDequeuedCallbackLater(item.DequeuedCallback);
580 return false;
583 static void OnDispatchCallback(object state)
585 ((InputQueue<T>)state).Dispatch();
588 static void InvokeDequeuedCallbackLater(ItemDequeuedCallback dequeuedCallback)
590 if (dequeuedCallback != null)
592 if (onInvokeDequeuedCallback == null)
594 onInvokeDequeuedCallback = OnInvokeDequeuedCallback;
597 ThreadPool.QueueUserWorkItem(onInvokeDequeuedCallback, dequeuedCallback);
601 static void InvokeDequeuedCallback(ItemDequeuedCallback dequeuedCallback)
603 if (dequeuedCallback != null)
605 dequeuedCallback();
609 static void OnInvokeDequeuedCallback(object state)
611 ItemDequeuedCallback dequeuedCallback = (ItemDequeuedCallback)state;
612 dequeuedCallback();
615 bool RemoveReader(IQueueReader reader)
617 lock (ThisLock)
619 if (queueState == QueueState.Open || queueState == QueueState.Shutdown)
621 bool removed = false;
623 for (int i = readerQueue.Count; i > 0; i--)
625 IQueueReader temp = readerQueue.Dequeue();
626 if (Object.ReferenceEquals(temp, reader))
628 removed = true;
630 else
632 readerQueue.Enqueue(temp);
636 return removed;
640 return false;
643 public bool WaitForItem(TimeSpan timeout)
645 WaitQueueWaiter waiter = null;
646 bool itemAvailable = false;
648 lock (ThisLock)
650 if (queueState == QueueState.Open)
652 if (itemQueue.HasAvailableItem)
654 itemAvailable = true;
656 else
658 waiter = new WaitQueueWaiter();
659 waiterList.Add(waiter);
662 else if (queueState == QueueState.Shutdown)
664 if (itemQueue.HasAvailableItem)
666 itemAvailable = true;
668 else if (itemQueue.HasAnyItem)
670 waiter = new WaitQueueWaiter();
671 waiterList.Add(waiter);
673 else
675 return false;
678 else // queueState == QueueState.Closed
680 return true;
684 if (waiter != null)
686 return waiter.Wait(timeout);
688 else
690 return itemAvailable;
694 interface IQueueReader
696 void Set(Item item);
699 interface IQueueWaiter
701 void Set(bool itemAvailable);
704 class WaitQueueReader : IQueueReader
706 Exception exception;
707 InputQueue<T> inputQueue;
708 T item;
709 ManualResetEvent waitEvent;
710 object thisLock = new object();
712 public WaitQueueReader(InputQueue<T> inputQueue)
714 this.inputQueue = inputQueue;
715 waitEvent = new ManualResetEvent(false);
718 object ThisLock
722 return this.thisLock;
726 public void Set(Item item)
728 lock (ThisLock)
730 Debug.Assert(this.item == null, "InputQueue.WaitQueueReader.Set: (this.item == null)");
731 Debug.Assert(this.exception == null, "InputQueue.WaitQueueReader.Set: (this.exception == null)");
733 this.exception = item.Exception;
734 this.item = item.Value;
735 waitEvent.Set();
739 public bool Wait(TimeSpan timeout, out T value)
741 bool isSafeToClose = false;
744 if (timeout == TimeSpan.MaxValue)
746 waitEvent.WaitOne();
748 else if (!waitEvent.WaitOne(timeout, false))
750 if (this.inputQueue.RemoveReader(this))
752 value = default(T);
753 isSafeToClose = true;
754 return false;
756 else
758 waitEvent.WaitOne();
762 isSafeToClose = true;
764 finally
766 if (isSafeToClose)
768 waitEvent.Close();
772 value = item;
773 return true;
777 class AsyncQueueReader : AsyncResult, IQueueReader
779 static TimerCallback timerCallback = new TimerCallback(AsyncQueueReader.TimerCallback);
781 bool expired;
782 InputQueue<T> inputQueue;
783 T item;
784 Timer timer;
786 public AsyncQueueReader(InputQueue<T> inputQueue, TimeSpan timeout, AsyncCallback callback, object state)
787 : base(callback, state)
789 this.inputQueue = inputQueue;
790 if (timeout != TimeSpan.MaxValue)
792 this.timer = new Timer(timerCallback, this, timeout, TimeSpan.FromMilliseconds(-1));
796 public static bool End(IAsyncResult result, out T value)
798 AsyncQueueReader readerResult = AsyncResult.End<AsyncQueueReader>(result);
800 if (readerResult.expired)
802 value = default(T);
803 return false;
805 else
807 value = readerResult.item;
808 return true;
812 static void TimerCallback(object state)
814 AsyncQueueReader thisPtr = (AsyncQueueReader)state;
815 if (thisPtr.inputQueue.RemoveReader(thisPtr))
817 thisPtr.expired = true;
818 thisPtr.Complete(false);
822 public void Set(Item item)
824 this.item = item.Value;
825 if (this.timer != null)
827 this.timer.Change(-1, -1);
829 Complete(false, item.Exception);
833 struct Item
835 T value;
836 Exception exception;
837 ItemDequeuedCallback dequeuedCallback;
839 public Item(T value, ItemDequeuedCallback dequeuedCallback)
840 : this(value, null, dequeuedCallback)
844 public Item(Exception exception, ItemDequeuedCallback dequeuedCallback)
845 : this(null, exception, dequeuedCallback)
849 Item(T value, Exception exception, ItemDequeuedCallback dequeuedCallback)
851 this.value = value;
852 this.exception = exception;
853 this.dequeuedCallback = dequeuedCallback;
856 public Exception Exception
858 get { return this.exception; }
861 public T Value
863 get { return value; }
866 public ItemDequeuedCallback DequeuedCallback
868 get { return dequeuedCallback; }
871 public void Dispose()
873 if (value != null)
875 if (value is IDisposable)
877 ((IDisposable)value).Dispose();
879 else if (value is ICommunicationObject)
881 ((ICommunicationObject)value).Abort();
886 public T GetValue()
888 if (this.exception != null)
890 throw this.exception;
893 return this.value;
897 class WaitQueueWaiter : IQueueWaiter
899 bool itemAvailable;
900 ManualResetEvent waitEvent;
901 object thisLock = new object();
903 public WaitQueueWaiter()
905 waitEvent = new ManualResetEvent(false);
908 object ThisLock
912 return this.thisLock;
916 public void Set(bool itemAvailable)
918 lock (ThisLock)
920 this.itemAvailable = itemAvailable;
921 waitEvent.Set();
925 public bool Wait(TimeSpan timeout)
927 if (timeout == TimeSpan.MaxValue)
929 waitEvent.WaitOne();
931 else if (!waitEvent.WaitOne(timeout, false))
933 return false;
936 return this.itemAvailable;
940 class AsyncQueueWaiter : AsyncResult, IQueueWaiter
942 static TimerCallback timerCallback = new TimerCallback(AsyncQueueWaiter.TimerCallback);
943 Timer timer;
944 bool itemAvailable;
945 object thisLock = new object();
947 public AsyncQueueWaiter(TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state)
949 if (timeout != TimeSpan.MaxValue)
951 this.timer = new Timer(timerCallback, this, timeout, TimeSpan.FromMilliseconds(-1));
955 object ThisLock
959 return this.thisLock;
963 public static bool End(IAsyncResult result)
965 AsyncQueueWaiter waiterResult = AsyncResult.End<AsyncQueueWaiter>(result);
966 return waiterResult.itemAvailable;
969 static void TimerCallback(object state)
971 AsyncQueueWaiter thisPtr = (AsyncQueueWaiter)state;
972 thisPtr.Complete(false);
975 public void Set(bool itemAvailable)
977 bool timely;
979 lock (ThisLock)
981 timely = (this.timer == null) || this.timer.Change(-1, -1);
982 this.itemAvailable = itemAvailable;
985 if (timely)
987 Complete(false);
992 class ItemQueue
994 Item[] items;
995 int head;
996 int pendingCount;
997 int totalCount;
999 public ItemQueue()
1001 items = new Item[1];
1004 public Item DequeueAvailableItem()
1006 if (totalCount == pendingCount)
1008 Debug.Assert(false, "ItemQueue does not contain any available items");
1009 throw new Exception("Internal Error");
1011 return DequeueItemCore();
1014 public Item DequeueAnyItem()
1016 if (pendingCount == totalCount)
1017 pendingCount--;
1018 return DequeueItemCore();
1021 void EnqueueItemCore(Item item)
1023 if (totalCount == items.Length)
1025 Item[] newItems = new Item[items.Length * 2];
1026 for (int i = 0; i < totalCount; i++)
1027 newItems[i] = items[(head + i) % items.Length];
1028 head = 0;
1029 items = newItems;
1031 int tail = (head + totalCount) % items.Length;
1032 items[tail] = item;
1033 totalCount++;
1036 Item DequeueItemCore()
1038 if (totalCount == 0)
1040 Debug.Assert(false, "ItemQueue does not contain any items");
1041 throw new Exception("Internal Error");
1043 Item item = items[head];
1044 items[head] = new Item();
1045 totalCount--;
1046 head = (head + 1) % items.Length;
1047 return item;
1050 public void EnqueuePendingItem(Item item)
1052 EnqueueItemCore(item);
1053 pendingCount++;
1056 public void EnqueueAvailableItem(Item item)
1058 EnqueueItemCore(item);
1061 public void MakePendingItemAvailable()
1063 if (pendingCount == 0)
1065 Debug.Assert(false, "ItemQueue does not contain any pending items");
1066 throw new Exception("Internal Error");
1068 pendingCount--;
1071 public bool HasAvailableItem
1073 get { return totalCount > pendingCount; }
1076 public bool HasAnyItem
1078 get { return totalCount > 0; }
1081 public int ItemCount
1083 get { return totalCount; }