1 public static class ThreadManager
2 {
3
4 private const int NUM_WAIT_HANDLES = 64;
5
6 /// <summary>
7 /// Executes a series of operations in parallel, waiting for the complete collection of results, and then returns the results.
8 /// This implements a "scatter-gather" pattern.
9 /// </summary>
10 /// <typeparam name="TThreadBase">The type of the thread base.</typeparam>
11 /// <typeparam name="TInput">The type of the input.</typeparam>
12 /// <typeparam name="TOutput">The type of the output.</typeparam>
13 /// <param name="inputList">The input list.</param>
14 /// <returns></returns>
15 public static List<TOutput> ExecuteParallel<TThreadBase, TInput, TOutput>(List<TInput> inputList) where TThreadBase : ThreadBase<TInput, TOutput>, new()
16 {
17
18 int totalItems = inputList.Count;
19
20 List<TOutput> results = new List<TOutput>(totalItems);
21
22 if (inputList == null || inputList.Count == 0)
23 {//no need to process anything thing, because we weren't sent anything
24 return results;
25 }
26
27 Queue<ManualResetEvent> events = new Queue<ManualResetEvent>(totalItems);
28 List<TThreadBase> threadBases = new List<TThreadBase>();
29
30 // Queue all work items in the threadpool
31 foreach (TInput item in inputList)
32 {
33 ManualResetEvent manualResetEvent = new ManualResetEvent(false);
34 TThreadBase p = new TThreadBase();
35 events.Enqueue(manualResetEvent);
36 threadBases.Add(p);
37 p.SetData(item, manualResetEvent);
38 ThreadPool.QueueUserWorkItem(p.ThreadPoolCallback, null);
39 }
40
41 // Wait for completion
42 // Due to limitations on WaitAll, must be "chunked" in sets of 64 or fewer waithandles
43 List<WaitHandle> handles = new List<WaitHandle>(NUM_WAIT_HANDLES);
44
45 //calculate the number of times that we need to wait
46 int loops = ((inputList.Count - 1) / NUM_WAIT_HANDLES) + 1;
47
48 //chuncking the waiting
49 for (int currentLoop = 0; currentLoop < loops; currentLoop++)
50 {
51 for (int handleCount = 0; handleCount < NUM_WAIT_HANDLES; handleCount++)
52 {
53 if (events.Count == 0)
54 {
55 break;
56 }
57 handles.Add(events.Dequeue());
58 }
59 WaitAll(handles.ToArray());
60 handles.Clear();
61 }
62
63 //spool up all the results
64 foreach (ThreadBase<TInput, TOutput> b in threadBases)
65 {
66 if (b.Result != null)
67 {
68 results.Add(b.Result);
69 }
70 }
71 return results;
72
73 }
74
75 private static void WaitAll(WaitHandle[] waitHandles)
76 {
77 if (Thread.CurrentThread.GetApartmentState() == ApartmentState.STA)
78 {
79 // WaitAll for multiple handles on an STA thread is not supported.
80 // ...so wait on each handle individually.
81 foreach (WaitHandle myWaitHandle in waitHandles)
82 {
83 myWaitHandle.WaitOne();
84 }
85 }
86 else
87 {
88 WaitHandle.WaitAll(waitHandles);
89 }
90 }
91
92 }
93
94public abstract class ThreadBase<TInput, TOutput>
95 {
96
97 private ManualResetEvent _doneEvent;
98 private TInput _input;
99 private TOutput _ouput;
100
101 /// <summary>
102 /// Gets the result.
103 /// </summary>
104 /// <value>The result.</value>
105 public TOutput Result
106 {
107 get
108 {
109 return _ouput;
110 }
111 }
112
113
114 /// <summary>
115 /// Sets the data.
116 /// </summary>
117 /// <param name="input">The input.</param>
118 /// <param name="doneEvent">The done event.</param>
119 public void SetData(TInput input, ManualResetEvent doneEvent)
120 {
121 _doneEvent = doneEvent;
122 _input = input;
123 }
124
125
126 /// <summary>
127 /// Threads the pool callback.
128 /// </summary>
129 /// <param name="threadContext">The thread context.</param>
130 public void ThreadPoolCallback(Object threadContext)
131 {
132 try
133 {
134 _ouput = Execute(_input);
135 }
136 catch (Exception ex)
137 {//do something with the errors if you want
138 Console.WriteLine("{0} {1}", _input.ToString(), ex.ToString());
139 }
140 _doneEvent.Set();
141 }
142
143 /// <summary>
144 /// Executes the specified input.
145 /// </summary>
146 /// <param name="input">The input.</param>
147 /// <returns></returns>
148 public abstract TOutput Execute(TInput input);
149
150 }
151
152 public class HitWebpage : ThreadBase<string, string>
153 {
154 public override string Execute(string input)
155 {
156 WebRequest request = WebRequest.Create(input);
157 WebResponse response = request.GetResponse();
158 System.IO.StreamReader reader = new System.IO.StreamReader(response.GetResponseStream());
159 return reader.ReadToEnd();
160 }
161 }