什么是线程池

线程池是一项程序开发人员以简单和有效的方式去利用现代处理器的并发性来榨取处理器性能的方法。简而言之,线程池通过对线程的有效管理,提高了CPU的并发性。一般的流程是,提交一项任务后,线程池分配线程在不阻塞主线程的情况下完成这项工作。并且,线程池并不是每提交一项任务初始化一次,而是一次初始化,保持非活跃状态直至一些任务完成。这样的话,也减小了系统开销。

线程池的原理示意图如下:

20230115133605

什么情况下需要使用线程池

有这样一个项目:公司需要为某某超市做一套对每天进入超市的顾客做用户画像的系统。基本流程是:对海康摄像头拍摄到的每一帧画面做人脸检测,然后对每个人脸进行年龄、性别和特征点的计算,最后将结果post到服务端进行后续处理。大家都知道人脸相关算法耗时是较高的,如果所有计算任务都放在主线程进行,那么势必会阻塞主线程的处理流程,无法做到实时处理。使用多线程技术是大家自然而然想到的方案,对每一帧都创建一个新的线程来做这系列的处理是否合理呢?相信大家都知道,线程的创建和销毁都是需要时间的,在上述的场景中必然会频繁的创建和销毁线程,这样的开销相信是不能接受的,此时线程池技术便是很好的选择。

另外,在一些高并发的网络应用中,线程池也是常用的技术。陈硕大神推荐的C++多线程服务端编程模式为:one loop per thread + thread pool,通常会有单独的线程负责接受来自客户端的请求,对请求稍作解析后将数据处理的任务提交到专门的计算线程池。

如何实现一个简易的线程池

首先,我们使用一个队列来存储任务序列。这一部分的核心就是怎么实现将一个任务提交到任务序列,也就是入队。

一个任务,一般可以表示为一个函数。那么我们需要传入的参数就有一个函数指针以及这个函数的所有参数。其中一个问题是这个函数的参数往往不是不确定的,类型也是未知的,因此,我们需要使用可变参数函数模板。二者,里面的一个难点在于怎么将各种不同的类型统一。我们使用std::bind+std::funtion+std::packaged_task等层层封装。最后就是同步和互斥操作。

接着,就是线程池的工作过程。我们用一个vector来存储线程。主要的思路就是在没任务的时候保持线程处于非活跃状态,有任务的时候进行处理。其中也利用了lambda表达式的特性,以及同步和互斥操作。

最后,就是线程池的析构实现。用一个bool变量来存储是否要停止使用线程池,当需要析构的时候,唤醒所有线程,并且逐个join。

示例代码

头文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#include<thread>
#include<condition_variable>
#include<mutex>
#include<vector>
#include<queue>
#include<future>

class ThreadPool{
private:
bool m_stop;
std::vector<std::thread>m_thread;
std::queue<std::function<void()>>tasks;
std::mutex m_mutex;
std::condition_variable m_cv;

public:
explicit ThreadPool(size_t threadNumber):m_stop(false){
for(size_t i=0;i<threadNumber;++i)
{
m_thread.emplace_back(
[this](){
for(;;)
{
std::function<void()>task;
{
std::unique_lock<std::mutex>lk(m_mutex);
m_cv.wait(lk,[this](){ return m_stop||!tasks.empty();});
if(m_stop&&tasks.empty()) return;
task=std::move(tasks.front());
tasks.pop();
}
task();
}
}
);
}
}

ThreadPool(const ThreadPool &) = delete;
ThreadPool(ThreadPool &&) = delete;

ThreadPool & operator=(const ThreadPool &) = delete;
ThreadPool & operator=(ThreadPool &&) = delete;

~ThreadPool(){
{
std::unique_lock<std::mutex>lk(m_mutex);
m_stop=true;
}
m_cv.notify_all();
for(auto& threads:m_thread)
{
threads.join();
}
}

template<typename F,typename... Args>
auto submit(F&& f,Args&&... args)->std::future<decltype(f(args...))>{
auto taskPtr=std::make_shared<std::packaged_task<decltype(f(args...))()>>(
std::bind(std::forward<F>(f),std::forward<Args>(args)...)
);
{
std::unique_lock<std::mutex>lk(m_mutex);
if(m_stop) throw std::runtime_error("submit on stopped ThreadPool");
tasks.emplace([taskPtr](){ (*taskPtr)(); });
}
m_cv.notify_one();
return taskPtr->get_future();

}
};

测试代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#include<iostream>
#include<vector>
#include<future>
#include<functional>
#include"ThreadPool.h"
#include<random>

std::random_device rd;
std::mt19937 mt(rd());
std::uniform_int_distribution<int> dist(-1000, 1000);
auto rnd = std::bind(dist, mt);

void simulate_hard_computation() {
std::this_thread::sleep_for(std::chrono::milliseconds(2000 + rnd()));
}

void multiply(const int a, const int b) {
simulate_hard_computation();
const int res = a * b;
std::cout << a << " * " << b << " = " << res << std::endl;
}


void multiply_output(int & out, const int a, const int b) {
simulate_hard_computation();
out = a * b;
std::cout << a << " * " << b << " = " << out << std::endl;
}


int multiply_return(const int a, const int b) {
simulate_hard_computation();
const int res = a * b;
std::cout << a << " * " << b << " = " << res << std::endl;
return res;
}


int main(void)
{
ThreadPool pool(4);

for(int i=0;i<8;i++)
{
pool.submit(multiply,i,i+1);
}

system("pause");
return 0;
}

抄袭资料

看得懂英文强烈建议看一下资料1!

  1. https://github.com/mtrebi/thread-pool
  2. https://zhuanlan.zhihu.com/p/61464921