1

I have a listener, which receives work in the form of IPayload. The listener should push this work to observers who actually do the work. This is my first crude attempt to achieve this:

public interface IObserver
{
    void DoWork(IPayload payload);
}

public interface IObservable
{
    void RegisterObserver(IObserver observer);
    void RemoveObserver(IObserver observer);
    void NotifyObservers(IPayload payload);
}

public class Observer : IObserver
{
    public void DoWork(IPayload payload)
    {
        // do some heavy lifting
    }
}

public class Listener : IObservable
{
    private readonly List<IObserver> _observers = new List<IObserver>();

    public void PushIncomingPayLoad(IPayload payload)
    {
        NotifyObservers(payload);
    }

    public void RegisterObserver(IObserver observer)
    {
        _observers.Add(observer);
    }

    public void RemoveObserver(IObserver observer)
    {
        _observers.Remove(observer);
    }

    public void NotifyObservers(IPayload payload)
    {
        Parallel.ForEach(_observers, observer =>
        {
        observer.DoWork(payload);
        });
    }
}

Is this a valid approach that follows the observer/observable pattern (i.e. pub sub?)? My understanding is that the NotifyObservers also spins up a threat for each payload. Is this correct? Any improvement suggestions very much welcome.

Please note that all observers have to finish their work before new work in the form of a payload is passed on to them - the order of 'observation' does not matter. Basically, the listener has to act like a master whilst exploiting the cores of the host as much as possibly using the TPL. IMHO this requires the explicit registration of observers with the listener/Observable.

PS:

I think Parallel.ForEach does not create a thread for each observer: Why isn't Parallel.ForEach running multiple threads? If this is true how can I ensure to create a thread for each observer?

An alternative I have in mind is this:

public async void NotifyObservers(IPayload payload)
{
    foreach (var observer in _observers)
    {
    var observer1 = observer;
    await Task.Run(() => observer1.DoWork(payload));
    }
    await Task.WhenAll();
}
Community
  • 1
  • 1
cs0815
  • 16,751
  • 45
  • 136
  • 299

1 Answers1

2

Of course you can do it this way, but in .net that is not needed if you dont want to reinvent the wheel :-) In c# there this could be done using events.

A short example :

  //Your Listener who has a public eventhandler where others can add them as listeners
  public class Listener{
      //your eventhandler where others "add" them as listeners
      public event EventHandler<PayLoadEventsArgs> IncomingPayload;

      //a method where you process new data and want to notify the others
      public void PushIncomingPayLoad(IPayload payload)
      {
          //check if there are any listeners
          if(IncomingPayload != null)
              //if so, then notify them with the data in the PayloadEventArgs
              IncomingPayload(this, new PayloadEventArgs(payload));
      }
  }  

  //Your EventArgs class to hold the data
  public class PayloadEventArgs : EventArgs{

      Payload payload { get; private set; }  

      public PayloadEventArgs(Payload payload){
          this.payload = payload;
      }
  }

  public class Worker{
      //add this instance as a observer
      YourListenerInstance.IncomingPayload += DoWork;

      //remove this instance 
      YourListenerInstance.IncomingPayload -= DoWork;

      //This method gets called when the Listener notifies the  IncomingPayload listeners
      void DoWork(Object sender, PayloadEventArgs e){
          Console.WriteLine(e.payload);
      }
   }

EDIT: As the question asks for parallel execution, how about doing the new thread at the subscriber side? I think this is the easiest approach to achieve this.

//Inside the DoWork method of the subscriber start a new thread
Task.Factory.StartNew( () => 
{
      //Do your work here
});

//If you want to make sure that a new thread is used for the task, then add the TaskCreationOptions.LongRunning parameter
Task.Factory.StartNew( () => 
{
      //Do your work here
}, TaskCreationOptions.LongRunning);

Hopefully this answers your question? If not, please leave a comment.

sebhaub
  • 714
  • 6
  • 16
  • Thanks I am aware of this approach as well. How do you ensure that each worker has its own thread here? – cs0815 Feb 13 '16 at 14:20
  • sorry i missed that part of your question, i did an edit to my answer. In my opinion start a new thread in the subscriber method is the easiest approach. What do you think about that? – sebhaub Feb 13 '16 at 14:30
  • actually I think creating a new thread in the observer would not work in my scenario. I need to ensure the following: Let us say I have 3 observers subscribed to the listener: O1, O2 and O3 I need to ensure that all 3 observers have finished their work before new work is passed to them again. I think this is not guaranteed by creating new threads in the worker. Basically, the listener has to be a master whilst exploiting all cores to do the work in the observers. – cs0815 Feb 13 '16 at 18:51
  • Do you want to block the listener until the work is done by the observers? – sebhaub Feb 16 '16 at 13:12
  • Yes see also http://stackoverflow.com/questions/35394672/automapper-structuremap-issues-with-tpl-version-of-synchronous-code – cs0815 Feb 16 '16 at 17:29