0
  1. I have a Producer() which push data to a blocking collection.
  2. In Consumer(), I subscribed to the blocking collection as Observable, using System.Reactive (4.1.2).
  3. I'm using Buffer, but only able to buffer on numbers of streams.

Question - Can I use buffer operator with size of streams rather than number of streams?

When buffer size is crossing (example 1024 KB or 1 MB), create new buffer?

    class Program
    {
        private static readonly BlockingCollection<Message> MessagesBlockingCollection = new BlockingCollection<Message>();

    private static void Producer()
    {
        int ctr = 1;
        while (ctr <= 11)
        {
            MessagesBlockingCollection.Add(new Message { Id = ctr, Name = $"Name-{ctr}" });
            Thread.Sleep(1000);
            ctr++;
        }
    }

    private static void Consumer()
    {
        var observable = MessagesBlockingCollection.GetConsumingEnumerable().ToObservable();

        var bufferedNumberStream = observable.BufferWithThrottle(TimeSpan.FromSeconds(60), 5)
                                    .Subscribe(ts =>
                                    {
                                        WriteToFile(ts.ToList());
                                    });
    }

    private static void WriteToFile(List<Message> listToWrite)
    {
        using (StreamWriter outFile = System.IO.File.CreateText(Path.Combine(@"C:\TEMP", $"{DateTime.Now.ToString("yyyyMMddHHmmssfff")}.json")))
        {
            outFile.Write(JsonConvert.SerializeObject(listToWrite));
        }
    }

    static void Main(string[] args)
    {
        var producer = Task.Factory.StartNew(() => Producer());
        var consumer = Task.Factory.StartNew(() => Consumer());
        Console.Read();
     }
    }

Observable extension method,

public static IObservable<IList<TSource>> BufferWithThrottle<TSource>(this IObservable<TSource> source,
                                                                            TimeSpan threshold, int noOfStream)
    {
        return Observable.Create<IList<TSource>>((obs) =>
        {
            return source.GroupByUntil(_ => true,
                                       g => g.Throttle(threshold).Select(_ => Unit.Default)
                                             .Merge(g.Buffer(noOfStream).Select(_ => Unit.Default)))
                         .SelectMany(i => i.ToList())
                         .Subscribe(obs);
        });
    }
user584018
  • 10,186
  • 15
  • 74
  • 160

1 Answers1

1

Glad to see the extension method in use :)

You can modify it slightly to have it Scan the running count of Message sizes. By doing so we lose type generics.

public class Message
{
    public string Payload { get; set; }
    public int Size { get; set; }
}

public static IObservable<IList<Message>> BufferWithThrottle(this IObservable<Message> source,
                                                     TimeSpan threshold, int maxSize)
{
    return Observable.Create<IList<Message>>((obs) =>
    {
        return source.GroupByUntil(_ => true,
                                   g => g.Throttle(threshold).Select(_ => Unit.Default)
                                         .Merge(g.Select( i => i.Size)
                                                 .Scan(0, (a, b) => a + b)
                                                 .Where(a => a >= maxSize)
                                                 .Select(_ => Unit.Default)))
                     .SelectMany(i => i.ToList())
                     .Subscribe(obs);
    });
}
supertopi
  • 3,469
  • 26
  • 38
  • Thanks :), but how to calculate message Size? – user584018 Jan 09 '19 at 10:25
  • 1
    I wrote one more class 'BatchMessage`, where I'm calculating message size, `public class BatchMessage { public Message Message { get; set; } public int Size { get; set; } public BatchMessage(Message Message) { this.Message = Message; var msg = Newtonsoft.Json.JsonConvert.SerializeObject(Message); this.Size = msg.Length + 1; } }` and then using the extension with `IObservable>` – user584018 Jan 09 '19 at 11:02
  • Yep, that's roughly how you will get the JSON string size (or character length). Though it will not be the same as the number of bytes reserved by the file written to disk. – supertopi Jan 09 '19 at 12:49