C++ thread_pool with heterogeneous work-queueType erasure and deferred function calls for any...

Writing rule stating superpower from different root cause is bad writing

In a 5 years PhD, is it possible to get a postdoc position with one publication? I can really use some advice

Why "Having chlorophyll without photosynthesis is actually very dangerous" and "like living with a bomb"?

Why Is Death Allowed In the Matrix?

Modeling an IP Address

How to add double frame in tcolorbox?

How much RAM could one put in a typical 80386 setup?

Why do I get two different answers for this counting problem?

In Japanese, what’s the difference between “Tonari ni” (となりに) and “Tsugi” (つぎ)? When would you use one over the other?

How can I prevent hyper evolved versions of regular creatures from wiping out their cousins?

Can an x86 CPU running in real mode be considered to be basically an 8086 CPU?

Can I make popcorn with any corn?

Why is 150k or 200k jobs considered good when there's 300k+ births a month?

How to calculate partition Start End Sector?

What defenses are there against being summoned by the Gate spell?

Schoenfled Residua test shows proportionality hazard assumptions holds but Kaplan-Meier plots intersect

Email Account under attack (really) - anything I can do?

Did Shadowfax go to Valinor?

What's the output of a record cartridge playing an out-of-speed record

Fully-Firstable Anagram Sets

Hiring someone is unethical to Kantians because you're treating them as a means?

Why not use SQL instead of GraphQL?

What is the word for reserving something for yourself before others do?

Languages that we cannot (dis)prove to be Context-Free



C++ thread_pool with heterogeneous work-queue


Type erasure and deferred function calls for any functionParallelizing an algorithm with OpenMP using a dynamic work queueRun tasks in parallel using limited number of threadsJob queue with threadingLightweight asynchronous event library in C - Threadpool moduleNaive lock free work stealing queueWork stealing queueMinimalist dynamic task schedulerA simple lock-free queue for work stealingthread safe queue with interrupt function






.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty{ margin-bottom:0;
}







2












$begingroup$


I have read the book "C++ concurrency in action" and understood the thread_pool implementation. I have changed a few things according to my project requirements.



I have used std::variant to support heterogeneous work-queue to store different task arriving on a epoll-event loop. Currently In my project I have only two different types of task arrive on the epoll loop. Those are TaskECB and TaskRPCB. I have created classes for both of them and overloaded the operator()



#define THREAD_POOL_SIZE 100
std::map<std::string,std::string> tidToTname;

template<typename T>
class threadSafeQueue {
private:
mutable std::mutex mut;
std::queue<std::shared_ptr<T>> taskQueue; /* task as pushed here and
task are processed in FIFO
style */
std::condition_variable dataCond; /* used to protect the queue */
public:
threadSafeQueue(){}
void waitAndPop(T& value); /* wait untill task is not available in
the queue */
std::shared_ptr<T> waitAndPop();/* same but returns a shared pointer */
bool tryPop(T& value); /* does not block */
std::shared_ptr<T> tryPop(); /* does not block and returns a pointer*/
void Push(T newData);
bool Empty() const; /* check if queue is empty or not */
void notifyAllThreads(); /* notify all the waiting threads
used in Thpool decallocation */
};

template<typename T>
void threadSafeQueue<T>::notifyAllThreads() {
dataCond.notify_all();
}
template<typename T>
void threadSafeQueue<T>::waitAndPop(T& value) {
std::unique_lock<std::mutex> lk(mut);
dataCond.wait(lk,[this](){return !taskQueue.empty();});
value = std::move(*taskQueue.front());
taskQueue.pop();
}
template<typename T>
std::shared_ptr<T> threadSafeQueue<T>::waitAndPop() {
std::unique_lock<std::mutex> lk(mut);
dataCond.wait(lk,[this](){return !taskQueue.empty();});
std::shared_ptr<T> res = taskQueue.front();
taskQueue.pop();
return res;
}

template<typename T>
bool threadSafeQueue<T>::tryPop(T& value) {
std::lock_guard<std::mutex> lk(mut);
if(taskQueue.empty())
return false;
value = std::move(*taskQueue.front());
taskQueue.pop();
return true;
}
template<typename T>
std::shared_ptr<T> threadSafeQueue<T>::tryPop() {
std::lock_guard<std::mutex> lk(mut);
if(taskQueue.empty())
return std::shared_ptr<T>(); /* return nullptr */
std::shared_ptr<T> res = taskQueue.front();
taskQueue.pop();
return res;
}
template<typename T>
void threadSafeQueue<T>::Push(T newData) { /* TODO: size check before pushing */
std::shared_ptr<T> data(std::make_shared<T>(std::move(newData)));
/* construct the object before lock*/
std::lock_guard<std::mutex> lk(mut);
taskQueue.push(data);
dataCond.notify_one();
}
template<typename T>
bool threadSafeQueue<T>::Empty() const {
std::lock_guard<std::mutex> lk(mut);
return taskQueue.empty();
}

class TaskRPCB { /* Task RecvAndProcessCallbacks */
private:
int len;
uint fdId;
int streamId;
void* msgBlob;
std::function<void(void*,int,uint,int)> func;
public:
TaskRPCB(std::function<void(void*,int,uint,int)>&f , void* msgBlob,int len,
uint fdId, int streamId) {
this->func = f;
this->msgBlob = msgBlob;
this->len = len;
this->fdId = fdId;
this->streamId = streamId;
}
void operator()() {
higLog("%s","TaskRPCB function is executing...");
func(msgBlob,len,fdId,streamId);
}
};

class TaskECB { /* Task eventCallBack */
private:
std::function<void(void)> func;
public:
TaskECB(std::function<void(void)>&f) : func(f) {}
void operator()() {
higLog("%s","TaskECB function is executing...");
func();
}
};

typedef variant<TaskECB,TaskRPCB> taskTypes;

class Thpool {
std::atomic_bool done;
threadSafeQueue<taskTypes> workQ;
std::vector<std::thread> threads;
void workerThread() {
auto tid = std::this_thread::get_id();
std::stringstream ss;
ss << tid;
std::string s = ss.str();
while(!done) {
auto task = workQ.waitAndPop();
if(task != nullptr and !done) {
printf("%s is executing now : ",tidToTname[s].c_str());
if((*task).index() == 0) { // TODO: change 0 and 1 to enums
auto func = get<TaskECB>(*task);
func();
}else if((*task).index() == 1) {
auto func = get<TaskRPCB>(*task);
func();
}
}
}
}
public:
Thpool(): done(false) {
// unsigned const maxThreadCount = std::thread::hardware_concurrency();
unsigned const maxThreadCount = THREAD_POOL_SIZE;
printf("ThreadPool Size = %d",maxThreadCount);
/* save thread names for logging purpose */
std::vector<std::string> tnames;
for(unsigned int i = 0;i<maxThreadCount;i++) {
tnames.push_back("Thread_" + std::to_string(i+1));
}

try { /* exception might arise due to thread creation */
for(unsigned int i = 0;i<maxThreadCount;i++) {
threads.push_back(std::thread(&Thpool::workerThread,this));
/*map this ith threadID to a name Thread_i*/
auto tid = threads[i].get_id();
std::stringstream ss;
ss << tid;
tidToTname[ss.str()] = tnames[i];
}
}catch(...) {
done = true;
throw;
}
}
~Thpool() {
// done = true;
}
template<typename taskType>
void submit(taskType task) {
workQ.Push(task);
}
void deAllocatePool() {
done = true;
workQ.notifyAllThreads();
// unsigned const maxThreadCount = std::thread::hardware_concurrency();
unsigned const maxThreadCount = THREAD_POOL_SIZE;
for(unsigned int i = 0;i<maxThreadCount;) {
if(threads[i].joinable()) {
threads[i].join();
i++; /* go for the next thread in the pool */
}else {
workQ.notifyAllThreads();
}
}
}
};
/*============== Thread Pool code ends ============== */


How I use this thread_pool ?



Thpool pool;


Following code will be inserted in required functions. I am showing for the TaskRPCB type task only.



std::function<void(void*,int,uint,int)> func = NFVInstance->CallBackTable[channel];
/* create a task object : members of it :
- the callback function
- msgBlob
- len of the msgBlob
- some other ID
- stream ID on which this message was received
*/
TaskRPCB task(func,msg,rc,fdd.id,streamId);
/* submit this task to the pool.
one of the waiting threads will pick this task
*/
pool.submit(task);









share|improve this question











