10

I was trying to implement instant search on a database table with 10000+ records.

The search starts when the text inside the search text box changes, when the search box becomes empty I want to call a different method that loads all the data.

Also if the user changes the search string while results for another search are being loaded, then the loading of the those results should stop in favor of the new search.

I implemented it like the following code, but I was wondering if there is a better or cleaner way to do it using Rx (Reactive Extension) operators, I feel that creating a second observable inside the subscribe method of the first observable is more imperative than declarative, and the same for that if statement.

var searchStream = Observable.FromEventPattern(s => txtSearch.TextChanged += s, s => txtSearch.TextChanged -= s)
    .Throttle(TimeSpan.FromMilliseconds(300))
    .Select(evt =>
        {
            var txtbox = evt.Sender as TextBox;
            return txtbox.Text;
        }
    );

searchStream
    .DistinctUntilChanged()
    .ObserveOn(SynchronizationContext.Current)
    .Subscribe(searchTerm =>
        {
            this.parties.Clear();
            this.partyBindingSource.ResetBindings(false);
            long partyCount;
            var foundParties = string.IsNullOrEmpty(searchTerm) ? partyRepository.GetAll(out partyCount) : partyRepository.SearchByNameAndNotes(searchTerm);

            foundParties
                .ToObservable(Scheduler.Default)
                .TakeUntil(searchStream)
                .Buffer(500)
                .ObserveOn(SynchronizationContext.Current)
                .Subscribe(searchResults =>
                    {
                        this.parties.AddRange(searchResults);
                        this.partyBindingSource.ResetBindings(false);
                    }
                    , innerEx =>
                    {

                    }
                    , () => { }
                );
        }
        , ex =>
        {
        }
        , () =>
        {

        }
    );

The SearchByNameAndNotes method just returns an IEnumerable<Party> using SQLite by reading data from a data reader.

Ibrahim Najjar
  • 19,178
  • 4
  • 69
  • 95
  • 1
    What exactly is SearchAsync doing? – cwharris Apr 06 '14 at 19:20
  • 1
    Why is SearchByNameAndNotes being called from SearchAsync and your Subscribe method? – cwharris Apr 07 '14 at 02:36
  • @ChristopherHarris Thank you for your notes, and my apologies for that second mistake, that `SelectMany` clause was part of another attempt. It is commented in my original code, the final version of the question is mistakes free (I hope). – Ibrahim Najjar Apr 07 '14 at 05:27

1 Answers1

19

I think you want something like this. EDIT: From your comments, I see you have a synchronous repository API - I'll leave the asynchronous version in, and add a synchronous version afterwards. Notes inline:

Asynchronous Repository Version

An asynchronous repository interface could be something like this:

public interface IPartyRepository
{
    Task<IEnumerable<Party>> GetAllAsync(out long partyCount);
    Task<IEnumerable<Party>> SearchByNameAndNotesAsync(string searchTerm);
}

Then I refactor the query as:

