任务队列和线程池队列(C++11)
任务队列可以认为是执行同一个方法来处理数据的队列,指定回调函数。
线程池就是先开辟好多个线程,然后将要执行的方法+参数丢到线程池中,支持返回值获取。
均依赖了无锁队列库 https://github.com/cameron314/concurrentqueue
本来不想依赖三方组件的,但是测试发现这个库效率是真的高,比带锁的队列快非常多。
而且vs中的std::queue是有BUG的,pop后不释放内存,也有一定的隐患(急死强迫症)
任务队列库TaskQueue:
#pragma once #include <functional> #include <vector> #include <thread> #include <future> #include <atomic> #include <chrono> #include <mutex> #include "queue/blockingconcurrentqueue.h" //for tsBlockQuque template <class T, class P = void*> class CTaskQueue { private: typedef std::function<void(P ¶m)> functionTaskInit; typedef std::function<void(P ¶m)> functionTaskExit; typedef std::function<bool(T &data)> functionTaskCallback; typedef std::function<bool(T &data, P ¶m)> functionTaskExCallback; //退出的时候用的回调 typedef std::function<void(uint64_t, uint64_t)> functionWaitCallback; //退出标记 bool m_bExit; //检测队列的等待时间,毫秒 int m_nWaitTime; //延时模式开关 bool m_bDelayMode; //延时模式下,启动新工作线程需要加锁 std::mutex m_mtStartWorkThread; //最大线程数 size_t m_nMaxThreadCount; //添加的总数量的数量 std::atomic<uint64_t> m_nTotalCount; //已处理完毕的数量 std::atomic<uint64_t> m_nProcessedCount; //任务队列 tsBlockQuque <T> m_taskQueue; //单参数的回调 functionTaskCallback m_TaskCallBack; //带扩展参数的回调 functionTaskInit m_TaskInitCallBack; functionTaskExit m_TaskExitCallBack; functionTaskExCallback m_TaskExCallBack; //工作线程 std::vector <std::future<int>> m_arrWorkThreads; //启动工作线程 void _StartWorkThread(size_t nThreadCount) { std::lock_guard<std::mutex> lock(m_mtStartWorkThread); if (m_arrWorkThreads.size() >= m_nMaxThreadCount) return; for (size_t i = 0; i < nThreadCount; i++) { m_arrWorkThreads.emplace_back(std::async(std::launch::async, [this]()->int{ P param; if (m_TaskInitCallBack) m_TaskInitCallBack(param); while (true) { T data; if (m_taskQueue.wait_dequeue_timed(data, std::chrono::milliseconds(m_nWaitTime))) { //找到一组数据 回调 if (m_TaskCallBack) { if (!m_TaskCallBack(data)) { //处理失败了 重新丢回队列里面去 m_taskQueue.enqueue(data); continue; } } else if (m_TaskExCallBack) { if (!m_TaskExCallBack(data, param)) { //处理失败了 重新丢回队列里面去 m_taskQueue.enqueue(data); continue; } } m_nProcessedCount++; continue; } //结束标记并且队列没有数据了 返回 if (m_bExit) break; } if (m_TaskExitCallBack) m_TaskExitCallBack(param); return 0; })); } //创建完毕工作线程后,重置一下标记 if (m_bDelayMode && m_arrWorkThreads.size() >= m_nMaxThreadCount) m_bDelayMode = false; } public: //禁止拷贝和移动 CTaskQueue(const CTaskQueue&) = delete; CTaskQueue& operator=(const CTaskQueue&) = delete; /* 构造函数: bDelayMode: 延时模式 true:延时启动工作线程,有需要的时候启动 false:立即启动工作线程 nWaitTime:等待队列的超时时间,毫秒 */ CTaskQueue(bool bDelayMode = false, int nWaitTime = 200) { m_bExit = false; m_nTotalCount = 0; m_nProcessedCount = 0; m_bDelayMode = bDelayMode; m_nWaitTime = nWaitTime; } ~CTaskQueue() { m_bExit = true; for (auto& fu : m_arrWorkThreads) { try { fu.get(); } catch (...){} } } /* 简单的回调方法的初始化 参数: 线程数 回调方法 */ void InitTaskQueue(size_t nMaxThreadCount, functionTaskCallback fnTaskCallback) { m_TaskCallBack = fnTaskCallback; m_TaskExCallBack = nullptr; m_TaskInitCallBack = nullptr; m_TaskExitCallBack = nullptr; m_nMaxThreadCount = nMaxThreadCount; if (!m_bDelayMode) _StartWorkThread(nMaxThreadCount); } /* 带队列线程初始化和销毁回调方法的初始化 参数: 线程数 数据处理回调方法 队列线程初始化回调 队列线程销毁回调 */ void InitTaskQueue(int nMaxThreadCount, functionTaskExCallback fnTaskExCallback, functionTaskInit fnTaskInitCallback, functionTaskExit fnTaskExitCallback) { m_TaskCallBack = nullptr; m_TaskExCallBack = fnTaskExCallback; m_TaskInitCallBack = fnTaskInitCallback; m_TaskExitCallBack = fnTaskExitCallback; m_nMaxThreadCount = nMaxThreadCount; if (!m_bDelayMode) _StartWorkThread(nMaxThreadCount); } //添加数据 void AddData(const T &data) { if (m_bDelayMode) { //延时模式下,如果任务还没有处理完毕,就增加一个工作线程 if (!IsFinished() || m_arrWorkThreads.size() == 0) { //内部有工作线程数检测,达到最大线程数后自动关闭延时模式 _StartWorkThread(1); } } assert(m_arrWorkThreads.size()); m_taskQueue.enqueue(data); m_nTotalCount++; } //获取当前工作线程数 size_t GetWorkThreadCount() { return m_arrWorkThreads.size(); } //读取当前队列中待处理的数量 uint64_t GetQueueSize() { return m_taskQueue.size_approx(); } //读取已经处理过的数量 uint64_t GetProcessedCount() { return m_nProcessedCount; } //读取所有加入过队列的数量 uint64_t GetTotalCount() { return m_nTotalCount; } //查询队列是否处理完毕 立即返回 bool IsFinished() { if (m_nTotalCount == m_nProcessedCount && GetQueueSize() == 0) return true; return false; } //等待队列结束 阻塞执行 void WaitForFinish(int nCheckms = 100, functionWaitCallback fnWaitCallback = nullptr) { printf("Wait for finish...\n"); while (!IsFinished()) { if (fnWaitCallback) fnWaitCallback(m_nProcessedCount, m_nTotalCount); std::this_thread::sleep_for(std::chrono::milliseconds(nCheckms)); } if (fnWaitCallback) fnWaitCallback(m_nProcessedCount, m_nTotalCount); printf("All finished. \n"); } };
示例代码:
// CTaskQueue<int, int> taskTester; //第二个int可忽略 //单一回调 taskTester.InitTaskQueue(8, [](int data)->bool{ printf("task:%d\n", data); return true; //返回false会重新插回队列中 } //多个回调的初始化方法 taskTester.InitTaskQueue(8, [](int data, int ¶m)->bool{ printf("task:%d, param:%d\n", data, param); return true; }, [](int ¶m){ param = GetCurrentThreadId(); printf("task work thread init:%d \n", param); }, [](int ¶m){ printf("task work thread exit:%d \n", param); }); for (int i = 0; i < 10000; i++) { taskTester.AddData(i); } taskTester.WaitforFinish(100, [](uint64_t processed, uint64_t total){ uint64_t ret = (processed * 100) / total; printf("processed:%I64d, total:%I64d, %d%% \n", processed, total, (int)ret); });
线程池是用之前的帖子中的线程池修改的,增加了几个方法,修正了vs下的BUG 。。。
#pragma once #include <vector> #include <memory> #include <thread> #include <future> #include <functional> #include <stdexcept> #include <atomic> #include <chrono> #include <mutex> #include "queue/blockingconcurrentqueue.h" //for tsBlockQuque class ThreadPool { private: //退出时回调方法声明 typedef std::function<void(uint64_t, uint64_t)> functionWaitCallback; //工作线程 std::vector<std::future<int>> m_arrWorkThreads; //任务队列 tsBlockQuque < std::function<void()> > m_TaskQueue; //退出标记 bool m_bExit; //检测队列的等待时间,毫秒 int m_nWaitTime; //延时模式开关 bool m_bDelayMode; //延时模式下,启动新工作线程需要加锁 std::mutex m_mtStartWorkThread; //最大线程数 size_t m_nMaxThreadCount; //添加的总数量的数量 std::atomic<uint64_t> m_nTotalCount; //已处理完毕的数量 std::atomic<uint64_t> m_nProcessedCount; //启动工作线程 void _StartWorkThread(size_t nThreadCount) { std::lock_guard<std::mutex> lock(m_mtStartWorkThread); if (m_arrWorkThreads.size() >= m_nMaxThreadCount) return; for (size_t i = 0; i < nThreadCount; i++) { m_arrWorkThreads.emplace_back(std::async(std::launch::async, [this]()->int{ printf("thread pool thread start...\n"); while (true) { std::function<void()> task; if (m_TaskQueue.wait_dequeue_timed(task, std::chrono::milliseconds(m_nWaitTime))) { task(); m_nProcessedCount++; continue; } //结束标记并且队列没有数据了 返回 if (m_bExit) break; } printf("thread pool thread exit...\n"); return 0; })); } //创建完毕工作线程后,重置一下标记 if (m_bDelayMode && m_arrWorkThreads.size() >= m_nMaxThreadCount) m_bDelayMode = false; } public: //禁止拷贝和移动 ThreadPool(const ThreadPool&) = delete; ThreadPool& operator=(const ThreadPool&) = delete; /* 构造函数 nMaxThreadCount:最大工作线程数 bDelayMode:延时模式 true:延时启动工作线程,有需要的时候启动 false:立即启动工作线程 nWaitTime:等待队列的超时时间,毫秒 */ ThreadPool(size_t nMaxThreadCount, bool bDelayMode = false, int nWaitTime = 200) { m_nTotalCount = 0; m_nProcessedCount = 0; m_bExit = false; InitThreadPool(nMaxThreadCount, bDelayMode, nWaitTime); } ThreadPool() { m_nTotalCount = 0; m_nProcessedCount = 0; m_bExit = false; } ~ThreadPool() { m_bExit = true; for (auto& fu : m_arrWorkThreads) { try { fu.get(); } catch (...){} } } /* 初始化线程池,如果使用带参数的构造函数初始化,则无需调用此方法 nMaxThreadCount:最大工作线程数 bDelayMode:延时模式 true:延时启动工作线程,有需要的时候启动 false:立即启动工作线程 nWaitTime:等待队列的超时时间,毫秒 */ void InitThreadPool(size_t nMaxThreadCount, bool bDelayMode = false, int nWaitTime = 200) { m_nMaxThreadCount = nMaxThreadCount; m_bDelayMode = bDelayMode; m_nWaitTime = nWaitTime; if (!m_bDelayMode) _StartWorkThread(nMaxThreadCount); } //添加任务 template<class F, class... Args> auto AddTask(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> { if (m_bDelayMode) { //延时模式下,如果任务还没有处理完毕,就增加一个工作线程 if (!IsFinished() || m_arrWorkThreads.size() == 0) { //内部有工作线程数检测,达到最大线程数后自动关闭延时模式 _StartWorkThread(1); } } assert(m_arrWorkThreads.size()); using return_type = typename std::result_of<F(Args...)>::type; auto task = std::make_shared< std::packaged_task<return_type()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...)); std::future<return_type> res = task->get_future(); m_TaskQueue.enqueue([task](){ (*task)(); }); m_nTotalCount++; return res; } //获取当前工作线程数 size_t GetWorkThreadCount() { return m_arrWorkThreads.size(); } //获取总共插入过的任务数 uint64_t GetTotalCount() { return m_nTotalCount; } //获取已经处理完毕的任务数 uint64_t GetProcessedCount() { return m_nProcessedCount; } //读取当前队列中待处理的数量 uint64_t GetQueueSize() { return m_TaskQueue.size_approx(); } //判断是否已经执行完毕已经插入的任务 bool IsFinished() { if (m_nProcessedCount == m_nTotalCount && m_TaskQueue.size_approx() == 0) return true; return false; } //阻塞等待队列执行完毕 void WaitForFinish(int nCheckms = 100, functionWaitCallback fnWaitCallback = nullptr) { printf("Wait for finish...\n"); while (!IsFinished()) { if (fnWaitCallback) fnWaitCallback(m_nProcessedCount, m_nTotalCount); std::this_thread::sleep_for(std::chrono::milliseconds(nCheckms)); } if (fnWaitCallback) fnWaitCallback(m_nProcessedCount, m_nTotalCount); printf("All finished. \n"); } };
最后说说vs神奇的BUG。。。
#include <iostream> #include <string> #include <vector> #include <thread> class MyClass { private: std::vector<std::thread> arrthreads; bool bexit; public: MyClass() { bexit = false; }; ~MyClass() { bexit = true; for (auto &x : arrthreads) { x.join(); //vs编译的这里会卡死,gcc编译的正常执行 } }; void init() { for (int i = 0; i < 4; i++) { arrthreads.emplace_back([this](){ printf("trhead start...\n"); while (true) { if (bexit) break; } printf("trhead exit...\n"); }); } }; }; MyClass testthread; int main() { printf("main2\n"); testthread.init(); return 0; }
MyClass析构函数中让工作线程退出,毫无问题,但是vs编译的,join会卡死阻塞或者崩溃跑飞,gcc编译的无问题。
各位大师有何看法。。。