$endgroup$












  • $begingroup$
    sorry for mixing tags. I would like to have suggestions regarding thread pool implementation overall and the use of` std:: variant` ? is it ok ? or does it have some performance penalty?
    $endgroup$
    – Debashish
    Mar 29 at 12:43






  • 1




    $begingroup$
    I've edited tags to C++17 (since std::variant requires that). I hope you get good reviews!
    $endgroup$
    – Toby Speight
    Mar 29 at 14:04






  • 1




    $begingroup$
    THREAD_POOL_SIZE 100 I hope you are running on some large hardware. The point os a thread pool is that you create a thread for each available processor (or very close to that) then each processor gets new work from the pool without having to switch threads (as that is a relatively expensive operation).
    $endgroup$
    – Martin York
    Apr 1 at 18:40


















2












$begingroup$


I have read the book "C++ concurrency in action" and understood the thread_pool implementation. I have changed a few things according to my project requirements.



I have used std::variant to support heterogeneous work-queue to store different task arriving on a epoll-event loop. Currently In my project I have only two different types of task arrive on the epoll loop. Those are TaskECB and TaskRPCB. I have created classes for both of them and overloaded the operator()



#define THREAD_POOL_SIZE 100
std::map<std::string,std::string> tidToTname;

template<typename T>
class threadSafeQueue {
private:
mutable std::mutex mut;
std::queue<std::shared_ptr<T>> taskQueue; /* task as pushed here and
task are processed in FIFO
style */
std::condition_variable dataCond; /* used to protect the queue */
public:
threadSafeQueue(){}
void waitAndPop(T& value); /* wait untill task is not available in
the queue */
std::shared_ptr<T> waitAndPop();/* same but returns a shared pointer */
bool tryPop(T& value); /* does not block */
std::shared_ptr<T> tryPop(); /* does not block and returns a pointer*/
void Push(T newData);
bool Empty() const; /* check if queue is empty or not */
void notifyAllThreads(); /* notify all the waiting threads
used in Thpool decallocation */
};

template<typename T>
void threadSafeQueue<T>::notifyAllThreads() {
dataCond.notify_all();
}
template<typename T>
void threadSafeQueue<T>::waitAndPop(T& value) {
std::unique_lock<std::mutex> lk(mut);
dataCond.wait(lk,[this](){return !taskQueue.empty();});
value = std::move(*taskQueue.front());
taskQueue.pop();
}
template<typename T>
std::shared_ptr<T> threadSafeQueue<T>::waitAndPop() {
std::unique_lock<std::mutex> lk(mut);
dataCond.wait(lk,[this](){return !taskQueue.empty();});
std::shared_ptr<T> res = taskQueue.front();
taskQueue.pop();
return res;
}

template<typename T>
bool threadSafeQueue<T>::tryPop(T& value) {
std::lock_guard<std::mutex> lk(mut);
if(taskQueue.empty())
return false;
value = std::move(*taskQueue.front());
taskQueue.pop();
return true;
}
template<typename T>
std::shared_ptr<T> threadSafeQueue<T>::tryPop() {
std::lock_guard<std::mutex> lk(mut);
if(taskQueue.empty())
return std::shared_ptr<T>(); /* return nullptr */
std::shared_ptr<T> res = taskQueue.front();
taskQueue.pop();
return res;
}
template<typename T>
void threadSafeQueue<T>::Push(T newData) { /* TODO: size check before pushing */
std::shared_ptr<T> data(std::make_shared<T>(std::move(newData)));
/* construct the object before lock*/
std::lock_guard<std::mutex> lk(mut);
taskQueue.push(data);
dataCond.notify_one();
}
template<typename T>
bool threadSafeQueue<T>::Empty() const {
std::lock_guard<std::mutex> lk(mut);
return taskQueue.empty();
}

class TaskRPCB { /* Task RecvAndProcessCallbacks */
private:
int len;
uint fdId;
int streamId;
void* msgBlob;
std::function<void(void*,int,uint,int)> func;
public:
TaskRPCB(std::function<void(void*,int,uint,int)>&f , void* msgBlob,int len,
uint fdId, int streamId) {
this->func = f;
this->msgBlob = msgBlob;
this->len = len;
this->fdId = fdId;
this->streamId = streamId;
}
void operator()() {
higLog("%s","TaskRPCB function is executing...");
func(msgBlob,len,fdId,streamId);
}
};

class TaskECB { /* Task eventCallBack */
private:
std::function<void(void)> func;
public:
TaskECB(std::function<void(void)>&f) : func(f) {}
void operator()() {
higLog("%s","TaskECB function is executing...");
func();
}
};

typedef variant<TaskECB,TaskRPCB> taskTypes;

class Thpool {
std::atomic_bool done;
threadSafeQueue<taskTypes> workQ;
std::vector<std::thread> threads;
void workerThread() {
auto tid = std::this_thread::get_id();
std::stringstream ss;
ss << tid;
std::string s = ss.str();
while(!done) {
auto task = workQ.waitAndPop();
if(task != nullptr and !done) {
printf("%s is executing now : ",tidToTname[s].c_str());
if((*task).index() == 0) { // TODO: change 0 and 1 to enums
auto func = get<TaskECB>(*task);
func();
}else if((*task).index() == 1) {
auto func = get<TaskRPCB>(*task);
func();
}
}
}
}
public:
Thpool(): done(false) {
// unsigned const maxThreadCount = std::thread::hardware_concurrency();
unsigned const maxThreadCount = THREAD_POOL_SIZE;
printf("ThreadPool Size = %d",maxThreadCount);
/* save thread names for logging purpose */
std::vector<std::string> tnames;
for(unsigned int i = 0;i<maxThreadCount;i++) {
tnames.push_back("Thread_" + std::to_string(i+1));
}

try { /* exception might arise due to thread creation */
for(unsigned int i = 0;i<maxThreadCount;i++) {
threads.push_back(std::thread(&Thpool::workerThread,this));
/*map this ith threadID to a name Thread_i*/
auto tid = threads[i].get_id();
std::stringstream ss;
ss << tid;
tidToTname[ss.str()] = tnames[i];
}
}catch(...) {
done = true;
throw;
}
}
~Thpool() {
// done = true;
}
template<typename taskType>
void submit(taskType task) {
workQ.Push(task);
}
void deAllocatePool() {
done = true;
workQ.notifyAllThreads();
// unsigned const maxThreadCount = std::thread::hardware_concurrency();
unsigned const maxThreadCount = THREAD_POOL_SIZE;
for(unsigned int i = 0;i<maxThreadCount;) {
if(threads[i].joinable()) {
threads[i].join();
i++; /* go for the next thread in the pool */
}else {
workQ.notifyAllThreads();
}
}
}
};
/*============== Thread Pool code ends ============== */


How I use this thread_pool ?



Thpool pool;


Following code will be inserted in required functions. I am showing for the TaskRPCB type task only.



std::function<void(void*,int,uint,int)> func = NFVInstance->CallBackTable[channel];
/* create a task object : members of it :
- the callback function
- msgBlob
- len of the msgBlob
- some other ID
- stream ID on which this message was received
*/
TaskRPCB task(func,msg,rc,fdd.id,streamId);
/* submit this task to the pool.
one of the waiting threads will pick this task
*/
pool.submit(task);









share|improve this question











$endgroup$












  • $begingroup$
    sorry for mixing tags. I would like to have suggestions regarding thread pool implementation overall and the use of` std:: variant` ? is it ok ? or does it have some performance penalty?
    $endgroup$
    – Debashish
    Mar 29 at 12:43






  • 1




    $begingroup$
    I've edited tags to C++17 (since std::variant requires that). I hope you get good reviews!
    $endgroup$
    – Toby Speight
    Mar 29 at 14:04






  • 1




    $begingroup$
    THREAD_POOL_SIZE 100 I hope you are running on some large hardware. The point os a thread pool is that you create a thread for each available processor (or very close to that) then each processor gets new work from the pool without having to switch threads (as that is a relatively expensive operation).
    $endgroup$
    – Martin York
    Apr 1 at 18:40














2












2








2


0



$begingroup$


I have read the book "C++ concurrency in action" and understood the thread_pool implementation. I have changed a few things according to my project requirements.



I have used std::variant to support heterogeneous work-queue to store different task arriving on a epoll-event loop. Currently In my project I have only two different types of task arrive on the epoll loop. Those are TaskECB and TaskRPCB. I have created classes for both of them and overloaded the operator()



