1

My program uses an iterator to traverse through a map, and spawns off a number of worker threads to process the points from the read iterator, which is all good. Now, I'd like to write the output for each point, and for that I'm using a memory buffer to ensure data is collected from the threads in the correct order before it is written to the file (via another iterator for writing):

public class MapMain
{
    // Multiple threads used here, each thread starts in Run() 
    // requests and processes map points

    public void Run()
    {
        // Get point from somewhere and process point
        int pointIndex = ...

        bufferWriter.StartPoint(pointIndex);

        // Perform a number of computations.
        // For simplicity, numberOfComputations = 1 in this example   
        bufferedWriter.BufferValue(pointIndex, value);

        bufferWriter.EndPoint(pointIndex); 
    }
}

My attempt at implementing a buffer:

public class BufferWriter
{
  private const int BufferSize = 4;

  private readonly IIterator iterator;
  private readonly float?[] bufferArray;
  private readonly bool[] bufferingCompleted;
  private readonly SortedDictionary<long, int> pointIndexToBufferIndexMap;
  private readonly object syncObject = new object();  

  private int bufferCount = 0;
  private int endBufferCount = 0;

  public BufferWriter(....)
  {
      iterator = ...
      bufferArray = new float?[BufferSize];
      bufferingCompleted = new bool[BufferSize];
      pointIndexToBufferIndexMap = new SortedDictionary<long, int>();
  }

  public void StartPoint(long pointIndex)
  {
    lock (syncObject)
    {
        if (bufferCount == BufferSize)
        {
            Monitor.Wait(syncObject);
        }

        pointIndexToBufferIndexMap.Add(pointIndex, bufferCount);   
        bufferCount++;
    }
  }

  public void BufferValue(long pointIndex, float value)
  {
      lock (syncObject)
      {
          int bufferIndex = pointIndexToBufferIndexMap[pointIndex];
          bufferArray[bufferIndex] = value;          
      }
  }

  public void EndPoint(long pointIndex)
  {
      lock (syncObject)
      {
          int bufferIndex = pointIndexToBufferIndexMap[pointIndex];
          bufferingCompleted[bufferIndex] = true;

          endBufferCount++;
          if (endBufferCount == BufferSize)
          {
              FlushBuffer();
              Monitor.PulseAll(syncObject);
          }
      }
  }

  private void FlushBuffer()
  {
      // Iterate in order of points
      foreach (long pointIndex in pointIndexToBufferIndexMap.Keys)
      {
          // Move iterator 
          iterator.MoveNext();

          int bufferIndex = pointIndexToBufferIndexMap[pointIndex];

          if (bufferArray[bufferIndex].HasValue)
          {                  
              iterator.Current = bufferArray[bufferIndex];

              // Clear to null
              bufferArray[bufferIndex] = null;                  
          }
      }

      bufferCount = 0;
      endBufferCount = 0;
      pointIndexToBufferIndexMap.Clear();
  }        
}

I'm looking for feedback to fix and correct the bugs in my code and resolve any performance issues:

[1] In short: I have a fixed-size buffer that collects data from multiple threads processing points in somewhat random order. When the buffer gets completely filled with data, it has to be flushed. But what if I collected points 0 to 9 but point 8 was missing ? My buffer is already full and any point trying to use the buffer will block until a flush is performed, which needs point 8.

[2] Order of values in the buffer does not correspond to the order of the map points the values refer to. If this was the case, then I think flushing would be easier (array access faster than SortedDictionary retrieval time ?). In addition, this might allow us to reuse the flushed slots for incoming data (circular buffer ?)

But I can't think of a working model to achieve this.

[3] Buffer waits until it gets completely filled before flushing. There are many instances where a thread invokes EndPoint() and iterator.Current happens to refer to that point. It might make more sense to instantly "write" (i.e. call 'iterator.Current' and enumerate once) for that point, but how can this be done ?

Just to be clear, the writing iterator in BufferWriter has a buffer at its own level to cache values invoked on its Current property before writing to output, but I don't have to worry about it.

I feel like the whole thing needs to be rewritten from scratch !

Any help appreciated, Thank you.

alhazen
  • 1,907
  • 3
  • 22
  • 43

2 Answers2

1

I wouldn't do parallelism "by hand", farm it out to TPL or PLINQ. Since you are talking about a map you have fixed set of points that you could enumerate by coordinates and let PLINQ worry about parallelism.

Example:

// first get your map points, could be just a lazy iterator over every map point
IEnumerable<MapPoint> mapPoints = ...
//Now use PLINQ to compute in parallel, maintain order
var computedMapPoints = mapPoints.AsParallel()
                        .AsOrdered()
                        .Select(mappoint => ComputeMapPoint(mappoint)).ToList();
BrokenGlass
  • 158,293
  • 28
  • 286
  • 335
  • TPL and PLINQ are available since .NET 4.0. Not so many developers have (for various reasons) switched to it yet. – dzendras Jan 12 '11 at 22:03
1

That's my solution that should work, although I haven't tested it. Add a new field:

private readonly Queue<AutoResetEvent> waitHandles = new Queue<AutoResetEvent>();

Two if's (Start and End) require changing to:

Start:

if (bufferCount == BufferSize)
{
    AutoResetEvent ev = new AutoResetEvent( false );
    waitHandles.Enqueue( ev );
    ev.WaitOne();
}

End:

if (endBufferCount == BufferSize)
{
   FlushBuffer();
   for ( int i = 0; i < Math.Min( waitHandles.Count, BufferSize ); ++i )
   {
      waitHandles.Dequeue().Set();
   }
}
dzendras
  • 4,721
  • 1
  • 25
  • 20
  • My current implementation limits access to the buffer only using syncObj. If a thread attempts to use the buffer, by calling StartPoint(), and it happens to be full, the thread will have to wait until the buffer is flushed. – alhazen Jan 12 '11 at 21:05
  • In short: I have a fixed-size buffer that collects data from multiple threads processing points in somewhat random order. When the buffer gets completely filled with data, it has to be flushed, right ? But what if I collected points 0 to 9 but point 8 was missing ? My buffer is already full and any point trying to use the buffer will block until a flush is performed, which needs point 8 – alhazen Jan 12 '11 at 21:10
  • I can see some flaws in the code. 1. If you want to invite another 4 threads to compute, you need to invite ONLY 4 threads, but PulseAll releases ALL waiting threads (i.e. 1000). When released they use THE SAME bufferCount value == 0 (zeroed in FlushBuffer method): pointIndexToBufferIndexMap.Add(pointIndex, bufferCount);//bufferCount == 0 for ALL threads bufferCount++;//here is the mess I think EDIT: Don't know how to format comment properly. Sorry. – dzendras Jan 12 '11 at 21:15
  • Did you check my updated solution? I don't get any feedback from you. – dzendras Jan 14 '11 at 07:43