4

I have 4 threads. One is reading some information from the network write it in variable and should signal after each piece. 3 of them are reading this variable and should read it exactly one time. Current solution is writer set the event after it writes and wait for readers events. Readers wait for event, then read and set their events (mean they read). The problem is that readers can read more than one time and I have duplicates in them. How can I achieve the rule for readers read exactly one time?

syned
  • 2,201
  • 19
  • 22
  • I would prefer start from any working solution and then do it better – syned Mar 25 '13 at 21:13
  • 1
    I think the reader should handle the duplicate write message. I don't think you should leave it up to the clients to assume that a change event will only fire once. There are network scenarios that could cause this behavior, for example. – neontapir Mar 25 '13 at 21:15

5 Answers5

2

One way to implement this is the following

The data is shared amongst the threads as a singly linked list. Every node in the list can be a marker or have data. The list starts out as a single node which is typed to marker. When data is read it a new list is formed which has a series of data nodes followed by a marker. This list is appended to the most recent marker added to the list.

Every reader thread starts out with a reference to the original marker node and an AutoResetEvent. Whenever a new piece of data comes in the writer it will signal the AutoResetEvent for every reader thread. The reader thread will then simply walk until it finds a marker with no Next node.

This scheme ensures that all readers only see the data once. The biggest complication is constructing the list so that it can be written to and read from in a lock free fashion. This is pretty straight forward with Interlocked.CompareExchange though

Linked List type

class Node<T> { 
  public bool IsMarker;
  public T Data;
  public Node<T> Next;
}

Sample writer type

class Writer<T> {
  private List<AutoResetEvent> m_list; 
  private Node<T> m_lastMarker;

  public Writer(List<AutoResetEvent> list, Node<T> marker) { 
    m_lastMarker = marker;
    m_list = list;
  }

  // Assuming this can't overlap.  If this can overload then you will
  // need synchronization in this method around the writing of 
  // m_lastMarker      
  void OnDataRead(T[] items) {
    if (items.Length == 0) {
      return;
    }

    // Build up a linked list of the new data followed by a 
    // Marker to signify the end of the data.  
    var head = new Node<T>() { Data = items[0] };
    var current = head;
    for (int i = 1; i < items.Length; i++) {
      current.Next = new Node<T>{ Data = items[i] };
      current = current.Next;
    }
    var marker = new Node<T> { IsMarker = true };
    current.Next = marker;

    // Append the list to the end of the last marker node the writer
    // created 
    m_lastMarker.Next = head;
    m_lastMarker = marker;

    // Tell each of the readers that there is new data 
    foreach (var e in m_list) { 
      e.Set();
    }
  }
}

Sample reader type

class Reader<T> { 
  private AutoResetEvent m_event;
  private Node<T> m_marker;

  void Go() {
    while(true) { 
      m_event.WaitOne();
      var current = m_marker.Next;
      while (current != null) { 
        if (current.IsMarker) { 
          // Found a new marker.  Always record the marker because it may 
          // be the last marker in the chain 
          m_marker = current;
        } else { 
          // Actually process the data 
          ProcessData(current.Data);
        }
        current = current.Next;
      }
    }
  }
}
JaredPar
  • 733,204
  • 149
  • 1,241
  • 1,454
1

I agree with the comment that said you should code the consumer threads to accept the possibility of getting the same value multiple times. Perhaps the easiest way to do that would be to add a sequential identifier to each update. That way, the thread can compare the sequential id to the last id that it read, and know if it's getting a duplicate.

It would also know if it missed a value.

But if you really need them to be in lock-step and only get the value one time, I would suggest that you use two ManualResetEvent objects, and a CountdownEvent. Here's how to use them.

ManualResetEvent DataReadyEvent = new ManualResetEvent();
ManualResetEvent WaitForResultEvent = new ManualResetEvent();
CountdownEvent Acknowledgement = new CountdownEvent(NumWaitingThreads);

The reader threads wait on the DataReadyEvent.

When the other thread reads a value from the network, it does this:

Acknowledgement.Reset(NumWaitingThreads);
DataReadyEvent.Set();  // signal waiting threads to process
Acknowledgement.WaitOne();  // wait for all threads to signal they got it.
DataReadyEvent.Reset(); // block threads' reading
WaitForResultEvent.Set(); // tell threads they can continue

The waiting threads do this:

DataReadyEvent.WaitOne(); // wait for value to be available
// read the value
Acknowledgement.Set();  // acknowledge receipt
WaitForResultEvent.WaitOne(); // wait for signal to proceed

This has the same effect as having two events per waiting thread, but much more simply.

It does have the drawback, though, that if a thread crashes, this will hang on the countdown event. But then, your method will, too, if the producer thread waits for all of the thread messages.

Jim Mischel
  • 131,090
  • 20
  • 188
  • 351
1

This is a good fit for the Barrier class.

You can use two Barriers to flip-flop between the two states.

Here's an example:

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    internal class Program
    {
        private static void Main(string[] args)
        {
            int readerCount = 4;

            Barrier barrier1 = new Barrier(readerCount + 1);
            Barrier barrier2 = new Barrier(readerCount + 1);

            for (int i = 0; i < readerCount; ++i)
            {
                Task.Factory.StartNew(() => reader(barrier1, barrier2));
            }

            while (true)
            {
                barrier1.SignalAndWait(); // Wait for all threads to reach the "new data available" point.

                if ((value % 10000) == 0)       // Print message every so often.
                    Console.WriteLine(value);

                barrier2.SignalAndWait(); // Wait for the reader threads to read the current value.
                ++value;                  // Produce the next value.
            }
        }

        private static void reader(Barrier barrier1, Barrier barrier2)
        {
            int expected = 0;

            while (true)
            {
                barrier1.SignalAndWait(); // Wait for "new data available".

                if (value != expected)
                {
                    Console.WriteLine("Expected " + expected + ", got " + value);
                }

                ++expected;
                barrier2.SignalAndWait();  // Signal that we've read the data, and wait for all other threads.
            }
        }

        private static volatile int value;
    }
}
Matthew Watson
  • 104,400
  • 10
  • 158
  • 276
0

I would recommend ConcurrentQueue - it guarnatees that each thread gets a unique instance from the queue. Here is a good explaination of how to use it.

ConnurrentQueue<T>.TryDequeue() is a thread-safe method that checks if queue is not empty and if it's not takes an item from the queue. Since it performs both operations at once programmers dont have to worry about race condition.

Community
  • 1
  • 1
alexm
  • 6,854
  • 20
  • 24
  • So, writer add new data to queue, and then readers read and delete that element? – syned Mar 25 '13 at 21:17
  • But how writer knows how many elements should it add to this queue? Or it automatically duplicate elements across the threes? – syned Mar 25 '13 at 21:43
0

I think I've found the way to go. I created 2 arrays of AutoResetEvent and every reader has 2 events, wait for write event and set read event, and writer set all write events and wait for all read events.

JaredPar, your answer was useful and helps me

syned
  • 2,201
  • 19
  • 22