BlackFeather'S Blog

首页 | |

任务队列和线程池队列(C++11)


任务队列可以认为是执行同一个方法来处理数据的队列,指定回调函数。

线程池就是先开辟好多个线程,然后将要执行的方法+参数丢到线程池中,支持返回值获取。


均依赖了无锁队列库 https://github.com/cameron314/concurrentqueue

本来不想依赖三方组件的,但是测试发现这个库效率是真的高,比带锁的队列快非常多。

而且vs中的std::queue是有BUG的,pop后不释放内存,也有一定的隐患(急死强迫症)


任务队列库TaskQueue:

#pragma once

#include <functional>
#include <vector>
#include <thread>
#include <future>
#include <atomic>
#include <chrono>
#include <mutex>

#include "queue/blockingconcurrentqueue.h" //for tsBlockQuque

template <class T, class P = void*>
class CTaskQueue
{
private:
typedef std::function<void(P &param)> functionTaskInit;
typedef std::function<void(P &param)> functionTaskExit;
typedef std::function<bool(T &data)> functionTaskCallback;
typedef std::function<bool(T &data, P &param)> functionTaskExCallback;
//退出的时候用的回调
typedef std::function<void(uint64_t, uint64_t)> functionWaitCallback;

//退出标记
bool m_bExit;
//检测队列的等待时间,毫秒
int m_nWaitTime;
//延时模式开关
bool m_bDelayMode;
//延时模式下,启动新工作线程需要加锁
std::mutex m_mtStartWorkThread;
//最大线程数
size_t m_nMaxThreadCount;
//添加的总数量的数量
std::atomic<uint64_t> m_nTotalCount;
//已处理完毕的数量
std::atomic<uint64_t> m_nProcessedCount;
//任务队列
tsBlockQuque <T> m_taskQueue;

//单参数的回调
functionTaskCallback m_TaskCallBack;

//带扩展参数的回调
functionTaskInit m_TaskInitCallBack;
functionTaskExit m_TaskExitCallBack;
functionTaskExCallback m_TaskExCallBack;

//工作线程
std::vector <std::future<int>> m_arrWorkThreads;

//启动工作线程
void _StartWorkThread(size_t nThreadCount)
{
std::lock_guard<std::mutex> lock(m_mtStartWorkThread);

if (m_arrWorkThreads.size() >= m_nMaxThreadCount)
return;

for (size_t i = 0; i < nThreadCount; i++)
{
m_arrWorkThreads.emplace_back(std::async(std::launch::async, [this]()->int{

P param;
if (m_TaskInitCallBack)
m_TaskInitCallBack(param);

while (true)
{
T data;
if (m_taskQueue.wait_dequeue_timed(data, std::chrono::milliseconds(m_nWaitTime)))
{
//找到一组数据 回调
if (m_TaskCallBack)
{
if (!m_TaskCallBack(data))
{
//处理失败了 重新丢回队列里面去
m_taskQueue.enqueue(data);
continue;
}
}
else if (m_TaskExCallBack)
{
if (!m_TaskExCallBack(data, param))
{
//处理失败了 重新丢回队列里面去
m_taskQueue.enqueue(data);
continue;
}
}

m_nProcessedCount++;
continue;
}

//结束标记并且队列没有数据了 返回
if (m_bExit)
break;
}

if (m_TaskExitCallBack)
m_TaskExitCallBack(param);

return 0;
}));
}

//创建完毕工作线程后,重置一下标记
if (m_bDelayMode && m_arrWorkThreads.size() >= m_nMaxThreadCount)
m_bDelayMode = false;
}
public:

//禁止拷贝和移动
CTaskQueue(const CTaskQueue&) = delete;
CTaskQueue& operator=(const CTaskQueue&) = delete;

/*
构造函数:
bDelayMode: 延时模式
true:延时启动工作线程,有需要的时候启动
false:立即启动工作线程
nWaitTime:等待队列的超时时间,毫秒
*/
CTaskQueue(bool bDelayMode = false, int nWaitTime = 200)
{
m_bExit = false;
m_nTotalCount = 0;
m_nProcessedCount = 0;
m_bDelayMode = bDelayMode;
m_nWaitTime = nWaitTime;
}
~CTaskQueue()
{
m_bExit = true;

for (auto& fu : m_arrWorkThreads)
{
try
{
fu.get();
}
catch (...){}
}
}

/*
简单的回调方法的初始化
参数:
线程数
回调方法
*/
void InitTaskQueue(size_t nMaxThreadCount, functionTaskCallback fnTaskCallback)
{
m_TaskCallBack = fnTaskCallback;
m_TaskExCallBack = nullptr;
m_TaskInitCallBack = nullptr;
m_TaskExitCallBack = nullptr;
m_nMaxThreadCount = nMaxThreadCount;

if (!m_bDelayMode)
_StartWorkThread(nMaxThreadCount);
}
/*
带队列线程初始化和销毁回调方法的初始化
参数:
线程数
数据处理回调方法
队列线程初始化回调
队列线程销毁回调
*/
void InitTaskQueue(int nMaxThreadCount, functionTaskExCallback fnTaskExCallback, functionTaskInit fnTaskInitCallback, functionTaskExit fnTaskExitCallback)
{
m_TaskCallBack = nullptr;
m_TaskExCallBack = fnTaskExCallback;
m_TaskInitCallBack = fnTaskInitCallback;
m_TaskExitCallBack = fnTaskExitCallback;
m_nMaxThreadCount = nMaxThreadCount;

if (!m_bDelayMode)
_StartWorkThread(nMaxThreadCount);
}

//添加数据
void AddData(const T &data)
{
if (m_bDelayMode)
{
//延时模式下,如果任务还没有处理完毕,就增加一个工作线程
if (!IsFinished() || m_arrWorkThreads.size() == 0)
{
//内部有工作线程数检测,达到最大线程数后自动关闭延时模式
_StartWorkThread(1);
}
}

assert(m_arrWorkThreads.size());
m_taskQueue.enqueue(data);
m_nTotalCount++;
}

//获取当前工作线程数
size_t GetWorkThreadCount()
{
return m_arrWorkThreads.size();
}

//读取当前队列中待处理的数量
uint64_t GetQueueSize()
{
return m_taskQueue.size_approx();
}

//读取已经处理过的数量
uint64_t GetProcessedCount()
{
return m_nProcessedCount;
}

//读取所有加入过队列的数量
uint64_t GetTotalCount()
{
return m_nTotalCount;
}

//查询队列是否处理完毕  立即返回
bool IsFinished()
{
if (m_nTotalCount == m_nProcessedCount && GetQueueSize() == 0)
return true;

return false;
}

//等待队列结束  阻塞执行
void WaitForFinish(int nCheckms = 100, functionWaitCallback fnWaitCallback = nullptr)
{
printf("Wait for finish...\n");

while (!IsFinished())
{
if (fnWaitCallback)
fnWaitCallback(m_nProcessedCount, m_nTotalCount);

std::this_thread::sleep_for(std::chrono::milliseconds(nCheckms));
}

if (fnWaitCallback)
fnWaitCallback(m_nProcessedCount, m_nTotalCount);

printf("All finished. \n");
}
};


