我很抱歉有一个多余的问题.但是,我找到了很多解决我问题的方法,但没有一个解释得很清楚.我希望在这里说清楚.
我的C#应用程序的主线程使用ThreadPool生成1..n后台工作者.我希望原始线程锁定,直到所有工人都完成.我特别研究了ManualResetEvent,但我不清楚它的用途.
在伪:
foreach( var o in collection ) { queue new worker(o); } while( workers not completed ) { continue; }
如果有必要,我会知道即将排队的工人数量.
试试这个.该函数接受一个Action委托列表.它将为列表中的每个项添加一个ThreadPool worker条目.在返回之前,它将等待每个动作完成.
public static void SpawnAndWait(IEnumerableactions) { var list = actions.ToList(); var handles = new ManualResetEvent[actions.Count()]; for (var i = 0; i < list.Count; i++) { handles[i] = new ManualResetEvent(false); var currentAction = list[i]; var currentHandle = handles[i]; Action wrappedAction = () => { try { currentAction(); } finally { currentHandle.Set(); } }; ThreadPool.QueueUserWorkItem(x => wrappedAction()); } WaitHandle.WaitAll(handles); }
这是一种不同的方法 - 封装; 所以你的代码可以像下面这样简单:
Forker p = new Forker(); foreach (var obj in collection) { var tmp = obj; p.Fork(delegate { DoSomeWork(tmp); }); } p.Join();
Forker
下面给出了这个类(我在火车上感到无聊;-p)......再次,这避免了操作系统对象,但是整齐地包装起来(IMO):
using System; using System.Threading; ///Event arguments representing the completion of a parallel action. public class ParallelEventArgs : EventArgs { private readonly object state; private readonly Exception exception; internal ParallelEventArgs(object state, Exception exception) { this.state = state; this.exception = exception; } ///The opaque state object that identifies the action (null otherwise). public object State { get { return state; } } ///The exception thrown by the parallel action, or null if it completed without exception. public Exception Exception { get { return exception; } } } ///Provides a caller-friendly wrapper around parallel actions. public sealed class Forker { int running; private readonly object joinLock = new object(), eventLock = new object(); ///Raised when all operations have completed. public event EventHandler AllComplete { add { lock (eventLock) { allComplete += value; } } remove { lock (eventLock) { allComplete -= value; } } } private EventHandler allComplete; ///Raised when each operation completes. public event EventHandlerItemComplete { add { lock (eventLock) { itemComplete += value; } } remove { lock (eventLock) { itemComplete -= value; } } } private EventHandler itemComplete; private void OnItemComplete(object state, Exception exception) { EventHandler itemHandler = itemComplete; // don't need to lock if (itemHandler != null) itemHandler(this, new ParallelEventArgs(state, exception)); if (Interlocked.Decrement(ref running) == 0) { EventHandler allHandler = allComplete; // don't need to lock if (allHandler != null) allHandler(this, EventArgs.Empty); lock (joinLock) { Monitor.PulseAll(joinLock); } } } /// Adds a callback to invoke when each operation completes. ///Current instance (for fluent API). public Forker OnItemComplete(EventHandlerhandler) { if (handler == null) throw new ArgumentNullException("handler"); ItemComplete += handler; return this; } /// Adds a callback to invoke when all operations are complete. ///Current instance (for fluent API). public Forker OnAllComplete(EventHandler handler) { if (handler == null) throw new ArgumentNullException("handler"); AllComplete += handler; return this; } ///Waits for all operations to complete. public void Join() { Join(-1); } ///Waits (with timeout) for all operations to complete. ///Whether all operations had completed before the timeout. public bool Join(int millisecondsTimeout) { lock (joinLock) { if (CountRunning() == 0) return true; Thread.SpinWait(1); // try our luck... return (CountRunning() == 0) || Monitor.Wait(joinLock, millisecondsTimeout); } } ///Indicates the number of incomplete operations. ///The number of incomplete operations. public int CountRunning() { return Interlocked.CompareExchange(ref running, 0, 0); } ///Enqueues an operation. /// The operation to perform. ///The current instance (for fluent API). public Forker Fork(ThreadStart action) { return Fork(action, null); } ///Enqueues an operation. /// The operation to perform. /// An opaque object, allowing the caller to identify operations. ///The current instance (for fluent API). public Forker Fork(ThreadStart action, object state) { if (action == null) throw new ArgumentNullException("action"); Interlocked.Increment(ref running); ThreadPool.QueueUserWorkItem(delegate { Exception exception = null; try { action(); } catch (Exception ex) { exception = ex;} OnItemComplete(state, exception); }); return this; } }
首先,工人执行多长时间?池线程通常应该用于短期任务 - 如果它们要运行一段时间,请考虑手动线程.
重新解决问题; 你真的需要阻止主线程吗?你可以使用回调吗?如果是这样的话,比如:
int running = 1; // start at 1 to prevent multiple callbacks if // tasks finish faster than they are started Action endOfThread = delegate { if(Interlocked.Decrement(ref running) == 0) { // ****run callback method**** } }; foreach(var o in collection) { var tmp = o; // avoid "capture" issue Interlocked.Increment(ref running); ThreadPool.QueueUserWorkItem(delegate { DoSomeWork(tmp); // [A] should handle exceptions internally endOfThread(); }); } endOfThread(); // opposite of "start at 1"
这是一种相当轻量级(无操作系统原语)跟踪工作者的方式.
如果需要阻止,可以使用Monitor
(同样,避免OS对象)执行相同的操作:
object syncLock = new object(); int running = 1; Action endOfThread = delegate { if (Interlocked.Decrement(ref running) == 0) { lock (syncLock) { Monitor.Pulse(syncLock); } } }; lock (syncLock) { foreach (var o in collection) { var tmp = o; // avoid "capture" issue ThreadPool.QueueUserWorkItem(delegate { DoSomeWork(tmp); // [A] should handle exceptions internally endOfThread(); }); } endOfThread(); Monitor.Wait(syncLock); } Console.WriteLine("all done");
我一直在使用CTP新的并行任务库在这里:
Parallel.ForEach(collection, o => { DoSomeWork(o); });