|
11 | 11 | #ifndef THREAD_POOL_H |
12 | 12 | #define THREAD_POOL_H |
13 | 13 |
|
14 | | -#include <vector> // std::vector |
15 | | -#include <queue> // std::queue |
16 | | -#include <memory> // std::make_shared |
| 14 | +#include <memory> // std::make_shared |
| 15 | +#include <queue> // std::queue |
| 16 | +#include <vector> // std::vector |
17 | 17 |
|
18 | | -#include <thread> // std::thread |
19 | | -#include <mutex> // std::mutex, std::unique_lock |
20 | | -#include <condition_variable> // std::condition_variable |
21 | | -#include <future> // std::future, std::packaged_task |
| 18 | +#include <condition_variable> // std::condition_variable |
| 19 | +#include <future> // std::future, std::packaged_task |
| 20 | +#include <mutex> // std::mutex, std::unique_lock |
| 21 | +#include <thread> // std::thread |
22 | 22 |
|
23 | | -#include <functional> // std::function, std::bind |
24 | | -#include <stdexcept> // std::runtime_error |
25 | | -#include <utility> // std::move, std::forward |
| 23 | +#include <functional> // std::function, std::bind |
| 24 | +#include <stdexcept> // std::runtime_error |
| 25 | +#include <utility> // std::move, std::forward |
26 | 26 |
|
27 | 27 | class ThreadPool { |
28 | 28 | public: |
29 | | - |
30 | | - // initialize the number of concurrency threads |
31 | | - ThreadPool(size_t); |
32 | | - |
33 | | - // enqueue new thread task |
34 | | - template<class F, class... Args> |
35 | | - decltype(auto) enqueue(F&& f, Args&&... args); |
36 | | - |
37 | | - // destroy thread pool and all created threads |
38 | | - ~ThreadPool(); |
| 29 | + // initialize the number of concurrency threads |
| 30 | + ThreadPool(size_t); |
| 31 | + |
| 32 | + // enqueue new thread task |
| 33 | + template <class F, class... Args> |
| 34 | + decltype(auto) enqueue(F &&f, Args &&...args); |
| 35 | + |
| 36 | + // destroy thread pool and all created threads |
| 37 | + ~ThreadPool(); |
| 38 | + |
39 | 39 | private: |
40 | | - |
41 | | - // thread list, stores all threads |
42 | | - std::vector< std::thread > workers; |
43 | | - // queue task, the type of queue elements are functions with void return type |
44 | | - std::queue< std::function<void()> > tasks; |
45 | | - |
46 | | - // for synchonization |
47 | | - std::mutex queue_mutex; |
48 | | - // std::condition_variable is a new feature from c++11, |
49 | | - // it's a synchronization primitives. it can be used |
50 | | - // to block a thread or threads at the same time until |
51 | | - // all of them modified condition_variable. |
52 | | - std::condition_variable condition; |
53 | | - bool stop; |
| 40 | + // thread list, stores all threads |
| 41 | + std::vector<std::thread> workers; |
| 42 | + // queue task, the type of queue elements are functions with void return type |
| 43 | + std::queue<std::function<void()>> tasks; |
| 44 | + |
| 45 | + // for synchonization |
| 46 | + std::mutex queue_mutex; |
| 47 | + // std::condition_variable is a new feature from c++11, |
| 48 | + // it's a synchronization primitives. it can be used |
| 49 | + // to block a thread or threads at the same time until |
| 50 | + // all of them modified condition_variable. |
| 51 | + std::condition_variable condition; |
| 52 | + bool stop; |
54 | 53 | }; |
55 | | - |
| 54 | + |
56 | 55 | // constructor initialize a fixed size of worker |
57 | | -inline ThreadPool::ThreadPool(size_t threads): stop(false) { |
58 | | - // initialize worker |
59 | | - for(size_t i = 0;i<threads;++i) |
60 | | - // std::vector::emplace_back : |
61 | | - // append to the end of vector container |
62 | | - // this element will be constructed at the end of container, without copy and move behavior |
63 | | - workers.emplace_back([this] { // the lambda express capture this, i.e. the instance of thread pool |
64 | | - // avoid fake awake |
65 | | - for(;;) { |
66 | | - // define function task container, return type is void |
67 | | - std::function<void()> task; |
68 | | - |
69 | | - // critical section |
70 | | - { |
71 | | - // get mutex |
72 | | - std::unique_lock<std::mutex> lock(this->queue_mutex); |
73 | | - |
74 | | - // block current thread |
75 | | - this->condition.wait(lock, |
76 | | - [this]{ return this->stop || !this->tasks.empty(); }); |
77 | | - |
78 | | - // return if queue empty and task finished |
79 | | - if(this->stop && this->tasks.empty()) |
80 | | - return; |
81 | | - |
82 | | - // otherwise execute the first element of queue |
83 | | - task = std::move(this->tasks.front()); |
84 | | - this->tasks.pop(); |
85 | | - } |
86 | | - |
87 | | - // execution |
88 | | - task(); |
89 | | - } |
90 | | - } |
91 | | - ); |
| 56 | +inline ThreadPool::ThreadPool(size_t threads) : stop(false) { |
| 57 | + // initialize worker |
| 58 | + while (threads--) |
| 59 | + // std::vector::emplace_back : |
| 60 | + // append to the end of vector container |
| 61 | + // this element will be constructed at the end of container, without copy |
| 62 | + // and move behavior |
| 63 | + workers.emplace_back([this] { // the lambda express capture this, i.e. the |
| 64 | + // instance of thread pool |
| 65 | + // avoid fake awake |
| 66 | + for (;;) { |
| 67 | + // define function task container, return type is void |
| 68 | + std::function<void()> task; |
| 69 | + |
| 70 | + // critical section |
| 71 | + { |
| 72 | + // get mutex |
| 73 | + std::unique_lock<std::mutex> lock(this->queue_mutex); |
| 74 | + |
| 75 | + // block current thread |
| 76 | + this->condition.wait( |
| 77 | + lock, [this] { return this->stop || !this->tasks.empty(); }); |
| 78 | + |
| 79 | + // return if queue empty and task finished |
| 80 | + if (this->stop && this->tasks.empty()) |
| 81 | + return; |
| 82 | + |
| 83 | + // otherwise execute the first element of queue |
| 84 | + task = std::move(this->tasks.front()); |
| 85 | + this->tasks.pop(); |
| 86 | + } |
| 87 | + |
| 88 | + // execution |
| 89 | + task(); |
| 90 | + } |
| 91 | + }); |
92 | 92 | } |
93 | 93 |
|
94 | 94 | // Enqueue a new thread |
95 | | -// use variadic templates and tail return type |
96 | | -template<class F, class... Args> |
97 | | -decltype(auto) ThreadPool::enqueue(F&& f, Args&&... args) { |
98 | | - // deduce return type |
99 | | - using return_type = typename std::result_of<F(Args...)>::type; |
100 | | - |
101 | | - // fetch task |
102 | | - auto task = std::make_shared<std::packaged_task<return_type()>>( |
103 | | - std::bind(std::forward<F>(f), std::forward<Args>(args)...) |
104 | | - ); |
105 | | - |
106 | | - std::future<return_type> res = task->get_future(); |
107 | | - |
108 | | - // critical section |
109 | | - { |
110 | | - std::unique_lock<std::mutex> lock(queue_mutex); |
111 | | - |
112 | | - // avoid add new thread if theadpool is destroyed |
113 | | - if(stop) |
114 | | - throw std::runtime_error("enqueue on stopped ThreadPool"); |
115 | | - |
116 | | - // add thread to queue |
117 | | - tasks.emplace([task]{ (*task)(); }); |
118 | | - } |
119 | | - |
120 | | - // notify a wait thread |
121 | | - condition.notify_one(); |
122 | | - return res; |
| 95 | +// use variadic templates and tail return type |
| 96 | +template <class F, class... Args> |
| 97 | +decltype(auto) ThreadPool::enqueue(F &&f, Args &&...args) { |
| 98 | + // deduce return type |
| 99 | + using return_type = typename std::invoke_result<F, Args...>::type; |
| 100 | + |
| 101 | + // fetch task |
| 102 | + auto task = std::make_shared<std::packaged_task<return_type()>>( |
| 103 | + std::bind(std::forward<F>(f), std::forward<Args>(args)...)); |
| 104 | + |
| 105 | + std::future<return_type> res = task->get_future(); |
| 106 | + |
| 107 | + // critical section |
| 108 | + { |
| 109 | + std::unique_lock<std::mutex> lock(queue_mutex); |
| 110 | + |
| 111 | + // avoid add new thread if theadpool is destroyed |
| 112 | + if (stop) |
| 113 | + throw std::runtime_error("enqueue on stopped ThreadPool"); |
| 114 | + |
| 115 | + // add thread to queue |
| 116 | + tasks.emplace([task] { (*task)(); }); |
| 117 | + } |
| 118 | + |
| 119 | + // notify a wait thread |
| 120 | + condition.notify_one(); |
| 121 | + return res; |
123 | 122 | } |
124 | 123 |
|
125 | 124 | // destroy everything |
126 | | -inline ThreadPool::~ThreadPool() |
127 | | -{ |
128 | | - // critical section |
129 | | - { |
130 | | - std::unique_lock<std::mutex> lock(queue_mutex); |
131 | | - stop = true; |
132 | | - } |
133 | | - |
134 | | - // wake up all threads |
135 | | - condition.notify_all(); |
136 | | - |
137 | | - // let all processes into synchronous execution, use c++11 new for-loop: for(value:values) |
138 | | - for(std::thread &worker: workers) |
139 | | - worker.join(); |
| 125 | +inline ThreadPool::~ThreadPool() { |
| 126 | + // critical section |
| 127 | + std::unique_lock<std::mutex> lock(queue_mutex); |
| 128 | + stop = true; |
| 129 | + |
| 130 | + // wake up all threads |
| 131 | + condition.notify_all(); |
| 132 | + |
| 133 | + // let all processes into synchronous execution, use c++11 new for-loop: |
| 134 | + // for(value:values) |
| 135 | + for (std::thread &worker : workers) |
| 136 | + worker.join(); |
140 | 137 | } |
141 | 138 |
|
142 | 139 | #endif |
0 commit comments