1

My goal is to download files via ftp and somehow process them asynchronously. I turn list of files to IObservable and process it using SelectMany combinator. Inside there is some manipulations to download blocked files: try to download file with number of retries and return Tuple, or in case of failure return Tuple and wrap it into Observable. Sample of "Observable retriable after delay" I have taken there and slightly modified it. Problem is that my code randomly stops after downloading couple of files. Sometimes it reaches the "OnNext" callback in "Subscribe" method. I never have detected code reaching "OnComplete" callback. No exception were thrown either.

        files.ToObservable().SelectMany(f =>
        {
            var source = Observable.Defer(() => Observable.Start(() =>
            {
                ftpConnection.DownloadFile(avroPath, f.Name);
                return Tuple.Create(true, f.Name);
            }));
            int attempt = 0;
            return Observable.Defer(() => ((++attempt == 1)
                ? source
                : source.DelaySubscription(TimeSpan.FromSeconds(1))))
                .Retry(4)
                .Catch(Observable.Return(Tuple.Create(false, f.Name)));
        }).Subscribe(
            res =>
            {
                Console.Write("Damn, its only rarely gets there, however some files were downloaded succesfully");
                if (res.Item1) Process(res.Item2);
                else LogOrQueueOrWhatever(res.Item2);
            },
            (Exception ex) =>
            {
                Console.Write("Never was thrown");
            },
            () =>
            {
                Console.Write("Never entered this section");
                ProcessLogs();
                ScheduleNExtDownloadRoutine();
            });

I will be grateful if someone will show the more idiomatic way to mess with combinators on Observables.

Community
  • 1
  • 1
  • 1
    Is your application exiting before you've given it enough time to asynchronously download all of the files (`Subscribe` does not block the current thread)? – Brandon Jul 09 '14 at 15:04
  • Exactly the case. Just realized after message was posted, so newbie me. – Sergey Prytkov Jul 09 '14 at 16:26
  • FYI see this question for a Retry operator: http://stackoverflow.com/questions/24655590/reactive-extensions-and-retry – Brandon Jul 09 '14 at 16:33

1 Answers1

1

As Brandon mentioned, there was no synchronization/blocking after defining observable's behavior. So I deal with it by replacing "Subscribe" call with "ForEachAsync", transforming that Observable to Task and blocking caller with Tasks's "Wait" method:

    files.ToObservable().SelectMany(f =>
    {
        var source = Observable.Defer(() => Observable.Start(() =>
        {
            ftpConnection.DownloadFile(avroPath, f.Name);
            return Tuple.Create(true, f.Name);
        }));
        int attempt = 0;
        return Observable.Defer(() => ((++attempt == 1)
            ? source
            : source.DelaySubscription(TimeSpan.FromSeconds(1))))
            .Retry(4)
            .Catch(Observable.Return(Tuple.Create(false, f.Name)));
    }).ForEachAsync(res =>
    {
        if (res.Item1) Process(res.Item2);
        else LogOrQueueOrWhatever(res.Item2);
    }).Wait();

    ProcessLogs();
    ScheduleNExtDownloadRoutine();