var searchStream = Observable.FromEventPattern(
    s => txtSearch.TextChanged += s,
    s => txtSearch.TextChanged -= s)
    .Select(evt => txtSearch.Text) // better to select on the UI thread
    .Throttle(TimeSpan.FromMilliseconds(300))
    .DistinctUntilChanged()
    // placement of this is important to avoid races updating the UI
    .ObserveOn(SynchronizationContext.Current)
    .Do(_ =>
    {
        // I like to use Do to make in-stream side-effects explicit
        this.parties.Clear();
        this.partyBindingSource.ResetBindings(false);
    })
    // This is "the money" part of the answer:
    // Don't subscribe, just project the search term
    // into the query...
    .Select(searchTerm =>
    {
        long partyCount;
        var foundParties = string.IsNullOrEmpty(searchTerm)
            ? partyRepository.GetAllAsync(out partyCount)
            : partyRepository.SearchByNameAndNotesAsync(searchTerm);

        // I assume the intention of the Buffer was to load
        // the data into the UI in batches. If so, you can use Buffer from nuget
        // package Ix-Main like this to get IEnumerable<T> batched up
        // without splitting it up into unit sized pieces first
        return foundParties
            // this ToObs gets us into the monad
            // and returns IObservable<IEnumerable<Party>>
            .ToObservable()
            // the ToObs here gets us into the monad from
            // the IEnum<IList<Party>> returned by Buffer
            // and the SelectMany flattens so the output
            // is IObservable<IList<Party>>
            .SelectMany(x => x.Buffer(500).ToObservable())
            // placement of this is again important to avoid races updating the UI
            // erroneously putting it after the Switch is a very common bug
            .ObserveOn(SynchronizationContext.Current); 
    })
    // At this point we have IObservable<IObservable<IList<Party>>
    // Switch flattens and returns the most recent inner IObservable,
    // cancelling any previous pending set of batched results
    // superceded due to a textbox change
    // i.e. the previous inner IObservable<...> if it was incomplete
    // - it's the equivalent of your TakeUntil, but a bit neater
    .Switch() 
    .Subscribe(searchResults =>
    {
        this.parties.AddRange(searchResults);
        this.partyBindingSource.ResetBindings(false);
    },
    ex => { },
    () => { });

Synchronous Repository Version

An synchronous repository interface could be something like this:

public interface IPartyRepository
{
    IEnumerable<Party> GetAll(out long partyCount);
    IEnumerable<Party> SearchByNameAndNotes(string searchTerm);
}

Personally, I don't recommend a repository interface be synchronous like this. Why? It is typically going to do IO, so you will wastefully block a thread.

You might say the client could call from a background thread, or you could wrap their call in a task - but this is not the right way to go I think.

  • The client doesn't "know" you are going to block; it's not expressed in the contract
  • It should be the repository that handles the asynchronous aspect of the implementation - after all, how this is best achieved will only be known best by the repository implementer.

Anyway, accepting the above, one way to implement is like this (of course it's mostly similar to the async version so I've only annotated the differences):

var searchStream = Observable.FromEventPattern(
    s => txtSearch.TextChanged += s,
    s => txtSearch.TextChanged -= s)
    .Select(evt => txtSearch.Text)
    .Throttle(TimeSpan.FromMilliseconds(300))
    .DistinctUntilChanged()
    .ObserveOn(SynchronizationContext.Current)
    .Do(_ =>
    {
        this.parties.Clear();
        this.partyBindingSource.ResetBindings(false);
    })       
    .Select(searchTerm =>
        // Here we wrap the synchronous repository into an
        // async call. Note it's simply not enough to call
        // ToObservable(Scheduler.Default) on the enumerable
        // because this can actually still block up to the point that the
        // first result is yielded. Doing as we have here,
        // we guarantee the UI stays responsive
        Observable.Start(() =>
        {
            long partyCount;
            var foundParties = string.IsNullOrEmpty(searchTerm)
                ? partyRepository.GetAll(out partyCount)
                : partyRepository.SearchByNameAndNotes(searchTerm);

            return foundParties;
        }) // Note you can supply a scheduler, default is Scheduler.Default
        .SelectMany(x => x.Buffer(500).ToObservable())
        .ObserveOn(SynchronizationContext.Current))
    .Switch()
    .Subscribe(searchResults =>
    {
        this.parties.AddRange(searchResults);
        this.partyBindingSource.ResetBindings(false);
    },
    ex => { },
    () => { });  
