Multithreaded local serverFinding potential thread safety issues and race conditions in my multithreading...

Is this version of a gravity generator feasible?

when is out of tune ok?

How can I kill an app using Terminal?

How to be diplomatic in refusing to write code that breaches the privacy of our users

Was Spock the First Vulcan in Starfleet?

Nautlius: add mouse right-click action to compute MD5 sum

Sort a list by elements of another list

How can I get through very long and very dry, but also very useful technical documents when learning a new tool?

How can we prove that any integral in the set of non-elementary integrals cannot be expressed in the form of elementary functions?

Valid Badminton Score?

Opposite of a diet

Why escape if the_content isnt?

Did Dumbledore lie to Harry about how long he had James Potter's invisibility cloak when he was examining it? If so, why?

What to do with wrong results in talks?

Do sorcerers' subtle spells require a skill check to be unseen?

Is exact Kanji stroke length important?

CREATE opcode: what does it really do?

Why not increase contact surface when reentering the atmosphere?

Is the destination of a commercial flight important for the pilot?

Two monoidal structures and copowering

Customer Requests (Sometimes) Drive Me Bonkers!

Is it okay for two “sein” to be next to each other?

What can we do to stop prior company from asking us questions?

How to Reset Passwords on Multiple Websites Easily?



Multithreaded local server


Finding potential thread safety issues and race conditions in my multithreading codeEfficiently using ZeroMQ sockets in JavaMultithreaded client server socketNon-blocking, non-threaded HTTP client implementationMultiThreaded TCP Server with high CPU usageMultithreaded Client/ Server communicationLockedValue implementationA new approach to multithreading in ExcelC++ Threaded Loggerc++ linux multithreaded epoll tcp server













0












$begingroup$


I'm writing several local servers which have almost the code in the main.cpp. Appreciate comments, improvement suggestions and especially notes on potential memory leaks. Thanks!



#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <unordered_map>

#include "UdsServer.hpp"
#include "RequestManager.hpp"
#include "MSutils.hpp" //MS::log()



void pullRequests();


const std::string pathToSocket = "/var/run/SomeServer.sock";
const std::string SERVICE_NAME = "SomeServer";

RequestManager service; //Does the actual processing of the request


std::unordered_map<int, std::string> requestQueue;
std::mutex requestQueue_mutex;
std::condition_variable processorsThreadSwitch;
bool gotNewRequests = false;



int main()
{
UdsServer app; //Server listening on a Unix Domain Socket

try
{
app.createServer(pathToSocket);
}
catch (const std::string & err)
{
MS::log(SERVICE_NAME, "Failed to start the service. Error: " + err, MS::MessageType::FatalException);

return -1;
}

unsigned n_concThreads = std::thread::hardware_concurrency();

if (!n_concThreads) //if the query failed...
{
std::ifstream cpuinfo("/proc/cpuinfo");

n_concThreads = std::count(std::istream_iterator<std::string>(cpuinfo),
std::istream_iterator<std::string>(),
std::string("processor"));

if (!n_concThreads)
n_concThreads = 6; // ~number of CPU cores. TODO: make the number of worker processes/threads configurable using a config file
}

for (int i = 0; i < n_concThreads; ++i)
{
std::thread t (pullRequests);
t.detach();
}



while ((int clientConnection = app.newConnectionEstablished()) > -1) //Uses accept() internally
{
std::string command = app.getMsg (clientConnection); //Uses read() internally

if (command.empty())
app.closeConnection(clientConnection);
else if (command == "SHUTDOWN")
{
app.closeConnection(clientConnection);

return 0;
}
else
{
{ //Anonymous scope just to get rid of the lock before notifying a thread
std::lock_guard<std::mutex> writeLock(requestQueue_mutex);

requestQueue[clientConnection] = std::move(command);

gotNewRequests = true;
}

processorsThreadSwitch.notify_one(); //nothing happens here if all threads are busy
}
}
}



