Home
Manage Your Code
Snippet: Multi Threading (C#)
Title: Multi Threading Language: C#
Description: Take a list of inputs, returns a list of outputs, and you just have to inherit from ThreadBase Views: 3098
Author: kevin upchurch Date Added: 12/10/2008
Copy Code  
1    public static class ThreadManager
2    {
3
4        private const int NUM_WAIT_HANDLES = 64;
5
6        /// <summary>

7        /// Executes a series of operations in parallel, waiting for the complete collection of results, and then returns the results.

8        /// This implements a "scatter-gather" pattern.

9        /// </summary>

10        /// <typeparam name="TThreadBase">The type of the thread base.</typeparam>

11        /// <typeparam name="TInput">The type of the input.</typeparam>

12        /// <typeparam name="TOutput">The type of the output.</typeparam>

13        /// <param name="inputList">The input list.</param>

14        /// <returns></returns>

15        public static List<TOutput> ExecuteParallel<TThreadBase, TInput, TOutput>(List<TInput> inputList) where TThreadBase : ThreadBase<TInput, TOutput>, new()
16        {
17
18            int totalItems = inputList.Count;
19
20            List<TOutput> results = new List<TOutput>(totalItems);
21
22            if (inputList == null || inputList.Count == 0)
23            {//no need to process anything thing, because we weren't sent anything

24                return results;
25            }
26
27            Queue<ManualResetEvent> events = new Queue<ManualResetEvent>(totalItems);
28            List<TThreadBase> threadBases = new List<TThreadBase>();
29
30            // Queue all work items in the threadpool

31            foreach (TInput item in inputList)
32            {
33                ManualResetEvent manualResetEvent = new ManualResetEvent(false);
34                TThreadBase p = new TThreadBase();
35                events.Enqueue(manualResetEvent);
36                threadBases.Add(p);
37                p.SetData(item, manualResetEvent);
38                ThreadPool.QueueUserWorkItem(p.ThreadPoolCallback, null);
39            }
40
41            // Wait for completion

42            // Due to limitations on WaitAll, must be "chunked" in sets of 64 or fewer waithandles

43            List<WaitHandle> handles = new List<WaitHandle>(NUM_WAIT_HANDLES);
44
45            //calculate the number of times that we need to wait

46            int loops = ((inputList.Count - 1) / NUM_WAIT_HANDLES) + 1;
47
48            //chuncking the waiting

49            for (int currentLoop = 0; currentLoop < loops; currentLoop++)
50            {
51                for (int handleCount = 0; handleCount < NUM_WAIT_HANDLES; handleCount++)
52                {
53                    if (events.Count == 0)
54                    {
55                        break;
56                    }
57                    handles.Add(events.Dequeue());
58                }
59                WaitAll(handles.ToArray());
60                handles.Clear();
61            }
62
63            //spool up all the results

64            foreach (ThreadBase<TInput, TOutput> b in threadBases)
65            {
66                if (b.Result != null)
67                {
68                    results.Add(b.Result);
69                }
70            }
71            return results;
72
73        }
74
75        private static void WaitAll(WaitHandle[] waitHandles)
76        {
77            if (Thread.CurrentThread.GetApartmentState() == ApartmentState.STA)
78            {
79                // WaitAll for multiple handles on an STA thread is not supported.

80                // ...so wait on each handle individually.

81                foreach (WaitHandle myWaitHandle in waitHandles)
82                {
83                    myWaitHandle.WaitOne();
84                }
85            }
86            else
87            {
88                WaitHandle.WaitAll(waitHandles);
89            }
90        }
91
92    }
93
94public abstract class ThreadBase<TInput, TOutput>
95    {
96
97        private ManualResetEvent _doneEvent;
98        private TInput _input;
99        private TOutput _ouput;
100
101        /// <summary>

102        /// Gets the result.

103        /// </summary>

104        /// <value>The result.</value>

105        public TOutput Result
106        {
107            get
108            {
109                return _ouput;
110            }
111        }
112
113
114        /// <summary>

115        /// Sets the data.

116        /// </summary>

117        /// <param name="input">The input.</param>

118        /// <param name="doneEvent">The done event.</param>

119        public void SetData(TInput input, ManualResetEvent doneEvent)
120        {
121            _doneEvent = doneEvent;
122            _input = input;
123        }
124
125
126        /// <summary>

127        /// Threads the pool callback.

128        /// </summary>

129        /// <param name="threadContext">The thread context.</param>

130        public void ThreadPoolCallback(Object threadContext)
131        {
132            try
133            {
134                _ouput = Execute(_input);
135            }
136            catch (Exception ex)
137            {//do something with the errors if you want

138                Console.WriteLine("{0} {1}", _input.ToString(), ex.ToString());
139            }
140            _doneEvent.Set();
141        }
142
143        /// <summary>

144        /// Executes the specified input.

145        /// </summary>

146        /// <param name="input">The input.</param>

147        /// <returns></returns>

148        public abstract TOutput Execute(TInput input);
149
150    }
151
152 public class HitWebpage : ThreadBase<string, string>
153    {
154        public override string Execute(string input)
155        {
156            WebRequest request = WebRequest.Create(input);
157            WebResponse response = request.GetResponse();
158            System.IO.StreamReader reader = new System.IO.StreamReader(response.GetResponseStream());
159            return reader.ReadToEnd();
160        }
161    }
Usage
List inputUrls = new List();
inputUrls.Add("http://google.com/");
inputUrls.Add("http://code.google.com/");
List returned = ThreadManager.ExecuteParallel(inputUrls);