0
IObservable<Match> IObservableArray = new Regex("(.*):(.*)").Matches(file).OfType<Match>().ToList().ToObservable();
var query = IObservableArray.SelectMany(s => Observable.Start(() => {
    //do stuff
}));

Working Code Above's Explanation: The code above uses Observable with Reactive to do a Concurrent Multi-Threading system while retaining s as a Match.


My issue is that it seems to need to load everything into memory before even starting doing //do stuff since IObservableArray is a big array of Matches - this takes up a lot of the memory causing it to do a OutOfMemory Exception.

I have been researching for more than a month and all I can find is .Buffer() which if I put it before the .SelectMany() and then foreach Match over the s, im able to load 1000 Matches into memory at a time causing the memory overall to be much better.

But, since I have to resort to using a foreach to go through all 1000 in the buffer at a time, it isnt concurrent - meaning im basically checking 1 after the other.

Is there a way to do similar code below, yet have it Concurrent/Multi-Threaded? (Have at least 150 running concurrently, but don't load all of it to memory, using 1000 at the moment.)

yes I tried using thread.start etc, using them makes it trigger the finished code much earlier since technically it does finish since it had done what it was told which was make them all into a new thread

IObservable<Match> IObservableArray = new Regex("(.*):(.*)").Matches(file).OfType<Match>().ToList().ToObservable();
var query = IObservableArray.Buffer(1000).SelectMany(s => Observable.Start(() => {
    //do stuff
}));
query.ObserveOn(ActiveForm).Subscribe(x =>
{
    //do finish stuff
});
user7842865
  • 51
  • 1
  • 8
  • 1
    What is the idea behind using observables anyway? Why not use a TPL class like Parallel for this? – Peter Bons Jul 02 '17 at 12:07
  • None of those are actualy multithreaded, .ToList() is probably redudant with ToObersable(), that could be causing you to consume 2x memory – user6144226 Jul 02 '17 at 12:08
  • @PeterBons To be honest I dont have an answer for that - I just found that Observables has a proper way to detect when finished so went for that. – user7842865 Jul 02 '17 at 12:10
  • @user6144226 Yeah you can remove that and it still functions exactly same - im gonna edit it out. (P.S: It doesnt seem to save much memory - I believe it saves some when its finished iterating through the array though. – user7842865 Jul 02 '17 at 12:11
  • The key might be what you are doing in `// do stuff`. That will help to find out what could be used, like Tasks or parallelization or both or something else. Using TPL or Tasks will still give you detection when the work is done. – Peter Bons Jul 02 '17 at 12:15
  • @PeterBons Whats being done is some basic method calls, basic if checks, nothing really special. Mainly incrementation and storing the data into a list. – user7842865 Jul 02 '17 at 12:20
  • @user7842865 - Where do you get the `OutOfMemory` exception? In the `IObservableArray` declaration? In the `query`? Or in the `Subscribe`? – Enigmativity Jul 02 '17 at 12:48
  • And what is `file`? Can you provide that in your question? And what are you doing in `//do stuff` and `//do finish stuff`? – Enigmativity Jul 02 '17 at 12:51
  • @Enigmativity I get the exception in the whole query var including the subscribe - file is simply a big string of lines like: `Hi:Email`. In //do stuff im doing basic stuff - like doing some if's and tests and then call an incrementUI method. – user7842865 Jul 02 '17 at 22:27
  • 1
    @user7842865 - Can you please post a [mcve]? We need to be able to replicate your issue to solve it. – Enigmativity Jul 03 '17 at 00:29
  • @Enigmativity realistically the code at the bottom is the full code with my issue - if you set File to a file loading with File.ReadAllText, and have it find a lot of matches - it will make the memory spike heavilly in my case more than 1.1gb. the //do stuff and //finish stuff is pretty much basic stuff - the type of stuff youd often find in a foreach. I gave Parral.ForEach a shot and it seems to be working better than the Observable. Ill keep an eye on it. – user7842865 Jul 03 '17 at 03:55
  • 1
    @user7842865 - Could you pretty please post a [mcve]? We need to be able to run your code and replicate your issue. The lack of answers are because you haven't given us [mcve]. – Enigmativity Jul 03 '17 at 04:02

2 Answers2

0

You're not actually telling Start() which scheduler to use, which may be the reason you're not getting the concurrency you want. You can specify the desired scheduler as the second argument:

var query = IObservableArray.Buffer(1000).SelectMany(s => Observable.Start(() => {
    //do stuff
}, TaskPoolScheduler.Default));

If you know that //do stuff is going to take more than 500ms, I'd consider using the ThreadPoolScheduler instead. The task pool won't spawn a new thread until a Task has blocked a thread for at least 500ms, so if you know you're going to do a lot of heavy work and need a lot of threads, you can use ThreadpoolScehduler.Instance instead of TaskPoolScheduler.Default.

Jon G Stødle
  • 3,844
  • 1
  • 16
  • 22
0

For this kind of work an IEnumerable<T> is a better match than an IObservable<T>. An enumerable is something that you can unwind on demand, and take its values when you are ready to process them. On the contrary an observable is something that forcefully pushes its values onto you, whether you are able to handle the load or not.

There are numerous ways to process an IEnumerable<T> in parallel, with a specific degree of parallelism. Before suggesting anything, the first question to ask is whether the stuff you have to do with each Match is synchronous or asynchronous. For synchronous work the most commonly used tools are the Parallel class, the PLINQ, and the TPL Dataflow library. Below is a PLINQ example:

IEnumerable<Match> matches = RegexFindAllMatches(file, "(.*):(.*)");
Partitioner
    .Create(matches, EnumerablePartitionerOptions.NoBuffering)
    .AsParallel()
    .WithDegreeOfParallelism(Environment.ProcessorCount)
    .ForAll(match =>
    {
        // Do stuff
    });
/// <summary>
/// Provides an enumerable whose elements are the successful matches found by
/// iteratively applying a regular expression pattern to the input string.
/// </summary>
public static IEnumerable<Match> RegexFindAllMatches(
    string input, string pattern, RegexOptions options = RegexOptions.None,
    TimeSpan matchTimeout = default)
{
    if (matchTimeout == default) matchTimeout = Regex.InfiniteMatchTimeout;
    var match = Regex.Match(input, pattern, options, matchTimeout);
    while (match.Success)
    {
        yield return match;
        match = match.NextMatch();
    }
}

The above implementation avoids the use of the Regex.Matches method, and subsequently the MatchCollection class, because although this class evaluates lazily the next Match during the enumeration, it then stores each found Match in an internal ArrayList (source code). This could cause massive memory allocation, proportional to the total numbers of matches.

For asynchronous work the Parallel class and the PLINQ are not good options (unless you are willing to wait for the .NET 6 Parallel.ForEachAsync), but you can still use the TPL Dataflow library. You can also find plenty of custom options here or here. Searching for C# ForEachAsync should reveal even more options.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104