Streamed implementation. Limited input buffer.
[LanSpider.git] / src / LanSpider / Multithreading / WorkManager.cs
blob2da6852153102e0e2adca9748654474befffccb2
1 using System;
2 using System.Collections.Generic;
3 using System.ComponentModel;
5 namespace LanSpider.Multithreading
7 /// <summary>
8 /// Manages concurent execution of several threads of worker method and
9 /// pipes results into consumer method.
10 /// </summary>
11 /// <typeparam name="T"></typeparam>
12 /// <typeparam name="TResult"></typeparam>
13 public class WorkManager< T, TResult >
15 private readonly Func<T, TResult> _workerMethod;
16 private readonly Action<TResult> _consumerMethod;
17 private Queue<T> _input;
19 private readonly object _inputHandle = new object();
20 private readonly object _outputHandle = new object();
22 public event EventHandler<ProgressChangedEventArgs> WorkProgressChanged;
23 private int _totalItems;
25 /// <summary>
26 /// Initialize an istance of <see cref="WorkManager{T,TResult}"/> class with
27 /// worker and consumer methods.
28 /// </summary>
29 /// <param name="workerMethod"></param>
30 /// <param name="consumerMethod"></param>
31 public WorkManager( Func<T, TResult> workerMethod, Action<TResult> consumerMethod )
33 _workerMethod = workerMethod;
34 _consumerMethod = consumerMethod;
37 /// <summary>
38 /// Execute concurent worker on data provided and return in a synchronous manner.
39 /// </summary>
40 /// <param name="shares"></param>
41 public void RunSync( IEnumerable<T> shares )
43 _input = new Queue<T>( shares );
44 _totalItems = _input.Count;
46 Action yield = Yield;
48 int nThreads = Environment.ProcessorCount * 2;
50 List<IAsyncResult> handles = new List<IAsyncResult>( nThreads );
52 for ( int i = 0; i < nThreads; ++i )
54 IAsyncResult iar = yield.BeginInvoke( null, null );
55 handles.Add( iar );
58 foreach ( IAsyncResult asyncResult in handles )
60 asyncResult.AsyncWaitHandle.WaitOne();
64 private void Yield()
66 while ( true )
68 T argument = default( T );
69 bool gotWork = false;
71 lock ( _inputHandle )
73 if ( _input.Count > 0 )
75 argument = _input.Dequeue();
76 gotWork = true;
80 if ( !gotWork )
82 break;
85 TResult result = _workerMethod( argument );
87 if ( WorkProgressChanged != null )
89 lock ( _inputHandle )
91 WorkProgressChanged( this,new ProgressChangedEventArgs( 100 - _input.Count * 100 / _totalItems, argument ) );
95 // TO DO: lock ( _outputHandle )
97 _consumerMethod( result );