1

I have a blocking queue which keep getting messages through some app, now In asp.net app I tried to consume the queue and write the output into CSV/JSON file.

Here I want to hold messages up to 1MB which receive from blocking queue and then write it out, now again hold data for 1MB and write again...so on.

In below code, I'm using system.reactive buffer and can hold number of observable 's and write to JSON, but Is there any way on size of observable 's?

class Program
{
    private static readonly BlockingCollection<Message> MessagesBlockingCollection = new BlockingCollection<Message>();

    private static void Producer()
    {
        int ctr = 1;
        while (ctr <= 11)
        {
            MessagesBlockingCollection.Add(new Message { Id = ctr, Name = $"Name-{ctr}" });
            Thread.Sleep(1000);
            ctr++;
        }
    }

    private static void Consumer()
    {
        var observable = MessagesBlockingCollection.GetConsumingEnumerable().ToObservable();

        var bufferedNumberStream = observable.BufferWithThrottle(TimeSpan.FromSeconds(60), 5)
                                    .Subscribe(ts =>
                                    {
                                        WriteToFile(ts.ToList());
                                    });
    }

    private static void WriteToFile(List<Message> listToWrite)
    {
        using (StreamWriter outFile = System.IO.File.CreateText(Path.Combine(@"C:\TEMP", $"{DateTime.Now.ToString("yyyyMMddHHmmssfff")}.json")))
        {
            outFile.Write(JsonConvert.SerializeObject(listToWrite));
        }
    }

    static void Main(string[] args)
    {
        var producer = Task.Factory.StartNew(() => Producer());
        var consumer = Task.Factory.StartNew(() => Consumer());
        Console.Read();
    }
}

Extension Reactive Method,

public static IObservable<IList<TSource>> BufferWithThrottle<TSource>(this IObservable<TSource> source,
                                                                            TimeSpan threshold, int noOfStream)
    {
        return Observable.Create<IList<TSource>>((obs) =>
        {
            return source.GroupByUntil(_ => true,
                                       g => g.Throttle(threshold).Select(_ => Unit.Default)
                                             .Merge(g.Buffer(noOfStream).Select(_ => Unit.Default)))
                         .SelectMany(i => i.ToList())
                         .Subscribe(obs);
        });
    }

Message Class,

public class Message
{
    public int Id { get; set; }
    public string Name { get; set; }
}
user584018
  • 10,186
  • 15
  • 74
  • 160
  • Why do you want to buffer the data in Rx? If you must buffer the write, can you buffer using a [BufferedStream](https://learn.microsoft.com/en-us/dotnet/api/system.io.bufferedstream?view=netframework-4.7.2)? – Aron Jan 09 '19 at 07:54
  • @Aron, I tried something using `'Rx Buffer`, but able to do with ONLY number of observable, not on size, please have a look this question, https://stackoverflow.com/questions/54104055/buffer-with-stream-size-instead-of-number-of-streams-reactive-extension-c-sharp – user584018 Jan 09 '19 at 09:13
  • Question edited – user584018 Jan 09 '19 at 09:28
  • I didn't say use Rx buffer. I said you use BufferedStream. You just wrap your FileStream with a BufferedStream. Message has no "size". Only the serialized form does... – Aron Jan 09 '19 at 09:42
  • Ok, meaning we can't determine observable message size? There is no `Rx` way? – user584018 Jan 09 '19 at 09:51
  • I also reading this, https://stackoverflow.com/questions/53152134/buffer-by-time-or-running-sum-for-reactive-extensions – user584018 Jan 09 '19 at 09:55
  • I mean that it is a meaningless question in .net (nothing to do with Rx). You are using managed memory. Alternatively there are 101 different answers to "message size". – Aron Jan 09 '19 at 09:55
  • @Enigmativity, please re-open this question https://stackoverflow.com/questions/54104055/buffer-with-stream-size-instead-of-number-of-streams-reactive-extension-c-sharp, I will delete this one. Thanks! – user584018 Jan 09 '19 at 10:38

0 Answers0