2

I check and tried this example: How to write Asynchronous LINQ query?

This works well and it asynchronously executes both procedures but my target is to create a query that when you get the first result of the query you can already start to use this value while the query is still executing and looking for more values.

I did this in while I was programming for android and the result was amazing and really fast but I have no clue how to make it in C# and using linQ

Community
  • 1
  • 1
Iban Arriola
  • 2,526
  • 9
  • 41
  • 88
  • You may be able to do what you want with PLinq and aggregate functions... http://msdn.microsoft.com/en-us/library/dd460697(v=vs.110).aspx – Jakob Olsen Jan 20 '15 at 10:42
  • There might be a work around for this, You start your work with the use of FirstOrDefault() result and make the Where query again to complete all rest operations (escaping the first one). It might not be exactly what you want, but this could drop you an idea. – Rohit Prakash Jan 20 '15 at 10:42
  • @JakobOlsen, "PLINQ performs best when the processing of each element in a source collection is independent, with no shared state involved among the individual delegates" from [MSDN](http://msdn.microsoft.com/en-us/library/dd997399(v=vs.110).aspx). Also you are not guaranteed that your first item will be processed first always with the PLINQ. – Rohit Prakash Jan 20 '15 at 10:46
  • 1
    @RohitPrakash if I understood well what you propose is to make a first linQ query to get only one value, am I following you well? The problem is that I don´t want to make it with only the first value but with all the values while linq query is finding them. – Iban Arriola Jan 20 '15 at 11:03
  • @IbanArriola My answer will do this, as each answer is available the processing you want to do can be performed. You can optionally skip the AsParallel call to do the processing one at a time which is the same as saying "get one and do logic, then get another and repeat". – War Jan 21 '15 at 21:32

4 Answers4

0

Asynchronous sequences are modeled in .NET as IObservables. Reactive Extensions is the asynchronous section of LINQ.

You can generate an observable sequence in any number of ways, such as through a call to Generate:

var sequence = Observable.Generate(0,
    i => i < 10,
    i => i + 1,
    i => SomeExpensiveGeneratorFunction());

And then query it using all of the regular LINQ functions (which as a result allows for the use of query syntax) along with a number of additional operations that make sense specifically for asynchronous sequence (and also a lot of different ways of creating observable sequences) such as:

var query = from item in sequence
            where ConditionIsTrue(item)
            select item.ToString();

The short description of what's going on here is to just say that it does exactly what you want. The generator function simply notifies its subscribers whenever it successfully generates a value (or when it's done) and continues generating values without waiting for the subscribers to finish, the Where method will subscribe to sequence, and notify its subscribers whenever it observes a value that passes the condition, Select will subscribe to the sequence returned by Where and perform its transformation (asynchronously) whenever it gets a value and will then push it to all of its subscribers.

Servy
  • 202,030
  • 26
  • 332
  • 449
-1

Try again ...

If I understand you the logic you want is something like ...

var query = getData.Where( ... );
query.AsParallel().ForEach(r => {
   //other stuff
});

What will happen here ...

Well in short, the compiler will evaluate this to something like: Whilst iterating through query results in parallel perform the logic in the area where the comment is.

This is async and makes use of an optimal thread pool managed by .net to ensure the results are acquired as fast as possible.

This is an automatically managed async parallel operation.

It's also worth noting that I if I do this ...

var query = getData.Where( ... );

... no actual code is run until I begin iterating the IQueryable and by declaring the operation a parallel one the framework is able to operate on more than one of the results at any point in time by threading the code for you.

The ForEach is essentially just a normal foreach loop where each iteration is asynchronously handled.

The logic you put in there could call some sort of callback if you wanted but that's down to how you wrap this code ...

Might I suggest something like this:

void DoAsync<T>(IQueryable<T> items, Func<T> operation, Func<T> callback)
{
   items.AsParallel().ForEach(x => {
      operation(x);
      callback(x);
   });
}
War
  • 8,539
  • 4
  • 46
  • 98
-1

I have modified TheSoftwareJedi answer from your given link. You can raise the first startup event from the Asynchronous class, and use it to start-up your work. Here's the class,

