我看的一下午才明白的,因为吧,我之前都是不知道与boost::thread相关的任何知识,然后开始看各种资料啊。。。
妈的,我就是一个小白,没一点基础的。。
总的来说:blocking_queue实现一个阻塞队列,它利用了生成者与消费者的设计模式,怎么说呢?、
首先吧,你要有一个queue(队列,c++里的一种容器),对它的操作有push与pop。 push即向队列里压入数据,相当于一个生产者,然后呢,pop把数据弹出队列,相当于一个消费者。。但是呢,生产者与消费者的速度可能不一样(即push与pop的速度)啊,那怎么办呢??所以呢,要想办法让它们同步啊,方法即把这样queue变为一个阻塞队列啊。。。
下面看一下怎么实现的:
它的构造函数 :
template<typename T> BlockingQueue<T>::BlockingQueue() : sync_(new sync()) { }
在blockingqueue的头文件中包括:成员变量有:queue_(这是一个队列,类型为std::queue)
sync_(这是一个sync类,里面只有有两个成员变量:mutex_与condition_)
blockingqueue的成员函数:
void push(参数为一个要push进去的数据)
bool try_pop(参数为一个用于存放要pop出来的数据的指针),如果有数据可以pop出来,则返回true,否则为false
type pop(参数为一个用于存放要pop出来的数据的指针),它与上面的区别在于,如果queue为空时,它会等待。
bool try_peek(参数为一个用于存放队列最前端的数据的指针),它的作用就是试着返回一下queue最前端的数据;有数据写入,则true.
type peek(参数为一个用于存放队列最前端的数据的指针),与上面的区别在于没有数据,它会等待。
size_t size() ,它干的事情就是返回队列中数据的个数;
另外,对于blocking_queueg来说,它只会在pop与peek的时候进行相应的等待(如果队列为空就等啊),在push的时候不用等待的(应该队列不会满吧,它可以自动增加吧,应有可能取的速度较快吧,不会造成队列不断增加吧);
补充一下sync的类:
// 这个类是在BlockingQueue类中定义的 template<typename T> class BlockingQueue<T>::sync { public: mutable boost::mutex mutex_; boost::condition_variable condition_; };
还是在这里写一下实现代码吧:
#include <boost/thread.hpp> #include <string> #include "caffe/data_reader.hpp" #include "caffe/layers/base_data_layer.hpp" #include "caffe/parallel.hpp" #include "caffe/util/blocking_queue.hpp" namespace caffe { template<typename T> class BlockingQueue<T>::sync { public: mutable boost::mutex mutex_; //实现了一个mutex对象; boost::condition_variable condition_; //也是实现了一个对象; }; template<typename T> BlockingQueue<T>::BlockingQueue() : sync_(new sync()) { } template<typename T> void BlockingQueue<T>::push(const T& t) { // 在push操作过程中,创建一个scoped_lock的对象lock,利用它的构造函数来对 // mutex_进行加锁; boost::mutex::scoped_lock lock(sync_->mutex_); queue_.push(t); lock.unlock(); //对互斥体解锁; sync_->condition_.notify_one(); //给相应的wait中的线程发出通知; } template<typename T> bool BlockingQueue<T>::try_pop(T* t) { boost::mutex::scoped_lock lock(sync_->mutex_); if (queue_.empty()) { return false; } *t = queue_.front(); queue_.pop(); return true; } template<typename T> T BlockingQueue<T>::pop(const string& log_on_wait) { boost::mutex::scoped_lock lock(sync_->mutex_); while (queue_.empty()) { if (!log_on_wait.empty()) { LOG_EVERY_N(INFO, 1000)<< log_on_wait; //当空的时候,输入相应的等待信息; } sync_->condition_.wait(lock); //线程进入wait的过程;等相应的通知; } T t = queue_.front(); queue_.pop(); return t; } template<typename T> bool BlockingQueue<T>::try_peek(T* t) { boost::mutex::scoped_lock lock(sync_->mutex_); if (queue_.empty()) { return false; } *t = queue_.front(); return true; } template<typename T> T BlockingQueue<T>::peek() { boost::mutex::scoped_lock lock(sync_->mutex_); while (queue_.empty()) { sync_->condition_.wait(lock); } return queue_.front(); } template<typename T> size_t BlockingQueue<T>::size() const { boost::mutex::scoped_lock lock(sync_->mutex_); return queue_.size(); } template class BlockingQueue<Batch<float>*>; template class BlockingQueue<Batch<double>*>; template class BlockingQueue<Datum*>; template class BlockingQueue<shared_ptr<DataReader::QueuePair> >; template class BlockingQueue<P2PSync<float>*>; template class BlockingQueue<P2PSync<double>*>; } // namespace caffe
本文链接:http://task.lmcjl.com/news/12543.html