关键词

caffe里的blocking_queue.hpp与.cpp干了点什么呢???

我看的一下午才明白的,因为吧,我之前都是不知道与boost::thread相关的任何知识,然后开始看各种资料啊。。。

妈的,我就是一个小白,没一点基础的。。

 

总的来说:blocking_queue实现一个阻塞队列,它利用了生成者与消费者的设计模式,怎么说呢?、

首先吧,你要有一个queue(队列,c++里的一种容器),对它的操作有push与pop。 push即向队列里压入数据,相当于一个生产者,然后呢,pop把数据弹出队列,相当于一个消费者。。但是呢,生产者与消费者的速度可能不一样(即push与pop的速度)啊,那怎么办呢??所以呢,要想办法让它们同步啊,方法即把这样queue变为一个阻塞队列啊。。。

 

下面看一下怎么实现的:

它的构造函数 :

template<typename T>
BlockingQueue<T>::BlockingQueue()        
    : sync_(new sync()) {
}

 

在blockingqueue的头文件中包括:成员变量有:queue_(这是一个队列,类型为std::queue)

                                                               sync_(这是一个sync类,里面只有有两个成员变量:mutex_与condition_)

blockingqueue的成员函数:

void push(参数为一个要push进去的数据)

bool try_pop(参数为一个用于存放要pop出来的数据的指针),如果有数据可以pop出来,则返回true,否则为false

type pop(参数为一个用于存放要pop出来的数据的指针),它与上面的区别在于,如果queue为空时,它会等待。

bool try_peek(参数为一个用于存放队列最前端的数据的指针),它的作用就是试着返回一下queue最前端的数据;有数据写入,则true.

type peek(参数为一个用于存放队列最前端的数据的指针),与上面的区别在于没有数据,它会等待。

size_t size() ,它干的事情就是返回队列中数据的个数;

 

另外,对于blocking_queueg来说,它只会在pop与peek的时候进行相应的等待(如果队列为空就等啊),在push的时候不用等待的(应该队列不会满吧,它可以自动增加吧,应有可能取的速度较快吧,不会造成队列不断增加吧);

补充一下sync的类:

// 这个类是在BlockingQueue类中定义的
template<typename T>                                             
class BlockingQueue<T>::sync {                                       
 public:                                                                   
  mutable boost::mutex mutex_;                                      
  boost::condition_variable condition_;                                   
};

 

还是在这里写一下实现代码吧:

#include <boost/thread.hpp>                                                                                                                                                                    
#include <string>

#include "caffe/data_reader.hpp"
#include "caffe/layers/base_data_layer.hpp"
#include "caffe/parallel.hpp"
#include "caffe/util/blocking_queue.hpp"

namespace caffe {

template<typename T>
class BlockingQueue<T>::sync {
 public:
  mutable boost::mutex mutex_;             //实现了一个mutex对象;
  boost::condition_variable condition_;   //也是实现了一个对象;
};

template<typename T>
BlockingQueue<T>::BlockingQueue()        
    : sync_(new sync()) {
}

template<typename T>
void BlockingQueue<T>::push(const T& t) {
  // 在push操作过程中,创建一个scoped_lock的对象lock,利用它的构造函数来对
  // mutex_进行加锁;
  boost::mutex::scoped_lock lock(sync_->mutex_);  
  queue_.push(t);
  lock.unlock();   //对互斥体解锁;
  sync_->condition_.notify_one(); //给相应的wait中的线程发出通知;
}

template<typename T>
bool BlockingQueue<T>::try_pop(T* t) {
  boost::mutex::scoped_lock lock(sync_->mutex_);

  if (queue_.empty()) {
    return false;
  }

  *t = queue_.front();
  queue_.pop();
  return true;
}

template<typename T>
T BlockingQueue<T>::pop(const string& log_on_wait) {
  boost::mutex::scoped_lock lock(sync_->mutex_);

  while (queue_.empty()) {
    if (!log_on_wait.empty()) {
      LOG_EVERY_N(INFO, 1000)<< log_on_wait; //当空的时候,输入相应的等待信息;
    }
    sync_->condition_.wait(lock);           //线程进入wait的过程;等相应的通知;
}

  T t = queue_.front();
  queue_.pop();
  return t;
}

template<typename T>
bool BlockingQueue<T>::try_peek(T* t) {
  boost::mutex::scoped_lock lock(sync_->mutex_);

  if (queue_.empty()) {
    return false;
  }

  *t = queue_.front();
  return true;
}

template<typename T>
T BlockingQueue<T>::peek() {
  boost::mutex::scoped_lock lock(sync_->mutex_);

  while (queue_.empty()) {
    sync_->condition_.wait(lock);
  }

  return queue_.front();
}

template<typename T>
size_t BlockingQueue<T>::size() const {
  boost::mutex::scoped_lock lock(sync_->mutex_);
  return queue_.size();
}

template class BlockingQueue<Batch<float>*>;
template class BlockingQueue<Batch<double>*>;
template class BlockingQueue<Datum*>;
template class BlockingQueue<shared_ptr<DataReader::QueuePair> >;
template class BlockingQueue<P2PSync<float>*>;
template class BlockingQueue<P2PSync<double>*>;

}  // namespace caffe

本文链接:http://task.lmcjl.com/news/12543.html

展开阅读全文