1

Even if _write is thread safe, it does not guarantee a full write, partial writes may happen.

If two threads write to the same file descriptor, is there any way of blocking just the file descriptor, instead of the whole function with a global mutex?

So if two threads are trying to write to fd 1, one will have to wait for the other to finish; if one is trying to write to fd 1 while the other is trying to fd 2, then they'll both be executed at the same time.

I'm looking for a C++ solution.

#include <io.h>

struct IOError {};

void
write(int const fd, char const * buffer, int unsigned size) {
    int result;
    while (size != 0) {
        result = ::_write(fd, buffer, size);
        if (result < 0) {
            throw IOError();
        }
        buffer += result;
        size -= result;
    }
}

int
main() {
    write(1, "Hello, world!\n", 14);
    return 0;
}
João Pires
  • 927
  • 1
  • 5
  • 16

1 Answers1

1

A simple solution would be to use one mutex for each file descriptor.

You would only need a global mutex in order to create one unique mutex for a given descriptor and store it into a map, as long as you can compile with C++11 at least ( e.g local static thread safety )

But you need to store the result of the map creation / map search into something ( as STL containers are not themselves threadsafe ). I used shared pointers here to serve this. they give automatic deletion.

If you want to use exception the std::lock_guard< std::mutex > RAII helps you to release the mutex if something bad occurs ( see QA like Unlock mutex on exception )

Here the (linux oriented) code you can copy/paste. Just adjust NB_ELEM to something above your system's pipe size.

#include <unistd.h>
#include <mutex>
#include <map>
#include <memory>

#include <future> // For async and testing
#include <vector> // For testing
#include <iostream> // here for testing std::cout 
#include <fcntl.h> // For testing fcntl to display pipe (std out) size

void my_write(int const fd, char const * buffer, ssize_t size) 
{

    static std::map<int,std::shared_ptr<std::mutex>> MM;
    static std::mutex global_mutex;
    
    ssize_t result;
    std::shared_ptr<std::mutex> msptr;
    
    {
        std::lock_guard<std::mutex> lock(global_mutex);
        if ( MM.cend() == MM.find(fd) ) {
            msptr = std::make_shared<std::mutex>();
            MM[fd] = msptr;
        }   
        else {
            msptr = MM[fd];
        }
    }
    
    std::lock_guard<std::mutex> lock(*msptr);

    while (size != 0) {
        result = write(fd, buffer, size);
        if (result < 0) {
            //throw if you want
        }
        buffer += result;
        size -= result;
    }
    
}

const size_t NB_ELEM = 100000u;

std::vector<char> va(NB_ELEM,'a');

std::vector<char> vb(NB_ELEM,'b');


int
main() 
{
    va.push_back('\n');
    vb.push_back('\n');
    
    std::cout << "stdout pipe size is : " << fcntl( 1, F_GETPIPE_SZ ) << "\n" << std::flush;
    
    {
    #if 1
        auto fut2 = std::async([](){my_write(1, vb.data(), vb.size());});
        auto fut1 = std::async([](){my_write(1, va.data(), va.size());});
    #else
        auto fut2 = std::async([](){write(1, vb.data(), vb.size());});
        auto fut1 = std::async([](){write(1, va.data(), va.size());});
    #endif
    }
    
    std::cout << "Bye ! \n" << std::flush;
    
    return 0;
}

On the coliru system

stdout pipe size is : 65536

With my_write(...) you will get such a result

bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb...

aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa...

Bye !

And with normal write(...) you may sometimes get

bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb...bbbaaaaaaaaaaa...aaaaaaaaaaabbb...

bbbbbbbbbbbbbbbbbbbbbaaaaaaaa....aaa

Bye !

Community
  • 1
  • 1
NGI
  • 852
  • 1
  • 12
  • 31