2

I have an piece of code that does calculations on assets. There are many millions of those so I want to compute everything in streams. My current 'pipeline' looks like this:

I have a query that is executed as a Datareader.

Then my Asset class has a constructor that accepts an IDataReader;

Public Asset(IdataReader rdr){
  // logic that initiates fields
}

and a method that converts the IDataReader to an IEnumerable<Asset>

public static IEnumerable<Asset> ToAssets(IDataReader rdr) {

    // make sure the reader is in the right formt
    CheckReaderFormat(rdr);

    // project reader into IEnumeable<Asset>
    while (rdr.Read()) yield return new Asset(rdr);

}

That then gets passed into a function that does the actually calculations and then projects it into a IEnumerable<Asnwer>

That then gets a wrapper the exposes the Answers as an IDataReader and that then that gets passed to a OracleBulkCopy and the stream is written to the DB.

So far it works like a charm. Because of the setup I can swap the DataReader for an IEnumerable that reads from a file, or have the results written to a file etc. All depending on how I string the classes/ functions together.

Now: There are several thing I can compute, for instance besides the normal Answer I could have a DebugAnswer class that also outputs some intermediate numbers for debugging. So what I would like to do is project the IEnumerable into several output streams so I can put 'listeners' on those. That way I won't have to go over the data multiple times. How can I do that? Kind of like having several Events and then only fire certain code if there's a listeners attached.

Also sometimes I write to the DB but also to a zipfile just to keep a backup of the results. So then I would like to have 2 'listeners' on the IEnumerable. One that projects is as an IDataReader and another one that writes straight to the file.

How do I output multiple output streams and how can I put multiple listeners on one outputstream? What lets me compose streams of data like that?

edit

so some pseudocode of what I would like to do:

foreach(Asset in Assets){
   if(DebugListener != null){
     // compute 
     DebugAnswer da = new DebugAnswer {result = 100};
     yield da to DebugListener;  // so instead of yield return yield to that stream

   }

   if(AnswerListener != null){
     // compute basic stuff 
     Answer a = new Answer { bla = 200 };
     yield a to AnswerListener;
   }
}

Thanks in advance,

Gert-Jan

Chris Snow
  • 23,813
  • 35
  • 144
  • 309
gjvdkamp
  • 9,929
  • 3
  • 38
  • 46

5 Answers5

5

What you're describing sounds sort of like what the Reactive framework provides via the IObservable interface, but I don't know for sure whether it allows multiple subscribers to a single subscription stream.

Update

If you take a look at the documentation for IObservable, it has a pretty good example of how to do the sort of thing you're doing, with multiple subscribers to a single object.

Community
  • 1
  • 1
StriplingWarrior
  • 151,543
  • 27
  • 246
  • 315
  • 2
    Yes, you can subscribe multiple observers to a single observable. – dtb Aug 05 '11 at 14:20
  • Hi Thanks for RX. Had heard of it but never jumped in. I updated the question, my main issue is yielding to multiple streams. Would you happen to have an example for that? Haven;t seen it in the 101 yet... – gjvdkamp Aug 05 '11 at 14:31
  • Looked at the docs, seems to hit the spot indeed. Have my reading and studying cut for me. – gjvdkamp Aug 05 '11 at 14:42
4

Your example rewritten using Rx:

// The stream of assets
IObservable<Asset> assets = ...

// The stream of each asset projected to a DebugAnswer
IObservable<DebugAnswer> debugAnswers = from asset in assets
                                        select new DebugAnswer { result = 100 };

// Subscribe the DebugListener to receive the debugAnswers
debugAnswers.Subscribe(DebugListener);

// The stream of each asset projected to an Anwer
IObservable<Answer> answers = from asset in assets
                              select new Answer { bla = 200 };

// Subscribe the AnswerListener to receive the answers
answers.Subscribe(AnswerListener);
dtb
  • 213,145
  • 36
  • 401
  • 431
  • Hi, thanks for this, not quite grokking it yet though... I will look at Rx very hard. – gjvdkamp Aug 05 '11 at 14:44
  • 1
    Think I'm strating to get it. If this actually works like this it would be pretty awesome. I see how the Linq deferred execution would let one do this like this..WOW. Gotta go now, check back later. – gjvdkamp Aug 05 '11 at 14:47
1

This is exactly the job for Reactive Extensions (became part of .NET since 4.0, available as a library in 3.5).

Community
  • 1
  • 1
Dan Abramov
  • 264,556
  • 84
  • 409
  • 511
1

You don't need multiple "listeners", you just need pipeline components that aren't destructive or even necessarily transformable.

IEnumerable<T> PassThroughEnumerable<T>(IEnumerable<T> source, Action<T> action) {
    foreach (T t in source) {
       Action(t);
       yield return t;
    }    
}

Or, as you're processing in the pipeline just raise some events to be consumed. You can async them if you want:

static IEnumerable<Asset> ToAssets(IDataReader rdr) {
   CheckReaderFormat(rdr);
   var h = this.DebugAsset;
   while (rdr.Read()) {
      var a = new Asset(rdr);
      if (h != null) h(a);
      yield return a;
   }
}

public event EventHandler<Asset> DebugAsset;
Mark Brackett
  • 84,552
  • 17
  • 108
  • 152
  • Hi thanks. Thought of that but for instance the OracleBulkCopy does not return anything, that class it outside my control. So then the stream would stop there, limiting my options of composing these streams. My real life problem is a bit more complicated and would turn into a jigsaw pretty quickly. If possible I would prefer some more geneneric framework for doing this, RX seems to be just that. – gjvdkamp Aug 05 '11 at 14:33
0

If I got you right, it should be possible to replace or decorate the wrapper. The WrapperDecorator may forward calls to the normal OracleBulkCopy (or whatever you're using) and add some custom debug code.

Does that help you?

Matthias

Matthias Meid
  • 12,455
  • 7
  • 45
  • 79
  • Hi Mude thanks. This sounds like whaty Mark is suggesting but that would solve this problem but become complicated when I get more stream to compose. Rx sounds better since it seems to 'Convert' the control from pull to push and back. – gjvdkamp Aug 05 '11 at 14:38