0

I have to process large CSV files (up to tens of GB), that looks like this:

Key,CompletedA,CompletedB
1,true,NULL
2,true,NULL
3,false,NULL
1,NULL,true
2,NULL,true  

I have a parser that yields parsed lines as IEnumerable<Record>, so that I reads only one line at a time into memory.

Now I have to group records by Key and check whether columns CompletedA and CompletedB have value within the group. On the output I need records, that does not have both CompletedA,CompletedB within the group.

In this case it is record with key 3.

However, there is many similar processings going on the same dataset and I don't wont to iterate over it multiple times.

I think I can convert IEnumerable into IObservable and use Reactive Extentions to find the records.

Is it possible to do it in memory efficient way with simple Linq expression over the IObservable collection?

Liero
  • 25,216
  • 29
  • 151
  • 297
  • 1
    Sure, you could also use a pipeline processor like dataflow, orrrr Reactive Extensions, however, this is all overkill, you can do it efficiently in a foreach loop and you would be doing yourself a favor to try this first – TheGeneral Nov 22 '18 at 08:17
  • `records.CountBy(z => new { Key = z.Key, Value = z.CompletedA ?? z.CompletedB}).Where(z => z.Value == 1).Select(z => z.Key)` might get you started. You'll need https://www.nuget.org/packages/morelinq/ for this. – mjwills Nov 22 '18 at 08:44
  • How many distinct `Key`s do you have? – Dmitry Bychenko Nov 22 '18 at 08:56
  • @TheGeneral: This is just one of many such analytics and I would have to do all of them in single foreach. There are also other reasons why foreach is not suitable – Liero Nov 22 '18 at 09:18
  • @DmitryBychenko: _"How many distinct Keys do you have?"_: half the number of records or more. Nor sure how many lines will there be in production, but given the file size, a lot. – Liero Nov 22 '18 at 09:22
  • @Liero ahh ok, yeah its always good to cancel out the simplest approaches first – TheGeneral Nov 22 '18 at 09:23
  • Related: [How to check an IEnumerable for multiple conditions with a single enumeration without buffering?](https://stackoverflow.com/questions/58578480/how-to-check-an-ienumerable-for-multiple-conditions-with-a-single-enumeration-wi) – Theodor Zoulias Nov 26 '20 at 00:57

3 Answers3

2

Providing that Key is an integer we can try using a Dictionary and one scan:

 // value: 0b00 - neither A nor B
 //        0b01 - A only
 //        0b10 - B only
 //        0b11 - Both A and B    
 Dictionary<int, byte> Status = new Dictionary<int, byte>();

 var query = File
   .ReadLines(@"c:\MyFile.csv")
   .Where(line => !string.IsNullOrWhiteSpace(line))
   .Skip(1) // skip header 
   .Select(line => YourParserHere(line));

 foreach (var record in query) {
   int mask = (record.CompletedA != null ? 1 : 0) |
              (record.CompletedB != null ? 2 : 0); 

   if (Status.TryGetValue(record.Key, out var value))
     Status[record.Key] = (byte) (value | mask);
   else
     Status.Add(record.Key, (byte) mask);
 }

 // All keys that don't have 3 == 0b11 value (both A and B)  
 var bothAandB = Status
   .Where(pair => pair.Value != 3)
   .Select(pair => pair.Key); 
Dmitry Bychenko
  • 180,369
  • 20
  • 160
  • 215
  • The reason I asked about RX solution is that there would be too many stuff in single foreach, so I need to split it somehow without enumerating the records multiple times, so I thought s push collection with multiple subscribers would work. Moreover, I have different scenarios with different set of "analytics". RX solution would make it nice reusable peace of single "analytics". – Liero Nov 22 '18 at 09:27
  • @Liero - You would need to ensure that the `IEnumerable` is lazy to make Rx efficient, but if it is then a simple loop will be too. – Enigmativity Nov 22 '18 at 11:09
0

I think this will do what you need:

var result =
    source
        .GroupBy(x => x.Key)
        .SelectMany(xs =>
            (xs.Select(x => x.CompletedA).Any(x => x != null && x == true) && xs.Select(x => x.CompletedA).Any(x => x != null && x == true))
            ? new List<Record>()
            : xs.ToList());

Using Rx doesn't help here.

Enigmativity
  • 113,464
  • 11
  • 89
  • 172
0

Yes, the Rx library is well suited for this kind of synchronous enumerate-once/calculate-many operation. You can use a Subject<Record> as the one-to-many propagator, then you should attach various Rx operators to it, then you should feed it with the records from the source enumerable, and finally you'll collect the results from the attached operators that will now be completed. Here is the basic pattern:

IEnumerable<Record> source = GetRecords();
var subject = new Subject<Record>();
var task1 = SomeRxTransformation1(subject);
var task2 = SomeRxTransformation2(subject);
var task3 = SomeRxTransformation3(subject);
source.ToObservable().Subscribe(subject); // This line does all the work
var result1 = task1.Result;
var result2 = task2.Result;
var result3 = task3.Result;

The SomeRxTransformation1, SomeRxTransformation2 etc are methods that accept an IObservable<Record>, and return some generic Task. Their signature should look like this:

Task<TResult> SomeRxTransformation1(IObservable<Record> source);

For example the special grouping you want to do will require a transformation like the following:

Task<Record[][]> GroupByKeyExcludingSomeGroups(IObservable<Record> source)
{
    return source
        .GroupBy(record => record.Key)
        .Select(grouped => grouped.ToArray())
        .Merge()
        .Where(array => array.All(r => !r.CompletedA && !r.CompletedB))
        .ToArray()
        .ToTask();
}

When you incorporate it into the pattern, it will look like this:

Task<Record[][]> task1 = GroupByKeyExcludingSomeGroups(subject);
source.ToObservable().Subscribe(subject); // This line does all the work
Record[][] result1 = task1.Result;
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Btw feeding the subject with `.ToObservable().Subscribe(subject)` is simple but not very efficient. For better performing alternatives you can look [here](https://stackoverflow.com/questions/60987491/why-is-ienumerable-toobservable-so-slow "Why is IEnumerable.ToObservable so slow?"). – Theodor Zoulias Nov 26 '20 at 01:03
  • Another optimization could be to implement a synchronous `ISubject`, because the built-in [`Subject`](https://github.com/dotnet/reactive/blob/main/Rx.NET/Source/src/System.Reactive/Subjects/Subject.cs) has embedded synchronization features (`Volatile.Read`). These are not needed for synchronous processing, and will just add overhead. Implementing an `ISubject` is not very difficult. [Here](https://stackoverflow.com/questions/64961330/how-to-fix-the-inconsistency-of-the-publish-refcount-behavior/64991229#64991229) is a starting point. – Theodor Zoulias Nov 28 '20 at 03:05