0

I am designing a shared memory, which support multi-producer multi-consumer.

I have a header with conditional_variable and pthread_mutex_t.

// header.hpp
#pragma once
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/types.h>
#include <semaphore.h>
#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <error.h>
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/sysinfo.h>
#include <atomic>
#include <memory>
#include <mutex>
#include <condition_variable>
#include <iostream>
using namespace std;

struct Header {
  const size_t size_;
  std::atomic_int32_t tail_;
  std::mutex cv_mut_;
  std::condition_variable cv_;
  pthread_mutex_t mut_;
  Header(size_t size, int32_t tail) : size_(size), tail_(tail) {
    pthread_mutexattr_t mutexattr;
    pthread_mutexattr_init(&mutexattr);
    pthread_mutexattr_setpshared(&mutexattr, PTHREAD_PROCESS_SHARED);
    pthread_mutexattr_setrobust(&mutexattr, PTHREAD_MUTEX_ROBUST);
    pthread_mutex_init(&mut_, &mutexattr); 
  }
};

I also have a shm_worker which used to init and allocate memory.

#pragma once
#include "./header.hpp"

class ShmWorker {
 public:
  ShmWorker() : is_init(false) {}

  virtual ~ShmWorker() {
    shmdt(m_data);
    if (create_new) shmctl(shmid, IPC_RMID, 0); 
  }

 protected:
  key_t get_keyid(const std::string & name) {
    const std::string & s = "./" + name;
    if (access(s.c_str(), F_OK) == -1) { const std::string & s1 = "touch " + s; system(s1.c_str()); }
    key_t semkey = ftok(name.c_str(), 1); 
    if (semkey == -1) { printf("shm_file:%s not existed\n", s.c_str()); exit(1); }
    return semkey;
  }

  template <typename T>
  void init(const std::string& name, int size) {  // one word to conclude: if not existed, create, else connect
    if (is_init) { printf("this shmworker has beed inited!\n"); return; }
    m_key = get_keyid(name);
    shmid = shmget(m_key, 0, 0); 
    if (shmid == -1) {
      if (errno == ENOENT || errno == EINVAL) {
        shmid = shmget(m_key, sizeof(Header) + sizeof(T) * size, 0666 | IPC_CREAT | O_EXCL);
        if (shmid == -1) { printf("both connet and create are failed for shm\n"); exit(1); }
        printf("creating new shm %s\n", name.c_str());
        create_new = true;
        m_data = (char*)shmat(shmid, NULL, 0); 
        Header* header = new Header(size, 0); 
        memcpy(m_data, header, sizeof(Header));
      } else {
        exit(1);
      }   
    } else {
      m_data = (char*)shmat(shmid, 0, 0); 
      // m_size = reinterpret_cast<std::atomic_int*>(m_data)->load();
    }   

    if (m_data == (char*)(-1)) { perror("shmat"); exit(1); }
    is_init = true;
  }

  int m_key;
  int shmid;
  char* m_data;
  bool is_init;
  bool create_new = false;
};

the producer code is:

// shm_worker.hpp
#pragma once
#include <mutex>
#include <fcntl.h>
#include <fstream>
#include "./shm_worker.hpp"

template <typename T>
class ShmSender: public ShmWorker {
 public:
  ShmSender(const std::string& key, int size = 4096) {
    init <T> (key, size);
    header_ = (Header*)m_data;
  }

  virtual ~ShmSender() = default;

  void Send(const T& shot) {
    // pthread_mutex_lock(&header_->mut_);
    {   
      std::lock_guard<std::mutex> lk(header_->cv_mut_);
      memcpy(m_data + sizeof(Header) + header_->tail_.load() % header_->size_ * sizeof(T), &shot, sizeof(T));
      header_->tail_.fetch_add(1);
    }   
    // pthread_mutex_unlock(&header_->mut_);
    header_->cv_.notify_all();
  }

 private:
  Header * header_;
};

the consumer code is:

#pragma once
#include <unistd.h>
#include "./shm_worker.hpp"

template <typename T>
class ShmRecver : public ShmWorker {
 public:
  ShmRecver(const std::string & key, int size = 4096) {
    init <T> (key, size);
    header_ = (Header*)m_data;
    read_index = header_->tail_.load();
  }

  virtual ~ShmRecver() {}

  void Recv(T& t) {
    std::unique_lock<std::mutex> lk(header_->cv_mut_);
    header_->cv_.wait(lk, [&] { return read_index != header_->tail_; });  // wait tail changed by Sender
    t = *(T*)(m_data + sizeof(*header_) + (read_index ++));
  }

 private:
  int read_index;
  Header* header_;
};

but, when i call sender.Send(), the recver.Recv() not response.

seems conditional_variable notify_all didn't wake up Recver process's wait

could you help on this?

nothingisme
  • 119
  • 5
  • 1
    Not that I have the ready answer, but I'd start with the following: `static_assert(std::is_trivially_copyable_v
    , "should be serializable by memcpy");` For me it fails, which is not a good sign.
    – alagner Mar 08 '23 at 07:03

0 Answers0