1

I have been trying to implement a distributed depth first search in c#. I have beem successful upto a certain point but have got a synchronisation error. I am not able to rectify the error. What i am trying to do is make each node communicate with one other using task parallel dataflow and thereby i attain parallelism in DFS. Below is my code:

public class DFS
{
static List<string> traversedList = new List<string>();

static List<string> parentList = new List<string>();
static Thread[] thread_array;
static BufferBlock<Object> buffer1 = new BufferBlock<Object>();

public static void Main(string[] args)
{

    int N = 100;
    int M = N * 4;
    int P = N * 16;

    Stopwatch stopwatch = new Stopwatch();
    stopwatch.Start();

    List<string> global_list = new List<string>();

    StreamReader file = new StreamReader(args[args.Length - 2]);


    string text = file.ReadToEnd();

    string[] lines = text.Split('\n');



    string[][] array1 = new string[lines.Length][];

    for (int i = 0; i < lines.Length; i++)
    {
        lines[i] = lines[i].Trim();
        string[] words = lines[i].Split(' ');

        array1[i] = new string[words.Length];

        for (int j = 0; j < words.Length; j++)
        {
            array1[i][j] = words[j];
        }
    }

    StreamWriter sr = new StreamWriter("E:\\Newtext1.txt");

    for (int i = 0; i < array1.Length; i++)
    {
        for (int j = 0; j < array1[i].Length; j++)
        {
            if (j != 0)
            {
                sr.Write(array1[i][0] + ":" + array1[i][j]);
                Console.WriteLine(array1[i][0] + ":" + array1[i][j]);
                sr.Write(sr.NewLine);
            }
        }

    }
    int start_no = Convert.ToInt32(args[args.Length - 1]);
    thread_array = new Thread[lines.Length];
    string first_message = "root";
    buffer1.Post(first_message);
    buffer1.Post(array1);
    buffer1.Post(start_no);
    buffer1.Post(1);

    for (int t = 1; t < lines.Length; t++)
    {
        Console.WriteLine("thread" + t);
        thread_array[t] = new Thread(new ThreadStart(thread_run));
        thread_array[t].Name = t.ToString();
        lock (thread_array[t])
        {
            Console.WriteLine("working");
            thread_array[t].Start();
            thread_array[t].Join();
        }

    }
    stopwatch.Stop();

    Console.WriteLine(stopwatch.Elapsed);
    Console.ReadLine();
}

private static void dfs(string[][] array, int point)
{
    for (int z = 1; z < array[point].Length; z++)
    {
        if ((!traversedList.Contains(array[point][z])))
        {
            traversedList.Add(array[point][z]);
            parentList.Add(point.ToString());
            dfs(array, int.Parse(array[point][z]));
        }

    }
    return;


}
public static void thread_run()
{
    try
    {
        string parent;
        string[][] array1;
        int point;
        int id;
        parent = (string)buffer1.Receive();
        array1 = (string[][])buffer1.Receive();
        point = (int)buffer1.Receive();
        id = (int)buffer1.Receive();
        object value;
        Console.WriteLine("times");

        if (Thread.CurrentThread.Name.Equals(point.ToString()))
        {
            if (!traversedList.Contains(point.ToString()))
            {
                Console.WriteLine("Node:" + point + " Parent:" + parent + " Id:" + id);
                traversedList.Add(point.ToString());
                parent = point.ToString();
                for (int x = 1; x < array1[point].Length; x++)
                {
                    Console.WriteLine("times");
                    if (buffer1.TryReceive(out value))
                    {
                        array1 = (string[][])value;
                    }
                    if (buffer1.TryReceive(out value))
                    {
                        id = (int)buffer1.Receive();
                    }
                    id++;
                    buffer1.Post(parent);
                    buffer1.Post(array1);
                    buffer1.Post(x);
                    buffer1.Post(id);
                    Console.WriteLine("times");
                    Monitor.PulseAll(Thread.CurrentThread);
                }

                //return;
            }
            else
            {
                buffer1.Post(parent);
                buffer1.Post(array1);
                buffer1.Post(point);
                buffer1.Post(id);
                Console.WriteLine("working 1");
                Monitor.PulseAll(Thread.CurrentThread);
            }
        }
        else
        {
            Console.WriteLine("working 2");
            Monitor.Wait(Thread.CurrentThread);
        }
        //Console.WriteLine(parent);
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.Message);
    }

}

}

enter image description here

