当前位置:  开发笔记 > 编程语言 > 正文

等待池化线程完成

如何解决《等待池化线程完成》经验,为你挑选了4个好方法。

我很抱歉有一个多余的问题.但是,我找到了很多解决我问题的方法,但没有一个解释得很清楚.我希望在这里说清楚.

我的C#应用​​程序的主线程使用ThreadPool生成1..n后台工作者.我希望原始线程锁定,直到所有工人都完成.我特别研究了ManualResetEvent,但我不清楚它的用途.

在伪:

foreach( var o in collection )
{
  queue new worker(o);
}

while( workers not completed ) { continue; }

如果有必要,我会知道即将排队的工人数量.



1> JaredPar..:

试试这个.该函数接受一个Action委托列表.它将为列表中的每个项添加一个ThreadPool worker条目.在返回之前,它将等待每个动作完成.

public static void SpawnAndWait(IEnumerable actions)
{
    var list = actions.ToList();
    var handles = new ManualResetEvent[actions.Count()];
    for (var i = 0; i < list.Count; i++)
    {
        handles[i] = new ManualResetEvent(false);
        var currentAction = list[i];
        var currentHandle = handles[i];
        Action wrappedAction = () => { try { currentAction(); } finally { currentHandle.Set(); } };
        ThreadPool.QueueUserWorkItem(x => wrappedAction());
    }

    WaitHandle.WaitAll(handles);
}


如果句柄数大于系统允许的数量,WaitHandle.WaitAll将失败.在我的Win2k3服务器上,这个数字是64,所以当我尝试产生超过64个项目时,我得到一个例外...
此代码中存在严重错误.由于包装器动作是懒惰地评估的,因此第一个线程可能执行包装器动作,获得第二个,第三个等句柄而不是第一个.

2> Marc Gravell..:

这是一种不同的方法 - 封装; 所以你的代码可以像下面这样简单:

    Forker p = new Forker();
    foreach (var obj in collection)
    {
        var tmp = obj;
        p.Fork(delegate { DoSomeWork(tmp); });
    }
    p.Join();

Forker下面给出了这个类(我在火车上感到无聊;-p)......再次,这避免了操作系统对象,但是整齐地包装起来(IMO):

using System;
using System.Threading;

/// Event arguments representing the completion of a parallel action.
public class ParallelEventArgs : EventArgs
{
    private readonly object state;
    private readonly Exception exception;
    internal ParallelEventArgs(object state, Exception exception)
    {
        this.state = state;
        this.exception = exception;
    }

    /// The opaque state object that identifies the action (null otherwise).
    public object State { get { return state; } }

    /// The exception thrown by the parallel action, or null if it completed without exception.
    public Exception Exception { get { return exception; } }
}

/// Provides a caller-friendly wrapper around parallel actions.
public sealed class Forker
{
    int running;
    private readonly object joinLock = new object(), eventLock = new object();

    /// Raised when all operations have completed.
    public event EventHandler AllComplete
    {
        add { lock (eventLock) { allComplete += value; } }
        remove { lock (eventLock) { allComplete -= value; } }
    }
    private EventHandler allComplete;
    /// Raised when each operation completes.
    public event EventHandler ItemComplete
    {
        add { lock (eventLock) { itemComplete += value; } }
        remove { lock (eventLock) { itemComplete -= value; } }
    }
    private EventHandler itemComplete;

    private void OnItemComplete(object state, Exception exception)
    {
        EventHandler itemHandler = itemComplete; // don't need to lock
        if (itemHandler != null) itemHandler(this, new ParallelEventArgs(state, exception));
        if (Interlocked.Decrement(ref running) == 0)
        {
            EventHandler allHandler = allComplete; // don't need to lock
            if (allHandler != null) allHandler(this, EventArgs.Empty);
            lock (joinLock)
            {
                Monitor.PulseAll(joinLock);
            }
        }
    }

    /// Adds a callback to invoke when each operation completes.
    /// Current instance (for fluent API).
    public Forker OnItemComplete(EventHandler handler)
    {
        if (handler == null) throw new ArgumentNullException("handler");
        ItemComplete += handler;
        return this;
    }