James World
  • 29,019
  • 9
  • 86
  • 120
  • It seems like a very good answer. I am at work now so I can check it thoroughly, but I just wanted to say that my interface signature looks exactly like what you said but without the `Task<>`. – Ibrahim Najjar Apr 08 '14 at 08:07
  • 1
    OK, I added in a paragraph about that ("If you want to keep...") . My preference would be to tie the asynchrony to the repo though as it's modelling an inherently asynchronous task. – James World Apr 08 '14 at 09:50
  • Inside the "money" part, you have `return foundParties.ToObservable()` and you say this returns an `IObservable>` but it actually returns an `IObservable` so the rest of the code won't compile ? – Ibrahim Najjar Apr 08 '14 at 16:12
  • I've updated my answer with a separate explanation of a working synchronous repository version - should be clearer now I hope! – James World Apr 08 '14 at 20:31
  • I really like your answer, after testing it out though I found out that my original version was a bit more responsive !!. Anyway one last question: Why is it better to select on the UI thread *(the Text property)* ?. I am planning to award you the bounty, but just for the benefit of the doubt I am going to leave the door open for more answers (if any). Thank you so much for your thorough explanation. – Ibrahim Najjar Apr 08 '14 at 20:49
  • 1
    Cheers. When I run a profiler, I can see 97.4% of the work of my sample happening in that `ResetBindings` call - it kills the UI. I guess this isn't the best way to handle loading lots of items into controls WinForms (I've not done WinForms for a looooong time I have to say). I can say this code is doing the right thing from the perspective of doing what can be done off the UI - but I suspect you need another data binding approach for handling 1000s of items loading into a control. I read the text on the UI thread to avoid a race where it is updated on the UI thread at the point of being read. – James World Apr 08 '14 at 21:59
  • Addng items to a ListView control with Virtualization enabled maybe? – James World Apr 08 '14 at 22:00
  • 1
    I think the reason you're seeing your version being more responsive than James' is your use of Scheduler.Default to run the repository loading. This is what James is talking about with the possibility to do the loading on another thread. As James said, the design would be neater if the repository offered an Async API. – Niall Connaughton Apr 09 '14 at 07:51
  • @NiallConnaughton Thanks for your input, I am considering it. – Ibrahim Najjar Apr 09 '14 at 12:56
  • I am afraid I still have one more question: Why did you use the `Buffer` method from the `Ix-Main` library instead of the `Buffer` in the reactive extensions library itself ? – Ibrahim Najjar Apr 09 '14 at 12:57
  • 1
    I will call them IE.Buffer and IO.Buffer respectively. If I use IO.Buffer I must convert the IE stream to individual IO events and then repackage them back into IE buffers then stream them in IO. i.e. IE -> IO -> IE -> IO If I use IE.Buffer I go directly to the buffered lists I need without leaving IE, and then transition. i.e. IE -> IE -> IO. The latter has two less type transitions and less steps and is more efficient. – James World Apr 09 '14 at 14:13
  • 1
    If you had a data-layer that gave you IObservable directly (which makes a lot more sense than IEnumerable), then IO.Buffer would make more sense too. These do exist - e.g. for SignalR, StreamInsight, Terracotta UM). Even better if they buffer natively and return IO> directly. – James World Apr 09 '14 at 14:14
  • 2
    To put James' comment another way, your foundParties.ToObservable.TakeUntil.Buffer chain converts the Enumerable parties to an Observable. The parties are fed one at a time to Buffer, which will convert them into a series of Enumerables (buffers). But foundParties was already an Enumerable. So it's lighter weight to just take chunks out of that Enumerable than it is to convert it to an Observable, read one at a time, batch them up, and hand it back as an Enumerable. – Niall Connaughton Apr 10 '14 at 02:05
  • 2
    However, if the data API could read partial results asynchronously, it could return an Observable and you could use the IO Buffer instead. – Niall Connaughton Apr 10 '14 at 02:05
  • Thanks for the translation Niall, much better. :) – James World Apr 10 '14 at 06:00
  • @NiallConnaughton Thank you both guys for your explanations, I couldn't ask for more. – Ibrahim Najjar Apr 10 '14 at 12:56
  • I find that foundParties.ToObservable is not possible, because Task cannot be transformed to an observable. – Diana Mar 21 '17 at 10:38
  • If I use it with `WhenAnyValue` do I still need to use Switch to avoid race condition between threads you mentioned? I can't access it. – lolelo Nov 28 '20 at 11:16