I am designing a shared memory, which support multi-producer multi-consumer.
I have a header with conditional_variable
and pthread_mutex_t
.
// header.hpp
#pragma once
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/types.h>
#include <semaphore.h>
#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <error.h>
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/sysinfo.h>
#include <atomic>
#include <memory>
#include <mutex>
#include <condition_variable>
#include <iostream>
using namespace std;
struct Header {
const size_t size_;
std::atomic_int32_t tail_;
std::mutex cv_mut_;
std::condition_variable cv_;
pthread_mutex_t mut_;
Header(size_t size, int32_t tail) : size_(size), tail_(tail) {
pthread_mutexattr_t mutexattr;
pthread_mutexattr_init(&mutexattr);
pthread_mutexattr_setpshared(&mutexattr, PTHREAD_PROCESS_SHARED);
pthread_mutexattr_setrobust(&mutexattr, PTHREAD_MUTEX_ROBUST);
pthread_mutex_init(&mut_, &mutexattr);
}
};
I also have a shm_worker which used to init and allocate memory.
#pragma once
#include "./header.hpp"
class ShmWorker {
public:
ShmWorker() : is_init(false) {}
virtual ~ShmWorker() {
shmdt(m_data);
if (create_new) shmctl(shmid, IPC_RMID, 0);
}
protected:
key_t get_keyid(const std::string & name) {
const std::string & s = "./" + name;
if (access(s.c_str(), F_OK) == -1) { const std::string & s1 = "touch " + s; system(s1.c_str()); }
key_t semkey = ftok(name.c_str(), 1);
if (semkey == -1) { printf("shm_file:%s not existed\n", s.c_str()); exit(1); }
return semkey;
}
template <typename T>
void init(const std::string& name, int size) { // one word to conclude: if not existed, create, else connect
if (is_init) { printf("this shmworker has beed inited!\n"); return; }
m_key = get_keyid(name);
shmid = shmget(m_key, 0, 0);
if (shmid == -1) {
if (errno == ENOENT || errno == EINVAL) {
shmid = shmget(m_key, sizeof(Header) + sizeof(T) * size, 0666 | IPC_CREAT | O_EXCL);
if (shmid == -1) { printf("both connet and create are failed for shm\n"); exit(1); }
printf("creating new shm %s\n", name.c_str());
create_new = true;
m_data = (char*)shmat(shmid, NULL, 0);
Header* header = new Header(size, 0);
memcpy(m_data, header, sizeof(Header));
} else {
exit(1);
}
} else {
m_data = (char*)shmat(shmid, 0, 0);
// m_size = reinterpret_cast<std::atomic_int*>(m_data)->load();
}
if (m_data == (char*)(-1)) { perror("shmat"); exit(1); }
is_init = true;
}
int m_key;
int shmid;
char* m_data;
bool is_init;
bool create_new = false;
};
the producer code is:
// shm_worker.hpp
#pragma once
#include <mutex>
#include <fcntl.h>
#include <fstream>
#include "./shm_worker.hpp"
template <typename T>
class ShmSender: public ShmWorker {
public:
ShmSender(const std::string& key, int size = 4096) {
init <T> (key, size);
header_ = (Header*)m_data;
}
virtual ~ShmSender() = default;
void Send(const T& shot) {
// pthread_mutex_lock(&header_->mut_);
{
std::lock_guard<std::mutex> lk(header_->cv_mut_);
memcpy(m_data + sizeof(Header) + header_->tail_.load() % header_->size_ * sizeof(T), &shot, sizeof(T));
header_->tail_.fetch_add(1);
}
// pthread_mutex_unlock(&header_->mut_);
header_->cv_.notify_all();
}
private:
Header * header_;
};
the consumer code is:
#pragma once
#include <unistd.h>
#include "./shm_worker.hpp"
template <typename T>
class ShmRecver : public ShmWorker {
public:
ShmRecver(const std::string & key, int size = 4096) {
init <T> (key, size);
header_ = (Header*)m_data;
read_index = header_->tail_.load();
}
virtual ~ShmRecver() {}
void Recv(T& t) {
std::unique_lock<std::mutex> lk(header_->cv_mut_);
header_->cv_.wait(lk, [&] { return read_index != header_->tail_; }); // wait tail changed by Sender
t = *(T*)(m_data + sizeof(*header_) + (read_index ++));
}
private:
int read_index;
Header* header_;
};
but, when i call sender.Send()
, the recver.Recv()
not response.
seems conditional_variable notify_all
didn't wake up Recver process's wait
could you help on this?