#define THREAD_POOL_SIZE 100
std::map<std::string,std::string> tidToTname;

template<typename T>
class threadSafeQueue {
private:
mutable std::mutex mut;
std::queue<std::shared_ptr<T>> taskQueue; /* task as pushed here and
task are processed in FIFO
style */
std::condition_variable dataCond; /* used to protect the queue */
public:
threadSafeQueue(){}
void waitAndPop(T& value); /* wait untill task is not available in
the queue */
std::shared_ptr<T> waitAndPop();/* same but returns a shared pointer */
bool tryPop(T& value); /* does not block */
std::shared_ptr<T> tryPop(); /* does not block and returns a pointer*/
void Push(T newData);
bool Empty() const; /* check if queue is empty or not */
void notifyAllThreads(); /* notify all the waiting threads
used in Thpool decallocation */
};

template<typename T>
void threadSafeQueue<T>::notifyAllThreads() {
dataCond.notify_all();
}
template<typename T>
void threadSafeQueue<T>::waitAndPop(T& value) {
std::unique_lock<std::mutex> lk(mut);
dataCond.wait(lk,[this](){return !taskQueue.empty();});
value = std::move(*taskQueue.front());
taskQueue.pop();
}
template<typename T>
std::shared_ptr<T> threadSafeQueue<T>::waitAndPop() {
std::unique_lock<std::mutex> lk(mut);
dataCond.wait(lk,[this](){return !taskQueue.empty();});
std::shared_ptr<T> res = taskQueue.front();
taskQueue.pop();
return res;
}

template<typename T>
bool threadSafeQueue<T>::tryPop(T& value) {
std::lock_guard<std::mutex> lk(mut);
if(taskQueue.empty())
return false;
value = std::move(*taskQueue.front());
taskQueue.pop();
return true;
}
template<typename T>
std::shared_ptr<T> threadSafeQueue<T>::tryPop() {
std::lock_guard<std::mutex> lk(mut);
if(taskQueue.empty())
return std::shared_ptr<T>(); /* return nullptr */
std::shared_ptr<T> res = taskQueue.front();
taskQueue.pop();
return res;
}
template<typename T>
void threadSafeQueue<T>::Push(T newData) { /* TODO: size check before pushing */
std::shared_ptr<T> data(std::make_shared<T>(std::move(newData)));
/* construct the object before lock*/
std::lock_guard<std::mutex> lk(mut);
taskQueue.push(data);
dataCond.notify_one();
}
template<typename T>
bool threadSafeQueue<T>::Empty() const {
std::lock_guard<std::mutex> lk(mut);
return taskQueue.empty();
}

class TaskRPCB { /* Task RecvAndProcessCallbacks */
private:
int len;
uint fdId;
int streamId;
void* msgBlob;
std::function<void(void*,int,uint,int)> func;
public:
TaskRPCB(std::function<void(void*,int,uint,int)>&f , void* msgBlob,int len,
uint fdId, int streamId) {
this->func = f;
this->msgBlob = msgBlob;
this->len = len;
this->fdId = fdId;
this->streamId = streamId;
}
void operator()() {
higLog("%s","TaskRPCB function is executing...");
func(msgBlob,len,fdId,streamId);
}
};

class TaskECB { /* Task eventCallBack */
private:
std::function<void(void)> func;
public:
TaskECB(std::function<void(void)>&f) : func(f) {}
void operator()() {
higLog("%s","TaskECB function is executing...");
func();
}
};

typedef variant<TaskECB,TaskRPCB> taskTypes;

class Thpool {
std::atomic_bool done;
threadSafeQueue<taskTypes> workQ;
std::vector<std::thread> threads;
void workerThread() {
auto tid = std::this_thread::get_id();
std::stringstream ss;
ss << tid;
std::string s = ss.str();
while(!done) {
auto task = workQ.waitAndPop();
if(task != nullptr and !done) {
printf("%s is executing now : ",tidToTname[s].c_str());
if((*task).index() == 0) { // TODO: change 0 and 1 to enums
auto func = get<TaskECB>(*task);
func();
}else if((*task).index() == 1) {
auto func = get<TaskRPCB>(*task);
func();
}
}
}
}
public:
Thpool(): done(false) {
// unsigned const maxThreadCount = std::thread::hardware_concurrency();
unsigned const maxThreadCount = THREAD_POOL_SIZE;
printf("ThreadPool Size = %d",maxThreadCount);
/* save thread names for logging purpose */
std::vector<std::string> tnames;
for(unsigned int i = 0;i<maxThreadCount;i++) {
tnames.push_back("Thread_" + std::to_string(i+1));
}

try { /* exception might arise due to thread creation */
for(unsigned int i = 0;i<maxThreadCount;i++) {
threads.push_back(std::thread(&Thpool::workerThread,this));
/*map this ith threadID to a name Thread_i*/
auto tid = threads[i].get_id();
std::stringstream ss;
ss << tid;
tidToTname[ss.str()] = tnames[i];
}
}catch(...) {
done = true;
throw;
}
}
~Thpool() {
// done = true;
}
template<typename taskType>
void submit(taskType task) {
workQ.Push(task);
}
void deAllocatePool() {
done = true;
workQ.notifyAllThreads();
// unsigned const maxThreadCount = std::thread::hardware_concurrency();
unsigned const maxThreadCount = THREAD_POOL_SIZE;
for(unsigned int i = 0;i<maxThreadCount;) {
if(threads[i].joinable()) {
threads[i].join();
i++; /* go for the next thread in the pool */
}else {
workQ.notifyAllThreads();
}
}
}
};
/*============== Thread Pool code ends ============== */


How I use this thread_pool ?



Thpool pool;


Following code will be inserted in required functions. I am showing for the TaskRPCB type task only.



std::function<void(void*,int,uint,int)> func = NFVInstance->CallBackTable[channel];
/* create a task object : members of it :
- the callback function
- msgBlob
- len of the msgBlob
- some other ID
- stream ID on which this message was received
*/
TaskRPCB task(func,msg,rc,fdd.id,streamId);
/* submit this task to the pool.
one of the waiting threads will pick this task
*/
pool.submit(task);









share|improve this question











$endgroup$




I have read the book "C++ concurrency in action" and understood the thread_pool implementation. I have changed a few things according to my project requirements.



I have used std::variant to support heterogeneous work-queue to store different task arriving on a epoll-event loop. Currently In my project I have only two different types of task arrive on the epoll loop. Those are TaskECB and TaskRPCB. I have created classes for both of them and overloaded the operator()



#define THREAD_POOL_SIZE 100
std::map<std::string,std::string> tidToTname;

template<typename T>
class threadSafeQueue {
private:
mutable std::mutex mut;
std::queue<std::shared_ptr<T>> taskQueue; /* task as pushed here and
task are processed in FIFO
style */
std::condition_variable dataCond; /* used to protect the queue */
public:
threadSafeQueue(){}
void waitAndPop(T& value); /* wait untill task is not available in
the queue */
std::shared_ptr<T> waitAndPop();/* same but returns a shared pointer */
bool tryPop(T& value); /* does not block */
std::shared_ptr<T> tryPop(); /* does not block and returns a pointer*/
void Push(T newData);
bool Empty() const; /* check if queue is empty or not */
void notifyAllThreads(); /* notify all the waiting threads
used in Thpool decallocation */
};

template<typename T>
void threadSafeQueue<T>::notifyAllThreads() {
dataCond.notify_all();
}
template<typename T>
void threadSafeQueue<T>::waitAndPop(T& value) {
std::unique_lock<std::mutex> lk(mut);
dataCond.wait(lk,[this](){return !taskQueue.empty();});
value = std::move(*taskQueue.front());
taskQueue.pop();
}
template<typename T>
std::shared_ptr<T> threadSafeQueue<T>::waitAndPop() {
std::unique_lock<std::mutex> lk(mut);
dataCond.wait(lk,[this](){return !taskQueue.empty();});
std::shared_ptr<T> res = taskQueue.front();
taskQueue.pop();
return res;
}

