My goal is to download files via ftp and somehow process them asynchronously. I turn list of files to IObservable and process it using SelectMany combinator. Inside there is some manipulations to download blocked files: try to download file with number of retries and return Tuple, or in case of failure return Tuple and wrap it into Observable. Sample of "Observable retriable after delay" I have taken there and slightly modified it. Problem is that my code randomly stops after downloading couple of files. Sometimes it reaches the "OnNext" callback in "Subscribe" method. I never have detected code reaching "OnComplete" callback. No exception were thrown either.
files.ToObservable().SelectMany(f =>
{
var source = Observable.Defer(() => Observable.Start(() =>
{
ftpConnection.DownloadFile(avroPath, f.Name);
return Tuple.Create(true, f.Name);
}));
int attempt = 0;
return Observable.Defer(() => ((++attempt == 1)
? source
: source.DelaySubscription(TimeSpan.FromSeconds(1))))
.Retry(4)
.Catch(Observable.Return(Tuple.Create(false, f.Name)));
}).Subscribe(
res =>
{
Console.Write("Damn, its only rarely gets there, however some files were downloaded succesfully");
if (res.Item1) Process(res.Item2);
else LogOrQueueOrWhatever(res.Item2);
},
(Exception ex) =>
{
Console.Write("Never was thrown");
},
() =>
{
Console.Write("Never entered this section");
ProcessLogs();
ScheduleNExtDownloadRoutine();
});
I will be grateful if someone will show the more idiomatic way to mess with combinators on Observables.