void pullRequests()
{
UnixDomainSocket uds;

std::unique_lock<std::mutex> writeLock(requestQueue_mutex);

while (true) //Let the thread run "forever"
{
while (!gotNewRequests)
processorsThreadSwitch.wait(writeLock);


std::unordered_map<int, std::string> queueCopy (std::move(requestQueue));
requestQueue.clear();

gotNewRequests = false;

writeLock.unlock(); //Don't let the other threads wait when this threads doesn't need to access the shared data any more


if (queueCopy.empty())
continue;
else if (queueCopy.size() == 1)
{
std::string response = service.pullRequests(queueCopy.cbegin()->second);


if (response.length())
{
auto sendResult = uds.sendMsg(queueCopy.cbegin()->first, response);

if (!sendResult.isValid())
MS::log(SERVICE_NAME, "Could not send the response for request: " + queueCopy.begin()->second, MS::MessageType::Error);
}

if (!uds.closeConnection(queueCopy.begin()->first))
MS::log(SERVICE_NAME, "Could not close the connection.", MS::MessageType::Error);
}
else //Multiplex
{
std::unordered_map<std::string, std::vector<int>> multiplexedRequests;

for (auto & request : queueCopy)
multiplexedRequests[std::move(request.second)].push_back(request.first);

for (const auto & request : multiplexedRequests)
{
std::string response = service.pullRequests(request.first);

if (response.length())
for (auto socket : request.second)
{
auto sendResult = uds.sendMsg(socket, response);

if (!sendResult.isValid())
MS::log(SERVICE_NAME, "Could not send the response for request: " + request.first, MS::MessageType::Error);

if (!uds.closeConnection(socket))
MS::log(SERVICE_NAME, "Could not close the connection.", MS::MessageType::Error);
}
}
}


writeLock.lock();
}
}








share







New contributor




Sceptical Jule is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.