VMAtm
  • 27,943
  • 17
  • 79
  • 125
  • 1
    how is this any different from the question you asked 3 days ago @ http://stackoverflow.com/questions/10852317/depth-first-search-in-a-distributed-way ? – James Manning Jun 05 '12 at 03:30
  • @JamesManning : Please not tat I did a sequential implementation in the last question but figured a way for distributed implementation(used threads) and here I am stuck with this error. In java its easier because we basically use synchronized keyword but in c# I don't find anything tat easy. – Rigorous implementation Jun 05 '12 at 03:40
  • What line of code is this error coming from? – Faraday Jun 05 '12 at 04:15
  • @Vijay:on the Monitor.Wait(Thread.currentThread).. when i try to make the thread started inside main wait... – Rigorous implementation Jun 05 '12 at 04:24
  • Your code is NOT using dataflow. The whole point of dataflow is to use ActionBlocks instead of threads to simplify processing. Instead, you are using BufferBlock in a way similar to ConcurrentQueue or any of the other concurrent collections. – Panagiotis Kanavos Jun 05 '12 at 06:45

3 Answers3

3

There various issues with your code.

Incorrect use of locking and "touching" the traversedList from multiple threads is the most obvious problem.

More importantly, your code doesn't really use Dataflow, it uses BufferBlock in a manner similar to ConcurrentQueue or any other concurrent collection. The whole point of dataflow is to use ActionBlocks instead of threads to simplify processing. By default an action block will use only a single thread for processing but you can specify as many threads as you want through the DataflowBlockOptions class.

ActionBlocks have their own input and output buffers so you don't have to add additional BufferBlocks just for buffering.

Passing multiple related values to a block is another problem, as it can lead to errors and makes the code confusing. Creating a data structure to hold all the values doesn't cost anything.

Assuming you use this class to hold processing message:

    public class PointMessage
    {
        public string Message { get; set; }
        public string[][] Lines{get;set;}
        public int Point { get; set; }
        public int ID { get; set; }
    }

You can create an ActionBlock to process these messages like this:

static ActionBlock<PointMessage> _block;
...
var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExecutionDataflowBlockOptions.Unbounded };
_block=new ActionBlock<PointMessage>(msg=>ProcessMessage(msg),options);

And process each message like this:

    private static void ProcessMessage(PointMessage arg)
    {
        if (...)
        {
            ...
            arg.ID++;
            _block.Post(arg);
        }
        else
        {
             ...
            _block.Post(arg);
        }
    }

If your function returns a value, you can use a TransformBlock instead of an ActionBlock.

I don't understand what your code does so I won't try to rewrite it using DataFlow. If you clean it up a bit, it will be easier to help.

Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
  • I just want to send a notification and receive confirmations from each of the nodes present in the graph. That's the reason i m using task parallel library as actor model. – Rigorous implementation Jun 05 '12 at 09:07
  • The point is, you are NOT using any actors. BufferBlock doesn't handle messages, ActionBlock or TransformBlock do. Besides, what do you mean by actors? Are you trying to convert each node to an actor? That's a huge waste of resources. It would be easier to evaluate a Contains function for each node in a Parallel.For loop to allow the runtime to select an acceptable number of threds. – Panagiotis Kanavos Jun 05 '12 at 09:17
  • But bufferblock does have send & receive functionalities where i am making use of it...Yeah! the basic idea is each node will act like a independent system and will communicate with one another. – Rigorous implementation Jun 05 '12 at 09:21
  • So does ConcurrentQueue, ConcurrentBag, BlockingCollection and any other class used in a Producer/Consumer scenarios but they arent't called actors. If you want to implement your own actor class, BufferBlock will not help at all. Actors are implemented as ActionBlocks or TransformBlocks. – Panagiotis Kanavos Jun 05 '12 at 09:26
  • Small correction: `ActionBlock` doesn't have an output queue, because it doesn't produce anything, unlike most other blocks. – svick Jun 05 '12 at 09:58
  • I think I mentioned that in the main answer, to return a value you need TransformBlock – Panagiotis Kanavos Jun 05 '12 at 09:59
1

The issue is that the Thread needs to own Monitor in order to call Wait. So you need to lock on Monitor.PulseAll aswell as Monitor.Wait in order to ensure that you don't get any more errors like this.

If you need me to explain locking to you, open another question and I'll explain it in full! :)

Faraday
  • 2,904
  • 3
  • 23
  • 46
0

EDIT: Ignore my post - Read the post from @PanagiotisKanavos instead...

This won't compile, but will set you in the right direction for using locks:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Threading;

public class DFS
{
    static List<string> traversedList = new List<string>();

    static List<string> parentList = new List<string>();
    static Thread[] thread_array;
    //static BufferBlock<Object> buffer1 = new BufferBlock<Object>();

