2

I am facing problem with Rx.Window

Below is my code. Notice that some work items do not reach the inner subscription.

var subscription
    = m_BufferBlock
        .AsObservable()
        .Synchronize()
        .Where(InValue => InValue.Region == Region)
        .Do(W => logger.Debug("Side Effect => " + W.ToString())) 

Work Item Lost after this; does not hit window and I dont see it being received inside Subscribe

        .Window(TimeSpan.FromMilliseconds(10000))
        .SubscribeOn(Scheduler.Default)
        .Subscribe(window =>
            {
                window
                    .ToList()
                    .SubscribeOn(Scheduler.Default)
                    .Subscribe(workItems =>
                        {
                            foreach (WorkItem W in workItems)
                            {
                                // Some work items do not reach this line
                                logger.Debug("Came inside subscriber => " + W);
                            }

                            if (workItems.Count > 0)
                            {
                                ProcessWorkItems(workItems.ToList<WorkItem>());
                            }
                        });
            });

WorkItem Class

public class WorkItem
{

    public int Region { get; set; }
    public int Priority { get; set; }

    public string PortfolioId { get; set; }
    public string SecurityId { get; set; }
    public string Status { get; set; }
    public string Message { get; set; }

    public Int64 Guid { get; set; }
    public DateTime RequestedDateTime { get; set; }
    public WorkItemType WorkItemType { get; set; }
    public long RecordCount { get; set; }

    public WorkItem()
    {
        PortfolioId = string.Empty;
        SecurityId = string.Empty;
        Message = string.Empty;
        Status = string.Empty;
        Region = 0;
        WorkItemType = WorkItemType.REALTIME;
        RequestedDateTime = DateTime.Now;
        RecordCount = 0;
    }

    public override string ToString()
    {
        return string.Format("WorkItemType : {0} Region : {1} PortfolioId : {2} SecurityId : {3} Guid : {4} Priority : {5} Status : {6} Message : {7}", 
            WorkItemType.ToString(), Region, PortfolioId, SecurityId, Guid, Priority, Status, Message);
    }
}

I am using BufferBlock with Unbounded Capacity...

Any help would be appreciated...

svick
  • 236,525
  • 50
  • 385
  • 514
user2757350
  • 311
  • 3
  • 12

3 Answers3

3

Do not do window.ToList().SubscribeOn(...).Subscribe(...). Instead do window.ToList().Subscribe(...) or use Chris's version that is using SelectMany.

The inner window observables are hot. That means you must subscribe to them before returning or you will lose items in the window. But SubscribeOn is introducing a delay in the subscription while the call to subscribe is scheduled on another thread or task. Which means you are returning from your callback before you have actually subscribed to the window and it is a race to see if the subscribe runs before an item arrives in the window. If an item arrives before the scheduled call to subscribe runs, then the item gets lost because the subscription has not been setup yet.

So not only is the SubscribeOn not useful, as James noted, it is actually the cause of your problem.

Brandon
  • 38,310
  • 8
  • 82
  • 87
2

Are you absolutely certain that you are losing elements? (If you are, I am almost certain it's not a bug in Window)

The reason I ask is that your Rx query has been written in such a way that it is entirely possible for the log output sent in the Do() of window n+1 to be written out before the items captured in window n have been logged.

I wonder if perusing a little further down the log might resolve things?

To check this, we could modify your query slightly to get a window number and include it in your log output like this:

var subscription
    = m_BufferBlock
    .AsObservable()
    .Synchronize()
    .Where(InValue => InValue.Region == Region)
    .Do(W => logger.Debug("Side Effect => " + W.ToString())) 
    .Window(TimeSpan.FromMilliseconds(10000))
    .Select((window, index) => Tuple.Create(window,index))
    .SubscribeOn(Scheduler.Default)
    .Subscribe(window =>
        {
            window.Item1
                .ToList()
                .SubscribeOn(Scheduler.Default)
                .Subscribe(workItems =>
                    {
                        foreach (WorkItem W in workItems)
                        {
                            // Some work items do not reach this line
                            logger.Debug("Came inside window " + window.Item2 + " and subscriber => " + W);
                        }

                        if (workItems.Count > 0)
                        {
                            ProcessWorkItems(workItems.ToList<WorkItem>());
                        }
                    });
        });

I should add that nesting subscriptions in the way that you have is not best practice. It brings issues in terms of linearity (the repercussions of which you are suffering here), performance, flexibility and readability. You are better off keeping things "in the monad" (i.e. composing observables) and holding off to make a final subscription as long as possible.

Look at how Chris has translated your query with a SelectMany in place of the nested subscription to see how to approach this. Also, your SubscribeOn is probably gaining you nothing here - Have a look at this question to see what SubscribeOn gives you.

Community
  • 1
  • 1
James World
  • 29,019
  • 9
  • 86
  • 120
  • SelectMany is the first thing I noticed; I have changed my code accordingly and I have kept only one Subscribe rather than nested Subscribe... I am testing this code; I will keep you posted.. – user2757350 Jan 10 '14 at 18:21
0

Window will yield an Observable for each Window period regardless of whether or not any items are yielded during that window. Therefore, you may end up with windows containing no events. When you use ToList on these windows, they will result in an observable which yeilds an empty list.

var subscription = m_BufferBlock
    .AsObservable()
    .Synchronize()
    .Where(item => item.Region == Region)
    .Do(item => logger.Debug("Side Effect => " + item))
    .Window(TimeSpan.FromMilliseconds(10000))
    .SelectMany(window => window.ToList())
    .Where(workItems => workItems.Count > 0)
    .Subscribe(ProcessWorkItems);

This could also work.

var subscription = m_BufferBlock
    .AsObservable()
    .Synchronize()
    .Where(item => item.Region == Region)
    .Do(item => logger.Debug("Side Effect => " + item))
    .Buffer(TimeSpan.FromMilliseconds(10000))
    .Where(workItems => workItems.Count > 0)
    .Subscribe(ProcessWorkItems);
cwharris
  • 17,835
  • 4
  • 44
  • 64
  • Ok; I know that there will not be items in every window yield; but some of the WorkItems never comes through in any window. It just disappears...I can see them untill Do(W => logger.Debug("Side Effect => " + W.ToString())). But after that it neven comes through logger.Debug("Came inside subscriber => " + W.ToString()); And this happens not for all; it happens very occasionally for few WorkItems... – user2757350 Jan 09 '14 at 21:20
  • let me take a look. So they get logged to "Side Effect => item", but they don't appear in a window? – cwharris Jan 09 '14 at 21:22
  • I'm really not sure what's going on. Could you provide some output from your application, showing where a work item has been lost? – cwharris Jan 09 '14 at 21:35
  • I have added Comment in the Code; it gets lost after .Do. Only few WorkItem gets lost not all of them. I have added my WorkItem class also. Please refer to original code all the way up – user2757350 Jan 09 '14 at 22:01
  • I'll see if I can take a look at the implementation of Window later to see if there's a bug. – cwharris Jan 09 '14 at 23:31