template<typename T>
bool threadSafeQueue<T>::tryPop(T& value) {
std::lock_guard<std::mutex> lk(mut);
if(taskQueue.empty())
return false;
value = std::move(*taskQueue.front());
taskQueue.pop();
return true;
}
template<typename T>
std::shared_ptr<T> threadSafeQueue<T>::tryPop() {
std::lock_guard<std::mutex> lk(mut);
if(taskQueue.empty())
return std::shared_ptr<T>(); /* return nullptr */
std::shared_ptr<T> res = taskQueue.front();
taskQueue.pop();
return res;
}
template<typename T>
void threadSafeQueue<T>::Push(T newData) { /* TODO: size check before pushing */
std::shared_ptr<T> data(std::make_shared<T>(std::move(newData)));
/* construct the object before lock*/
std::lock_guard<std::mutex> lk(mut);
taskQueue.push(data);
dataCond.notify_one();
}
template<typename T>
bool threadSafeQueue<T>::Empty() const {
std::lock_guard<std::mutex> lk(mut);
return taskQueue.empty();
}

class TaskRPCB { /* Task RecvAndProcessCallbacks */
private:
int len;
uint fdId;
int streamId;
void* msgBlob;
std::function<void(void*,int,uint,int)> func;
public:
TaskRPCB(std::function<void(void*,int,uint,int)>&f , void* msgBlob,int len,
uint fdId, int streamId) {
this->func = f;
this->msgBlob = msgBlob;
this->len = len;
this->fdId = fdId;
this->streamId = streamId;
}
void operator()() {
higLog("%s","TaskRPCB function is executing...");
func(msgBlob,len,fdId,streamId);
}
};

class TaskECB { /* Task eventCallBack */
private:
std::function<void(void)> func;
public:
TaskECB(std::function<void(void)>&f) : func(f) {}
void operator()() {
higLog("%s","TaskECB function is executing...");
func();
}
};

typedef variant<TaskECB,TaskRPCB> taskTypes;

class Thpool {
std::atomic_bool done;
threadSafeQueue<taskTypes> workQ;
std::vector<std::thread> threads;
void workerThread() {
auto tid = std::this_thread::get_id();
std::stringstream ss;
ss << tid;
std::string s = ss.str();
while(!done) {
auto task = workQ.waitAndPop();
if(task != nullptr and !done) {
printf("%s is executing now : ",tidToTname[s].c_str());
if((*task).index() == 0) { // TODO: change 0 and 1 to enums
auto func = get<TaskECB>(*task);
func();
}else if((*task).index() == 1) {
auto func = get<TaskRPCB>(*task);
func();
}
}
}
}
public:
Thpool(): done(false) {
// unsigned const maxThreadCount = std::thread::hardware_concurrency();
unsigned const maxThreadCount = THREAD_POOL_SIZE;
printf("ThreadPool Size = %d",maxThreadCount);
/* save thread names for logging purpose */
std::vector<std::string> tnames;
for(unsigned int i = 0;i<maxThreadCount;i++) {
tnames.push_back("Thread_" + std::to_string(i+1));
}

try { /* exception might arise due to thread creation */
for(unsigned int i = 0;i<maxThreadCount;i++) {
threads.push_back(std::thread(&Thpool::workerThread,this));
/*map this ith threadID to a name Thread_i*/
auto tid = threads[i].get_id();
std::stringstream ss;
ss << tid;
tidToTname[ss.str()] = tnames[i];
}
}catch(...) {
done = true;
throw;
}
}
~Thpool() {
// done = true;
}
template<typename taskType>
void submit(taskType task) {
workQ.Push(task);
}
void deAllocatePool() {
done = true;
workQ.notifyAllThreads();
// unsigned const maxThreadCount = std::thread::hardware_concurrency();
unsigned const maxThreadCount = THREAD_POOL_SIZE;
for(unsigned int i = 0;i<maxThreadCount;) {
if(threads[i].joinable()) {
threads[i].join();
i++; /* go for the next thread in the pool */
}else {
workQ.notifyAllThreads();
}
}
}
};
/*============== Thread Pool code ends ============== */


How I use this thread_pool ?



Thpool pool;


Following code will be inserted in required functions. I am showing for the TaskRPCB type task only.



std::function<void(void*,int,uint,int)> func = NFVInstance->CallBackTable[channel];
/* create a task object : members of it :
- the callback function
- msgBlob
- len of the msgBlob
- some other ID
- stream ID on which this message was received
*/
TaskRPCB task(func,msg,rc,fdd.id,streamId);
/* submit this task to the pool.
one of the waiting threads will pick this task
*/
pool.submit(task);






c++ multithreading c++17






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Mar 29 at 14:44







Debashish

















asked Mar 29 at 10:30









DebashishDebashish

1113




1113












  • $begingroup$
    sorry for mixing tags. I would like to have suggestions regarding thread pool implementation overall and the use of` std:: variant` ? is it ok ? or does it have some performance penalty?
    $endgroup$
    – Debashish
    Mar 29 at 12:43






  • 1




    $begingroup$
    I've edited tags to C++17 (since std::variant requires that). I hope you get good reviews!
    $endgroup$
    – Toby Speight
    Mar 29 at 14:04






  • 1




    $begingroup$
    THREAD_POOL_SIZE 100 I hope you are running on some large hardware. The point os a thread pool is that you create a thread for each available processor (or very close to that) then each processor gets new work from the pool without having to switch threads (as that is a relatively expensive operation).
    $endgroup$
    – Martin York
    Apr 1 at 18:40


















  • $begingroup$
    sorry for mixing tags. I would like to have suggestions regarding thread pool implementation overall and the use of` std:: variant` ? is it ok ? or does it have some performance penalty?
    $endgroup$
    – Debashish
    Mar 29 at 12:43






  • 1




    $begingroup$
    I've edited tags to C++17 (since std::variant requires that). I hope you get good reviews!
    $endgroup$
    – Toby Speight
    Mar 29 at 14:04






  • 1




    $begingroup$
    THREAD_POOL_SIZE 100 I hope you are running on some large hardware. The point os a thread pool is that you create a thread for each available processor (or very close to that) then each processor gets new work from the pool without having to switch threads (as that is a relatively expensive operation).
    $endgroup$
    – Martin York
    Apr 1 at 18:40
















$begingroup$
sorry for mixing tags. I would like to have suggestions regarding thread pool implementation overall and the use of` std:: variant` ? is it ok ? or does it have some performance penalty?
$endgroup$
– Debashish
Mar 29 at 12:43




$begingroup$
sorry for mixing tags. I would like to have suggestions regarding thread pool implementation overall and the use of` std:: variant` ? is it ok ? or does it have some performance penalty?
$endgroup$
– Debashish
Mar 29 at 12:43




1




1




$begingroup$
I've edited tags to C++17 (since std::variant requires that). I hope you get good reviews!
$endgroup$
– Toby Speight
Mar 29 at 14:04




$begingroup$
I've edited tags to C++17 (since std::variant requires that). I hope you get good reviews!
$endgroup$
– Toby Speight
Mar 29 at 14:04




1




1




$begingroup$
THREAD_POOL_SIZE 100 I hope you are running on some large hardware. The point os a thread pool is that you create a thread for each available processor (or very close to that) then each processor gets new work from the pool without having to switch threads (as that is a relatively expensive operation).
$endgroup$
– Martin York
Apr 1 at 18:40




$begingroup$
THREAD_POOL_SIZE 100 I hope you are running on some large hardware. The point os a thread pool is that you create a thread for each available processor (or very close to that) then each processor gets new work from the pool without having to switch threads (as that is a relatively expensive operation).
$endgroup$
– Martin York
Apr 1 at 18:40










2 Answers
2






active

oldest

votes


















4












$begingroup$

