I'm reading a piece of code of a graph database and in the constructor I find the following code snippet:
CommitManager(std::string path, std::atomic<timestamp_t> &_global_epoch_id)
: fd(EMPTY_FD),
seq_front{0, 0},
seq_rear{0, 0},
mutex(),
client_mutex(),
cv_server(),
cv_client(),
global_client_mutex(0),
used_size(0),
file_size(0),
global_epoch_id(_global_epoch_id),
writing_epoch_id(global_epoch_id),
unfinished_epoch_id(),
queue(),
closed(false),
server_thread([&] { server_loop(); })
And later it has
void server_loop()
{
while (true)
{
check_unfinished_epoch_id();
int local_client_mutex = global_client_mutex.load();
std::unique_lock<std::mutex> lock(mutex[local_client_mutex]);
auto &local_queue = queue[local_client_mutex];
while (local_queue.empty() && !closed.load())
{
cv_server.wait_for(lock, SERVER_SPIN_INTERVAL, [&]() {
check_unfinished_epoch_id();
return !local_queue.empty() || closed.load();
});
check_unfinished_epoch_id();
cv_client[local_client_mutex ^ 1].notify_all();
}
std::unique_lock<std::mutex> client_lock(client_mutex[local_client_mutex]);
global_client_mutex ^= 1;
size_t num_txns = local_queue.size();
if (!num_txns)
break;
++writing_epoch_id;
unfinished_epoch_id.emplace(writing_epoch_id, 0);
auto &num_unfinished = unfinished_epoch_id.back().second;
std::string group_wal;
group_wal.append(reinterpret_cast<char *>(&writing_epoch_id), sizeof(writing_epoch_id));
group_wal.append(reinterpret_cast<char *>(&num_txns), sizeof(num_txns));
for (size_t i = 0; i < num_txns; i++)
{
auto &[wal, ret_epoch_id, ret_num] = local_queue.front();
group_wal.append(wal);
*ret_epoch_id = writing_epoch_id;
*ret_num = &num_unfinished;
++num_unfinished;
local_queue.pop();
}
auto expected_size = used_size + group_wal.size();
if (expected_size > file_size)
{
size_t new_file_size = (expected_size / FILE_TRUNC_SIZE + 1) * FILE_TRUNC_SIZE;
if (fd != EMPTY_FD)
{
if (ftruncate(fd, new_file_size) != 0)
throw std::runtime_error("ftruncate wal file error.");
}
file_size = new_file_size;
}
used_size += group_wal.size();
if (fd != EMPTY_FD)
{
if ((size_t)write(fd, group_wal.c_str(), group_wal.size()) != group_wal.size())
std::runtime_error("write wal file error.");
}
if (fd != EMPTY_FD)
{
if (fdatasync(fd) != 0)
std::runtime_error("fdatasync wal file error.");
}
++num_unfinished;
lock.unlock();
seq_front[local_client_mutex] += num_txns;
cv_client[local_client_mutex].notify_all();
client_lock.unlock();
--num_unfinished;
}
}
private:
std::thread server_thread;
I'm particularly confused about server_thread([&] { server_loop(); })
I assumed it was creating a thread to run the server_loop()
function but I'm confused about the syntax of [&]
and why it has to use "{}" to enclose the function. I recall in C's thread we pass the function and arguments as parameters when we create a new thread.