I need some help to write two IObservables which perform similar tasks (described below as 'Normal Trigger' and 'Dwell Trigger'). I have working input streams, but I am not sure how to combine them, perform the analysis (e.g. holding on to state variables for the 'Dwell Trigger' to store the 1st interesting coord, as well as our progress percentage) and then produce a new output of a different type, all described within an observable. Let me describe what I am attempting to write:
Normal Trigger:
INPUT = two streams: 1. An unreliable hot stream of Points (x,y coordinates), e.g. the mouse position. The number of points per second could fluctuate slightly, or the stream could produce no values for periods of time. 2. A hot stream of boolean values.
OUTPUT: A combination of the data (a point from stream 1 together with a value derived from stream 2), which will be a double and represent a percentage (only 100% or 0%). An output will only be produced if these conditions are met; A signal is received on the trigger stream and the last signal on the coords stream was within a configured timespan. Another way of saying that would be that we have received a trigger and have a coords which is not old/stale.
Coords stream: -x-x-x-x-x---x-x-x-x------x-x-x-x-x-x-x-
Trigger stream:----------x------------x-------x--------
Result stream: ----------x--------------------x--------
Dwell Trigger:
INPUT = one stream: 1. An unreliable hot stream of Points (x,y coordinates), e.g. the mouse position. The number of points per second could fluctuate slightly, or the stream could produce no values for periods of time.
OUTPUT: Same output type as in the 'Normal Trigger' (a point from stream 1 together with a new derived value). An output will only be produced if these conditions are met; The coords stream has supplied coords over a certain timespan where the coords are all within a small area and there are no big gaps between the values, e.g. I've received consistent, unbroken coords for 0.1 second that all relate to the same small area and there were no gap between any coords of more than 0.01 second. I then wish to produce an output which includes the first coord from the unbroken chain and a double indicating that we are, say, 10% to our goal. If we then receive another 0.1 second of coords in the same area with no big gaps then I wish to output the SAME first coord from the unbroken chain, with another double to show that we are 20% to our goal. If a big gap is encountered, or the coords move away from the initial area then we report 0%, discard our progress and initial coord and continue looking for sufficiently long (0.1 sec) unbroken chains. If we reach 100% of our goal then this is output and start looking again (beginning again at 0%).
Coords stream: -x-x-x-x-x---x-x-x-x------x-x-x-x-x-x-x-
Result stream: ---------x------------------------x-----
This may be difficult to understand so let me explain what these are for. The coordinates are locations on the screen, and the first 'Normal Trigger' is attempting to capture the location when a button is pressed (the button press being the trigger). In the second scenario (the 'Dwell Trigger') there is no button press, so we want to capture the position of the mouse pointer when it stays ('dwells') in the same small area. This is, however, gradual, so we want to recognise when the mouse position has been consistent for a small period of time, register that it has been in the same position for 10% of the time needed, then if it stays in the same area register that we are 20% of the way, etc, until we are happy that the mouse has been kept in the same area for long enough to output 100%, which means the user has 'dwelled' in the same position long enough to register their interest. We then start looking again to see where they dwell again, which may be in the same spot.
The 'Dwell Trigger' seems much more challenging to me, as I presume you would need to do all of the following;
- Retain the last last coord to be able to check the time between the current and last coord (to detect any "big gaps")
- Retain the first coord in any unbroken chain so that we can a) compare current coord position to the position of the first coord in the chain to check that the current coord is within an acceptable distance of the first coord in this chain b) calculate our progress based on how much time has passed between the first coord and the current coord which is acceptable to us c) output the first coord with our new progress percentage
- Have some sort of interval on the stream that allows us to output 0% if a "big gap" is detected in the stream
I have working Rx observables for the coord and trigger streams. Pseudo code/descriptions of where I should be looking would be most appreciated.
Thank you for reading my enormous post! Julius
Solutions:
"Normal Trigger"
Following Christopher's suggestion of using Observable.Create I came up with the below. I do not know whether this is considered the "hacky" way of creating streams and it would be better to describe what I'm doing using another set of Rx methods, but this works. Please note that the functionality is slightly different from what I described;
- I decided it would be better for a trigger to ALWAYS be output, but that it would be accompanied by a Point if that point was fresh enough, i.e. we can always see when the user has clicked the mouse button (or whatever the trigger signal is), but we can only see the point to which this relates if it is not stale.
The output signal is not 100% or 0%, but 1 (for 100%) and -1 (for -100% which occurs when the opposite trigger is encountered). For example pressing the mouse button DOWN might be 1, but releasing it might be -1.
public static IObservable<TriggerSignalWithPoint> CombineWithPointIfRelevent( this IObservable<bool> triggerSource, IObservable<Timestamped<Point>> pointsSource, TimeSpan pointsMaxReleventAge) { return Observable.Create<TriggerSignalWithPoint>(subj => { bool disposed = false; Timestamped<Point>? latestPoint = null; Action disposeChildSubscriptions = null; var pointsSubscription = pointsSource.Subscribe( timestampedPoint => { latestPoint = timestampedPoint; }, ex => { subj.OnError(ex); disposeChildSubscriptions(); }); var triggerSubscription = triggerSource .Where(_ => disposed == false) .Subscribe( b => { Point? latestUsefulPoint = latestPoint.HasValue && DateTimeOffset.Now.Subtract(latestPoint.Value.Timestamp) <= pointsMaxReleventAge ? latestPoint.Value.Value : (Point?) null; float signal = b ? 1 : -1; subj.OnNext(new TriggerSignalWithPoint(signal, latestUsefulPoint)); }, ex => { subj.OnError(ex); disposeChildSubscriptions(); }, () => { subj.OnCompleted(); disposeChildSubscriptions(); }); disposeChildSubscriptions = () => { disposed = true; if (triggerSubscription != null) { triggerSubscription.Dispose(); triggerSubscription = null; } if (pointsSubscription != null) { pointsSubscription.Dispose(); pointsSubscription = null; } }; return disposeChildSubscriptions; }); }
N.B. This solution is adapted from Paul Bett's answer here; CombineLatest, but only push for the left
Any criticism or help would be hugely helpful as I am only just brushing the surface of Rx.