基于C++11的线程池(threadpool)简洁且可以带任意多的参数
源代码来自Github上作者progschj,地址为:A simple C++11 Thread Pool implementation,具体博客可以参见Jakob’s Devlog,地址为:A Thread Pool with C++11
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 | #ifndef THREAD_POOL_H #define THREAD_POOL_H #include <vector> #include <queue> #include <memory> #include <thread> #include <mutex> #include <condition_variable> #include <future> #include <functional> #include <stdexcept> class ThreadPool { public : ThreadPool( size_t ); template < class F, class ... Args> auto enqueue(F&& f, Args&&... args) -> std::future< typename std::result_of<F(Args...)>::type>; ~ThreadPool(); private : // need to keep track of threads so we can join them std::vector< std:: thread > workers; // the task queue std::queue< std::function< void ()> > tasks; // synchronization std::mutex queue_mutex; std::condition_variable condition; bool stop; }; // the constructor just launches some amount of workers inline ThreadPool::ThreadPool( size_t threads) : stop( false ) { for ( size_t i = 0;i<threads;++i) workers.emplace_back( [ this ] { for (;;) { std::function< void ()> task; { std::unique_lock<std::mutex> lock( this ->queue_mutex); this ->condition.wait(lock, [ this ]{ return this ->stop || ! this ->tasks.empty(); }); if ( this ->stop && this ->tasks.empty()) return ; task = std::move( this ->tasks.front()); this ->tasks.pop(); } task(); } } ); } // add new work item to the pool template < class F, class ... Args> auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future< typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of<F(Args...)>::type; auto task = std::make_shared< std::packaged_task<return_type()> >( std::bind(std::forward<F>(f), std::forward<Args>(args)...) ); std::future<return_type> res = task->get_future(); { std::unique_lock<std::mutex> lock(queue_mutex); // don't allow enqueueing after stopping the pool if (stop) throw std::runtime_error( "enqueue on stopped ThreadPool" ); tasks.emplace([task](){ (*task)(); }); } condition.notify_one(); return res; } // the destructor joins all threads inline ThreadPool::~ThreadPool() { { std::unique_lock<std::mutex> lock(queue_mutex); stop = true ; } condition.notify_all(); for (std:: thread &worker: workers) worker.join(); } #endif |