示例代码:

    
    //
    CTaskQueue<int, int> taskTester;  //第二个int可忽略

//单一回调
taskTester.InitTaskQueue(8, [](int data)->bool{
printf("task:%d\n", data);
return true; //返回false会重新插回队列中
}

//多个回调的初始化方法
taskTester.InitTaskQueue(8, [](int data, int &param)->bool{
printf("task:%d, param:%d\n", data, param);
return true;
},
[](int &param){
param = GetCurrentThreadId();
printf("task work thread init:%d \n", param);
},
[](int &param){
printf("task work thread exit:%d \n", param);
});

for (int i = 0; i < 10000; i++)
{
taskTester.AddData(i);
}

taskTester.WaitforFinish(100, [](uint64_t processed, uint64_t total){
uint64_t ret = (processed * 100) / total;
printf("processed:%I64d, total:%I64d, %d%% \n", processed, total, (int)ret);
});




线程池是用之前的帖子中的线程池修改的,增加了几个方法,修正了vs下的BUG 。。。

#pragma once

#include <vector>
#include <memory>
#include <thread>
#include <future>
#include <functional>
#include <stdexcept>
#include <atomic>
#include <chrono>
#include <mutex>

#include "queue/blockingconcurrentqueue.h" //for tsBlockQuque

class ThreadPool 
{
private:
//退出时回调方法声明
typedef std::function<void(uint64_t, uint64_t)> functionWaitCallback;

//工作线程
std::vector<std::future<int>> m_arrWorkThreads;
//任务队列
tsBlockQuque < std::function<void()> > m_TaskQueue;

//退出标记
bool m_bExit;
//检测队列的等待时间,毫秒
int m_nWaitTime;
//延时模式开关
bool m_bDelayMode;
//延时模式下,启动新工作线程需要加锁
std::mutex m_mtStartWorkThread;
//最大线程数
size_t m_nMaxThreadCount;
//添加的总数量的数量
std::atomic<uint64_t> m_nTotalCount;
//已处理完毕的数量
std::atomic<uint64_t> m_nProcessedCount;

//启动工作线程
void _StartWorkThread(size_t nThreadCount)
{
std::lock_guard<std::mutex> lock(m_mtStartWorkThread);

if (m_arrWorkThreads.size() >= m_nMaxThreadCount)
return;

for (size_t i = 0; i < nThreadCount; i++)
{
m_arrWorkThreads.emplace_back(std::async(std::launch::async, [this]()->int{

printf("thread pool thread start...\n");

while (true)
{
std::function<void()> task;
if (m_TaskQueue.wait_dequeue_timed(task, std::chrono::milliseconds(m_nWaitTime)))
{
task();
m_nProcessedCount++;
continue;
}

//结束标记并且队列没有数据了 返回
if (m_bExit)
break;
}

printf("thread pool thread exit...\n");
return 0;
}));
}

//创建完毕工作线程后,重置一下标记
if (m_bDelayMode && m_arrWorkThreads.size() >= m_nMaxThreadCount)
m_bDelayMode = false;
}

public:

//禁止拷贝和移动
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;

/*
构造函数
nMaxThreadCount:最大工作线程数
bDelayMode:延时模式
true:延时启动工作线程,有需要的时候启动
false:立即启动工作线程
nWaitTime:等待队列的超时时间,毫秒
*/
ThreadPool(size_t nMaxThreadCount, bool bDelayMode = false, int nWaitTime = 200)
{
m_nTotalCount = 0;
m_nProcessedCount = 0;
m_bExit = false;

InitThreadPool(nMaxThreadCount, bDelayMode, nWaitTime);
}
ThreadPool()
{
m_nTotalCount = 0;
m_nProcessedCount = 0;
m_bExit = false;
}
~ThreadPool()
{
m_bExit = true;

for (auto& fu : m_arrWorkThreads)
{
try
{
fu.get();
}
catch (...){}
}
}

/*
初始化线程池,如果使用带参数的构造函数初始化,则无需调用此方法
nMaxThreadCount:最大工作线程数
bDelayMode:延时模式
true:延时启动工作线程,有需要的时候启动
false:立即启动工作线程
nWaitTime:等待队列的超时时间,毫秒
*/
void InitThreadPool(size_t nMaxThreadCount, bool bDelayMode = false, int nWaitTime = 200)
{
m_nMaxThreadCount = nMaxThreadCount;
m_bDelayMode = bDelayMode;
m_nWaitTime = nWaitTime;

if (!m_bDelayMode)
_StartWorkThread(nMaxThreadCount);
}

//添加任务
template<class F, class... Args>
auto AddTask(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>
{
if (m_bDelayMode)
{
//延时模式下,如果任务还没有处理完毕,就增加一个工作线程
if (!IsFinished() || m_arrWorkThreads.size() == 0)
{
//内部有工作线程数检测,达到最大线程数后自动关闭延时模式
_StartWorkThread(1);
}
}

assert(m_arrWorkThreads.size());

using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared< std::packaged_task<return_type()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
m_TaskQueue.enqueue([task](){ (*task)(); });
m_nTotalCount++;

return res;
}

//获取当前工作线程数
size_t GetWorkThreadCount()
{
return m_arrWorkThreads.size();
}

//获取总共插入过的任务数
uint64_t GetTotalCount()
{
return m_nTotalCount;
}

//获取已经处理完毕的任务数
uint64_t GetProcessedCount()
{
return m_nProcessedCount;
}

//读取当前队列中待处理的数量
uint64_t GetQueueSize()
{
return m_TaskQueue.size_approx();
}

//判断是否已经执行完毕已经插入的任务
bool IsFinished()
{
if (m_nProcessedCount == m_nTotalCount && m_TaskQueue.size_approx() == 0)
return true;

return false;
}


//阻塞等待队列执行完毕
void WaitForFinish(int nCheckms = 100, functionWaitCallback fnWaitCallback = nullptr)
{
printf("Wait for finish...\n");

while (!IsFinished())
{
if (fnWaitCallback)
fnWaitCallback(m_nProcessedCount, m_nTotalCount);

std::this_thread::sleep_for(std::chrono::milliseconds(nCheckms));
}

if (fnWaitCallback)
fnWaitCallback(m_nProcessedCount, m_nTotalCount);

printf("All finished. \n");
}
};



最后说说vs神奇的BUG。。。

#include <iostream>
#include <string>
#include <vector>
#include <thread>


class MyClass
{
private:
std::vector<std::thread> arrthreads;
bool bexit;
public:
MyClass()
{
bexit = false;
};
~MyClass()
{
bexit = true;
for (auto &x : arrthreads)
{
x.join();  //vs编译的这里会卡死,gcc编译的正常执行
}
};

void init()
{
for (int i = 0; i < 4; i++)
{
arrthreads.emplace_back([this](){

printf("trhead start...\n");

while (true)
{
if (bexit)
break;

}

printf("trhead exit...\n");
});
}
};
};


MyClass testthread;

int main()
{
    printf("main2\n");
    
    testthread.init();
    
    return 0;
}

MyClass析构函数中让工作线程退出,毫无问题,但是vs编译的,join会卡死阻塞或者崩溃跑飞,gcc编译的无问题。

各位大师有何看法。。。

2021/8/24 | Tags: | C/C++代码 | 查看评论(0)

Powered By Z-Blog  触屏版 | WAP版 | 电脑版