我有一个场景,我有多个线程添加到队列和多个线程从同一队列读取.如果队列达到特定大小,则填充队列的所有线程将在添加时被阻止,直到从队列中删除项目为止.
下面的解决方案就是我现在正在使用的问题,我的问题是:如何改进?是否有一个对象已经在我应该使用的BCL中启用此行为?
internal class BlockingCollection: CollectionBase, IEnumerable { //todo: might be worth changing this into a proper QUEUE private AutoResetEvent _FullEvent = new AutoResetEvent(false); internal T this[int i] { get { return (T) List[i]; } } private int _MaxSize; internal int MaxSize { get { return _MaxSize; } set { _MaxSize = value; checkSize(); } } internal BlockingCollection(int maxSize) { MaxSize = maxSize; } internal void Add(T item) { Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId)); _FullEvent.WaitOne(); List.Add(item); Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId)); checkSize(); } internal void Remove(T item) { lock (List) { List.Remove(item); } Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId)); } protected override void OnRemoveComplete(int index, object value) { checkSize(); base.OnRemoveComplete(index, value); } internal new IEnumerator GetEnumerator() { return List.GetEnumerator(); } private void checkSize() { if (Count < MaxSize) { Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId)); _FullEvent.Set(); } else { Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId)); _FullEvent.Reset(); } } }
Marc Gravell.. 198
这看起来非常不安全(很少同步); 怎么样的东西:
class SizeQueue{ private readonly Queue queue = new Queue (); private readonly int maxSize; public SizeQueue(int maxSize) { this.maxSize = maxSize; } public void Enqueue(T item) { lock (queue) { while (queue.Count >= maxSize) { Monitor.Wait(queue); } queue.Enqueue(item); if (queue.Count == 1) { // wake up any blocked dequeue Monitor.PulseAll(queue); } } } public T Dequeue() { lock (queue) { while (queue.Count == 0) { Monitor.Wait(queue); } T item = queue.Dequeue(); if (queue.Count == maxSize - 1) { // wake up any blocked enqueue Monitor.PulseAll(queue); } return item; } } }
(编辑)
实际上,你想要一种方法来关闭队列,以便读者开始干净利落 - 可能像bool标志 - 如果设置,空队列只返回(而不是阻塞):
bool closing; public void Close() { lock(queue) { closing = true; Monitor.PulseAll(queue); } } public bool TryDequeue(out T value) { lock (queue) { while (queue.Count == 0) { if (closing) { value = default(T); return false; } Monitor.Wait(queue); } value = queue.Dequeue(); if (queue.Count == maxSize - 1) { // wake up any blocked enqueue Monitor.PulseAll(queue); } return true; } }
@ user260197 - 因为命名的东西很棘手;-p随意重命名它... (10认同)
@Lasse - 它在`Wait`期间释放锁,因此其他线程可以获取它.它在唤醒时收回锁. (4认同)
为什么SizeQueue,为什么不是FixedSizeQueue? (3认同)
xhafan.. 55
使用.net 4 BlockingCollection,排队使用Add(),出列使用Take().它在内部使用非阻塞的ConcurrentQueue.更多信息此处快速和最佳生产者/消费者队列技术BlockingCollection与并发队列
这看起来非常不安全(很少同步); 怎么样的东西:
class SizeQueue{ private readonly Queue queue = new Queue (); private readonly int maxSize; public SizeQueue(int maxSize) { this.maxSize = maxSize; } public void Enqueue(T item) { lock (queue) { while (queue.Count >= maxSize) { Monitor.Wait(queue); } queue.Enqueue(item); if (queue.Count == 1) { // wake up any blocked dequeue Monitor.PulseAll(queue); } } } public T Dequeue() { lock (queue) { while (queue.Count == 0) { Monitor.Wait(queue); } T item = queue.Dequeue(); if (queue.Count == maxSize - 1) { // wake up any blocked enqueue Monitor.PulseAll(queue); } return item; } } }
(编辑)
实际上,你想要一种方法来关闭队列,以便读者开始干净利落 - 可能像bool标志 - 如果设置,空队列只返回(而不是阻塞):
bool closing; public void Close() { lock(queue) { closing = true; Monitor.PulseAll(queue); } } public bool TryDequeue(out T value) { lock (queue) { while (queue.Count == 0) { if (closing) { value = default(T); return false; } Monitor.Wait(queue); } value = queue.Dequeue(); if (queue.Count == maxSize - 1) { // wake up any blocked enqueue Monitor.PulseAll(queue); } return true; } }
使用.net 4 BlockingCollection,排队使用Add(),出列使用Take().它在内部使用非阻塞的ConcurrentQueue.更多信息此处快速和最佳生产者/消费者队列技术BlockingCollection与并发队列
"怎么能改善这个?"
好吧,您需要查看类中的每个方法,并考虑如果另一个线程同时调用该方法或任何其他方法会发生什么.例如,您在Remove方法中放置了一个锁,但在Add方法中没有.如果一个线程与另一个线程同时添加删除会发生什么?坏事.
还要考虑一个方法可以返回第二个对象,该对象提供对第一个对象的内部数据的访问 - 例如,GetEnumerator.想象一个线程正在通过该枚举器,另一个线程正在同时修改列表.不好.
一个好的经验法则是让这个简单的获得通过削减的方法的数量在类中的绝对最小的权利.
特别是,不继承另一个容器类,因为你会暴露所有这些类的方法,来电话者腐败提供一种方式内部数据,或者看到的数据(同样糟糕部分完成的变化,因为数据那一刻似乎已经腐败了).隐藏所有细节,并对您如何允许访问它们完全无情.
我强烈建议您使用现成的解决方案 - 获取有关线程或使用第三方库的书籍.否则,根据您的尝试,您将长时间调试代码.
另外,是不是更有意义删除返回一个项目(比方说,这是第一次添加的一个,因为它是一个队列),而不是调用者选择的具体项目?当队列为空时,也许Remove也应该阻塞.
更新:Marc的答案实际上实现了所有这些建议!:)但我会留在这里,因为它可能有助于理解为什么他的版本是这样的改进.
您可以在System.Collections.Concurrent命名空间中使用BlockingCollection和ConcurrentQueue
public class ProducerConsumerQueue: BlockingCollection { /// /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality /// public ProducerConsumerQueue() : base(new ConcurrentQueue()) { } /// /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality /// /// public ProducerConsumerQueue(int maxSize) : base(new ConcurrentQueue(), maxSize) { } }
我刚刚使用Reactive Extensions打破了这个问题并记住了这个问题:
public class BlockingQueue{ private readonly Subject _queue; private readonly IEnumerator _enumerator; private readonly object _sync = new object(); public BlockingQueue() { _queue = new Subject (); _enumerator = _queue.GetEnumerator(); } public void Enqueue(T item) { lock (_sync) { _queue.OnNext(item); } } public T Dequeue() { _enumerator.MoveNext(); return _enumerator.Current; } }
不一定完全安全,但非常简单.
这就是我为一个线程安全的有界阻塞队列而来的操作.
using System; using System.Collections.Generic; using System.Text; using System.Threading; public class BlockingBuffer{ private Object t_lock; private Semaphore sema_NotEmpty; private Semaphore sema_NotFull; private T[] buf; private int getFromIndex; private int putToIndex; private int size; private int numItems; public BlockingBuffer(int Capacity) { if (Capacity <= 0) throw new ArgumentOutOfRangeException("Capacity must be larger than 0"); t_lock = new Object(); buf = new T[Capacity]; sema_NotEmpty = new Semaphore(0, Capacity); sema_NotFull = new Semaphore(Capacity, Capacity); getFromIndex = 0; putToIndex = 0; size = Capacity; numItems = 0; } public void put(T item) { sema_NotFull.WaitOne(); lock (t_lock) { while (numItems == size) { Monitor.Pulse(t_lock); Monitor.Wait(t_lock); } buf[putToIndex++] = item; if (putToIndex == size) putToIndex = 0; numItems++; Monitor.Pulse(t_lock); } sema_NotEmpty.Release(); } public T take() { T item; sema_NotEmpty.WaitOne(); lock (t_lock) { while (numItems == 0) { Monitor.Pulse(t_lock); Monitor.Wait(t_lock); } item = buf[getFromIndex++]; if (getFromIndex == size) getFromIndex = 0; numItems--; Monitor.Pulse(t_lock); } sema_NotFull.Release(); return item; } }