C++ thread_pool with heterogeneous work-queueType erasure and deferred function calls for any...
Writing rule stating superpower from different root cause is bad writing
In a 5 years PhD, is it possible to get a postdoc position with one publication? I can really use some advice
Why "Having chlorophyll without photosynthesis is actually very dangerous" and "like living with a bomb"?
Why Is Death Allowed In the Matrix?
Modeling an IP Address
How to add double frame in tcolorbox?
How much RAM could one put in a typical 80386 setup?
Why do I get two different answers for this counting problem?
In Japanese, what’s the difference between “Tonari ni” (となりに) and “Tsugi” (つぎ)? When would you use one over the other?
How can I prevent hyper evolved versions of regular creatures from wiping out their cousins?
Can an x86 CPU running in real mode be considered to be basically an 8086 CPU?
Can I make popcorn with any corn?
Why is 150k or 200k jobs considered good when there's 300k+ births a month?
How to calculate partition Start End Sector?
What defenses are there against being summoned by the Gate spell?
Schoenfled Residua test shows proportionality hazard assumptions holds but Kaplan-Meier plots intersect
Email Account under attack (really) - anything I can do?
Did Shadowfax go to Valinor?
What's the output of a record cartridge playing an out-of-speed record
Fully-Firstable Anagram Sets
Hiring someone is unethical to Kantians because you're treating them as a means?
Why not use SQL instead of GraphQL?
What is the word for reserving something for yourself before others do?
Languages that we cannot (dis)prove to be Context-Free
C++ thread_pool with heterogeneous work-queue
Type erasure and deferred function calls for any functionParallelizing an algorithm with OpenMP using a dynamic work queueRun tasks in parallel using limited number of threadsJob queue with threadingLightweight asynchronous event library in C - Threadpool moduleNaive lock free work stealing queueWork stealing queueMinimalist dynamic task schedulerA simple lock-free queue for work stealingthread safe queue with interrupt function
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty{ margin-bottom:0;
}
$begingroup$
I have read the book "C++ concurrency in action" and understood the thread_pool implementation. I have changed a few things according to my project requirements.
I have used std::variant
to support heterogeneous work-queue
to store different task arriving on a epoll-event
loop. Currently In my project I have only two different types of task arrive on the epoll loop. Those are TaskECB
and TaskRPCB
. I have created classes for both of them and overloaded the operator()
#define THREAD_POOL_SIZE 100
std::map<std::string,std::string> tidToTname;
template<typename T>
class threadSafeQueue {
private:
mutable std::mutex mut;
std::queue<std::shared_ptr<T>> taskQueue; /* task as pushed here and
task are processed in FIFO
style */
std::condition_variable dataCond; /* used to protect the queue */
public:
threadSafeQueue(){}
void waitAndPop(T& value); /* wait untill task is not available in
the queue */
std::shared_ptr<T> waitAndPop();/* same but returns a shared pointer */
bool tryPop(T& value); /* does not block */
std::shared_ptr<T> tryPop(); /* does not block and returns a pointer*/
void Push(T newData);
bool Empty() const; /* check if queue is empty or not */
void notifyAllThreads(); /* notify all the waiting threads
used in Thpool decallocation */
};
template<typename T>
void threadSafeQueue<T>::notifyAllThreads() {
dataCond.notify_all();
}
template<typename T>
void threadSafeQueue<T>::waitAndPop(T& value) {
std::unique_lock<std::mutex> lk(mut);
dataCond.wait(lk,[this](){return !taskQueue.empty();});
value = std::move(*taskQueue.front());
taskQueue.pop();
}
template<typename T>
std::shared_ptr<T> threadSafeQueue<T>::waitAndPop() {
std::unique_lock<std::mutex> lk(mut);
dataCond.wait(lk,[this](){return !taskQueue.empty();});
std::shared_ptr<T> res = taskQueue.front();
taskQueue.pop();
return res;
}
template<typename T>
bool threadSafeQueue<T>::tryPop(T& value) {
std::lock_guard<std::mutex> lk(mut);
if(taskQueue.empty())
return false;
value = std::move(*taskQueue.front());
taskQueue.pop();
return true;
}
template<typename T>
std::shared_ptr<T> threadSafeQueue<T>::tryPop() {
std::lock_guard<std::mutex> lk(mut);
if(taskQueue.empty())
return std::shared_ptr<T>(); /* return nullptr */
std::shared_ptr<T> res = taskQueue.front();
taskQueue.pop();
return res;
}
template<typename T>
void threadSafeQueue<T>::Push(T newData) { /* TODO: size check before pushing */
std::shared_ptr<T> data(std::make_shared<T>(std::move(newData)));
/* construct the object before lock*/
std::lock_guard<std::mutex> lk(mut);
taskQueue.push(data);
dataCond.notify_one();
}
template<typename T>
bool threadSafeQueue<T>::Empty() const {
std::lock_guard<std::mutex> lk(mut);
return taskQueue.empty();
}
class TaskRPCB { /* Task RecvAndProcessCallbacks */
private:
int len;
uint fdId;
int streamId;
void* msgBlob;
std::function<void(void*,int,uint,int)> func;
public:
TaskRPCB(std::function<void(void*,int,uint,int)>&f , void* msgBlob,int len,
uint fdId, int streamId) {
this->func = f;
this->msgBlob = msgBlob;
this->len = len;
this->fdId = fdId;
this->streamId = streamId;
}
void operator()() {
higLog("%s","TaskRPCB function is executing...");
func(msgBlob,len,fdId,streamId);
}
};
class TaskECB { /* Task eventCallBack */
private:
std::function<void(void)> func;
public:
TaskECB(std::function<void(void)>&f) : func(f) {}
void operator()() {
higLog("%s","TaskECB function is executing...");
func();
}
};
typedef variant<TaskECB,TaskRPCB> taskTypes;
class Thpool {
std::atomic_bool done;
threadSafeQueue<taskTypes> workQ;
std::vector<std::thread> threads;
void workerThread() {
auto tid = std::this_thread::get_id();
std::stringstream ss;
ss << tid;
std::string s = ss.str();
while(!done) {
auto task = workQ.waitAndPop();
if(task != nullptr and !done) {
printf("%s is executing now : ",tidToTname[s].c_str());
if((*task).index() == 0) { // TODO: change 0 and 1 to enums
auto func = get<TaskECB>(*task);
func();
}else if((*task).index() == 1) {
auto func = get<TaskRPCB>(*task);
func();
}
}
}
}
public:
Thpool(): done(false) {
// unsigned const maxThreadCount = std::thread::hardware_concurrency();
unsigned const maxThreadCount = THREAD_POOL_SIZE;
printf("ThreadPool Size = %d",maxThreadCount);
/* save thread names for logging purpose */
std::vector<std::string> tnames;
for(unsigned int i = 0;i<maxThreadCount;i++) {
tnames.push_back("Thread_" + std::to_string(i+1));
}
try { /* exception might arise due to thread creation */
for(unsigned int i = 0;i<maxThreadCount;i++) {
threads.push_back(std::thread(&Thpool::workerThread,this));
/*map this ith threadID to a name Thread_i*/
auto tid = threads[i].get_id();
std::stringstream ss;
ss << tid;
tidToTname[ss.str()] = tnames[i];
}
}catch(...) {
done = true;
throw;
}
}
~Thpool() {
// done = true;
}
template<typename taskType>
void submit(taskType task) {
workQ.Push(task);
}
void deAllocatePool() {
done = true;
workQ.notifyAllThreads();
// unsigned const maxThreadCount = std::thread::hardware_concurrency();
unsigned const maxThreadCount = THREAD_POOL_SIZE;
for(unsigned int i = 0;i<maxThreadCount;) {
if(threads[i].joinable()) {
threads[i].join();
i++; /* go for the next thread in the pool */
}else {
workQ.notifyAllThreads();
}
}
}
};
/*============== Thread Pool code ends ============== */
How I use this thread_pool
?
Thpool pool;
Following code will be inserted in required functions. I am showing for the TaskRPCB
type task only.
std::function<void(void*,int,uint,int)> func = NFVInstance->CallBackTable[channel];
/* create a task object : members of it :
- the callback function
- msgBlob
- len of the msgBlob
- some other ID
- stream ID on which this message was received
*/
TaskRPCB task(func,msg,rc,fdd.id,streamId);
/* submit this task to the pool.
one of the waiting threads will pick this task
*/
pool.submit(task);
c++ multithreading c++17
$endgroup$
add a comment |
$begingroup$
I have read the book "C++ concurrency in action" and understood the thread_pool implementation. I have changed a few things according to my project requirements.
I have used std::variant
to support heterogeneous work-queue
to store different task arriving on a epoll-event
loop. Currently In my project I have only two different types of task arrive on the epoll loop. Those are TaskECB
and TaskRPCB
. I have created classes for both of them and overloaded the operator()
#define THREAD_POOL_SIZE 100
std::map<std::string,std::string> tidToTname;
template<typename T>
class threadSafeQueue {
private:
mutable std::mutex mut;
std::queue<std::shared_ptr<T>> taskQueue; /* task as pushed here and
task are processed in FIFO
style */
std::condition_variable dataCond; /* used to protect the queue */
public:
threadSafeQueue(){}
void waitAndPop(T& value); /* wait untill task is not available in
the queue */
std::shared_ptr<T> waitAndPop();/* same but returns a shared pointer */
bool tryPop(T& value); /* does not block */
std::shared_ptr<T> tryPop(); /* does not block and returns a pointer*/
void Push(T newData);
bool Empty() const; /* check if queue is empty or not */
void notifyAllThreads(); /* notify all the waiting threads
used in Thpool decallocation */
};
template<typename T>
void threadSafeQueue<T>::notifyAllThreads() {
dataCond.notify_all();
}
template<typename T>
void threadSafeQueue<T>::waitAndPop(T& value) {
std::unique_lock<std::mutex> lk(mut);
dataCond.wait(lk,[this](){return !taskQueue.empty();});
value = std::move(*taskQueue.front());
taskQueue.pop();
}
template<typename T>
std::shared_ptr<T> threadSafeQueue<T>::waitAndPop() {
std::unique_lock<std::mutex> lk(mut);
dataCond.wait(lk,[this](){return !taskQueue.empty();});
std::shared_ptr<T> res = taskQueue.front();
taskQueue.pop();
return res;
}
template<typename T>
bool threadSafeQueue<T>::tryPop(T& value) {
std::lock_guard<std::mutex> lk(mut);
if(taskQueue.empty())
return false;
value = std::move(*taskQueue.front());
taskQueue.pop();
return true;
}
template<typename T>
std::shared_ptr<T> threadSafeQueue<T>::tryPop() {
std::lock_guard<std::mutex> lk(mut);
if(taskQueue.empty())
return std::shared_ptr<T>(); /* return nullptr */
std::shared_ptr<T> res = taskQueue.front();
taskQueue.pop();
return res;
}
template<typename T>
void threadSafeQueue<T>::Push(T newData) { /* TODO: size check before pushing */
std::shared_ptr<T> data(std::make_shared<T>(std::move(newData)));
/* construct the object before lock*/
std::lock_guard<std::mutex> lk(mut);
taskQueue.push(data);
dataCond.notify_one();
}
template<typename T>
bool threadSafeQueue<T>::Empty() const {
std::lock_guard<std::mutex> lk(mut);
return taskQueue.empty();
}
class TaskRPCB { /* Task RecvAndProcessCallbacks */
private:
int len;
uint fdId;
int streamId;
void* msgBlob;
std::function<void(void*,int,uint,int)> func;
public:
TaskRPCB(std::function<void(void*,int,uint,int)>&f , void* msgBlob,int len,
uint fdId, int streamId) {
this->func = f;
this->msgBlob = msgBlob;
this->len = len;
this->fdId = fdId;
this->streamId = streamId;
}
void operator()() {
higLog("%s","TaskRPCB function is executing...");
func(msgBlob,len,fdId,streamId);
}
};
class TaskECB { /* Task eventCallBack */
private:
std::function<void(void)> func;
public:
TaskECB(std::function<void(void)>&f) : func(f) {}
void operator()() {
higLog("%s","TaskECB function is executing...");
func();
}
};
typedef variant<TaskECB,TaskRPCB> taskTypes;
class Thpool {
std::atomic_bool done;
threadSafeQueue<taskTypes> workQ;
std::vector<std::thread> threads;
void workerThread() {
auto tid = std::this_thread::get_id();
std::stringstream ss;
ss << tid;
std::string s = ss.str();
while(!done) {
auto task = workQ.waitAndPop();
if(task != nullptr and !done) {
printf("%s is executing now : ",tidToTname[s].c_str());
if((*task).index() == 0) { // TODO: change 0 and 1 to enums
auto func = get<TaskECB>(*task);
func();
}else if((*task).index() == 1) {
auto func = get<TaskRPCB>(*task);
func();
}
}
}
}
public:
Thpool(): done(false) {
// unsigned const maxThreadCount = std::thread::hardware_concurrency();
unsigned const maxThreadCount = THREAD_POOL_SIZE;
printf("ThreadPool Size = %d",maxThreadCount);
/* save thread names for logging purpose */
std::vector<std::string> tnames;
for(unsigned int i = 0;i<maxThreadCount;i++) {
tnames.push_back("Thread_" + std::to_string(i+1));
}
try { /* exception might arise due to thread creation */
for(unsigned int i = 0;i<maxThreadCount;i++) {
threads.push_back(std::thread(&Thpool::workerThread,this));
/*map this ith threadID to a name Thread_i*/
auto tid = threads[i].get_id();
std::stringstream ss;
ss << tid;
tidToTname[ss.str()] = tnames[i];
}
}catch(...) {
done = true;
throw;
}
}
~Thpool() {
// done = true;
}
template<typename taskType>
void submit(taskType task) {
workQ.Push(task);
}
void deAllocatePool() {
done = true;
workQ.notifyAllThreads();
// unsigned const maxThreadCount = std::thread::hardware_concurrency();
unsigned const maxThreadCount = THREAD_POOL_SIZE;
for(unsigned int i = 0;i<maxThreadCount;) {
if(threads[i].joinable()) {
threads[i].join();
i++; /* go for the next thread in the pool */
}else {
workQ.notifyAllThreads();
}
}
}
};
/*============== Thread Pool code ends ============== */
How I use this thread_pool
?
Thpool pool;
Following code will be inserted in required functions. I am showing for the TaskRPCB
type task only.
std::function<void(void*,int,uint,int)> func = NFVInstance->CallBackTable[channel];
/* create a task object : members of it :
- the callback function
- msgBlob
- len of the msgBlob
- some other ID
- stream ID on which this message was received
*/
TaskRPCB task(func,msg,rc,fdd.id,streamId);
/* submit this task to the pool.
one of the waiting threads will pick this task
*/
pool.submit(task);
c++ multithreading c++17
$endgroup$
$begingroup$
sorry for mixing tags. I would like to have suggestions regarding thread pool implementation overall and the use of` std:: variant` ? is it ok ? or does it have some performance penalty?
$endgroup$
– Debashish
Mar 29 at 12:43
1
$begingroup$
I've edited tags to C++17 (sincestd::variant
requires that). I hope you get good reviews!
$endgroup$
– Toby Speight
Mar 29 at 14:04
1
$begingroup$
THREAD_POOL_SIZE 100
I hope you are running on some large hardware. The point os a thread pool is that you create a thread for each available processor (or very close to that) then each processor gets new work from the pool without having to switch threads (as that is a relatively expensive operation).
$endgroup$
– Martin York
Apr 1 at 18:40
add a comment |
$begingroup$
I have read the book "C++ concurrency in action" and understood the thread_pool implementation. I have changed a few things according to my project requirements.
I have used std::variant
to support heterogeneous work-queue
to store different task arriving on a epoll-event
loop. Currently In my project I have only two different types of task arrive on the epoll loop. Those are TaskECB
and TaskRPCB
. I have created classes for both of them and overloaded the operator()
#define THREAD_POOL_SIZE 100
std::map<std::string,std::string> tidToTname;
template<typename T>
class threadSafeQueue {
private:
mutable std::mutex mut;
std::queue<std::shared_ptr<T>> taskQueue; /* task as pushed here and
task are processed in FIFO
style */
std::condition_variable dataCond; /* used to protect the queue */
public:
threadSafeQueue(){}
void waitAndPop(T& value); /* wait untill task is not available in
the queue */
std::shared_ptr<T> waitAndPop();/* same but returns a shared pointer */
bool tryPop(T& value); /* does not block */
std::shared_ptr<T> tryPop(); /* does not block and returns a pointer*/
void Push(T newData);
bool Empty() const; /* check if queue is empty or not */
void notifyAllThreads(); /* notify all the waiting threads
used in Thpool decallocation */
};
template<typename T>
void threadSafeQueue<T>::notifyAllThreads() {
dataCond.notify_all();
}
template<typename T>
void threadSafeQueue<T>::waitAndPop(T& value) {
std::unique_lock<std::mutex> lk(mut);
dataCond.wait(lk,[this](){return !taskQueue.empty();});
value = std::move(*taskQueue.front());
taskQueue.pop();
}
template<typename T>
std::shared_ptr<T> threadSafeQueue<T>::waitAndPop() {
std::unique_lock<std::mutex> lk(mut);
dataCond.wait(lk,[this](){return !taskQueue.empty();});
std::shared_ptr<T> res = taskQueue.front();
taskQueue.pop();
return res;
}
template<typename T>
bool threadSafeQueue<T>::tryPop(T& value) {
std::lock_guard<std::mutex> lk(mut);
if(taskQueue.empty())
return false;
value = std::move(*taskQueue.front());
taskQueue.pop();
return true;
}
template<typename T>
std::shared_ptr<T> threadSafeQueue<T>::tryPop() {
std::lock_guard<std::mutex> lk(mut);
if(taskQueue.empty())
return std::shared_ptr<T>(); /* return nullptr */
std::shared_ptr<T> res = taskQueue.front();
taskQueue.pop();
return res;
}
template<typename T>
void threadSafeQueue<T>::Push(T newData) { /* TODO: size check before pushing */
std::shared_ptr<T> data(std::make_shared<T>(std::move(newData)));
/* construct the object before lock*/
std::lock_guard<std::mutex> lk(mut);
taskQueue.push(data);
dataCond.notify_one();
}
template<typename T>
bool threadSafeQueue<T>::Empty() const {
std::lock_guard<std::mutex> lk(mut);
return taskQueue.empty();
}
class TaskRPCB { /* Task RecvAndProcessCallbacks */
private:
int len;
uint fdId;
int streamId;
void* msgBlob;
std::function<void(void*,int,uint,int)> func;
public:
TaskRPCB(std::function<void(void*,int,uint,int)>&f , void* msgBlob,int len,
uint fdId, int streamId) {
this->func = f;
this->msgBlob = msgBlob;
this->len = len;
this->fdId = fdId;
this->streamId = streamId;
}
void operator()() {
higLog("%s","TaskRPCB function is executing...");
func(msgBlob,len,fdId,streamId);
}
};
class TaskECB { /* Task eventCallBack */
private:
std::function<void(void)> func;
public:
TaskECB(std::function<void(void)>&f) : func(f) {}
void operator()() {
higLog("%s","TaskECB function is executing...");
func();
}
};
typedef variant<TaskECB,TaskRPCB> taskTypes;
class Thpool {
std::atomic_bool done;
threadSafeQueue<taskTypes> workQ;
std::vector<std::thread> threads;
void workerThread() {
auto tid = std::this_thread::get_id();
std::stringstream ss;
ss << tid;
std::string s = ss.str();
while(!done) {
auto task = workQ.waitAndPop();
if(task != nullptr and !done) {
printf("%s is executing now : ",tidToTname[s].c_str());
if((*task).index() == 0) { // TODO: change 0 and 1 to enums
auto func = get<TaskECB>(*task);
func();
}else if((*task).index() == 1) {
auto func = get<TaskRPCB>(*task);
func();
}
}
}
}
public:
Thpool(): done(false) {
// unsigned const maxThreadCount = std::thread::hardware_concurrency();
unsigned const maxThreadCount = THREAD_POOL_SIZE;
printf("ThreadPool Size = %d",maxThreadCount);
/* save thread names for logging purpose */
std::vector<std::string> tnames;
for(unsigned int i = 0;i<maxThreadCount;i++) {
tnames.push_back("Thread_" + std::to_string(i+1));
}
try { /* exception might arise due to thread creation */
for(unsigned int i = 0;i<maxThreadCount;i++) {
threads.push_back(std::thread(&Thpool::workerThread,this));
/*map this ith threadID to a name Thread_i*/
auto tid = threads[i].get_id();
std::stringstream ss;
ss << tid;
tidToTname[ss.str()] = tnames[i];
}
}catch(...) {
done = true;
throw;
}
}
~Thpool() {
// done = true;
}
template<typename taskType>
void submit(taskType task) {
workQ.Push(task);
}
void deAllocatePool() {
done = true;
workQ.notifyAllThreads();
// unsigned const maxThreadCount = std::thread::hardware_concurrency();
unsigned const maxThreadCount = THREAD_POOL_SIZE;
for(unsigned int i = 0;i<maxThreadCount;) {
if(threads[i].joinable()) {
threads[i].join();
i++; /* go for the next thread in the pool */
}else {
workQ.notifyAllThreads();
}
}
}
};
/*============== Thread Pool code ends ============== */
How I use this thread_pool
?
Thpool pool;
Following code will be inserted in required functions. I am showing for the TaskRPCB
type task only.
std::function<void(void*,int,uint,int)> func = NFVInstance->CallBackTable[channel];
/* create a task object : members of it :
- the callback function
- msgBlob
- len of the msgBlob
- some other ID
- stream ID on which this message was received
*/
TaskRPCB task(func,msg,rc,fdd.id,streamId);
/* submit this task to the pool.
one of the waiting threads will pick this task
*/
pool.submit(task);
c++ multithreading c++17
$endgroup$
I have read the book "C++ concurrency in action" and understood the thread_pool implementation. I have changed a few things according to my project requirements.
I have used std::variant
to support heterogeneous work-queue
to store different task arriving on a epoll-event
loop. Currently In my project I have only two different types of task arrive on the epoll loop. Those are TaskECB
and TaskRPCB
. I have created classes for both of them and overloaded the operator()
#define THREAD_POOL_SIZE 100
std::map<std::string,std::string> tidToTname;
template<typename T>
class threadSafeQueue {
private:
mutable std::mutex mut;
std::queue<std::shared_ptr<T>> taskQueue; /* task as pushed here and
task are processed in FIFO
style */
std::condition_variable dataCond; /* used to protect the queue */
public:
threadSafeQueue(){}
void waitAndPop(T& value); /* wait untill task is not available in
the queue */
std::shared_ptr<T> waitAndPop();/* same but returns a shared pointer */
bool tryPop(T& value); /* does not block */
std::shared_ptr<T> tryPop(); /* does not block and returns a pointer*/
void Push(T newData);
bool Empty() const; /* check if queue is empty or not */
void notifyAllThreads(); /* notify all the waiting threads
used in Thpool decallocation */
};
template<typename T>
void threadSafeQueue<T>::notifyAllThreads() {
dataCond.notify_all();
}
template<typename T>
void threadSafeQueue<T>::waitAndPop(T& value) {
std::unique_lock<std::mutex> lk(mut);
dataCond.wait(lk,[this](){return !taskQueue.empty();});
value = std::move(*taskQueue.front());
taskQueue.pop();
}
template<typename T>
std::shared_ptr<T> threadSafeQueue<T>::waitAndPop() {
std::unique_lock<std::mutex> lk(mut);
dataCond.wait(lk,[this](){return !taskQueue.empty();});
std::shared_ptr<T> res = taskQueue.front();
taskQueue.pop();
return res;
}
template<typename T>
bool threadSafeQueue<T>::tryPop(T& value) {
std::lock_guard<std::mutex> lk(mut);
if(taskQueue.empty())
return false;
value = std::move(*taskQueue.front());
taskQueue.pop();
return true;
}
template<typename T>
std::shared_ptr<T> threadSafeQueue<T>::tryPop() {
std::lock_guard<std::mutex> lk(mut);
if(taskQueue.empty())
return std::shared_ptr<T>(); /* return nullptr */
std::shared_ptr<T> res = taskQueue.front();
taskQueue.pop();
return res;
}
template<typename T>
void threadSafeQueue<T>::Push(T newData) { /* TODO: size check before pushing */
std::shared_ptr<T> data(std::make_shared<T>(std::move(newData)));
/* construct the object before lock*/
std::lock_guard<std::mutex> lk(mut);
taskQueue.push(data);
dataCond.notify_one();
}
template<typename T>
bool threadSafeQueue<T>::Empty() const {
std::lock_guard<std::mutex> lk(mut);
return taskQueue.empty();
}
class TaskRPCB { /* Task RecvAndProcessCallbacks */
private:
int len;
uint fdId;
int streamId;
void* msgBlob;
std::function<void(void*,int,uint,int)> func;
public:
TaskRPCB(std::function<void(void*,int,uint,int)>&f , void* msgBlob,int len,
uint fdId, int streamId) {
this->func = f;
this->msgBlob = msgBlob;
this->len = len;
this->fdId = fdId;
this->streamId = streamId;
}
void operator()() {
higLog("%s","TaskRPCB function is executing...");
func(msgBlob,len,fdId,streamId);
}
};
class TaskECB { /* Task eventCallBack */
private:
std::function<void(void)> func;
public:
TaskECB(std::function<void(void)>&f) : func(f) {}
void operator()() {
higLog("%s","TaskECB function is executing...");
func();
}
};
typedef variant<TaskECB,TaskRPCB> taskTypes;
class Thpool {
std::atomic_bool done;
threadSafeQueue<taskTypes> workQ;
std::vector<std::thread> threads;
void workerThread() {
auto tid = std::this_thread::get_id();
std::stringstream ss;
ss << tid;
std::string s = ss.str();
while(!done) {
auto task = workQ.waitAndPop();
if(task != nullptr and !done) {
printf("%s is executing now : ",tidToTname[s].c_str());
if((*task).index() == 0) { // TODO: change 0 and 1 to enums
auto func = get<TaskECB>(*task);
func();
}else if((*task).index() == 1) {
auto func = get<TaskRPCB>(*task);
func();
}
}
}
}
public:
Thpool(): done(false) {
// unsigned const maxThreadCount = std::thread::hardware_concurrency();
unsigned const maxThreadCount = THREAD_POOL_SIZE;
printf("ThreadPool Size = %d",maxThreadCount);
/* save thread names for logging purpose */
std::vector<std::string> tnames;
for(unsigned int i = 0;i<maxThreadCount;i++) {
tnames.push_back("Thread_" + std::to_string(i+1));
}
try { /* exception might arise due to thread creation */
for(unsigned int i = 0;i<maxThreadCount;i++) {
threads.push_back(std::thread(&Thpool::workerThread,this));
/*map this ith threadID to a name Thread_i*/
auto tid = threads[i].get_id();
std::stringstream ss;
ss << tid;
tidToTname[ss.str()] = tnames[i];
}
}catch(...) {
done = true;
throw;
}
}
~Thpool() {
// done = true;
}
template<typename taskType>
void submit(taskType task) {
workQ.Push(task);
}
void deAllocatePool() {
done = true;
workQ.notifyAllThreads();
// unsigned const maxThreadCount = std::thread::hardware_concurrency();
unsigned const maxThreadCount = THREAD_POOL_SIZE;
for(unsigned int i = 0;i<maxThreadCount;) {
if(threads[i].joinable()) {
threads[i].join();
i++; /* go for the next thread in the pool */
}else {
workQ.notifyAllThreads();
}
}
}
};
/*============== Thread Pool code ends ============== */
How I use this thread_pool
?
Thpool pool;
Following code will be inserted in required functions. I am showing for the TaskRPCB
type task only.
std::function<void(void*,int,uint,int)> func = NFVInstance->CallBackTable[channel];
/* create a task object : members of it :
- the callback function
- msgBlob
- len of the msgBlob
- some other ID
- stream ID on which this message was received
*/
TaskRPCB task(func,msg,rc,fdd.id,streamId);
/* submit this task to the pool.
one of the waiting threads will pick this task
*/
pool.submit(task);
c++ multithreading c++17
c++ multithreading c++17
edited Mar 29 at 14:44
Debashish
asked Mar 29 at 10:30
DebashishDebashish
1113
1113
$begingroup$
sorry for mixing tags. I would like to have suggestions regarding thread pool implementation overall and the use of` std:: variant` ? is it ok ? or does it have some performance penalty?
$endgroup$
– Debashish
Mar 29 at 12:43
1
$begingroup$
I've edited tags to C++17 (sincestd::variant
requires that). I hope you get good reviews!
$endgroup$
– Toby Speight
Mar 29 at 14:04
1
$begingroup$
THREAD_POOL_SIZE 100
I hope you are running on some large hardware. The point os a thread pool is that you create a thread for each available processor (or very close to that) then each processor gets new work from the pool without having to switch threads (as that is a relatively expensive operation).
$endgroup$
– Martin York
Apr 1 at 18:40
add a comment |
$begingroup$
sorry for mixing tags. I would like to have suggestions regarding thread pool implementation overall and the use of` std:: variant` ? is it ok ? or does it have some performance penalty?
$endgroup$
– Debashish
Mar 29 at 12:43
1
$begingroup$
I've edited tags to C++17 (sincestd::variant
requires that). I hope you get good reviews!
$endgroup$
– Toby Speight
Mar 29 at 14:04
1
$begingroup$
THREAD_POOL_SIZE 100
I hope you are running on some large hardware. The point os a thread pool is that you create a thread for each available processor (or very close to that) then each processor gets new work from the pool without having to switch threads (as that is a relatively expensive operation).
$endgroup$
– Martin York
Apr 1 at 18:40
$begingroup$
sorry for mixing tags. I would like to have suggestions regarding thread pool implementation overall and the use of` std:: variant` ? is it ok ? or does it have some performance penalty?
$endgroup$
– Debashish
Mar 29 at 12:43
$begingroup$
sorry for mixing tags. I would like to have suggestions regarding thread pool implementation overall and the use of` std:: variant` ? is it ok ? or does it have some performance penalty?
$endgroup$
– Debashish
Mar 29 at 12:43
1
1
$begingroup$
I've edited tags to C++17 (since
std::variant
requires that). I hope you get good reviews!$endgroup$
– Toby Speight
Mar 29 at 14:04
$begingroup$
I've edited tags to C++17 (since
std::variant
requires that). I hope you get good reviews!$endgroup$
– Toby Speight
Mar 29 at 14:04
1
1
$begingroup$
THREAD_POOL_SIZE 100
I hope you are running on some large hardware. The point os a thread pool is that you create a thread for each available processor (or very close to that) then each processor gets new work from the pool without having to switch threads (as that is a relatively expensive operation).$endgroup$
– Martin York
Apr 1 at 18:40
$begingroup$
THREAD_POOL_SIZE 100
I hope you are running on some large hardware. The point os a thread pool is that you create a thread for each available processor (or very close to that) then each processor gets new work from the pool without having to switch threads (as that is a relatively expensive operation).$endgroup$
– Martin York
Apr 1 at 18:40
add a comment |
2 Answers
2
active
oldest
votes
$begingroup$
Naming nits:
Why
threadSafeQueue
instead ofThreadSafeQueue
? You CamelCaps all your other class names. Why not this one?class TaskECB { /* Task eventCallBack */
is a very verbose way of writingclass TaskEventCallback {
. If you have to spell it out in a comment anyway, just spell it out in the code. Your readers will thank you.bool threadSafeQueue<T>::Empty() const
: Since you're diverging from the traditional STL naming convention anyway (Empty
is notempty
), I recommend prefixing boolean accessors withIs
, as in,myQueue.IsEmpty()
. This way you don't confuse it with the verb "to empty," as in, "this function empties the queue." Orthogonally, you might mark this function[[nodiscard]]
just to emphasize that it has no side effects.deAllocatePool()
would more traditionally be spelleddeallocatePool()
. "Deallocate" is a single word in English.
template<typename T>
void threadSafeQueue<T>::Push(T newData) { /* TODO: size check before pushing */
std::shared_ptr<T> data(std::make_shared<T>(std::move(newData)));
/* construct the object before lock*/
std::lock_guard<std::mutex> lk(mut);
taskQueue.push(data);
dataCond.notify_one();
}
Personally, I would simplify this to
template<class T>
void threadSafeQueue<T>::Push(T newData) { /* TODO: size check */
auto data = std::make_shared<T>(std::move(newData));
std::lock_guard<std::mutex> lk(mut);
taskQueue.push(std::move(data));
dataCond.notify_one();
}
Notice the use of =
for initialization — it helps distinguish bool foo(int)
from bool foo(true)
, and helps readability in general. I also put a std::move
on the push, so that we're not unnecessarily copying the shared_ptr
and incurring an extra atomic increment and decrement of the refcount. (No big deal.) Notice that we are still incurring an extra call to T
's move-constructor; we might want to take T
by reference here, in which case we might want two versions — one that takes const T&
and one that takes T&&
.
In fact, we might want to cut out the middleman entirely:
template<class T>
template<class... Args>
void threadSafeQueue<T>::Emplace(Args&&... args) { /* TODO: size check */
auto data = std::make_shared<T>(std::forward<Args>(args)...);
std::lock_guard<std::mutex> lk(mut);
taskQueue.push(std::move(data));
dataCond.notify_one();
}
typedef variant<TaskECB,TaskRPCB> taskTypes;
You're missing a std::
there. Also, isn't it weird to have a single type named taskTypes
(plural)? When I see the name "taskTypes", I expect to see multiple types — like a parameter pack or something. Here I think this type alias wants to be just a taskType
or taskVariant
, singular.
auto tid = std::this_thread::get_id();
std::stringstream ss;
ss << tid;
std::string s = ss.str();
Yuck. In an ideal world, std::to_string(std::this_thread::get_id())
would Do The Right Thing; but we don't live in an ideal world.
However, why are you using std::string
as your map key, anyway? Why not just define
std::map<std::thread::id, std::string> tidToTname;
and skip the expensive stringification?
Then, instead of
printf("%s is executing now : ",tidToTname[s].c_str());
you would write simply
printf("%s is executing now : ",tidToTname.at(tid).c_str());
(notice that we no longer risk modifying tidToTname
accidentally!), and instead of
tidToTname[ss.str()] = tnames[i];
you'd write simply
tidToTname.insert_or_assign(tid, tnames[i]);
(Still beware: using emplace
instead of insert_or_assign
will still compile, but it will do the wrong thing if the key is already present! The STL's map
is a very tricky beast. You have to be careful with it.)
if((*task).index() == 0) { // TODO: change 0 and 1 to enums
auto func = get<TaskECB>(*task);
func();
}else if((*task).index() == 1) {
auto func = get<TaskRPCB>(*task);
func();
}
First of all, (*task).index()
is traditionally spelled task->index()
, and I strongly recommend that you do so. Nested parentheses make things hard to read. That's why the ->
operator was added to C back in the '70s! (Probably late '60s, actually. Maybe earlier.)
Second, this is not a typical way to interact with std::variant
. The library really intends you to interact with it like this:
std::visit([](auto& callback) {
callback();
}, *task);
If you want to preserve your inefficient copying, just change auto&
to auto
.
Really, IMO, it should be const auto&
; but in order to make that work, you'll have to make your callback
types const-callable. Right now their operator()
s are non-const member functions:
void operator()() /* NO CONST HERE -- INTENTIONAL? */ {
higLog("%s","TaskECB function is executing...");
func();
}
If you're allergic to visit
— which you shouldn't be! — but if you are, then a slightly more idiomatic way to write your chain of if
s would be
if (auto *func = std::get_if<TaskECB>(task.get())) {
(*func)();
} else if (auto *func = std::get_if<TaskRPCB>(task.get())) {
(*func)();
}
Having to use task.get()
to get a raw pointer, instead of just task
or *task
, definitely isn't ideal API design on the STL's part. But again, the ideal solution is to just use std::visit
! You should use std::visit
.
I didn't check the multithreading parts. Odds are, there are bugs. Multithreaded code always has at least one bug. :)
$endgroup$
add a comment |
$begingroup$
Some additions to the review from @Quuxpluson.
With regard to threadsafety, i'd be concerned that you are not calling deAllocatePool
in the destructor. It's undefined behavior to call a destructor on std::mutex
when the mutex is locked. It is also a bad thing to call the destructor on std::thread
when it is considered joinable
. Both of these may happen if you destruct your threadpool without clearing out the work queue.
Looking further at your deAllocatePool
, I think that you could get into cases where you can't end a thread. You are setting done
but if the queue is empty, the thread could be waiting in waitAndPop()
. While the notifyAllThreads
will cause it to wake the lambda expression that you are using in your wait
statement
dataCond.wait(lk,[this](){return !taskQueue.empty();});
would immediately go back into a wait state after awoken. If you manage to call join()
on this kind of thread your deallocatePool
would hang. I think is that checking for an abort condition inside the wait would mitigate that. If this hasn't happened yet there could be a variety of issues, i could be wrong, your use pattern has prevented this, or just pure coincidence ...
Both of your task types could be reduced to a functor with the type std::function<void(void)>
. You could use either lambdas or std::bind
to enclose the state information, this would reduce the complexity inside your threadpool.
$endgroup$
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
return StackExchange.using("mathjaxEditing", function () {
StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix) {
StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
});
});
}, "mathjax-editing");
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "196"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: false,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: null,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f216467%2fc-thread-pool-with-heterogeneous-work-queue%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
$begingroup$
Naming nits:
Why
threadSafeQueue
instead ofThreadSafeQueue
? You CamelCaps all your other class names. Why not this one?class TaskECB { /* Task eventCallBack */
is a very verbose way of writingclass TaskEventCallback {
. If you have to spell it out in a comment anyway, just spell it out in the code. Your readers will thank you.bool threadSafeQueue<T>::Empty() const
: Since you're diverging from the traditional STL naming convention anyway (Empty
is notempty
), I recommend prefixing boolean accessors withIs
, as in,myQueue.IsEmpty()
. This way you don't confuse it with the verb "to empty," as in, "this function empties the queue." Orthogonally, you might mark this function[[nodiscard]]
just to emphasize that it has no side effects.deAllocatePool()
would more traditionally be spelleddeallocatePool()
. "Deallocate" is a single word in English.
template<typename T>
void threadSafeQueue<T>::Push(T newData) { /* TODO: size check before pushing */
std::shared_ptr<T> data(std::make_shared<T>(std::move(newData)));
/* construct the object before lock*/
std::lock_guard<std::mutex> lk(mut);
taskQueue.push(data);
dataCond.notify_one();
}
Personally, I would simplify this to
template<class T>
void threadSafeQueue<T>::Push(T newData) { /* TODO: size check */
auto data = std::make_shared<T>(std::move(newData));
std::lock_guard<std::mutex> lk(mut);
taskQueue.push(std::move(data));
dataCond.notify_one();
}
Notice the use of =
for initialization — it helps distinguish bool foo(int)
from bool foo(true)
, and helps readability in general. I also put a std::move
on the push, so that we're not unnecessarily copying the shared_ptr
and incurring an extra atomic increment and decrement of the refcount. (No big deal.) Notice that we are still incurring an extra call to T
's move-constructor; we might want to take T
by reference here, in which case we might want two versions — one that takes const T&
and one that takes T&&
.
In fact, we might want to cut out the middleman entirely:
template<class T>
template<class... Args>
void threadSafeQueue<T>::Emplace(Args&&... args) { /* TODO: size check */
auto data = std::make_shared<T>(std::forward<Args>(args)...);
std::lock_guard<std::mutex> lk(mut);
taskQueue.push(std::move(data));
dataCond.notify_one();
}
typedef variant<TaskECB,TaskRPCB> taskTypes;
You're missing a std::
there. Also, isn't it weird to have a single type named taskTypes
(plural)? When I see the name "taskTypes", I expect to see multiple types — like a parameter pack or something. Here I think this type alias wants to be just a taskType
or taskVariant
, singular.
auto tid = std::this_thread::get_id();
std::stringstream ss;
ss << tid;
std::string s = ss.str();
Yuck. In an ideal world, std::to_string(std::this_thread::get_id())
would Do The Right Thing; but we don't live in an ideal world.
However, why are you using std::string
as your map key, anyway? Why not just define
std::map<std::thread::id, std::string> tidToTname;
and skip the expensive stringification?
Then, instead of
printf("%s is executing now : ",tidToTname[s].c_str());
you would write simply
printf("%s is executing now : ",tidToTname.at(tid).c_str());
(notice that we no longer risk modifying tidToTname
accidentally!), and instead of
tidToTname[ss.str()] = tnames[i];
you'd write simply
tidToTname.insert_or_assign(tid, tnames[i]);
(Still beware: using emplace
instead of insert_or_assign
will still compile, but it will do the wrong thing if the key is already present! The STL's map
is a very tricky beast. You have to be careful with it.)
if((*task).index() == 0) { // TODO: change 0 and 1 to enums
auto func = get<TaskECB>(*task);
func();
}else if((*task).index() == 1) {
auto func = get<TaskRPCB>(*task);
func();
}
First of all, (*task).index()
is traditionally spelled task->index()
, and I strongly recommend that you do so. Nested parentheses make things hard to read. That's why the ->
operator was added to C back in the '70s! (Probably late '60s, actually. Maybe earlier.)
Second, this is not a typical way to interact with std::variant
. The library really intends you to interact with it like this:
std::visit([](auto& callback) {
callback();
}, *task);
If you want to preserve your inefficient copying, just change auto&
to auto
.
Really, IMO, it should be const auto&
; but in order to make that work, you'll have to make your callback
types const-callable. Right now their operator()
s are non-const member functions:
void operator()() /* NO CONST HERE -- INTENTIONAL? */ {
higLog("%s","TaskECB function is executing...");
func();
}
If you're allergic to visit
— which you shouldn't be! — but if you are, then a slightly more idiomatic way to write your chain of if
s would be
if (auto *func = std::get_if<TaskECB>(task.get())) {
(*func)();
} else if (auto *func = std::get_if<TaskRPCB>(task.get())) {
(*func)();
}
Having to use task.get()
to get a raw pointer, instead of just task
or *task
, definitely isn't ideal API design on the STL's part. But again, the ideal solution is to just use std::visit
! You should use std::visit
.
I didn't check the multithreading parts. Odds are, there are bugs. Multithreaded code always has at least one bug. :)
$endgroup$
add a comment |
$begingroup$
Naming nits:
Why
threadSafeQueue
instead ofThreadSafeQueue
? You CamelCaps all your other class names. Why not this one?class TaskECB { /* Task eventCallBack */
is a very verbose way of writingclass TaskEventCallback {
. If you have to spell it out in a comment anyway, just spell it out in the code. Your readers will thank you.bool threadSafeQueue<T>::Empty() const
: Since you're diverging from the traditional STL naming convention anyway (Empty
is notempty
), I recommend prefixing boolean accessors withIs
, as in,myQueue.IsEmpty()
. This way you don't confuse it with the verb "to empty," as in, "this function empties the queue." Orthogonally, you might mark this function[[nodiscard]]
just to emphasize that it has no side effects.deAllocatePool()
would more traditionally be spelleddeallocatePool()
. "Deallocate" is a single word in English.
template<typename T>
void threadSafeQueue<T>::Push(T newData) { /* TODO: size check before pushing */
std::shared_ptr<T> data(std::make_shared<T>(std::move(newData)));
/* construct the object before lock*/
std::lock_guard<std::mutex> lk(mut);
taskQueue.push(data);
dataCond.notify_one();
}
Personally, I would simplify this to
template<class T>
void threadSafeQueue<T>::Push(T newData) { /* TODO: size check */
auto data = std::make_shared<T>(std::move(newData));
std::lock_guard<std::mutex> lk(mut);
taskQueue.push(std::move(data));
dataCond.notify_one();
}
Notice the use of =
for initialization — it helps distinguish bool foo(int)
from bool foo(true)
, and helps readability in general. I also put a std::move
on the push, so that we're not unnecessarily copying the shared_ptr
and incurring an extra atomic increment and decrement of the refcount. (No big deal.) Notice that we are still incurring an extra call to T
's move-constructor; we might want to take T
by reference here, in which case we might want two versions — one that takes const T&
and one that takes T&&
.
In fact, we might want to cut out the middleman entirely:
template<class T>
template<class... Args>
void threadSafeQueue<T>::Emplace(Args&&... args) { /* TODO: size check */
auto data = std::make_shared<T>(std::forward<Args>(args)...);
std::lock_guard<std::mutex> lk(mut);
taskQueue.push(std::move(data));
dataCond.notify_one();
}
typedef variant<TaskECB,TaskRPCB> taskTypes;
You're missing a std::
there. Also, isn't it weird to have a single type named taskTypes
(plural)? When I see the name "taskTypes", I expect to see multiple types — like a parameter pack or something. Here I think this type alias wants to be just a taskType
or taskVariant
, singular.
auto tid = std::this_thread::get_id();
std::stringstream ss;
ss << tid;
std::string s = ss.str();
Yuck. In an ideal world, std::to_string(std::this_thread::get_id())
would Do The Right Thing; but we don't live in an ideal world.
However, why are you using std::string
as your map key, anyway? Why not just define
std::map<std::thread::id, std::string> tidToTname;
and skip the expensive stringification?
Then, instead of
printf("%s is executing now : ",tidToTname[s].c_str());
you would write simply
printf("%s is executing now : ",tidToTname.at(tid).c_str());
(notice that we no longer risk modifying tidToTname
accidentally!), and instead of
tidToTname[ss.str()] = tnames[i];
you'd write simply
tidToTname.insert_or_assign(tid, tnames[i]);
(Still beware: using emplace
instead of insert_or_assign
will still compile, but it will do the wrong thing if the key is already present! The STL's map
is a very tricky beast. You have to be careful with it.)
if((*task).index() == 0) { // TODO: change 0 and 1 to enums
auto func = get<TaskECB>(*task);
func();
}else if((*task).index() == 1) {
auto func = get<TaskRPCB>(*task);
func();
}
First of all, (*task).index()
is traditionally spelled task->index()
, and I strongly recommend that you do so. Nested parentheses make things hard to read. That's why the ->
operator was added to C back in the '70s! (Probably late '60s, actually. Maybe earlier.)
Second, this is not a typical way to interact with std::variant
. The library really intends you to interact with it like this:
std::visit([](auto& callback) {
callback();
}, *task);
If you want to preserve your inefficient copying, just change auto&
to auto
.
Really, IMO, it should be const auto&
; but in order to make that work, you'll have to make your callback
types const-callable. Right now their operator()
s are non-const member functions:
void operator()() /* NO CONST HERE -- INTENTIONAL? */ {
higLog("%s","TaskECB function is executing...");
func();
}
If you're allergic to visit
— which you shouldn't be! — but if you are, then a slightly more idiomatic way to write your chain of if
s would be
if (auto *func = std::get_if<TaskECB>(task.get())) {
(*func)();
} else if (auto *func = std::get_if<TaskRPCB>(task.get())) {
(*func)();
}
Having to use task.get()
to get a raw pointer, instead of just task
or *task
, definitely isn't ideal API design on the STL's part. But again, the ideal solution is to just use std::visit
! You should use std::visit
.
I didn't check the multithreading parts. Odds are, there are bugs. Multithreaded code always has at least one bug. :)
$endgroup$
add a comment |
$begingroup$
Naming nits:
Why
threadSafeQueue
instead ofThreadSafeQueue
? You CamelCaps all your other class names. Why not this one?class TaskECB { /* Task eventCallBack */
is a very verbose way of writingclass TaskEventCallback {
. If you have to spell it out in a comment anyway, just spell it out in the code. Your readers will thank you.bool threadSafeQueue<T>::Empty() const
: Since you're diverging from the traditional STL naming convention anyway (Empty
is notempty
), I recommend prefixing boolean accessors withIs
, as in,myQueue.IsEmpty()
. This way you don't confuse it with the verb "to empty," as in, "this function empties the queue." Orthogonally, you might mark this function[[nodiscard]]
just to emphasize that it has no side effects.deAllocatePool()
would more traditionally be spelleddeallocatePool()
. "Deallocate" is a single word in English.
template<typename T>
void threadSafeQueue<T>::Push(T newData) { /* TODO: size check before pushing */
std::shared_ptr<T> data(std::make_shared<T>(std::move(newData)));
/* construct the object before lock*/
std::lock_guard<std::mutex> lk(mut);
taskQueue.push(data);
dataCond.notify_one();
}
Personally, I would simplify this to
template<class T>
void threadSafeQueue<T>::Push(T newData) { /* TODO: size check */
auto data = std::make_shared<T>(std::move(newData));
std::lock_guard<std::mutex> lk(mut);
taskQueue.push(std::move(data));
dataCond.notify_one();
}
Notice the use of =
for initialization — it helps distinguish bool foo(int)
from bool foo(true)
, and helps readability in general. I also put a std::move
on the push, so that we're not unnecessarily copying the shared_ptr
and incurring an extra atomic increment and decrement of the refcount. (No big deal.) Notice that we are still incurring an extra call to T
's move-constructor; we might want to take T
by reference here, in which case we might want two versions — one that takes const T&
and one that takes T&&
.
In fact, we might want to cut out the middleman entirely:
template<class T>
template<class... Args>
void threadSafeQueue<T>::Emplace(Args&&... args) { /* TODO: size check */
auto data = std::make_shared<T>(std::forward<Args>(args)...);
std::lock_guard<std::mutex> lk(mut);
taskQueue.push(std::move(data));
dataCond.notify_one();
}
typedef variant<TaskECB,TaskRPCB> taskTypes;
You're missing a std::
there. Also, isn't it weird to have a single type named taskTypes
(plural)? When I see the name "taskTypes", I expect to see multiple types — like a parameter pack or something. Here I think this type alias wants to be just a taskType
or taskVariant
, singular.
auto tid = std::this_thread::get_id();
std::stringstream ss;
ss << tid;
std::string s = ss.str();
Yuck. In an ideal world, std::to_string(std::this_thread::get_id())
would Do The Right Thing; but we don't live in an ideal world.
However, why are you using std::string
as your map key, anyway? Why not just define
std::map<std::thread::id, std::string> tidToTname;
and skip the expensive stringification?
Then, instead of
printf("%s is executing now : ",tidToTname[s].c_str());
you would write simply
printf("%s is executing now : ",tidToTname.at(tid).c_str());
(notice that we no longer risk modifying tidToTname
accidentally!), and instead of
tidToTname[ss.str()] = tnames[i];
you'd write simply
tidToTname.insert_or_assign(tid, tnames[i]);
(Still beware: using emplace
instead of insert_or_assign
will still compile, but it will do the wrong thing if the key is already present! The STL's map
is a very tricky beast. You have to be careful with it.)
if((*task).index() == 0) { // TODO: change 0 and 1 to enums
auto func = get<TaskECB>(*task);
func();
}else if((*task).index() == 1) {
auto func = get<TaskRPCB>(*task);
func();
}
First of all, (*task).index()
is traditionally spelled task->index()
, and I strongly recommend that you do so. Nested parentheses make things hard to read. That's why the ->
operator was added to C back in the '70s! (Probably late '60s, actually. Maybe earlier.)
Second, this is not a typical way to interact with std::variant
. The library really intends you to interact with it like this:
std::visit([](auto& callback) {
callback();
}, *task);
If you want to preserve your inefficient copying, just change auto&
to auto
.
Really, IMO, it should be const auto&
; but in order to make that work, you'll have to make your callback
types const-callable. Right now their operator()
s are non-const member functions:
void operator()() /* NO CONST HERE -- INTENTIONAL? */ {
higLog("%s","TaskECB function is executing...");
func();
}
If you're allergic to visit
— which you shouldn't be! — but if you are, then a slightly more idiomatic way to write your chain of if
s would be
if (auto *func = std::get_if<TaskECB>(task.get())) {
(*func)();
} else if (auto *func = std::get_if<TaskRPCB>(task.get())) {
(*func)();
}
Having to use task.get()
to get a raw pointer, instead of just task
or *task
, definitely isn't ideal API design on the STL's part. But again, the ideal solution is to just use std::visit
! You should use std::visit
.
I didn't check the multithreading parts. Odds are, there are bugs. Multithreaded code always has at least one bug. :)
$endgroup$
Naming nits:
Why
threadSafeQueue
instead ofThreadSafeQueue
? You CamelCaps all your other class names. Why not this one?class TaskECB { /* Task eventCallBack */
is a very verbose way of writingclass TaskEventCallback {
. If you have to spell it out in a comment anyway, just spell it out in the code. Your readers will thank you.bool threadSafeQueue<T>::Empty() const
: Since you're diverging from the traditional STL naming convention anyway (Empty
is notempty
), I recommend prefixing boolean accessors withIs
, as in,myQueue.IsEmpty()
. This way you don't confuse it with the verb "to empty," as in, "this function empties the queue." Orthogonally, you might mark this function[[nodiscard]]
just to emphasize that it has no side effects.deAllocatePool()
would more traditionally be spelleddeallocatePool()
. "Deallocate" is a single word in English.
template<typename T>
void threadSafeQueue<T>::Push(T newData) { /* TODO: size check before pushing */
std::shared_ptr<T> data(std::make_shared<T>(std::move(newData)));
/* construct the object before lock*/
std::lock_guard<std::mutex> lk(mut);
taskQueue.push(data);
dataCond.notify_one();
}
Personally, I would simplify this to
template<class T>
void threadSafeQueue<T>::Push(T newData) { /* TODO: size check */
auto data = std::make_shared<T>(std::move(newData));
std::lock_guard<std::mutex> lk(mut);
taskQueue.push(std::move(data));
dataCond.notify_one();
}
Notice the use of =
for initialization — it helps distinguish bool foo(int)
from bool foo(true)
, and helps readability in general. I also put a std::move
on the push, so that we're not unnecessarily copying the shared_ptr
and incurring an extra atomic increment and decrement of the refcount. (No big deal.) Notice that we are still incurring an extra call to T
's move-constructor; we might want to take T
by reference here, in which case we might want two versions — one that takes const T&
and one that takes T&&
.
In fact, we might want to cut out the middleman entirely:
template<class T>
template<class... Args>
void threadSafeQueue<T>::Emplace(Args&&... args) { /* TODO: size check */
auto data = std::make_shared<T>(std::forward<Args>(args)...);
std::lock_guard<std::mutex> lk(mut);
taskQueue.push(std::move(data));
dataCond.notify_one();
}
typedef variant<TaskECB,TaskRPCB> taskTypes;
You're missing a std::
there. Also, isn't it weird to have a single type named taskTypes
(plural)? When I see the name "taskTypes", I expect to see multiple types — like a parameter pack or something. Here I think this type alias wants to be just a taskType
or taskVariant
, singular.
auto tid = std::this_thread::get_id();
std::stringstream ss;
ss << tid;
std::string s = ss.str();
Yuck. In an ideal world, std::to_string(std::this_thread::get_id())
would Do The Right Thing; but we don't live in an ideal world.
However, why are you using std::string
as your map key, anyway? Why not just define
std::map<std::thread::id, std::string> tidToTname;
and skip the expensive stringification?
Then, instead of
printf("%s is executing now : ",tidToTname[s].c_str());
you would write simply
printf("%s is executing now : ",tidToTname.at(tid).c_str());
(notice that we no longer risk modifying tidToTname
accidentally!), and instead of
tidToTname[ss.str()] = tnames[i];
you'd write simply
tidToTname.insert_or_assign(tid, tnames[i]);
(Still beware: using emplace
instead of insert_or_assign
will still compile, but it will do the wrong thing if the key is already present! The STL's map
is a very tricky beast. You have to be careful with it.)
if((*task).index() == 0) { // TODO: change 0 and 1 to enums
auto func = get<TaskECB>(*task);
func();
}else if((*task).index() == 1) {
auto func = get<TaskRPCB>(*task);
func();
}
First of all, (*task).index()
is traditionally spelled task->index()
, and I strongly recommend that you do so. Nested parentheses make things hard to read. That's why the ->
operator was added to C back in the '70s! (Probably late '60s, actually. Maybe earlier.)
Second, this is not a typical way to interact with std::variant
. The library really intends you to interact with it like this:
std::visit([](auto& callback) {
callback();
}, *task);
If you want to preserve your inefficient copying, just change auto&
to auto
.
Really, IMO, it should be const auto&
; but in order to make that work, you'll have to make your callback
types const-callable. Right now their operator()
s are non-const member functions:
void operator()() /* NO CONST HERE -- INTENTIONAL? */ {
higLog("%s","TaskECB function is executing...");
func();
}
If you're allergic to visit
— which you shouldn't be! — but if you are, then a slightly more idiomatic way to write your chain of if
s would be
if (auto *func = std::get_if<TaskECB>(task.get())) {
(*func)();
} else if (auto *func = std::get_if<TaskRPCB>(task.get())) {
(*func)();
}
Having to use task.get()
to get a raw pointer, instead of just task
or *task
, definitely isn't ideal API design on the STL's part. But again, the ideal solution is to just use std::visit
! You should use std::visit
.
I didn't check the multithreading parts. Odds are, there are bugs. Multithreaded code always has at least one bug. :)
answered Apr 1 at 19:53
QuuxplusoneQuuxplusone
13.3k12165
13.3k12165
add a comment |
add a comment |
$begingroup$
Some additions to the review from @Quuxpluson.
With regard to threadsafety, i'd be concerned that you are not calling deAllocatePool
in the destructor. It's undefined behavior to call a destructor on std::mutex
when the mutex is locked. It is also a bad thing to call the destructor on std::thread
when it is considered joinable
. Both of these may happen if you destruct your threadpool without clearing out the work queue.
Looking further at your deAllocatePool
, I think that you could get into cases where you can't end a thread. You are setting done
but if the queue is empty, the thread could be waiting in waitAndPop()
. While the notifyAllThreads
will cause it to wake the lambda expression that you are using in your wait
statement
dataCond.wait(lk,[this](){return !taskQueue.empty();});
would immediately go back into a wait state after awoken. If you manage to call join()
on this kind of thread your deallocatePool
would hang. I think is that checking for an abort condition inside the wait would mitigate that. If this hasn't happened yet there could be a variety of issues, i could be wrong, your use pattern has prevented this, or just pure coincidence ...
Both of your task types could be reduced to a functor with the type std::function<void(void)>
. You could use either lambdas or std::bind
to enclose the state information, this would reduce the complexity inside your threadpool.
$endgroup$
add a comment |
$begingroup$
Some additions to the review from @Quuxpluson.
With regard to threadsafety, i'd be concerned that you are not calling deAllocatePool
in the destructor. It's undefined behavior to call a destructor on std::mutex
when the mutex is locked. It is also a bad thing to call the destructor on std::thread
when it is considered joinable
. Both of these may happen if you destruct your threadpool without clearing out the work queue.
Looking further at your deAllocatePool
, I think that you could get into cases where you can't end a thread. You are setting done
but if the queue is empty, the thread could be waiting in waitAndPop()
. While the notifyAllThreads
will cause it to wake the lambda expression that you are using in your wait
statement
dataCond.wait(lk,[this](){return !taskQueue.empty();});
would immediately go back into a wait state after awoken. If you manage to call join()
on this kind of thread your deallocatePool
would hang. I think is that checking for an abort condition inside the wait would mitigate that. If this hasn't happened yet there could be a variety of issues, i could be wrong, your use pattern has prevented this, or just pure coincidence ...
Both of your task types could be reduced to a functor with the type std::function<void(void)>
. You could use either lambdas or std::bind
to enclose the state information, this would reduce the complexity inside your threadpool.
$endgroup$
add a comment |
$begingroup$
Some additions to the review from @Quuxpluson.
With regard to threadsafety, i'd be concerned that you are not calling deAllocatePool
in the destructor. It's undefined behavior to call a destructor on std::mutex
when the mutex is locked. It is also a bad thing to call the destructor on std::thread
when it is considered joinable
. Both of these may happen if you destruct your threadpool without clearing out the work queue.
Looking further at your deAllocatePool
, I think that you could get into cases where you can't end a thread. You are setting done
but if the queue is empty, the thread could be waiting in waitAndPop()
. While the notifyAllThreads
will cause it to wake the lambda expression that you are using in your wait
statement
dataCond.wait(lk,[this](){return !taskQueue.empty();});
would immediately go back into a wait state after awoken. If you manage to call join()
on this kind of thread your deallocatePool
would hang. I think is that checking for an abort condition inside the wait would mitigate that. If this hasn't happened yet there could be a variety of issues, i could be wrong, your use pattern has prevented this, or just pure coincidence ...
Both of your task types could be reduced to a functor with the type std::function<void(void)>
. You could use either lambdas or std::bind
to enclose the state information, this would reduce the complexity inside your threadpool.
$endgroup$
Some additions to the review from @Quuxpluson.
With regard to threadsafety, i'd be concerned that you are not calling deAllocatePool
in the destructor. It's undefined behavior to call a destructor on std::mutex
when the mutex is locked. It is also a bad thing to call the destructor on std::thread
when it is considered joinable
. Both of these may happen if you destruct your threadpool without clearing out the work queue.
Looking further at your deAllocatePool
, I think that you could get into cases where you can't end a thread. You are setting done
but if the queue is empty, the thread could be waiting in waitAndPop()
. While the notifyAllThreads
will cause it to wake the lambda expression that you are using in your wait
statement
dataCond.wait(lk,[this](){return !taskQueue.empty();});
would immediately go back into a wait state after awoken. If you manage to call join()
on this kind of thread your deallocatePool
would hang. I think is that checking for an abort condition inside the wait would mitigate that. If this hasn't happened yet there could be a variety of issues, i could be wrong, your use pattern has prevented this, or just pure coincidence ...
Both of your task types could be reduced to a functor with the type std::function<void(void)>
. You could use either lambdas or std::bind
to enclose the state information, this would reduce the complexity inside your threadpool.
answered Apr 1 at 21:00
Harald ScheirichHarald Scheirich
1,401518
1,401518
add a comment |
add a comment |
Thanks for contributing an answer to Code Review Stack Exchange!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
Use MathJax to format equations. MathJax reference.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f216467%2fc-thread-pool-with-heterogeneous-work-queue%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
$begingroup$
sorry for mixing tags. I would like to have suggestions regarding thread pool implementation overall and the use of` std:: variant` ? is it ok ? or does it have some performance penalty?
$endgroup$
– Debashish
Mar 29 at 12:43
1
$begingroup$
I've edited tags to C++17 (since
std::variant
requires that). I hope you get good reviews!$endgroup$
– Toby Speight
Mar 29 at 14:04
1
$begingroup$
THREAD_POOL_SIZE 100
I hope you are running on some large hardware. The point os a thread pool is that you create a thread for each available processor (or very close to that) then each processor gets new work from the pool without having to switch threads (as that is a relatively expensive operation).$endgroup$
– Martin York
Apr 1 at 18:40