c++multithreading

C++ user input, during threaded file finding


I have a small console app I'm writing, it does some process in the background with threads, and has another thread that dumps results to console.

Now I want to enable the user to be able to input commands such as exit and it to terminate the program.

I have that working the only issue is during the loop if there isn't any input and the processing has completed the program waits for input so then as long as I type anything the loop continues and the condition is now false and the process exits.

My question is, is there a way to make it so I don't get stuck on the std::cin >> command in my while loop, so it can continue running and check the other conditions that might break the loop?

I've tried having a thread dedicated to input too but that seems to hand when waiting for it to end calling inputThread.join().

I'm using condition variables and setting an atomic<bool> running = false when other conditions are met. But I cannot get the program to exit if no user input is every provided.

Here's a skeleton to try demonstrate my issue.

int main(int argc, char *argv[])
{

    // create thread pool doing processes, that mark as completed when done
    // and dump thread that periodically works based on a run condtion
    // cv.wait_for(lock, std::chrono::seconds(10), []{ return !running.load(); }

    while(running) 
    {
        if(completedThreads.load() == threadPool.size())
        {
            running = false;
            results.clear();
            cv.notify_all();
            break;
        }

        std::cin >> command;
        if(command == "dump")
        {
            std::cout << "Dumping Results: " << std::endl;

            std::lock_guard<std::mutex> lock(mtx);
            //dump results here
        }
        if(command == "exit")
        {
            running = false;
            results.clear();
            cv.notify_all();
            break;
        }
    }

    for(auto& thread : threadPool)
    {
        if(thread.joinable())
        {
            thread.join();
        }
    }

    if(dumpThread.joinable())
    {
        dumpThread.join();
    }

    return 0;
}

edit: If possible I want to keep my solution as portable as possible but If its really difficult/not ideal I'd go windows specific. I wanted to try make the program as clean as possible so I'm not sure how bad having the OS reap a thread would be.


