ThreadPool解析

ThreadPool解析

Thread_Pool 项目解析

简介

ThreadPool 是一个轻量级的 C++ 线程池实现,旨在简化多线程编程。

项目分析

我们首先上github的项目地址:https://github.com/progschj/ThreadPool,然后克隆项目到本地。

点开项目的ThrealPool.h文件,查看源码:

#ifndef THREAD_POOL_H

#define THREAD_POOL_H

#include

#include

#include

#include

#include

#include

#include

#include

#include

class ThreadPool {

public:

ThreadPool(size_t);

template

auto enqueue(F&& f, Args&&... args)

-> std::future::type>;

~ThreadPool();

private:

// need to keep track of threads so we can join them

std::vector< std::thread > workers;

// the task queue

std::queue< std::function > tasks;

// synchronization

std::mutex queue_mutex;

std::condition_variable condition;

bool stop;

};

// the constructor just launches some amount of workers

inline ThreadPool::ThreadPool(size_t threads)

: stop(false)

{

for(size_t i = 0;i

workers.emplace_back(

[this]

{

for(;;)

{

std::function task;

{

std::unique_lock lock(this->queue_mutex);

this->condition.wait(lock,

[this]{ return this->stop || !this->tasks.empty(); });

if(this->stop && this->tasks.empty())

return;

task = std::move(this->tasks.front());

this->tasks.pop();

}

task();

}

}

);

}

// add new work item to the pool

template

auto ThreadPool::enqueue(F&& f, Args&&... args)

-> std::future::type>

{

using return_type = typename std::result_of::type;

auto task = std::make_shared< std::packaged_task >(

std::bind(std::forward(f), std::forward(args)...)

);

std::future res = task->get_future();

{

std::unique_lock lock(queue_mutex);

// don't allow enqueueing after stopping the pool

if(stop)

throw std::runtime_error("enqueue on stopped ThreadPool");

tasks.emplace([task](){ (*task)(); });

}

condition.notify_one();

return res;

}

// the destructor joins all threads

inline ThreadPool::~ThreadPool()

{

{

std::unique_lock lock(queue_mutex);

stop = true;

}

condition.notify_all();

for(std::thread &worker: workers)

worker.join();

}

#endif

类成员分析

接下来,我们一步一步分析源代码。

在整个文件中只定义一个类ThreadPool,它的类成员有:

std::vector< std::thread > workers;//存储处理任务的线程

std::queue< std::function > tasks;//存储任务的队列

std::mutex queue_mutex; // 互斥锁

std::condition_variable condition; // 条件变量,和上面的互斥锁保证多线程的同步和互斥

bool stop; // 线程池的是否停止的标志

ThreadPool初始化

先上代码:

inline ThreadPool::ThreadPool(size_t threads)

: stop(false)

{

for(size_t i = 0;i

workers.emplace_back(

[this]

{

for(;;)

{

std::function task;

{

std::unique_lock lock(this->queue_mutex);

this->condition.wait(lock,

[this]{ return this->stop || !this->tasks.empty(); });

if(this->stop && this->tasks.empty())

return;

task = std::move(this->tasks.front());

this->tasks.pop();

}

task();

}

}

);

}

ThreadPool 的初始化需传入一个参数threads,且将stop赋值为0.

接着往workers里加入threads个线程,每个线程都执行死循环:

for(;;)

{

std::function task;

{

std::unique_lock lock(this->queue_mutex);

this->condition.wait(lock,

[this]{ return this->stop || !this->tasks.empty(); });

if(this->stop && this->tasks.empty())

return;

task = std::move(this->tasks.front());

this->tasks.pop();

}

task();

}

在循环中,先定义锁,再调用condition.wait()方法,当线程池运行且任务队列为空时,线程堵塞,否则线程继续运行,然后当线程池停止且任务队列为空时,跳出循环,结束线程。否则从取出任务队列的第一个任务,执行任务。

ThreadPool enqueue 加入队列

template

auto ThreadPool::enqueue(F&& f, Args&&... args)

-> std::future::type>

{

using return_type = typename std::result_of::type;

auto task = std::make_shared< std::packaged_task >(

std::bind(std::forward(f), std::forward(args)...)

);

std::future res = task->get_future();

{

std::unique_lock lock(queue_mutex);

// don't allow enqueueing after stopping the pool

if(stop)

throw std::runtime_error("enqueue on stopped ThreadPool");

tasks.emplace([task](){ (*task)(); });

}

condition.notify_one();

return res;

}

enqueue 方法是模板函数,传入可调用对象F和任意数量的的参数args,,返回一个future对象,返回线程异步操作的结果。

using return_type = typename std::result_of::type;

首先,定义返回类型return_type,表示传入的可调用对象的返回值的类型。

auto task = std::make_shared< std::packaged_task >(

std::bind(std::forward(f), std::forward(args)...)

);

程序创建智能指针task,其指向了一个使用bind绑定的可调用对象(该对象调用f,并传入参数args),再使用packaged_task包装成可调用对象。创建智能指针的目的是为了其他线程的使用。

std::future res = task->get_future();

{

std::unique_lock lock(queue_mutex);

if(stop)throw std::runtime_error("enqueue on stopped ThreadPool");

tasks.emplace([task](){ (*task)(); });

}

使用res保存任务线程的异步结果,并作为返回值。

然后在代码块中使用互斥锁加锁,然后将任务加入任务队列中。

最后通知线程池中的一个线程处理任务并返回res。

ThreadPool 析构函数

看注释就可以了:

inline ThreadPool::~ThreadPool()

{

{

std::unique_lock lock(queue_mutex);

stop = true;

//表示线程池停止。

}

condition.notify_all(); // 通知所有线程

for(std::thread &worker: workers)

worker.join(); // 等待所有线程结束

}

总结:

ThreadPool 的运行步骤可以分为以下几步:

创建ThreadPool对象,传入线程池工作线程数量。在线程池中填加工作线程,并堵塞等待任务线程的通知。

调用enqueue方法,传入可调用对象和参数。在该方法中,enqueue先通过一系列操作调整传入的参数,再将其加入任务队列。

以上操作完成后,通知线程池中的一个线程处理任务。在线程池中取出任务队列的当前最先进来的任务处理。

处理完任务将结果保存到enqueue里的异步返回结果的future对象中,并通过enqueue返回。

ThreadPool对象被销毁时,将标志stop设置为true,并会通知所有堵塞线程,等待线程池中的所有线程结束。

ThreadPool 实现简单的线程池,使用简单的先进先出策略调度任务,如果可以使用更加复杂的策略,我们可以自己修改代码。

相关发现

手机取证系统恢复数据,公安局恢复手机数据取证需要几天
贝锐花生壳对比NAT123,内网穿透工具如何选?
365是正规平台吗

贝锐花生壳对比NAT123,内网穿透工具如何选?

🌼 08-07 🌻 7177
墨的成语
bst365app

墨的成语

🌼 07-28 🌻 6607
真宙,又名真优秀!
365是正规平台吗

真宙,又名真优秀!

🌼 08-06 🌻 820