    public static void Main(string[] args)
    {

        int N = 100;
        int M = N * 4;
        int P = N * 16;

        Stopwatch stopwatch = new Stopwatch();
        stopwatch.Start();

        List<string> global_list = new List<string>();

        StreamReader file = new StreamReader(args[args.Length - 2]);


        string text = file.ReadToEnd();

        string[] lines = text.Split('\n');



        string[][] array1 = new string[lines.Length][];

        for (int i = 0; i < lines.Length; i++)
        {
            lines[i] = lines[i].Trim();
            string[] words = lines[i].Split(' ');

            array1[i] = new string[words.Length];

            for (int j = 0; j < words.Length; j++)
            {
                array1[i][j] = words[j];
            }
        }

        StreamWriter sr = new StreamWriter("E:\\Newtext1.txt");

        for (int i = 0; i < array1.Length; i++)
        {
            for (int j = 0; j < array1[i].Length; j++)
            {
                if (j != 0)
                {
                    sr.Write(array1[i][0] + ":" + array1[i][j]);
                    Console.WriteLine(array1[i][0] + ":" + array1[i][j]);
                    sr.Write(sr.NewLine);
                }
            }

        }
        int start_no = Convert.ToInt32(args[args.Length - 1]);
        thread_array = new Thread[lines.Length];
        string first_message = "root";
        //buffer1.Post(first_message);
        //buffer1.Post(array1);
        //buffer1.Post(start_no);
        //buffer1.Post(1);

        for (int t = 1; t < lines.Length; t++)
        {
            Console.WriteLine("thread" + t);
            thread_array[t] = new Thread(new ThreadStart(thread_run));
            thread_array[t].Name = t.ToString();
            lock (thread_array[t])
            {
                Console.WriteLine("working");
                thread_array[t].Start();
                thread_array[t].Join();
            }

        }
        stopwatch.Stop();

        Console.WriteLine(stopwatch.Elapsed);
        Console.ReadLine();
    }

    private static void dfs(string[][] array, int point)
    {
        for (int z = 1; z < array[point].Length; z++)
        {
            if ((!traversedList.Contains(array[point][z])))
            {
                traversedList.Add(array[point][z]);
                parentList.Add(point.ToString());
                dfs(array, int.Parse(array[point][z]));
            }

        }
        return;


    }

    bool busy;
    private readonly object syncLock = new object();

    public static void thread_run()
    {
        try
        {
            string parent;
            string[][] array1;
            int point;
            int id;
            //parent = (string)buffer1.Receive();
            //array1 = (string[][])buffer1.Receive();
            //point = (int)buffer1.Receive();
            //id = (int)buffer1.Receive();
            object value;
            Console.WriteLine("times");

            if (Thread.CurrentThread.Name.Equals("Point.ToString()"))
            {
                if (!traversedList.Contains("Point.ToString()"))
                {
                    for (int x = 1; x < 99999; x++)
                    {
                        Console.WriteLine("times");
                        //if (buffer1.TryReceive(out value))
                        //{
                        //    array1 = (string[][])value;
                        //}
                        //if (buffer1.TryReceive(out value))
                        //{
                        //    id = (int)buffer1.Receive();
                        //}
                        //id++;
                        //buffer1.Post(parent);
                        //buffer1.Post(array1);
                        //buffer1.Post(x);
                        //buffer1.Post(id);
                        Console.WriteLine("times");

                        lock (syncLock)
                        {
                            while (busy)
                            {
                                busy = false;
                                Monitor.PulseAll(Thread.CurrentThread);
                            }
                            busy = true; // we've got it!
                        }


                    }

                    //return;
                }
                else
                {
                    //buffer1.Post(parent);
                    //buffer1.Post(array1);
                    //buffer1.Post(point);
                    //buffer1.Post(id);
                    lock (syncLock)
                    {
                        while (busy)
                        {
                            busy = false;
                            Monitor.PulseAll(Thread.CurrentThread);
                        }
                        busy = true; // we've got it!
                    }
                }
            }
            else
            {
                Console.WriteLine("working 2");
                lock (syncLock)
                {
                    while (busy)
                    {
                        Monitor.Wait(Thread.CurrentThread);
                    }
                    busy = true; // we've got it!
                }

            }
            //Console.WriteLine(parent);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }

    }

}
Faraday
  • 2,904
  • 3
  • 23
  • 46
  • what i am doing in the code is that I am making 10 different threads to call the same function. The above idea doesn't seem to work. – Rigorous implementation Jun 05 '12 at 07:36
  • If you wish to use Monitor, you have to lock properly... Maybe someone else can shed some light on this... – Faraday Jun 05 '12 at 07:38
  • Try changing all of your Monitor.Wait/Pulse/etc to use syncLock instead. The issue you have is that you are locking the thread and unlocking the thread itself... Read up on Threads & Locking patterns. There's too much to fit into this 500 char box, but try what I suggested and it'll work better than you think it will... Read up here: http://www.albahari.com/threading/part4.aspx – Faraday Jun 05 '12 at 08:02