Solution

  • I've been trying killing /destroying threads with cin, getchar etc.. and always observed cases when the command is wrong.

    So if it's OK to have several exe, I suggest using a dedicated process (and maybe another console window) for input, and an Inter Process Communication approach.

    Here is a (windows only) example using named pipes. Not thourough but still gives the idea. IPCs (namespipes, socket etc..) exist on all platforms, and I'm pretty sure you'll find a portable class. Maybe the system("start ...") will also have to be platform dependent.

    main.cpp

    #include <iostream>
    #include <thread>
    #include <mutex>
    #include <string>
    #include <queue>
    #include <chrono>
    #include <windows.h>
    #define BUFSIZE 256
    using std::thread;
    
    bool running = true;
    unsigned long nb_secs = 0;
    std::mutex mtx;
    static std::string command;
    static std::queue<std::string> commands;
    
    // simulates your calculations
    void count_secs(){
        while(nb_secs < 100){
            std::this_thread::sleep_for(std::chrono::seconds(1));
            nb_secs++;
        }
    }
    // pipe name
    LPCTSTR lpszPipename = TEXT((char*) "\\\\.\\pipe\\MethodiclesCmdPipe");
    HANDLE hPipe;
    
    
    void get_cmd(HANDLE cltPipe){
        DWORD bytesRead;
        BOOL ret_code;
        std::vector<char> buffer(BUFSIZE);
        do {
            ret_code = ReadFile(cltPipe, buffer.data(), buffer.size(), &bytesRead, NULL);
            if (!ret_code || bytesRead == 0) {
                if (GetLastError() == ERROR_BROKEN_PIPE) {
                    std::cout<<"**error** BROKEN_PIPE."<<std::endl;
                    break;
                } else {
                    std::cout<<"**error** ReadFile failed, GLE="<<GetLastError()<<std::endl;
                    break;
                }
                // manage errors, more data & cie
            }
            std::lock_guard<std::mutex> lock(mtx);
            commands.push(std::string(buffer.data()));
            
        } while (bytesRead > 0  && running);
        FlushFileBuffers(cltPipe);
        DisconnectNamedPipe(cltPipe);
        CloseHandle(cltPipe);
        running = false;// no commands => exit
    }
    
    
    int main(int argc, char *argv[])
    {
        // create an IPC - a windows namedpipe here but you may find more portable
        HANDLE hPipe = CreateNamedPipe(
                                    lpszPipename, // pipe name
                                    PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, // read/write access
                                    PIPE_TYPE_MESSAGE | // message type pipe
                                    PIPE_READMODE_MESSAGE | // message-read mode
                                    PIPE_WAIT, // blocking mode
                                    PIPE_UNLIMITED_INSTANCES, // max. instances
                                    BUFSIZE, // output buffer size
                                    BUFSIZE, // input buffer size
                                    0, // client time-out
                                    NULL);
        if (hPipe == INVALID_HANDLE_VALUE) {
            std::cout<<"**error** Creating named pipe. GLE="<< GetLastError()<<std::endl;
            return 1;
        }
        // run input process
        system("start MethodiclesInput.exe");// start /B MethodiclesInput.exe to have all in the same console
        
        std::cout<<"Waiting for client to connect..."<<std::endl;
        if (ConnectNamedPipe(hPipe, NULL) ? TRUE : (GetLastError() == ERROR_PIPE_CONNECTED)) {      
            std::cout<<"Client connected!"<<std::endl;
        } else {
            std::cout<<"**error** Error connecting to client. GLE="<< GetLastError()<<std::endl;
            return 1;
        }
        
        // a simple thread that counts 1 sec loop   
        thread(count_secs).detach();
        // the pipe (commands) reader
        thread(get_cmd, hPipe).detach();
        while(running) 
        {
            // commented : not given in your example
            // if(completedThreads.load() == threadPool.size())
            // ...
        
            // that part comes instead of std::cin>>
            {
                // every once in a while, read the commands (could also check if empty everytime, depending on what's in your loop while(running)
                std::this_thread::sleep_for(std::chrono::seconds(2));
                std::lock_guard<std::mutex> lock(mtx);// will use commands now
                if(commands.empty())
                    std::cout<<'.';// purely informative...
                else while(!commands.empty()){
                    command = commands.front();
                    commands.pop();
                    std::cout<<"Got command "<<command<<std::endl;
                    if(command == "dump"){
                        std::cout<<"Dumping Results: " << std::endl;
                        std::cout<<"\n val is " << nb_secs << std::endl;
                    }
                    else if (command == "exit"){
                        running = false;
                        //results.clear();
                        //cv.notify_all();
                        //break;
                        system("taskkill /IM MethodiclesInput.exe");
                        
                    }
                }
            }
            // ...
       }
       return 0;
    }
    

    inputconsole.cpp

    #include <iostream>
    #include <string>
    #include <windows.h>
    using std::cout;
    using std::endl;
    LPCTSTR lpszPipename = TEXT((char*) "\\\\.\\pipe\\MethodiclesCmdPipe");
    
    int main(){
        HANDLE hPipe;
    
        hPipe = CreateFile(
                        lpszPipename, // pipe name
                        GENERIC_WRITE, // write access
                        0, // no sharing
                        NULL, // default security attributes
                        OPEN_EXISTING, // opens existing pipe
                        0, //FILE_FLAG_OVERLAPPED 
                        NULL); // no template file
    
        // Exit if an error other than ERROR_PIPE_BUSY occurs
        int l = GetLastError();
        //.. manage errors!
        if(hPipe != INVALID_HANDLE_VALUE) {
            do {
                LPDWORD cbWritten;
                //WriteFile(hPipe, LPTSTR((char*)"Hello"),12,  cbWritten, NULL);
                //cout<<"Sent hello : "<<cbWritten<<endl;
                cout<<"Enter command."<<endl;
                std::string command;
                std::cin>>command;
                LPTSTR lpvMessage = (char*) command.c_str();
                size_t cbToWrite = (lstrlen(lpvMessage) + 1) * sizeof (TCHAR);
    
                // sends message through the named pipe
                int fSuccess = WriteFile(
                                    hPipe, // pipe handle
                                    lpvMessage, // message
                                    cbToWrite, // message length
                                    cbWritten, // bytes written
                                    NULL); // not overlapped
                if (!fSuccess) {
                    int l = GetLastError();
                    if (l) {
                        if (l != ERROR_PIPE_BUSY) {
                            cout<<"**error** Couldnt write in pipe. GLE="<<l<<endl;
                            Sleep(50);
    
                            if (l == 6) {
                                cout<<"Closing handle "<< hPipe<<endl;
                                CloseHandle(hPipe);
                                // try to reconnect or (cleanup &) exit
                                getchar();
                                return 0;
                            }
                        } else if (!WaitNamedPipe(lpszPipename, 5000)) {
                            cout<<"**error** Could not open pipe: 5 second wait timed out. GLE="<<GetLastError()<<endl;
                        }
                        // there might be other reasons  / errors to manage
                    }
                }
                if (!fSuccess) {
                    break;
                }
            } while (true);
    
            
        }
        getchar();
        return 0;
    }
    

    run

    g++ inputconsole.cpp -o MethodiclesInput
    g++ main.cpp -o test && test
    

    Hope you like it, have fun!