I have a blocking queue which keep getting messages through some app, now In asp.net app I tried to consume the queue and write the output into CSV/JSON file.
Here I want to hold messages up to 1MB which receive from blocking queue and then write it out, now again hold data for 1MB and write again...so on.
In below code, I'm using system.reactive
buffer and can hold number of observable 's and write to JSON, but Is there any way on size of observable 's?
class Program
{
private static readonly BlockingCollection<Message> MessagesBlockingCollection = new BlockingCollection<Message>();
private static void Producer()
{
int ctr = 1;
while (ctr <= 11)
{
MessagesBlockingCollection.Add(new Message { Id = ctr, Name = $"Name-{ctr}" });
Thread.Sleep(1000);
ctr++;
}
}
private static void Consumer()
{
var observable = MessagesBlockingCollection.GetConsumingEnumerable().ToObservable();
var bufferedNumberStream = observable.BufferWithThrottle(TimeSpan.FromSeconds(60), 5)
.Subscribe(ts =>
{
WriteToFile(ts.ToList());
});
}
private static void WriteToFile(List<Message> listToWrite)
{
using (StreamWriter outFile = System.IO.File.CreateText(Path.Combine(@"C:\TEMP", $"{DateTime.Now.ToString("yyyyMMddHHmmssfff")}.json")))
{
outFile.Write(JsonConvert.SerializeObject(listToWrite));
}
}
static void Main(string[] args)
{
var producer = Task.Factory.StartNew(() => Producer());
var consumer = Task.Factory.StartNew(() => Consumer());
Console.Read();
}
}
Extension Reactive Method,
public static IObservable<IList<TSource>> BufferWithThrottle<TSource>(this IObservable<TSource> source,
TimeSpan threshold, int noOfStream)
{
return Observable.Create<IList<TSource>>((obs) =>
{
return source.GroupByUntil(_ => true,
g => g.Throttle(threshold).Select(_ => Unit.Default)
.Merge(g.Buffer(noOfStream).Select(_ => Unit.Default)))
.SelectMany(i => i.ToList())
.Subscribe(obs);
});
}
Message Class,
public class Message
{
public int Id { get; set; }
public string Name { get; set; }
}