Naming nits:




  • Why threadSafeQueue instead of ThreadSafeQueue? You CamelCaps all your other class names. Why not this one?


  • class TaskECB { /* Task eventCallBack */ is a very verbose way of writing class TaskEventCallback {. If you have to spell it out in a comment anyway, just spell it out in the code. Your readers will thank you.


  • bool threadSafeQueue<T>::Empty() const: Since you're diverging from the traditional STL naming convention anyway (Empty is not empty), I recommend prefixing boolean accessors with Is, as in, myQueue.IsEmpty(). This way you don't confuse it with the verb "to empty," as in, "this function empties the queue." Orthogonally, you might mark this function [[nodiscard]] just to emphasize that it has no side effects.


  • deAllocatePool() would more traditionally be spelled deallocatePool(). "Deallocate" is a single word in English.





template<typename T>
void threadSafeQueue<T>::Push(T newData) { /* TODO: size check before pushing */
std::shared_ptr<T> data(std::make_shared<T>(std::move(newData)));
/* construct the object before lock*/
std::lock_guard<std::mutex> lk(mut);
taskQueue.push(data);
dataCond.notify_one();
}


Personally, I would simplify this to



template<class T>
void threadSafeQueue<T>::Push(T newData) { /* TODO: size check */
auto data = std::make_shared<T>(std::move(newData));
std::lock_guard<std::mutex> lk(mut);
taskQueue.push(std::move(data));
dataCond.notify_one();
}


Notice the use of = for initialization — it helps distinguish bool foo(int) from bool foo(true), and helps readability in general. I also put a std::move on the push, so that we're not unnecessarily copying the shared_ptr and incurring an extra atomic increment and decrement of the refcount. (No big deal.) Notice that we are still incurring an extra call to T's move-constructor; we might want to take T by reference here, in which case we might want two versions — one that takes const T& and one that takes T&&.



In fact, we might want to cut out the middleman entirely:



template<class T>
template<class... Args>
void threadSafeQueue<T>::Emplace(Args&&... args) { /* TODO: size check */
auto data = std::make_shared<T>(std::forward<Args>(args)...);
std::lock_guard<std::mutex> lk(mut);
taskQueue.push(std::move(data));
dataCond.notify_one();
}




typedef variant<TaskECB,TaskRPCB> taskTypes;


You're missing a std:: there. Also, isn't it weird to have a single type named taskTypes (plural)? When I see the name "taskTypes", I expect to see multiple types — like a parameter pack or something. Here I think this type alias wants to be just a taskType or taskVariant, singular.





    auto tid = std::this_thread::get_id();
std::stringstream ss;
ss << tid;
std::string s = ss.str();


Yuck. In an ideal world, std::to_string(std::this_thread::get_id()) would Do The Right Thing; but we don't live in an ideal world.



However, why are you using std::string as your map key, anyway? Why not just define



std::map<std::thread::id, std::string> tidToTname;


and skip the expensive stringification?



Then, instead of



printf("%s is executing now : ",tidToTname[s].c_str());


you would write simply



printf("%s is executing now : ",tidToTname.at(tid).c_str());


(notice that we no longer risk modifying tidToTname accidentally!), and instead of



tidToTname[ss.str()] = tnames[i];


you'd write simply



tidToTname.insert_or_assign(tid, tnames[i]);


(Still beware: using emplace instead of insert_or_assign will still compile, but it will do the wrong thing if the key is already present! The STL's map is a very tricky beast. You have to be careful with it.)





if((*task).index() == 0) {      // TODO: change 0 and 1 to enums
auto func = get<TaskECB>(*task);
func();
}else if((*task).index() == 1) {
auto func = get<TaskRPCB>(*task);
func();
}


First of all, (*task).index() is traditionally spelled task->index(), and I strongly recommend that you do so. Nested parentheses make things hard to read. That's why the -> operator was added to C back in the '70s! (Probably late '60s, actually. Maybe earlier.)



Second, this is not a typical way to interact with std::variant. The library really intends you to interact with it like this:



std::visit([](auto& callback) {
callback();
}, *task);


If you want to preserve your inefficient copying, just change auto& to auto.



Really, IMO, it should be const auto&; but in order to make that work, you'll have to make your callback types const-callable. Right now their operator()s are non-const member functions:



void operator()() /* NO CONST HERE -- INTENTIONAL? */ {
higLog("%s","TaskECB function is executing...");
func();
}


If you're allergic to visit — which you shouldn't be! — but if you are, then a slightly more idiomatic way to write your chain of ifs would be



if (auto *func = std::get_if<TaskECB>(task.get())) {
(*func)();
} else if (auto *func = std::get_if<TaskRPCB>(task.get())) {
(*func)();
}


Having to use task.get() to get a raw pointer, instead of just task or *task, definitely isn't ideal API design on the STL's part. But again, the ideal solution is to just use std::visit! You should use std::visit.





I didn't check the multithreading parts. Odds are, there are bugs. Multithreaded code always has at least one bug. :)






share|improve this answer









$endgroup$





















    2












    $begingroup$

    Some additions to the review from @Quuxpluson.



    With regard to threadsafety, i'd be concerned that you are not calling deAllocatePool in the destructor. It's undefined behavior to call a destructor on std::mutex when the mutex is locked. It is also a bad thing to call the destructor on std::thread when it is considered joinable. Both of these may happen if you destruct your threadpool without clearing out the work queue.



    Looking further at your deAllocatePool, I think that you could get into cases where you can't end a thread. You are setting done but if the queue is empty, the thread could be waiting in waitAndPop(). While the notifyAllThreads will cause it to wake the lambda expression that you are using in your wait statement



    dataCond.wait(lk,[this](){return !taskQueue.empty();});


    would immediately go back into a wait state after awoken. If you manage to call join() on this kind of thread your deallocatePool would hang. I think is that checking for an abort condition inside the wait would mitigate that. If this hasn't happened yet there could be a variety of issues, i could be wrong, your use pattern has prevented this, or just pure coincidence ...



    Both of your task types could be reduced to a functor with the type std::function<void(void)>. You could use either lambdas or std::bind to enclose the state information, this would reduce the complexity inside your threadpool.






    share|improve this answer









    $endgroup$














      Your Answer





      StackExchange.ifUsing("editor", function () {
      return StackExchange.using("mathjaxEditing", function () {
      StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix) {
      StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
      });
      });
      }, "mathjax-editing");

      StackExchange.ifUsing("editor", function () {
      StackExchange.using("externalEditor", function () {
      StackExchange.using("snippets", function () {
      StackExchange.snippets.init();
      });
      });
      }, "code-snippets");

      StackExchange.ready(function() {
      var channelOptions = {
      tags: "".split(" "),
      id: "196"
      };
      initTagRenderer("".split(" "), "".split(" "), channelOptions);

      StackExchange.using("externalEditor", function() {
      // Have to fire editor after snippets, if snippets enabled
      if (StackExchange.settings.snippets.snippetsEnabled) {
      StackExchange.using("snippets", function() {
      createEditor();
      });
      }
      else {
      createEditor();
      }
      });

      function createEditor() {
      StackExchange.prepareEditor({
      heartbeatType: 'answer',
      autoActivateHeartbeat: false,
      convertImagesToLinks: false,
      noModals: true,
      showLowRepImageUploadWarning: true,
      reputationToPostImages: null,
      bindNavPrevention: true,
      postfix: "",
      imageUploader: {
      brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
      contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
      allowUrls: true
      },
      onDemand: true,
      discardSelector: ".discard-answer"
      ,immediatelyShowMarkdownHelp:true
      });


      }
      });














      draft saved

      draft discarded


















      StackExchange.ready(
      function () {
      StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f216467%2fc-thread-pool-with-heterogeneous-work-queue%23new-answer', 'question_page');
      }
      );

      Post as a guest















      Required, but never shown

























      2 Answers
      2






      active

      oldest

      votes








      2 Answers
      2






      active

      oldest

      votes









      active

      oldest

      votes






      active

      oldest

      votes









      4












      $begingroup$

      Naming nits:




      • Why threadSafeQueue instead of ThreadSafeQueue? You CamelCaps all your other class names. Why not this one?


      • class TaskECB { /* Task eventCallBack */ is a very verbose way of writing class TaskEventCallback {. If you have to spell it out in a comment anyway, just spell it out in the code. Your readers will thank you.


      • bool threadSafeQueue<T>::Empty() const: Since you're diverging from the traditional STL naming convention anyway (Empty is not empty), I recommend prefixing boolean accessors with Is, as in, myQueue.IsEmpty(). This way you don't confuse it with the verb "to empty," as in, "this function empties the queue." Orthogonally, you might mark this function [[nodiscard]] just to emphasize that it has no side effects.


      • deAllocatePool() would more traditionally be spelled deallocatePool(). "Deallocate" is a single word in English.





      template<typename T>
      void threadSafeQueue<T>::Push(T newData) { /* TODO: size check before pushing */
      std::shared_ptr<T> data(std::make_shared<T>(std::move(newData)));
      /* construct the object before lock*/
      std::lock_guard<std::mutex> lk(mut);
      taskQueue.push(data);
      dataCond.notify_one();
      }


      Personally, I would simplify this to



      template<class T>
      void threadSafeQueue<T>::Push(T newData) { /* TODO: size check */
      auto data = std::make_shared<T>(std::move(newData));
      std::lock_guard<std::mutex> lk(mut);
      taskQueue.push(std::move(data));
      dataCond.notify_one();
      }


      Notice the use of = for initialization — it helps distinguish bool foo(int) from bool foo(true), and helps readability in general. I also put a std::move on the push, so that we're not unnecessarily copying the shared_ptr and incurring an extra atomic increment and decrement of the refcount. (No big deal.) Notice that we are still incurring an extra call to T's move-constructor; we might want to take T by reference here, in which case we might want two versions — one that takes const T& and one that takes T&&.



      In fact, we might want to cut out the middleman entirely:



      template<class T>
      template<class... Args>
      void threadSafeQueue<T>::Emplace(Args&&... args) { /* TODO: size check */
      auto data = std::make_shared<T>(std::forward<Args>(args)...);
      std::lock_guard<std::mutex> lk(mut);
      taskQueue.push(std::move(data));
      dataCond.notify_one();
      }




      typedef variant<TaskECB,TaskRPCB> taskTypes;


      You're missing a std:: there. Also, isn't it weird to have a single type named taskTypes (plural)? When I see the name "taskTypes", I expect to see multiple types — like a parameter pack or something. Here I think this type alias wants to be just a taskType or taskVariant, singular.





          auto tid = std::this_thread::get_id();
      std::stringstream ss;
      ss << tid;
      std::string s = ss.str();


      Yuck. In an ideal world, std::to_string(std::this_thread::get_id()) would Do The Right Thing; but we don't live in an ideal world.



      However, why are you using std::string as your map key, anyway? Why not just define



      std::map<std::thread::id, std::string> tidToTname;


      and skip the expensive stringification?



      Then, instead of



      printf("%s is executing now : ",tidToTname[s].c_str());


      you would write simply



      printf("%s is executing now : ",tidToTname.at(tid).c_str());


      (notice that we no longer risk modifying tidToTname accidentally!), and instead of



      tidToTname[ss.str()] = tnames[i];


      you'd write simply



      tidToTname.insert_or_assign(tid, tnames[i]);


      (Still beware: using emplace instead of insert_or_assign will still compile, but it will do the wrong thing if the key is already present! The STL's map is a very tricky beast. You have to be careful with it.)





      if((*task).index() == 0) {      // TODO: change 0 and 1 to enums
      auto func = get<TaskECB>(*task);
      func();
      }else if((*task).index() == 1) {
      auto func = get<TaskRPCB>(*task);
      func();
      }


      First of all, (*task).index() is traditionally spelled task->index(), and I strongly recommend that you do so. Nested parentheses make things hard to read. That's why the -> operator was added to C back in the '70s! (Probably late '60s, actually. Maybe earlier.)



      Second, this is not a typical way to interact with std::variant. The library really intends you to interact with it like this:



      std::visit([](auto& callback) {
      callback();
      }, *task);


      If you want to preserve your inefficient copying, just change auto& to auto.



      Really, IMO, it should be const auto&; but in order to make that work, you'll have to make your callback types const-callable. Right now their operator()s are non-const member functions:



      void operator()() /* NO CONST HERE -- INTENTIONAL? */ {
      higLog("%s","TaskECB function is executing...");
      func();
      }


      If you're allergic to visit — which you shouldn't be! — but if you are, then a slightly more idiomatic way to write your chain of ifs would be



      if (auto *func = std::get_if<TaskECB>(task.get())) {
      (*func)();
      } else if (auto *func = std::get_if<TaskRPCB>(task.get())) {
      (*func)();
      }


      Having to use task.get() to get a raw pointer, instead of just task or *task, definitely isn't ideal API design on the STL's part. But again, the ideal solution is to just use std::visit! You should use std::visit.





      I didn't check the multithreading parts. Odds are, there are bugs. Multithreaded code always has at least one bug. :)






      share|improve this answer









      $endgroup$


















        4












        $begingroup$

        Naming nits:




        • Why threadSafeQueue instead of ThreadSafeQueue? You CamelCaps all your other class names. Why not this one?


        • class TaskECB { /* Task eventCallBack */ is a very verbose way of writing class TaskEventCallback {. If you have to spell it out in a comment anyway, just spell it out in the code. Your readers will thank you.


        • bool threadSafeQueue<T>::Empty() const: Since you're diverging from the traditional STL naming convention anyway (Empty is not empty), I recommend prefixing boolean accessors with Is, as in, myQueue.IsEmpty(). This way you don't confuse it with the verb "to empty," as in, "this function empties the queue." Orthogonally, you might mark this function [[nodiscard]] just to emphasize that it has no side effects.


        • deAllocatePool() would more traditionally be spelled deallocatePool(). "Deallocate" is a single word in English.





        template<typename T>
        void threadSafeQueue<T>::Push(T newData) { /* TODO: size check before pushing */
        std::shared_ptr<T> data(std::make_shared<T>(std::move(newData)));
        /* construct the object before lock*/
        std::lock_guard<std::mutex> lk(mut);
        taskQueue.push(data);
        dataCond.notify_one();
        }


        Personally, I would simplify this to



        template<class T>
        void threadSafeQueue<T>::Push(T newData) { /* TODO: size check */
        auto data = std::make_shared<T>(std::move(newData));
        std::lock_guard<std::mutex> lk(mut);
        taskQueue.push(std::move(data));
        dataCond.notify_one();
        }


        Notice the use of = for initialization — it helps distinguish bool foo(int) from bool foo(true), and helps readability in general. I also put a std::move on the push, so that we're not unnecessarily copying the shared_ptr and incurring an extra atomic increment and decrement of the refcount. (No big deal.) Notice that we are still incurring an extra call to T's move-constructor; we might want to take T by reference here, in which case we might want two versions — one that takes const T& and one that takes T&&.



        In fact, we might want to cut out the middleman entirely:



        template<class T>
        template<class... Args>
        void threadSafeQueue<T>::Emplace(Args&&... args) { /* TODO: size check */
        auto data = std::make_shared<T>(std::forward<Args>(args)...);
        std::lock_guard<std::mutex> lk(mut);
        taskQueue.push(std::move(data));
        dataCond.notify_one();
        }




        typedef variant<TaskECB,TaskRPCB> taskTypes;


        You're missing a std:: there. Also, isn't it weird to have a single type named taskTypes (plural)? When I see the name "taskTypes", I expect to see multiple types — like a parameter pack or something. Here I think this type alias wants to be just a taskType or taskVariant, singular.





            auto tid = std::this_thread::get_id();
        std::stringstream ss;
        ss << tid;
        std::string s = ss.str();


        Yuck. In an ideal world, std::to_string(std::this_thread::get_id()) would Do The Right Thing; but we don't live in an ideal world.



        However, why are you using std::string as your map key, anyway? Why not just define



        std::map<std::thread::id, std::string> tidToTname;


        and skip the expensive stringification?



        Then, instead of



        printf("%s is executing now : ",tidToTname[s].c_str());


        you would write simply



        printf("%s is executing now : ",tidToTname.at(tid).c_str());


        (notice that we no longer risk modifying tidToTname accidentally!), and instead of



        tidToTname[ss.str()] = tnames[i];


        you'd write simply



        tidToTname.insert_or_assign(tid, tnames[i]);


        (Still beware: using emplace instead of insert_or_assign will still compile, but it will do the wrong thing if the key is already present! The STL's map is a very tricky beast. You have to be careful with it.)





        if((*task).index() == 0) {      // TODO: change 0 and 1 to enums
        auto func = get<TaskECB>(*task);
        func();
        }else if((*task).index() == 1) {
        auto func = get<TaskRPCB>(*task);
        func();
        }


        First of all, (*task).index() is traditionally spelled task->index(), and I strongly recommend that you do so. Nested parentheses make things hard to read. That's why the -> operator was added to C back in the '70s! (Probably late '60s, actually. Maybe earlier.)



        Second, this is not a typical way to interact with std::variant. The library really intends you to interact with it like this:



        std::visit([](auto& callback) {
        callback();
        }, *task);


        If you want to preserve your inefficient copying, just change auto& to auto.



        Really, IMO, it should be const auto&; but in order to make that work, you'll have to make your callback types const-callable. Right now their operator()s are non-const member functions:



        void operator()() /* NO CONST HERE -- INTENTIONAL? */ {
        higLog("%s","TaskECB function is executing...");
        func();
        }


        If you're allergic to visit — which you shouldn't be! — but if you are, then a slightly more idiomatic way to write your chain of ifs would be



        if (auto *func = std::get_if<TaskECB>(task.get())) {
        (*func)();
        } else if (auto *func = std::get_if<TaskRPCB>(task.get())) {
        (*func)();
        }


        Having to use task.get() to get a raw pointer, instead of just task or *task, definitely isn't ideal API design on the STL's part. But again, the ideal solution is to just use std::visit! You should use std::visit.





        I didn't check the multithreading parts. Odds are, there are bugs. Multithreaded code always has at least one bug. :)






        share|improve this answer









        $endgroup$
















          4












          4








          4





          $begingroup$

          Naming nits:




          • Why threadSafeQueue instead of ThreadSafeQueue? You CamelCaps all your other class names. Why not this one?


          • class TaskECB { /* Task eventCallBack */ is a very verbose way of writing class TaskEventCallback {. If you have to spell it out in a comment anyway, just spell it out in the code. Your readers will thank you.


          • bool threadSafeQueue<T>::Empty() const: Since you're diverging from the traditional STL naming convention anyway (Empty is not empty), I recommend prefixing boolean accessors with Is, as in, myQueue.IsEmpty(). This way you don't confuse it with the verb "to empty," as in, "this function empties the queue." Orthogonally, you might mark this function [[nodiscard]] just to emphasize that it has no side effects.


          • deAllocatePool() would more traditionally be spelled deallocatePool(). "Deallocate" is a single word in English.





          template<typename T>
          void threadSafeQueue<T>::Push(T newData) { /* TODO: size check before pushing */
          std::shared_ptr<T> data(std::make_shared<T>(std::move(newData)));
          /* construct the object before lock*/
          std::lock_guard<std::mutex> lk(mut);
          taskQueue.push(data);
          dataCond.notify_one();
          }


          Personally, I would simplify this to



          template<class T>
          void threadSafeQueue<T>::Push(T newData) { /* TODO: size check */
          auto data = std::make_shared<T>(std::move(newData));
          std::lock_guard<std::mutex> lk(mut);
          taskQueue.push(std::move(data));
          dataCond.notify_one();
          }


          Notice the use of = for initialization — it helps distinguish bool foo(int) from bool foo(true), and helps readability in general. I also put a std::move on the push, so that we're not unnecessarily copying the shared_ptr and incurring an extra atomic increment and decrement of the refcount. (No big deal.) Notice that we are still incurring an extra call to T's move-constructor; we might want to take T by reference here, in which case we might want two versions — one that takes const T& and one that takes T&&.



          In fact, we might want to cut out the middleman entirely:



          template<class T>
          template<class... Args>
          void threadSafeQueue<T>::Emplace(Args&&... args) { /* TODO: size check */
          auto data = std::make_shared<T>(std::forward<Args>(args)...);
          std::lock_guard<std::mutex> lk(mut);
          taskQueue.push(std::move(data));
          dataCond.notify_one();
          }




          typedef variant<TaskECB,TaskRPCB> taskTypes;


          You're missing a std:: there. Also, isn't it weird to have a single type named taskTypes (plural)? When I see the name "taskTypes", I expect to see multiple types — like a parameter pack or something. Here I think this type alias wants to be just a taskType or taskVariant, singular.





              auto tid = std::this_thread::get_id();
          std::stringstream ss;
          ss << tid;
          std::string s = ss.str();


          Yuck. In an ideal world, std::to_string(std::this_thread::get_id()) would Do The Right Thing; but we don't live in an ideal world.



          However, why are you using std::string as your map key, anyway? Why not just define



          std::map<std::thread::id, std::string> tidToTname;


          and skip the expensive stringification?



          Then, instead of



          printf("%s is executing now : ",tidToTname[s].c_str());


          you would write simply



          printf("%s is executing now : ",tidToTname.at(tid).c_str());


          (notice that we no longer risk modifying tidToTname accidentally!), and instead of



          tidToTname[ss.str()] = tnames[i];


          you'd write simply



          tidToTname.insert_or_assign(tid, tnames[i]);


          (Still beware: using emplace instead of insert_or_assign will still compile, but it will do the wrong thing if the key is already present! The STL's map is a very tricky beast. You have to be careful with it.)





          if((*task).index() == 0) {      // TODO: change 0 and 1 to enums
          auto func = get<TaskECB>(*task);
          func();
          }else if((*task).index() == 1) {
          auto func = get<TaskRPCB>(*task);
          func();
          }


          First of all, (*task).index() is traditionally spelled task->index(), and I strongly recommend that you do so. Nested parentheses make things hard to read. That's why the -> operator was added to C back in the '70s! (Probably late '60s, actually. Maybe earlier.)



          Second, this is not a typical way to interact with std::variant. The library really intends you to interact with it like this:



          std::visit([](auto& callback) {
          callback();
          }, *task);


          If you want to preserve your inefficient copying, just change auto& to auto.



          Really, IMO, it should be const auto&; but in order to make that work, you'll have to make your callback types const-callable. Right now their operator()s are non-const member functions:



          void operator()() /* NO CONST HERE -- INTENTIONAL? */ {
          higLog("%s","TaskECB function is executing...");
          func();
          }


          If you're allergic to visit — which you shouldn't be! — but if you are, then a slightly more idiomatic way to write your chain of ifs would be



          if (auto *func = std::get_if<TaskECB>(task.get())) {
          (*func)();
          } else if (auto *func = std::get_if<TaskRPCB>(task.get())) {
          (*func)();
          }


          Having to use task.get() to get a raw pointer, instead of just task or *task, definitely isn't ideal API design on the STL's part. But again, the ideal solution is to just use std::visit! You should use std::visit.





          I didn't check the multithreading parts. Odds are, there are bugs. Multithreaded code always has at least one bug. :)






          share|improve this answer









          $endgroup$



          Naming nits:




          • Why threadSafeQueue instead of ThreadSafeQueue? You CamelCaps all your other class names. Why not this one?


          • class TaskECB { /* Task eventCallBack */ is a very verbose way of writing class TaskEventCallback {. If you have to spell it out in a comment anyway, just spell it out in the code. Your readers will thank you.


          • bool threadSafeQueue<T>::Empty() const: Since you're diverging from the traditional STL naming convention anyway (Empty is not empty), I recommend prefixing boolean accessors with Is, as in, myQueue.IsEmpty(). This way you don't confuse it with the verb "to empty," as in, "this function empties the queue." Orthogonally, you might mark this function [[nodiscard]] just to emphasize that it has no side effects.


          • deAllocatePool() would more traditionally be spelled deallocatePool(). "Deallocate" is a single word in English.





          template<typename T>
          void threadSafeQueue<T>::Push(T newData) { /* TODO: size check before pushing */
          std::shared_ptr<T> data(std::make_shared<T>(std::move(newData)));
          /* construct the object before lock*/
          std::lock_guard<std::mutex> lk(mut);
          taskQueue.push(data);
          dataCond.notify_one();
          }


          Personally, I would simplify this to



          template<class T>
          void threadSafeQueue<T>::Push(T newData) { /* TODO: size check */
          auto data = std::make_shared<T>(std::move(newData));
          std::lock_guard<std::mutex> lk(mut);
          taskQueue.push(std::move(data));
          dataCond.notify_one();
          }


          Notice the use of = for initialization — it helps distinguish bool foo(int) from bool foo(true), and helps readability in general. I also put a std::move on the push, so that we're not unnecessarily copying the shared_ptr and incurring an extra atomic increment and decrement of the refcount. (No big deal.) Notice that we are still incurring an extra call to T's move-constructor; we might want to take T by reference here, in which case we might want two versions — one that takes const T& and one that takes T&&.



          In fact, we might want to cut out the middleman entirely:



          template<class T>
          template<class... Args>
          void threadSafeQueue<T>::Emplace(Args&&... args) { /* TODO: size check */
          auto data = std::make_shared<T>(std::forward<Args>(args)...);
          std::lock_guard<std::mutex> lk(mut);
          taskQueue.push(std::move(data));
          dataCond.notify_one();
          }




          typedef variant<TaskECB,TaskRPCB> taskTypes;


          You're missing a std:: there. Also, isn't it weird to have a single type named taskTypes (plural)? When I see the name "taskTypes", I expect to see multiple types — like a parameter pack or something. Here I think this type alias wants to be just a taskType or taskVariant, singular.





              auto tid = std::this_thread::get_id();
          std::stringstream ss;
          ss << tid;
          std::string s = ss.str();


          Yuck. In an ideal world, std::to_string(std::this_thread::get_id()) would Do The Right Thing; but we don't live in an ideal world.



          However, why are you using std::string as your map key, anyway? Why not just define



          std::map<std::thread::id, std::string> tidToTname;


          and skip the expensive stringification?



          Then, instead of



          printf("%s is executing now : ",tidToTname[s].c_str());


          you would write simply



          printf("%s is executing now : ",tidToTname.at(tid).c_str());


          (notice that we no longer risk modifying tidToTname accidentally!), and instead of



          tidToTname[ss.str()] = tnames[i];


          you'd write simply



          tidToTname.insert_or_assign(tid, tnames[i]);


          (Still beware: using emplace instead of insert_or_assign will still compile, but it will do the wrong thing if the key is already present! The STL's map is a very tricky beast. You have to be careful with it.)





          if((*task).index() == 0) {      // TODO: change 0 and 1 to enums
          auto func = get<TaskECB>(*task);
          func();
          }else if((*task).index() == 1) {
          auto func = get<TaskRPCB>(*task);
          func();
          }


          First of all, (*task).index() is traditionally spelled task->index(), and I strongly recommend that you do so. Nested parentheses make things hard to read. That's why the -> operator was added to C back in the '70s! (Probably late '60s, actually. Maybe earlier.)



          Second, this is not a typical way to interact with std::variant. The library really intends you to interact with it like this:



          std::visit([](auto& callback) {
          callback();
          }, *task);


          If you want to preserve your inefficient copying, just change auto& to auto.



          Really, IMO, it should be const auto&; but in order to make that work, you'll have to make your callback types const-callable. Right now their operator()s are non-const member functions:



          void operator()() /* NO CONST HERE -- INTENTIONAL? */ {
          higLog("%s","TaskECB function is executing...");
          func();
          }


          If you're allergic to visit — which you shouldn't be! — but if you are, then a slightly more idiomatic way to write your chain of ifs would be



          if (auto *func = std::get_if<TaskECB>(task.get())) {
          (*func)();
          } else if (auto *func = std::get_if<TaskRPCB>(task.get())) {
          (*func)();
          }


          Having to use task.get() to get a raw pointer, instead of just task or *task, definitely isn't ideal API design on the STL's part. But again, the ideal solution is to just use std::visit! You should use std::visit.





          I didn't check the multithreading parts. Odds are, there are bugs. Multithreaded code always has at least one bug. :)







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Apr 1 at 19:53









          QuuxplusoneQuuxplusone

          13.3k12165




          13.3k12165

























              2












              $begingroup$

              Some additions to the review from @Quuxpluson.



              With regard to threadsafety, i'd be concerned that you are not calling deAllocatePool in the destructor. It's undefined behavior to call a destructor on std::mutex when the mutex is locked. It is also a bad thing to call the destructor on std::thread when it is considered joinable. Both of these may happen if you destruct your threadpool without clearing out the work queue.



              Looking further at your deAllocatePool, I think that you could get into cases where you can't end a thread. You are setting done but if the queue is empty, the thread could be waiting in waitAndPop(). While the notifyAllThreads will cause it to wake the lambda expression that you are using in your wait statement



              dataCond.wait(lk,[this](){return !taskQueue.empty();});


              would immediately go back into a wait state after awoken. If you manage to call join() on this kind of thread your deallocatePool would hang. I think is that checking for an abort condition inside the wait would mitigate that. If this hasn't happened yet there could be a variety of issues, i could be wrong, your use pattern has prevented this, or just pure coincidence ...



              Both of your task types could be reduced to a functor with the type std::function<void(void)>. You could use either lambdas or std::bind to enclose the state information, this would reduce the complexity inside your threadpool.






              share|improve this answer









              $endgroup$


















                2












                $begingroup$

                Some additions to the review from @Quuxpluson.



                With regard to threadsafety, i'd be concerned that you are not calling deAllocatePool in the destructor. It's undefined behavior to call a destructor on std::mutex when the mutex is locked. It is also a bad thing to call the destructor on std::thread when it is considered joinable. Both of these may happen if you destruct your threadpool without clearing out the work queue.



                Looking further at your deAllocatePool, I think that you could get into cases where you can't end a thread. You are setting done but if the queue is empty, the thread could be waiting in waitAndPop(). While the notifyAllThreads will cause it to wake the lambda expression that you are using in your wait statement



                dataCond.wait(lk,[this](){return !taskQueue.empty();});


                would immediately go back into a wait state after awoken. If you manage to call join() on this kind of thread your deallocatePool would hang. I think is that checking for an abort condition inside the wait would mitigate that. If this hasn't happened yet there could be a variety of issues, i could be wrong, your use pattern has prevented this, or just pure coincidence ...



                Both of your task types could be reduced to a functor with the type std::function<void(void)>. You could use either lambdas or std::bind to enclose the state information, this would reduce the complexity inside your threadpool.






                share|improve this answer









                $endgroup$
















                  2












                  2








                  2





                  $begingroup$

                  Some additions to the review from @Quuxpluson.



                  With regard to threadsafety, i'd be concerned that you are not calling deAllocatePool in the destructor. It's undefined behavior to call a destructor on std::mutex when the mutex is locked. It is also a bad thing to call the destructor on std::thread when it is considered joinable. Both of these may happen if you destruct your threadpool without clearing out the work queue.



                  Looking further at your deAllocatePool, I think that you could get into cases where you can't end a thread. You are setting done but if the queue is empty, the thread could be waiting in waitAndPop(). While the notifyAllThreads will cause it to wake the lambda expression that you are using in your wait statement



                  dataCond.wait(lk,[this](){return !taskQueue.empty();});


                  would immediately go back into a wait state after awoken. If you manage to call join() on this kind of thread your deallocatePool would hang. I think is that checking for an abort condition inside the wait would mitigate that. If this hasn't happened yet there could be a variety of issues, i could be wrong, your use pattern has prevented this, or just pure coincidence ...



                  Both of your task types could be reduced to a functor with the type std::function<void(void)>. You could use either lambdas or std::bind to enclose the state information, this would reduce the complexity inside your threadpool.






                  share|improve this answer









                  $endgroup$



                  Some additions to the review from @Quuxpluson.



                  With regard to threadsafety, i'd be concerned that you are not calling deAllocatePool in the destructor. It's undefined behavior to call a destructor on std::mutex when the mutex is locked. It is also a bad thing to call the destructor on std::thread when it is considered joinable. Both of these may happen if you destruct your threadpool without clearing out the work queue.



                  Looking further at your deAllocatePool, I think that you could get into cases where you can't end a thread. You are setting done but if the queue is empty, the thread could be waiting in waitAndPop(). While the notifyAllThreads will cause it to wake the lambda expression that you are using in your wait statement



                  dataCond.wait(lk,[this](){return !taskQueue.empty();});


                  would immediately go back into a wait state after awoken. If you manage to call join() on this kind of thread your deallocatePool would hang. I think is that checking for an abort condition inside the wait would mitigate that. If this hasn't happened yet there could be a variety of issues, i could be wrong, your use pattern has prevented this, or just pure coincidence ...



                  Both of your task types could be reduced to a functor with the type std::function<void(void)>. You could use either lambdas or std::bind to enclose the state information, this would reduce the complexity inside your threadpool.







                  share|improve this answer












                  share|improve this answer



                  share|improve this answer










                  answered Apr 1 at 21:00









                  Harald ScheirichHarald Scheirich

                  1,401518




                  1,401518






























                      draft saved

                      draft discarded




















































                      Thanks for contributing an answer to Code Review Stack Exchange!


                      • Please be sure to answer the question. Provide details and share your research!

                      But avoid



                      • Asking for help, clarification, or responding to other answers.

                      • Making statements based on opinion; back them up with references or personal experience.


                      Use MathJax to format equations. MathJax reference.


                      To learn more, see our tips on writing great answers.




                      draft saved


                      draft discarded














                      StackExchange.ready(
                      function () {
                      StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f216467%2fc-thread-pool-with-heterogeneous-work-queue%23new-answer', 'question_page');
                      }
                      );

                      Post as a guest















                      Required, but never shown





















































                      Required, but never shown














                      Required, but never shown












                      Required, but never shown







                      Required, but never shown

































                      Required, but never shown














                      Required, but never shown












                      Required, but never shown







                      Required, but never shown







                      Popular posts from this blog

                      is 'sed' thread safeWhat should someone know about using Python scripts in the shell?Nexenta bash script uses...

                      How do i solve the “ No module named 'mlxtend' ” issue on Jupyter?

                      Pilgersdorf Inhaltsverzeichnis Geografie | Geschichte | Bevölkerungsentwicklung | Politik | Kultur...