$endgroup$

















    0












    $begingroup$


    I'm writing several local servers which have almost the code in the main.cpp. Appreciate comments, improvement suggestions and especially notes on potential memory leaks. Thanks!



    #include <iostream>
    #include <thread>
    #include <mutex>
    #include <condition_variable>
    #include <unordered_map>

    #include "UdsServer.hpp"
    #include "RequestManager.hpp"
    #include "MSutils.hpp" //MS::log()



    void pullRequests();


    const std::string pathToSocket = "/var/run/SomeServer.sock";
    const std::string SERVICE_NAME = "SomeServer";

    RequestManager service; //Does the actual processing of the request


    std::unordered_map<int, std::string> requestQueue;
    std::mutex requestQueue_mutex;
    std::condition_variable processorsThreadSwitch;
    bool gotNewRequests = false;



    int main()
    {
    UdsServer app; //Server listening on a Unix Domain Socket

    try
    {
    app.createServer(pathToSocket);
    }
    catch (const std::string & err)
    {
    MS::log(SERVICE_NAME, "Failed to start the service. Error: " + err, MS::MessageType::FatalException);

    return -1;
    }

    unsigned n_concThreads = std::thread::hardware_concurrency();

    if (!n_concThreads) //if the query failed...
    {
    std::ifstream cpuinfo("/proc/cpuinfo");

    n_concThreads = std::count(std::istream_iterator<std::string>(cpuinfo),
    std::istream_iterator<std::string>(),
    std::string("processor"));

    if (!n_concThreads)
    n_concThreads = 6; // ~number of CPU cores. TODO: make the number of worker processes/threads configurable using a config file
    }

    for (int i = 0; i < n_concThreads; ++i)
    {
    std::thread t (pullRequests);
    t.detach();
    }



    while ((int clientConnection = app.newConnectionEstablished()) > -1) //Uses accept() internally
    {
    std::string command = app.getMsg (clientConnection); //Uses read() internally

    if (command.empty())
    app.closeConnection(clientConnection);
    else if (command == "SHUTDOWN")
    {
    app.closeConnection(clientConnection);

    return 0;
    }
    else
    {
    { //Anonymous scope just to get rid of the lock before notifying a thread
    std::lock_guard<std::mutex> writeLock(requestQueue_mutex);

    requestQueue[clientConnection] = std::move(command);

    gotNewRequests = true;
    }

    processorsThreadSwitch.notify_one(); //nothing happens here if all threads are busy
    }
    }
    }



    void pullRequests()
    {
    UnixDomainSocket uds;

    std::unique_lock<std::mutex> writeLock(requestQueue_mutex);

    while (true) //Let the thread run "forever"
    {
    while (!gotNewRequests)
    processorsThreadSwitch.wait(writeLock);


    std::unordered_map<int, std::string> queueCopy (std::move(requestQueue));
    requestQueue.clear();

    gotNewRequests = false;

    writeLock.unlock(); //Don't let the other threads wait when this threads doesn't need to access the shared data any more


    if (queueCopy.empty())
    continue;
    else if (queueCopy.size() == 1)
    {
    std::string response = service.pullRequests(queueCopy.cbegin()->second);


    if (response.length())
    {
    auto sendResult = uds.sendMsg(queueCopy.cbegin()->first, response);

    if (!sendResult.isValid())
    MS::log(SERVICE_NAME, "Could not send the response for request: " + queueCopy.begin()->second, MS::MessageType::Error);
    }

    if (!uds.closeConnection(queueCopy.begin()->first))
    MS::log(SERVICE_NAME, "Could not close the connection.", MS::MessageType::Error);
    }
    else //Multiplex
    {
    std::unordered_map<std::string, std::vector<int>> multiplexedRequests;

    for (auto & request : queueCopy)
    multiplexedRequests[std::move(request.second)].push_back(request.first);

    for (const auto & request : multiplexedRequests)
    {
    std::string response = service.pullRequests(request.first);

    if (response.length())
    for (auto socket : request.second)
    {
    auto sendResult = uds.sendMsg(socket, response);

    if (!sendResult.isValid())
    MS::log(SERVICE_NAME, "Could not send the response for request: " + request.first, MS::MessageType::Error);

    if (!uds.closeConnection(socket))
    MS::log(SERVICE_NAME, "Could not close the connection.", MS::MessageType::Error);
    }
    }
    }


    writeLock.lock();
    }
    }








    share







    New contributor




    Sceptical Jule is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
    Check out our Code of Conduct.







    $endgroup$















      0












      0








      0





      $begingroup$


      I'm writing several local servers which have almost the code in the main.cpp. Appreciate comments, improvement suggestions and especially notes on potential memory leaks. Thanks!



      #include <iostream>
      #include <thread>
      #include <mutex>
      #include <condition_variable>
      #include <unordered_map>

      #include "UdsServer.hpp"
      #include "RequestManager.hpp"
      #include "MSutils.hpp" //MS::log()



      void pullRequests();


      const std::string pathToSocket = "/var/run/SomeServer.sock";
      const std::string SERVICE_NAME = "SomeServer";

      RequestManager service; //Does the actual processing of the request


      std::unordered_map<int, std::string> requestQueue;
      std::mutex requestQueue_mutex;
      std::condition_variable processorsThreadSwitch;
      bool gotNewRequests = false;



      int main()
      {
      UdsServer app; //Server listening on a Unix Domain Socket

      try
      {
      app.createServer(pathToSocket);
      }
      catch (const std::string & err)
      {
      MS::log(SERVICE_NAME, "Failed to start the service. Error: " + err, MS::MessageType::FatalException);

      return -1;
      }

      unsigned n_concThreads = std::thread::hardware_concurrency();

      if (!n_concThreads) //if the query failed...
      {
      std::ifstream cpuinfo("/proc/cpuinfo");

      n_concThreads = std::count(std::istream_iterator<std::string>(cpuinfo),
      std::istream_iterator<std::string>(),
      std::string("processor"));

      if (!n_concThreads)
      n_concThreads = 6; // ~number of CPU cores. TODO: make the number of worker processes/threads configurable using a config file
      }

      for (int i = 0; i < n_concThreads; ++i)
      {
      std::thread t (pullRequests);
      t.detach();
      }



      while ((int clientConnection = app.newConnectionEstablished()) > -1) //Uses accept() internally
      {
      std::string command = app.getMsg (clientConnection); //Uses read() internally

      if (command.empty())
      app.closeConnection(clientConnection);
      else if (command == "SHUTDOWN")
      {
      app.closeConnection(clientConnection);

      return 0;
      }
      else
      {
      { //Anonymous scope just to get rid of the lock before notifying a thread
      std::lock_guard<std::mutex> writeLock(requestQueue_mutex);

      requestQueue[clientConnection] = std::move(command);

      gotNewRequests = true;
      }

      processorsThreadSwitch.notify_one(); //nothing happens here if all threads are busy
      }
      }
      }



      void pullRequests()
      {
      UnixDomainSocket uds;

      std::unique_lock<std::mutex> writeLock(requestQueue_mutex);

      while (true) //Let the thread run "forever"
      {
      while (!gotNewRequests)
      processorsThreadSwitch.wait(writeLock);


      std::unordered_map<int, std::string> queueCopy (std::move(requestQueue));
      requestQueue.clear();

      gotNewRequests = false;

      writeLock.unlock(); //Don't let the other threads wait when this threads doesn't need to access the shared data any more


      if (queueCopy.empty())
      continue;
      else if (queueCopy.size() == 1)
      {
      std::string response = service.pullRequests(queueCopy.cbegin()->second);


      if (response.length())
      {
      auto sendResult = uds.sendMsg(queueCopy.cbegin()->first, response);

      if (!sendResult.isValid())
      MS::log(SERVICE_NAME, "Could not send the response for request: " + queueCopy.begin()->second, MS::MessageType::Error);
      }

      if (!uds.closeConnection(queueCopy.begin()->first))
      MS::log(SERVICE_NAME, "Could not close the connection.", MS::MessageType::Error);
      }
      else //Multiplex
      {
      std::unordered_map<std::string, std::vector<int>> multiplexedRequests;

      for (auto & request : queueCopy)
      multiplexedRequests[std::move(request.second)].push_back(request.first);

      for (const auto & request : multiplexedRequests)
      {
      std::string response = service.pullRequests(request.first);

      if (response.length())
      for (auto socket : request.second)
      {
      auto sendResult = uds.sendMsg(socket, response);

      if (!sendResult.isValid())
      MS::log(SERVICE_NAME, "Could not send the response for request: " + request.first, MS::MessageType::Error);

      if (!uds.closeConnection(socket))
      MS::log(SERVICE_NAME, "Could not close the connection.", MS::MessageType::Error);
      }
      }
      }


      writeLock.lock();
      }
      }








      share







      New contributor




      Sceptical Jule is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
      Check out our Code of Conduct.







      $endgroup$




      I'm writing several local servers which have almost the code in the main.cpp. Appreciate comments, improvement suggestions and especially notes on potential memory leaks. Thanks!



      #include <iostream>
      #include <thread>
      #include <mutex>
      #include <condition_variable>
      #include <unordered_map>

      #include "UdsServer.hpp"
      #include "RequestManager.hpp"
      #include "MSutils.hpp" //MS::log()



      void pullRequests();


      const std::string pathToSocket = "/var/run/SomeServer.sock";
      const std::string SERVICE_NAME = "SomeServer";

      RequestManager service; //Does the actual processing of the request


      std::unordered_map<int, std::string> requestQueue;
      std::mutex requestQueue_mutex;
      std::condition_variable processorsThreadSwitch;
      bool gotNewRequests = false;



      int main()
      {
      UdsServer app; //Server listening on a Unix Domain Socket

      try
      {
      app.createServer(pathToSocket);
      }
      catch (const std::string & err)
      {
      MS::log(SERVICE_NAME, "Failed to start the service. Error: " + err, MS::MessageType::FatalException);

      return -1;
      }

      unsigned n_concThreads = std::thread::hardware_concurrency();

      if (!n_concThreads) //if the query failed...
      {
      std::ifstream cpuinfo("/proc/cpuinfo");

      n_concThreads = std::count(std::istream_iterator<std::string>(cpuinfo),
      std::istream_iterator<std::string>(),
      std::string("processor"));

      if (!n_concThreads)
      n_concThreads = 6; // ~number of CPU cores. TODO: make the number of worker processes/threads configurable using a config file
      }

      for (int i = 0; i < n_concThreads; ++i)
      {
      std::thread t (pullRequests);
      t.detach();
      }



      while ((int clientConnection = app.newConnectionEstablished()) > -1) //Uses accept() internally
      {
      std::string command = app.getMsg (clientConnection); //Uses read() internally

      if (command.empty())
      app.closeConnection(clientConnection);
      else if (command == "SHUTDOWN")
      {
      app.closeConnection(clientConnection);

      return 0;
      }
      else
      {
      { //Anonymous scope just to get rid of the lock before notifying a thread
      std::lock_guard<std::mutex> writeLock(requestQueue_mutex);

      requestQueue[clientConnection] = std::move(command);

      gotNewRequests = true;
      }

      processorsThreadSwitch.notify_one(); //nothing happens here if all threads are busy
      }
      }
      }



      void pullRequests()
      {
      UnixDomainSocket uds;

      std::unique_lock<std::mutex> writeLock(requestQueue_mutex);

      while (true) //Let the thread run "forever"
      {
      while (!gotNewRequests)
      processorsThreadSwitch.wait(writeLock);


      std::unordered_map<int, std::string> queueCopy (std::move(requestQueue));
      requestQueue.clear();

      gotNewRequests = false;

      writeLock.unlock(); //Don't let the other threads wait when this threads doesn't need to access the shared data any more


      if (queueCopy.empty())
      continue;
      else if (queueCopy.size() == 1)
      {
      std::string response = service.pullRequests(queueCopy.cbegin()->second);


      if (response.length())
      {
      auto sendResult = uds.sendMsg(queueCopy.cbegin()->first, response);

      if (!sendResult.isValid())
      MS::log(SERVICE_NAME, "Could not send the response for request: " + queueCopy.begin()->second, MS::MessageType::Error);
      }

      if (!uds.closeConnection(queueCopy.begin()->first))
      MS::log(SERVICE_NAME, "Could not close the connection.", MS::MessageType::Error);
      }
      else //Multiplex
      {
      std::unordered_map<std::string, std::vector<int>> multiplexedRequests;

      for (auto & request : queueCopy)
      multiplexedRequests[std::move(request.second)].push_back(request.first);

      for (const auto & request : multiplexedRequests)
      {
      std::string response = service.pullRequests(request.first);

      if (response.length())
      for (auto socket : request.second)
      {
      auto sendResult = uds.sendMsg(socket, response);

      if (!sendResult.isValid())
      MS::log(SERVICE_NAME, "Could not send the response for request: " + request.first, MS::MessageType::Error);

      if (!uds.closeConnection(socket))
      MS::log(SERVICE_NAME, "Could not close the connection.", MS::MessageType::Error);
      }
      }
      }


      writeLock.lock();
      }
      }






      c++ multithreading linux server





      share







      New contributor




      Sceptical Jule is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
      Check out our Code of Conduct.










      share







      New contributor




      Sceptical Jule is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
      Check out our Code of Conduct.








      share



      share






      New contributor




      Sceptical Jule is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
      Check out our Code of Conduct.









      asked 7 mins ago









      Sceptical JuleSceptical Jule

      101




      101




      New contributor




      Sceptical Jule is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
      Check out our Code of Conduct.





      New contributor





      Sceptical Jule is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
      Check out our Code of Conduct.






      Sceptical Jule is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
      Check out our Code of Conduct.






















          0






          active

          oldest

          votes











          Your Answer





          StackExchange.ifUsing("editor", function () {
          return StackExchange.using("mathjaxEditing", function () {
          StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix) {
          StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
          });
          });
          }, "mathjax-editing");

          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "196"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: false,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: null,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });






          Sceptical Jule is a new contributor. Be nice, and check out our Code of Conduct.










          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f216379%2fmultithreaded-local-server%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          0






          active

          oldest

          votes








          0






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes








          Sceptical Jule is a new contributor. Be nice, and check out our Code of Conduct.










          draft saved

          draft discarded


















          Sceptical Jule is a new contributor. Be nice, and check out our Code of Conduct.













          Sceptical Jule is a new contributor. Be nice, and check out our Code of Conduct.












          Sceptical Jule is a new contributor. Be nice, and check out our Code of Conduct.
















          Thanks for contributing an answer to Code Review Stack Exchange!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          Use MathJax to format equations. MathJax reference.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f216379%2fmultithreaded-local-server%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Webac Holding Inhaltsverzeichnis Geschichte | Organisationsstruktur | Tochterfirmen |...

          What's the meaning of a knight fighting a snail in medieval book illustrations?What is the meaning of a glove...

          Salamanca Inhaltsverzeichnis Lage und Klima | Bevölkerungsentwicklung | Geschichte | Kultur und...