2 using System
.Collections
.Generic
;
3 using System
.ComponentModel
;
5 namespace LanSpider
.Multithreading
8 /// Manages concurent execution of several threads of worker method and
9 /// pipes results into consumer method.
11 /// <typeparam name="T"></typeparam>
12 /// <typeparam name="TResult"></typeparam>
13 public class WorkManager
< T
, TResult
>
15 private readonly Func
<T
, TResult
> _workerMethod
;
16 private readonly Action
<TResult
> _consumerMethod
;
17 private Queue
<T
> _input
;
19 private readonly object _inputHandle
= new object();
20 private readonly object _outputHandle
= new object();
22 public event EventHandler
<ProgressChangedEventArgs
> WorkProgressChanged
;
23 private int _totalItems
;
26 /// Initialize an istance of <see cref="WorkManager{T,TResult}"/> class with
27 /// worker and consumer methods.
29 /// <param name="workerMethod"></param>
30 /// <param name="consumerMethod"></param>
31 public WorkManager( Func
<T
, TResult
> workerMethod
, Action
<TResult
> consumerMethod
)
33 _workerMethod
= workerMethod
;
34 _consumerMethod
= consumerMethod
;
38 /// Execute concurent worker on data provided and return in a synchronous manner.
40 /// <param name="shares"></param>
41 public void RunSync( IEnumerable
<T
> shares
)
43 _input
= new Queue
<T
>( shares
);
44 _totalItems
= _input
.Count
;
48 int nThreads
= Environment
.ProcessorCount
* 2;
50 List
<IAsyncResult
> handles
= new List
<IAsyncResult
>( nThreads
);
52 for ( int i
= 0; i
< nThreads
; ++i
)
54 IAsyncResult iar
= yield.BeginInvoke( null, null );
58 foreach ( IAsyncResult asyncResult
in handles
)
60 asyncResult
.AsyncWaitHandle
.WaitOne();
68 T argument
= default( T
);
73 if ( _input
.Count
> 0 )
75 argument
= _input
.Dequeue();
85 TResult result
= _workerMethod( argument
);
87 if ( WorkProgressChanged
!= null )
91 WorkProgressChanged( this,new ProgressChangedEventArgs( 100 - _input
.Count
* 100 / _totalItems
, argument
) );
95 // TO DO: lock ( _outputHandle )
97 _consumerMethod( result
);