我想实现一个遵循大致接口的生产者/消费者场景:
class Consumer { private: vectorread(size_t n) { // If the internal buffer has `n` elements, then dequeue them // Otherwise wait for more data and try again } public: void run() { read(10); read(4839); // etc } void feed(const vector &more) { // Safely queue the data // Notify `read` that there is now more data } };
在这种情况下,feed
和run
将运行在独立的线程和read
应该是一个阻挡读出(如recv
和fread
).显然,我需要在我的双端队列中进行某种互斥,我需要某种通知系统来通知read
再试一次.
我听说条件变量是要走的路,但我所有的多线程经验都在于Windows,我很难绕过它们.
谢谢你的帮助!
(是的,我知道返回向量是没有效率的.让我们不要进入那个.)
此代码未准备好生产.没有对任何库调用的结果进行错误检查.
我在LockThread中包装了互斥锁的锁定/解锁,因此它是异常安全的.但那是关于它的.
另外,如果我认真地这样做,我会将互斥锁和条件变量包装在对象中,这样它们就可以在Consumer的其他方法中被滥用.但只要你注意到在使用条件变量之前必须获取锁(以任何方式),那么这个简单的情况可以保持原样.
出于兴趣,你检查了增强线程库吗?
#include#include #include class LockThread { public: LockThread(pthread_mutex_t& m) :mutex(m) { pthread_mutex_lock(&mutex); } ~LockThread() { pthread_mutex_unlock(&mutex); } private: pthread_mutex_t& mutex; }; class Consumer { pthread_mutex_t lock; pthread_cond_t cond; std::vector unreadData; public: Consumer() { pthread_mutex_init(&lock,NULL); pthread_cond_init(&cond,NULL); } ~Consumer() { pthread_cond_destroy(&cond); pthread_mutex_destroy(&lock); } private: std::vector read(size_t n) { LockThread locker(lock); while (unreadData.size() < n) { // Must wait until we have n char. // This is a while loop because feed may not put enough in. // pthread_cond() releases the lock. // Thread will not be allowed to continue until // signal is called and this thread reacquires the lock. pthread_cond_wait(&cond,&lock); // Once released from the condition you will have re-aquired the lock. // Thus feed() must have exited and released the lock first. } /* * Not sure if this is exactly what you wanted. * But the data is copied out of the thread safe buffer * into something that can be returned. */ std::vector result(n); // init result with size n std::copy(&unreadData[0], &unreadData[n], &result[0]); unreadData.erase(unreadData.begin(), unreadData.begin() + n); return (result); } public: void run() { read(10); read(4839); // etc } void feed(const std::vector &more) { LockThread locker(lock); // Once we acquire the lock we can safely modify the buffer. std::copy(more.begin(),more.end(),std::back_inserter(unreadData)); // Only signal the thread if you have the lock // Otherwise race conditions happen. pthread_cond_signal(&cond); // destructor releases the lock and thus allows read thread to continue. } }; int main() { Consumer c; }