3

I have a sequence of Images (IObservable<ImageSource>) that goes through this "pipeline".

  1. Each image is recognized using OCR
    • If the results have valid values, the are uploaded to a service that can register a set of results at a given time (not concurrently).
    • If the results have any invalid value, they are presented to the user in order to fix them. After they are fixed, the process continues.
  2. During the process, the UI should stay responsive.

The problem is that I don't know how to handle the case when the user has to interact. I just cannot do this

        subscription = images                
            .Do(source => source.Freeze())
            .Select(image => OcrService.Recognize(image))                
            .Subscribe(ocrResults => Upload(ocrResults));

...because when ocrResults have to be fixed by the user, the flow should be kept on hold until the valid values are accepted (ie. the user could execute a Command clicking a Button)

How do I say: if the results are NOT valid, wait until the user fixes them?

SuperJMN
  • 13,110
  • 16
  • 86
  • 185
  • I assume there is no way you can shove the imageSource data into an array of byte[] before you go through the subscription process? Do the methods rely on that ui element? – bri Jan 19 '16 at 13:26
  • 1
    "Since I use ImageSource, I might have problems with it in a thread that isn't the UI Thread". Not if you `Freeze()` it. – Clemens Jan 19 '16 at 13:39
  • It works with Freeze! Thank you! – SuperJMN Jan 19 '16 at 13:47
  • I'm very sorry. I formulated the question in a way that didn't represent my problem with fidelity. I've edited it and now it fits much better. – SuperJMN Jan 19 '16 at 18:47
  • 1
    It sounds like you need to _pause_ the `Observable`. There are ways to do it, for example [this one](http://stackoverflow.com/questions/7620182/pause-and-resume-subscription-on-cold-iobservable#answer-7642198) (if it works out, don't forget to give credit to the author) – supertopi Jan 19 '16 at 20:53
  • Thank you! I have read the solution and tried it, but it seems to have some problems. Please, read the comments. It loses values. – SuperJMN Jan 20 '16 at 08:34
  • @supertopi Have you tried the solution that he proposes? I've posted an example to the question you link to. To be honest, I would prefer doing it with the Pausable extension method which is really elegant and readable. Unfortunately, there are some doubts about whether it works as expected or not. Before marking any answer I would like to wait for the author to review and answer his answer. – SuperJMN Jan 21 '16 at 07:40

3 Answers3

2

I'm assuming your UploadAsync method returns a Task to allow you to wait for it to finished? If so, there are overloads of SelectMany that handle tasks.

images.Select(originalImage => ImageOperations.Resize(originalImage))
    .SelectMany(resizedImg => imageUploader.UploadAsync(resizedImg))
    .Subscribe();
Charles Mager
  • 25,735
  • 2
  • 35
  • 45
  • I'm very sorry. I formulated the question in a way that didn't represent my problem with fidelity. I've edited it and now it fits much better. – SuperJMN Jan 19 '16 at 18:47
2

Assuming you've got an async method which implements the "user fix process":

/* show the image to the user, which fixes it, returns true if fixed, false if should be skipped */
async Task UserFixesTheOcrResults(ocrResults);

Then your observable becomes:

subscription = images                
        .Do(source => source.Freeze())
        .Select(image => OcrService.Recognize(image))
        .Select(ocrResults=> {
            if (ocrResults.IsValid)
                return Observable.Return(ocrResults);
            else
                return UserFixesTheOcrResults(ocrResults).ToObservable().Select(_ => ocrResults)
        })
        .Concat()             
        .Subscribe(ocrResults => Upload(ocrResults));
Gluck
  • 2,933
  • 16
  • 28
  • 2
    Looks good, but does this put the flow _on hold_? I assume if `images` emits new values after user input is required, the following values pass through the monad normally – supertopi Jan 20 '16 at 09:27
  • It looks really elegant, and readable, but the thing is that I don't know how to transform the user interaction. In the UserFixTheOcrResults, the results are show in the Window and when everything is OK, they click on a button to submit the correct results. How do I wait on that? :( Sorry, but I'm not very experienced with this scenario. Definitely, an Rx rookie here. – SuperJMN Jan 20 '16 at 09:28
  • Also, the user MUST fix the ocrResults, so it shouldn't be a Tas, but only a Task :) – SuperJMN Jan 20 '16 at 09:32
  • @supertopi indeed it should use `Concat()` instead to guarantee the ordering – Gluck Jan 20 '16 at 09:49
  • 1
    @SuperJMN implementing the `UserFixesTheOcrResults` method is more of a WPF question than a Rx one. You could use a `ReactiveCommand` from [reactiveui](https://github.com/reactiveui/ReactiveUI) library and do: `var viewModel = /*some VM*/;ShowFixWindowFor(viewModel);await viewModel.ValidateCommand;` (if you go for `ReactiveCommand`, you can directly use it as an observable, and avoid the `Task` entirely) – Gluck Jan 20 '16 at 09:54
  • @Gluck in addition of order, there is still the issue of _on hold_. We don't know the actual requirements, but I assume the whole monad should be put on hold. For example: your solution allows running `source.Freeze()` and `OcrService.Recognize(image)` concurrently during `UserFixesTheOcrResults`, right ? Hence _pausing_ the source `Observable` or ignoring emitted values during fixing is required – supertopi Jan 20 '16 at 12:44
  • 1
    @supertopi, could be, but AFAICT the requirements are to upload in order and not concurrently, but OCR can run in parallel to UI (e.g. because the UI needs to stay responsive during it), if that's not the case, it becomes much more difficult for the lack of backpressure in Rx.Net, and complex Switch (your above link) seems the only option in pure Rx. – Gluck Jan 20 '16 at 13:17
  • @Gluck Sorry, I have tried to deal with my problem during the weekend, but I still don't know how to handle it. The exact part in which I have problems is in the 2-way ramification (the Select). If the results are valid, then they are returned, BUT when they're invalid, then we wait for something to happen. In this case, we wait for the user to fix them, but how? Using a ReactiveCommand, it seems, but I'm lost there :( Could you please put a sample? Thank you so much! – SuperJMN Jan 24 '16 at 20:59
2

This seems to be a mix of UX, WPF and Rx all wrapped up in one problem. Trying to solve it with only Rx is probably going to send you in to a tail spin. I am sure you could solve it with just Rx, and no more thought about it, but would you want to? Would it be testable, loosely coupled and easy to maintain?

In my understanding of the problem you have to following steps

  1. User Uploads/Selects some images
  2. The system performs OCR on each image
  3. If the OCR tool deems the image source to be valid, the result of the processing is uploaded
  4. If the OCR tool deems the image source to be invalid, the user "fixes" the result and the result is uploaded

But this may be better described as

  1. User Uploads/Selects some images
  2. The system performs OCR on each image
  3. The result of the OCR is placed in a validation queue
  4. While the result is invalid, a user is required to manually update it to a valid state.
  5. The valid result is uploaded

Sequence diagram

So this to me seem that you need a task/queue based UI so that a User can see invalid OCR results that they need to work on. This also then tells me that if a person is involved, that it should probably be outside of the Rx query.

Step 1 - Perform ORC

subscription = images                
        .Subscribe(image=>
        {
          //image.Freeze() --probably should be done by the source sequence
          var result = _ocrService.Recognize(image);
          _validator.Enqueue(result);
        });

Step 2 - Validate Result

//In the Enqueue method, if queue is empty, ProcessHead();
//Else add to queue.
//When Head item is updated, ProcessHead();
//ProcessHead method checks if the head item is valid, and if it is uploads it and remove from queue. Once removed from queue, if(!IsEmpty) {ProcessHead();}


//Display Head of the Queue (and probably queue depth) to user so they can interact with it.

Step 3 - Upload result

Upload(ocrResults)

So here Rx is just a tool in our arsenal, not the one hammer that needs to solve all problems. I have found that with most "Rx" problems that grow in size, that Rx just acts as the entry and exit points for various Queue structures. This allows us to make the queuing in our system explicit instead of implicit (i.e. hidden inside of Rx operators).

Lee Campbell
  • 10,631
  • 1
  • 34
  • 29
  • Thank you, Lee! I get your idea. However, right now my main problem is that the Upload phase takes more than 2-3 seconds and the UI hangs. My first reaction was to make the upload method async, but then I get more than one concurrent call to Upload. The upload service only supports one upload at a time. How could I achieve this? – SuperJMN Jan 25 '16 at 21:06
  • This was my point. If you break up with work in to individual pieces, you have more control. I would wonder if you had a queue of Items for upload, why would that be being processed from the UI thread? The UI thread can trigger/start some background process to drain the queue, but the actual uploading of the file would be done on a background thread. You would only update the `state` of the upload ("Pending Upload"->"Uploading"->"Uploaded") from the UI Thread. – Lee Campbell Jan 27 '16 at 01:13
  • It also appears that you have not considered error flows in this example. e.g. if the OCR was performed on a bogus file, or if the Upload service is down or wont accept the upload (too large, wrong version etc) – Lee Campbell Jan 27 '16 at 01:15