I am facing problem with Rx.Window
Below is my code. Notice that some work items do not reach the inner subscription.
var subscription
= m_BufferBlock
.AsObservable()
.Synchronize()
.Where(InValue => InValue.Region == Region)
.Do(W => logger.Debug("Side Effect => " + W.ToString()))
Work Item Lost after this; does not hit window and I dont see it being received inside Subscribe
.Window(TimeSpan.FromMilliseconds(10000))
.SubscribeOn(Scheduler.Default)
.Subscribe(window =>
{
window
.ToList()
.SubscribeOn(Scheduler.Default)
.Subscribe(workItems =>
{
foreach (WorkItem W in workItems)
{
// Some work items do not reach this line
logger.Debug("Came inside subscriber => " + W);
}
if (workItems.Count > 0)
{
ProcessWorkItems(workItems.ToList<WorkItem>());
}
});
});
WorkItem Class
public class WorkItem
{
public int Region { get; set; }
public int Priority { get; set; }
public string PortfolioId { get; set; }
public string SecurityId { get; set; }
public string Status { get; set; }
public string Message { get; set; }
public Int64 Guid { get; set; }
public DateTime RequestedDateTime { get; set; }
public WorkItemType WorkItemType { get; set; }
public long RecordCount { get; set; }
public WorkItem()
{
PortfolioId = string.Empty;
SecurityId = string.Empty;
Message = string.Empty;
Status = string.Empty;
Region = 0;
WorkItemType = WorkItemType.REALTIME;
RequestedDateTime = DateTime.Now;
RecordCount = 0;
}
public override string ToString()
{
return string.Format("WorkItemType : {0} Region : {1} PortfolioId : {2} SecurityId : {3} Guid : {4} Priority : {5} Status : {6} Message : {7}",
WorkItemType.ToString(), Region, PortfolioId, SecurityId, Guid, Priority, Status, Message);
}
}
I am using BufferBlock with Unbounded Capacity...
Any help would be appreciated...