    /// Adds a callback to invoke when all operations are complete.
    /// Current instance (for fluent API).
    public Forker OnAllComplete(EventHandler handler)
    {
        if (handler == null) throw new ArgumentNullException("handler");
        AllComplete += handler;
        return this;
    }

    /// Waits for all operations to complete.
    public void Join()
    {
        Join(-1);
    }

    /// Waits (with timeout) for all operations to complete.
    /// Whether all operations had completed before the timeout.
    public bool Join(int millisecondsTimeout)
    {
        lock (joinLock)
        {
            if (CountRunning() == 0) return true;
            Thread.SpinWait(1); // try our luck...
            return (CountRunning() == 0) ||
                Monitor.Wait(joinLock, millisecondsTimeout);
        }
    }

    /// Indicates the number of incomplete operations.
    /// The number of incomplete operations.
    public int CountRunning()
    {
        return Interlocked.CompareExchange(ref running, 0, 0);
    }

    /// Enqueues an operation.
    /// The operation to perform.
    /// The current instance (for fluent API).
    public Forker Fork(ThreadStart action) { return Fork(action, null); }

    /// Enqueues an operation.
    /// The operation to perform.
    /// An opaque object, allowing the caller to identify operations.
    /// The current instance (for fluent API).
    public Forker Fork(ThreadStart action, object state)
    {
        if (action == null) throw new ArgumentNullException("action");
        Interlocked.Increment(ref running);
        ThreadPool.QueueUserWorkItem(delegate
        {
            Exception exception = null;
            try { action(); }
            catch (Exception ex) { exception = ex;}
            OnItemComplete(state, exception);
        });
        return this;
    }
}


你需要了解代码`delegate {DoSomeWork(tmp); }**捕获*变量`tmp`.每次调用此代码时,每次循环都会捕获一个*不同的*变量,因为`tmp`的范围仅限于循环体.但是,每次循环时,`foreach`变量都是*same*变量,因此所有调用`delegate {DoSomeWork(tmp); 抓住同样的事情.这真的不需要像这样; 限制foreach变量的范围会使许多代码"正常工作"而人们甚至无法意识到这种情况的棘手.

3> Marc Gravell..:

首先,工人执行多长时间?池线程通常应该用于短期任务 - 如果它们要运行一段时间,请考虑手动线程.

重新解决问题; 你真的需要阻止主线程吗?你可以使用回调吗?如果是这样的话,比如:

int running = 1; // start at 1 to prevent multiple callbacks if
          // tasks finish faster than they are started
Action endOfThread = delegate {
    if(Interlocked.Decrement(ref running) == 0) {
        // ****run callback method****
    }
};
foreach(var o in collection)
{
    var tmp = o; // avoid "capture" issue
    Interlocked.Increment(ref running);
    ThreadPool.QueueUserWorkItem(delegate {
        DoSomeWork(tmp); // [A] should handle exceptions internally
        endOfThread();
    });
}
endOfThread(); // opposite of "start at 1"

这是一种相当轻量级(无操作系统原语)跟踪工作者的方式.

如果需要阻止,可以使用Monitor(同样,避免OS对象)执行相同的操作:

    object syncLock = new object();
    int running = 1;
    Action endOfThread = delegate {
        if (Interlocked.Decrement(ref running) == 0) {
            lock (syncLock) {
                Monitor.Pulse(syncLock);
            }
        }
    };
    lock (syncLock) {
        foreach (var o in collection) {
            var tmp = o; // avoid "capture" issue
            ThreadPool.QueueUserWorkItem(delegate
            {
                DoSomeWork(tmp); // [A] should handle exceptions internally
                endOfThread();
            });
        }
        endOfThread();
        Monitor.Wait(syncLock);
    }
    Console.WriteLine("all done");


如果其中一个委托抛出异常,您的代码将无限期等待.
如果其中一个委托抛出一个异常,我将失去整个过程,所以这是相当随意的...我假设它不会抛出,但我会明确的;-p

4> 小智..:

我一直在使用CTP新的并行任务库在这里:

       Parallel.ForEach(collection, o =>
            {
                DoSomeWork(o);
            });

推荐阅读
小白也坚强_177
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有