public static class AsynchronousQueryExecutor
    {
        private static Action<object> m_OnFirstItemProcessed;

        public static void Call<T>(IEnumerable<T> query, Action<IEnumerable<T>> callback, Action<Exception> errorCallback, Action<object> OnFirstItemProcessed)
        {
            m_OnFirstItemProcessed = OnFirstItemProcessed;
            Func<IEnumerable<T>, IEnumerable<T>> func =
                new Func<IEnumerable<T>, IEnumerable<T>>(InnerEnumerate<T>);
            IEnumerable<T> result = null;
            IAsyncResult ar = func.BeginInvoke(
                                query,
                                new AsyncCallback(delegate(IAsyncResult arr)
                                {
                                    try
                                    {
                                        result = ((Func<IEnumerable<T>, IEnumerable<T>>)((AsyncResult)arr).AsyncDelegate).EndInvoke(arr);
                                    }
                                    catch (Exception ex)
                                    {
                                        if (errorCallback != null)
                                        {
                                            errorCallback(ex);
                                        }
                                        return;
                                    }
                                    //errors from inside here are the callbacks problem
                                    //I think it would be confusing to report them
                                    callback(result);
                                }),
                                null);
        }
        private static IEnumerable<T> InnerEnumerate<T>(IEnumerable<T> query)
        {
            int iCount = 0;
            foreach (var item in query) //the method hangs here while the query executes
            {
                if (iCount == 0)
                {
                    iCount++;
                    m_OnFirstItemProcessed(item);

                }
                yield return item;
            }
        }
    }

here's the associations,

private void OnFirstItem(object value) // Your first items is proecessed here.
    {
        //You can start your work here.
    }

    public void HandleResults(IEnumerable<int> results)
    {
        foreach (var item in results)
        {
        }
    }

    public void HandleError(Exception ex)
    {
    }

and here's how you should call the function.

private void buttonclick(object sender, EventArgs e)
    {
        IEnumerable<int> range = Enumerable.Range(1,10000);

        var qry = TestSlowLoadingEnumerable(range);

        //We begin the call and give it our callback delegate
        //and a delegate to an error handler
        AsynchronousQueryExecutor.Call(qry, HandleResults, HandleError, OnFirstItem);
   }

If this meets your expectation, you can use this to start your work with the first item processed.

Community
  • 1
  • 1
Rohit Prakash
  • 1,975
  • 1
  • 14
  • 24
  • This doesn't allow you to work on a value returned by the query at the same time as the query is looking for the next value. The work "get first item", "handle first item", "get second item", "handle second item", etc. still all occur synchronously; this just adds an extra "handle first item" call that also occurs synchronously. – Rawling Jan 20 '15 at 12:15
  • @Rawling, lets say I have a list of integer in main thread. and I want to add the first item in the list that is processed asynchronously, private void OnFirstItem(object value) // Your first items is proecessed here. { listofInts.Add((int)value); //this adds successfully. //You can start your work here. } List listofInts = new List(); – Rohit Prakash Jan 20 '15 at 12:32
-2

This is pretty simple with the TPL.

Here's a dummy "slow" enumerator that has to do a bit of work between getting items:

static IEnumerable<int> SlowEnumerator()
{
    for (int i = 0; i < 10; i++)
    {
        Thread.Sleep(1000);
        yield return i;
    }
}

Here's a dummy bit of work to do with each item in the sequence:

private static void DoWork(int i)
{
    Thread.Sleep(1000);
    Console.WriteLine("{0} at {1}", i, DateTime.Now);
}

And here's how you can simultenously run the "bit of work" on one item that the enumerator has returned and ask the enumerator for the next item:

foreach (var i in SlowEnumerator())
{
    Task.Run(() => DoWork(i));
}

You should get work done every second - not every 2 seconds as you would expect if you had to interleave the two types of work:

0 at 20/01/2015 10:56:52
1 at 20/01/2015 10:56:53
2 at 20/01/2015 10:56:54
3 at 20/01/2015 10:56:55
4 at 20/01/2015 10:56:56
5 at 20/01/2015 10:56:57
6 at 20/01/2015 10:56:58
7 at 20/01/2015 10:56:59
8 at 20/01/2015 10:57:00
9 at 20/01/2015 10:57:01
Rawling
  • 49,248
  • 7
  • 89
  • 127
  • But how can I use this example with linq? how can I tell to linq to execute a function using the first value that he finds, then with the second and so on? – Iban Arriola Jan 20 '15 at 11:05
  • `foreach` gives you each value that `LINQ` finds, and `Task.Run` executes a function with each value as you find it. `SlowEnumerator` is just an example - this should be your LINQ query that returns an item and then is "still executing and looking for more values". – Rawling Jan 20 '15 at 11:08
  • I am trying to do it but I cannot find the way. The problem is that when I make the query I made something like this var query = from elem in contex.Clientes select elem.Nombre; But with that what I get is a list when all the query finishes... how can I make it to give me the results one by one since he is looking for it? – Iban Arriola Jan 20 '15 at 11:35
  • @Iban Your question needs more information about what the source is. If it is e.g. a database query then you can't do anything "while the query is still executing and looking for more values" because the query doesn't come back from the database until it is finished. – Rawling Jan 20 '15 at 11:43