当前位置: 首页 > >

muduo_base库学*笔记7??无界队列、有界队列及线程池的实现

发布时间:

BlockingQueueBoundedBlockingQueue实质就是一个生产者消费者的模型


一、 BlockingQueue


只用了一个条件变量notEmpty_,不需要notFull_,因为无界嘛不用考虑满的情况,条件变量需要跟一个互斥量一起使用mutex_,队列直接用STL中的deque_

put()生产;
生产中用MutexLockGuard调用Mutex_进行保护,生产了就通知消费者线程
take()消费;
队列空的时候就等待,直到不为空跳出循环,能跳出循环那就断言一定不为空了, front返回第一个元素,然后第一个元素弹出来,这是配套操作!front?>pop


size()队列大小

队列的大小也需要保护,因为可能有多个线程进行访问


二、BoundedBlockingQueue

两个条件变量,队列是直接用的boost::circular_buffer的环形缓冲区,一个保护互斥量


成员函数多了一个capacity表示队列的总容量,
多了empty函数和full函数判断队列是否满了或者空了

生产之前需要判断条件是否满了,如果满了就等待

其余都简单


三、线程池ThreadPool实现

线程池问题本质上也是生产者消费者问题,外部线程可以想线程池中的任务添加任务,相当于“生产者”;一旦任务队列中有任务,就唤醒线程队列中的线程来执行这些任务,这些任务就相当于“消费者”

类图:

互斥量、条件变量、线程池的名称、线程类,内部存放一个ptr_vector的指针、队列,用deque实现,数据类型是Task、running表示线程池是否处于运行状态
构造函数、析构函数、启动线程池(启动个数固定)、关闭线程池、运行任务(往线程池的任务队列添加任务)、线程池中的线程要执行的函数、获取任务(线程池中的线程函数要去获取任务然后执行任务)


具体实现:
.h文件

很清楚的步骤:构造函数(传递一个名字)、析构函数、start()启动线程池,run添加任务或者运行任务、 runInThread()线程池中的线程要执行的函数, take()获取任务


进入.cc文件看看一些详细的实现


1,start()启动线程池,启动的线程是固定个数的
用到了bind函数!


void ThreadPool::start(int numThreads)//numThreads个
{
assert(threads_.empty()); // 断言线程池是空的
running_ = true; // 运行状态标记置为true
threads_.reserve(numThreads); // 为线程池预留指定大小的空间
// 创建线程
for (int i = 0; i < numThreads; ++i)
{
char id[32];
snprintf(id, sizeof id, "%d", i);
threads_.push_back(new muduo::Thread(
boost::bind(&ThreadPool::runInThread, this), name_+id));//绑定runInThread
threads_[i].start();//启动线程,即启动bind的runInThread
}
}

2,runInThread函数


void ThreadPool::runInThread()
{
try
{
if (threadInitCallback_)
{
threadInitCallback_();
}
while (running_)//在start的时候就已经置为true了
{//在这个循环中执行
Task task(take());//take获取任务
if (task)//取出了任务,只要这个任务不空
{
task();//我们就执行任务
}
}
}
catch (const Exception& ex)
{
fprintf(stderr, "exception caught in ThreadPool %s
", name_.c_str());
fprintf(stderr, "reason: %s
", ex.what());
fprintf(stderr, "stack trace: %s
", ex.stackTrace());
abort();
}
catch (const std::exception& ex)
{
fprintf(stderr, "exception caught in ThreadPool %s
", name_.c_str());
fprintf(stderr, "reason: %s
", ex.what());
abort();
}
catch (...)
{
fprintf(stderr, "unknown exception caught in ThreadPool %s
", name_.c_str());
throw; // rethrow
}
}

3, take()获取任务


// 任务分配函数(获取任务)
// 线程池函数或者线程池里面的函数都可以到这里取出一个任务
// 然后在自己的线程中执行任务,返回一个任务指针
ThreadPool::Task ThreadPool::take()
{
MutexLockGuard lock(mutex_);
// always use a while-loop, due to spurious wakeup(虚假唤醒)
while (queue_.empty() && running_) // 任务队列为空且线程池处于运行状态,需要等待任务的到来
{
cond_.wait();
}
Task task; //任务来了,队列不空了就可以取任务
if(!queue_.empty())
{
// 获取任务并弹出
task = queue_.front();
queue_.pop_front();
}
return task; //返回任务
}

4,run() 执行任务


void ThreadPool::run(const Task& task)
{
// 如果线程池没有线程,那么直接执行任务
// 也就是说假设没有消费者,那么生产者直接消费产品,而不把任务加入任务队列
if (threads_.empty())
{
task();
}
// 如果线程池有线程,则将任务添加到任务队列
else
{
MutexLockGuard lock(mutex_);
queue_.push_back(task);
cond_.notify();
}
}

5,stop() 关闭线程池


void ThreadPool::stop()
{
{
MutexLockGuard lock(mutex_);
running_ = false; // 运行状态标识置为false
cond_.notifyAll(); // 通知所有线程
}
// 等待所有线程关闭
// boost::bind调用类成员函数时需要传入类成员函数指针、类对象指针...
for_each(threads_.begin(),
threads_.end(),
boost::bind(&muduo::Thread::join, _1));
}


//测试代码:



友情链接: