1

Are concurrent appends to a file support feature?

I tested this with concurrent threads + fstream for each thread. I see that data is not corrupt, but some writes are lost. The file size is less than expected after the writes finish. Writes don't overlap.

If I write with custom seeks with each fstream where I coordinate which offset each thread will write, no writes are lost.

Here is the sample code :

#include <fstream>
#include <vector>
#include <thread>
#include "gtest/gtest.h"

void append_concurrently(string filename, const int data_in_gb, const int num_threads, const char start_char,
    bool stream_cache = true) {
    const int offset = 1024;
    const long long num_records_each_thread = (data_in_gb * 1024 * ((1024 * 1024) / (num_threads * offset)));

    {
        auto write_file_fn = [&](int index) {
            // each thread has its own handle
            fstream file_handle(filename, fstream::app | fstream::binary);
            if (!stream_cache) {
                file_handle.rdbuf()->pubsetbuf(nullptr, 0); // no bufferring in fstream
            }

            vector<char> data(offset, (char)(index + start_char));

            for (long long i = 0; i < num_records_each_thread; ++i) {
                file_handle.write(data.data(), offset);
                if (!file_handle) {
                    std::cout << "File write failed: "
                        << file_handle.fail() << " " << file_handle.bad() << " " << file_handle.eof() << std::endl;
                    break;
                }
            }

            // file_handle.flush();
        };

        auto start_time = chrono::high_resolution_clock::now();
        vector<thread> writer_threads;
        for (int i = 0; i < num_threads; ++i) {
            writer_threads.push_back(std::thread(write_file_fn, i));
        }

        for (int i = 0; i < num_threads; ++i) {
            writer_threads[i].join();
        }

        auto end_time = chrono::high_resolution_clock::now();

        std::cout << filename << " Data written : " << data_in_gb << " GB, " << num_threads << " threads "
            << ", cache " << (stream_cache ? "true " : "false ") << ", size " << offset << " bytes ";
        std::cout << "Time taken: " << (end_time - start_time).count() / 1000 << " micro-secs" << std::endl;
    }

    {
        ifstream file(filename, fstream::in | fstream::binary);
        file.seekg(0, ios_base::end);
        
        // This EXPECT_EQ FAILS as file size is smaller than EXPECTED
        EXPECT_EQ(num_records_each_thread * num_threads * offset, file.tellg());
        file.seekg(0, ios_base::beg);
        EXPECT_TRUE(file);

        char data[offset]{ 0 };
        for (long long i = 0; i < (num_records_each_thread * num_threads); ++i) {
            file.read(data, offset);
            EXPECT_TRUE(file || file.eof()); // should be able to read until eof
            char expected_char = data[0]; // should not have any interleaving of data.

            bool same = true;
            for (auto & c : data) {
                same = same && (c == expected_char) && (c != 0);
            }

            EXPECT_TRUE(same); // THIS PASSES
            if (!same) {
                std::cout << "corruption detected !!!" << std::endl;
                break;
            }

            if (file.eof()) { // THIS FAILS as file size is smaller
                EXPECT_EQ(num_records_each_thread * num_threads, i + 1);
                break;
            }
        }
    }
}

TEST(fstream, file_concurrent_appends) {
    string filename = "file6.log";
    const int data_in_gb = 1;
    
    {
        // trunc file before write threads start.
        {
            fstream file(filename, fstream::in | fstream::out | fstream::trunc | fstream::binary);
        }
        append_concurrently(filename, data_in_gb, 4, 'B', false);
    }
    std::remove(filename.c_str());
}

Edit:

I moved fstream to be shared by all threads. Now, for 512 byte buffer size, i see 8 writes totalling 4 KB lost consistently.

    const int offset = 512;
    const long long num_records_each_thread = (data_in_gb * 1024 * ((1024 * 1024) / (num_threads * offset)));

    fstream file_handle(filename, fstream::app | fstream::binary);
    if (!stream_cache) {
        file_handle.rdbuf()->pubsetbuf(nullptr, 0); // no bufferring in fstream
    }

Problem does not reproduce with 4KB buffer size.

Running main() from gtest_main.cc
Note: Google Test filter = *file_conc*_*append*
[==========] Running 1 test from 1 test case.
[----------] Global test environment set-up.
[----------] 1 test from fstream
[ RUN      ] fstream.file_concurrent_appends
file6.log Data written : 1 GB, 1 threads , cache true , size 512 bytes Time taken: 38069289 micro-secs
d:\projs\logpoc\tests\test.cpp(279): error:       Expected: num_records_each_thread * num_threads * offset
      Which is: 1073741824
To be equal to: file.tellg()
      Which is: 1073737728
d:\projs\logpoc\tests\test.cpp(301): error:       Expected: num_records_each_thread * num_threads
      Which is: 2097152
To be equal to: i + 1
      Which is: 2097145

Edit 2: Close file_handle after joining all threads to flush the data from the internal buffer. This resolved above issue.

Ashish Negi
  • 5,193
  • 8
  • 51
  • 95
  • @Andreas Wenzel multi threading using same fstream is not supported as it has an offset position inside it. I opened the file in append mode. So, I hoped that OS would take care about offsets – Ashish Negi Oct 10 '20 at 02:29

1 Answers1

2

According to §29.4.2 ¶7 of the official ISO C++20 standard, the functions provided by std::fstream are generally thread-safe.

However, if every thread has its own std::fstream object, then, as far as the C++ standard library is concerned, these are distinct streams and no synchronization will take place. Only the operating system's kernel will be aware that all file handles point to the same file. Therefore, any synchronization will have to be done by the kernel. But the kernel possibly isn't even aware that a write is supposed to go to the end of the file. Depending on your platform, it is possible that the kernel only receives write requests for certain file positions. If the end of file has meanwhile been moved by an append from another thread, then the position for a thread's previous write request may no longer be to the end of the file.

According to the documentation on std::fstream::open, opening a file in append mode will cause the stream to seek to the end of the file before every write. This behavior seems to be exactly what you want. But, for the reasons stated above, this will probably only work if all threads share the same std::fstream object. In that case, the std::fstream object should be able to synchronize all writes. In particular, it should be able to perform the seeks to the end of file and the subsequent writes atomically.

Andreas Wenzel
  • 22,760
  • 4
  • 24
  • 39
  • thanks.. i tried sharing the fstream. with 512 buffer size, i see 8 writes totalling 4 KB writes getting lost consistently. – Ashish Negi Oct 10 '20 at 04:32
  • 1
    @AshishNegi: I suspect that your function call `file_handle.rdbuf()->pubsetbuf(nullptr, 0);` may be failing. According to [this answer to another question](https://stackoverflow.com/a/40317135/12149471), you must call that function **before** opening the file. Since you don't seem to be flushing the output buffer, I suspect that the missing 4096 bytes are still in the buffer. – Andreas Wenzel Oct 10 '20 at 05:11
  • @AshishNegi: After doing some further research, I found out that according to [this page](https://en.cppreference.com/w/cpp/io/basic_filebuf/setbuf), the gcc compiler requires that the function be called before opening the file, but other compilers allow you to call the function also after opening the file. I'm not sure if that page is still up to date, though. – Andreas Wenzel Oct 10 '20 at 05:32
  • It seems that the situation is unchanged. With libstdc++ (which is used by gcc), `setbuf` or `pubsetbuf` must still be called before opening any file. Otherwise, the function call has no effect. See [this page from the official documentation](https://gcc.gnu.org/onlinedocs/libstdc++/manual/streambufs.html#io.streambuf.buffering) for further information. – Andreas Wenzel Oct 10 '20 at 06:38
  • you are right about not flusing/closing earlier file handle used for writes. thanks.. this solves the mystery.. – Ashish Negi Oct 10 '20 at 09:00