2

ExecuteSellAsync method is being called twice at the same time and as you can see on the logs below. It works fine when I put 15 seconds on Observable.Interval(TimeSpan.FromSeconds(15)). How can I prevent that situation? Maybe something like locking or I don't know.

2021-02-12 19:04:09 [11] DEBUG LiveTradeManager Order ID: 263010769 | Pair: DOGEUSDT | Order side: Sell | Status: New | Price: 0.06783960 | Last filled price: 0.00000000 | Stop price: 0.00000000 | Quantity: 0.00000000 | Quote quantity: 0.00000000 | Commission: 0 
2021-02-12 19:04:09 [11] DEBUG LiveTradeManager Order ID: 263010769 | Pair: DOGEUSDT | Order side: Sell | Status: Filled | Price: 0.06783960 | Last filled price: 0.06784260 | Stop price: 0.00000000 | Quantity: 5420.00000000 | Quote quantity: 367.70689200 | Commission: 0.00201210 BNB
2021-02-12 19:04:09 [11] DEBUG LiveTradeManager Sell order was filled | Close date: 2021/02/12 17:04:09 | Close rate (price): 0.06784260
2021-02-12 19:04:13 [9] INFO  Wallets Wallets synced.
2021-02-12 19:04:14 [10] DEBUG LiveTradeManager Timer triggered. Price: 0.06783910 | Timestamp: 2/12/2021 5:03:00 PM | Close: 0.06790680
2021-02-12 19:04:17 [9] DEBUG BinanceSpotClient Limit sell order has failed | Error code: -2010 | Error message: Account has insufficient balance for requested action. | Pair: DOGEUSDT | Quantity: 0.00000000 | Price: 0.06782540

_throttlerObservable = Observable.Interval(TimeSpan.FromSeconds(5))
   .SelectMany(_ => Observable.FromAsync(async () =>
   {
       var lastCandle = _candles.Last();

       _logger.Debug($"Timer triggered. Price: {_ticker.LastPrice} | Open time: {lastCandle.Timestamp} | Close: {lastCandle.Close}");

       if (_orderSide == OrderSide.Sell)
       {
           var trade = _trades.FirstOrDefault(e => e.Pair.Equals(_tradeOptions.Pair) && e.IsOpen);

           if (trade.NotNull())
           {
               var shouldSell = _tradingStrategy.ShouldSell(trade, _ticker.LastPrice, _tradeAdvice);

               if (shouldSell.SellFlag)
               {
                   await ExecuteSellAsync(trade, lastCandle.Timestamp, shouldSell.SellType).ConfigureAwait(false);
               }
           }
       }
   }))
   .Subscribe();

Edit

I see what the problem is. _tradingStrategy.ShouldSell takes a few seconds to execute and the next execution starts executing the next check in the same time. Can I use lock into that logic?

That's what solves it but I need to lock the entire check:

bool test = false;
public (bool SellFlag, SellType SellType) ShouldSell(Trade trade, decimal rate, TradeAdvice tradeAdvice, decimal? low = null, decimal? high = null)
{
    if (!test)
    {
        test = true;

        // my logic is here. It takes a few seconds.

        test = false;
    }

    return (false, SellType.None);
}

Edit2

A testable code. Observable.Interval is executed on each second and ShouldSellAsync's logic takes 5 seconds to execute. Once _completed becomes true, the message is no longer printed. It executes the message 5 times, when I expect it only once.

using System;
using System.Linq;
using System.Reactive.Linq;
using System.Threading.Tasks;

namespace RxNETDispose
{
    class Program
    {
        private static bool _completed = false;

        public static async Task ShouldSellAsync()
        {
            if (!_completed)
            {
                await Task.Delay(5000);

                Console.WriteLine($"{DateTime.UtcNow} - ShouldSell called");

                _completed = true;
            }
        }

        static void Main(string[] args)
        {
            Observable.Interval(TimeSpan.FromSeconds(1))
                .SelectMany(_ => Observable.FromAsync(async () =>
                {
                    await ShouldSellAsync().ConfigureAwait(false);
                }))
            .Subscribe();

            Console.ReadLine();
        }
    }
}
nop
  • 4,711
  • 6
  • 32
  • 93
  • Please [edit] your question to include the full source code you have as a [mcve], which can be compiled and tested by others. – Progman Feb 12 '21 at 17:36
  • Exactly what is `Observable.Interval(5)`? That's not one of the standard methods, and there's no implicit conversion from `int` to `TimeSpan`. – StackOverthrow Feb 12 '21 at 17:42
  • @StackOverthrow, sorry, it's in seconds. I accidentally removed it while trying to get a better code for tests. – nop Feb 12 '21 at 17:44
  • 1
    Related: [What's a good way to run periodic tasks using Rx, with a single concurrent execution restriction?](https://stackoverflow.com/questions/31399054/whats-a-good-way-to-run-periodic-tasks-using-rx-with-a-single-concurrent-execu/) – Theodor Zoulias Feb 12 '21 at 18:10
  • @Progman, added one. If u execute the code, u will see that the message doesnt appear instantly, but it waits 8 seconds and then it gets printed, even tho it should be in reversed order. – nop Feb 12 '21 at 18:15
  • 1
    @nop Your MCVE will generate the next output at every 5 seconds (not 8), as defined by the `Observable.Interval()`. It also does not start "immediately" with the first value as the value is generated *after* the configured period has passed. – Progman Feb 12 '21 at 18:27
  • 1
    The `SelectMany` operator is equivalent to `Select`+`Merge`, and the `Merge` merges the subsequences by interleaving their elements. If you want each subsequence to be subscribed exclusively, so that the next subsequence is subscribed after the completion of the previous one, you need the `Select`+`Concat` or `Select`+`Merge(1)`. But this has the nasty consequence that all the non-subscribed subsequences are buffered by the `Concat` or `Merge(1)` operators, and so it is possible that later your timer will start ticking faster than you want (among other memory-related issues). – Theodor Zoulias Feb 12 '21 at 18:28
  • @Progman, yes, and that's why my ExecuteSellAsync check is executed twice because the thing is generated later. – nop Feb 12 '21 at 18:36
  • 1
    @nop Based on the MCVE, what result/output do you expect and when should each line be printed? Can you add some kind of time diagram (or time table) when what event should occure in your MCVE? – Progman Feb 12 '21 at 18:38
  • @Progman, I can't create a diagram but the stuff should be executed at time – nop Feb 12 '21 at 20:39
  • @nop "Should be executed at time" does not explain when exactly the events or output lines in your MCVE should appear. Please [edit] your question to include some kind of time table which shows when the `Console.WriteLine()` calls should be executed. Start with the time `00:00` in the format "mm:ss" and write the time of each event/output you want to have. – Progman Feb 12 '21 at 20:51
  • @Progman, check the updated question, mate. I know why that is happening. Will `lock` work on the entire check? I kinda dislike having a bool variable like the way I did – nop Feb 13 '21 at 17:12
  • @nop Locks might help, but that depends on the actual specific problem you have. Please provide a [mcve], which clearly shows the problem you have when calling the `Main()` method. – Progman Feb 13 '21 at 17:15
  • @Progman, I finally composed a testable code. That's what happens. The message is executed 5 times because the boolean takes 5 seconds to become `true`. I expect the logic to be locked and not executed twice, when it's already running. – nop Feb 13 '21 at 18:04
  • Kinda can't use `lock` with awaited methods :/ – nop Feb 13 '21 at 22:58

1 Answers1

2

SelectMany does indeed introduce concurrency. We want to control that concurrency so the answer here is to roll your own operator to be able to ensure that there's a fixed gap between the calls to ExecuteSellAsync.

Thankfully there's a beautiful, but non-obvious, way to do this with the Rx schedulers.

The method we're looking for is this:

public static IDisposable ScheduleAsync(this IScheduler scheduler, TimeSpan dueTime, Func<IScheduler, CancellationToken, Task<IDisposable>> action)

To use this call be need to define the Func<IScheduler, CancellationToken, Task<IDisposable>> action to be recursive so as to call itself to reschedule once the call to ExecuteSellAsync is complete.

So, to do this every 2.0 seconds, for example, we do this:

Func<IScheduler, CancellationToken, Task<IDisposable>> handler = null;
handler = async (s, ct) =>
{
    await ExecuteSellAsync();
    return s.ScheduleAsync(TimeSpan.FromSeconds(2.0), handler);
};

We can kick it off by calling this:

IDisposable subscription = Scheduler.Default.ScheduleAsync(TimeSpan.Zero, handler);

Of course, like all good Rx operations we can call subscription.Dispose() to stop it running.

Here's a complete example:

async Task Main()
{
    Func<IScheduler, CancellationToken, Task<IDisposable>> handler = null;
    handler = async (s, ct) =>
    {
        await ExecuteSellAsync();
        return s.ScheduleAsync(TimeSpan.FromSeconds(2.0), handler);
    };
    
    IDisposable subscription = Scheduler.Default.ScheduleAsync(TimeSpan.Zero, handler);

    await Task.Delay(TimeSpan.FromSeconds(9.0));
    
    subscription.Dispose();
 }

private DateTime then = DateTime.Now;
private int __counter = 0;

async Task ExecuteSellAsync()
{
    var counter = Interlocked.Increment(ref __counter);
    Console.WriteLine($"ExecuteSellAsync() Start {counter} - {DateTime.Now.Subtract(then).TotalSeconds}");
    await Task.Delay(TimeSpan.FromSeconds(2.0));
    Console.WriteLine($"ExecuteSellAsync() End {counter} - {DateTime.Now.Subtract(then).TotalSeconds}");
}

When I run this I get this output:

ExecuteSellAsync() Start 1 - 0.0019952
ExecuteSellAsync() End 1 - 2.0095866
ExecuteSellAsync() Start 2 - 4.0185182
ExecuteSellAsync() End 2 - 6.0199157
ExecuteSellAsync() Start 3 - 8.0303588
ExecuteSellAsync() End 3 - 10.0417079

Note that ExecuteSellAsync() does not cooperatively cancel so it runs to completion. It's not hard to change it to async Task ExecuteSellAsync(CancellationToken ct) and allow it to cancel cooperatively.

Now, this can be extended to make it into a nice observable.

Try this:

IObservable<Unit> query =
    Observable.Create<Unit>(o =>
    {
        Func<IScheduler, CancellationToken, Task<IDisposable>> handler = null;
        handler = async (s, ct) =>
        {
            if (ct.IsCancellationRequested)
            {
                o.OnCompleted();
            }
            else
            {
                await ExecuteSellAsync();
                o.OnNext(Unit.Default);
            }
            return s.ScheduleAsync(TimeSpan.FromSeconds(2.0), handler);
        };
    
        return Scheduler.Default.ScheduleAsync(TimeSpan.Zero, handler);
    });

IDisposable subscription = query.Take(3).Subscribe(x => Console.WriteLine("U"), () => Console.WriteLine("C"));

await Task.Delay(TimeSpan.FromSeconds(11.0));

subscription.Dispose();

This has the following output:

ExecuteSellAsync() Start 1 - 0.0009972
ExecuteSellAsync() End 1 - 2.0115375
U
ExecuteSellAsync() Start 2 - 4.0128375
ExecuteSellAsync() End 2 - 6.0282818
U
ExecuteSellAsync() Start 3 - 8.0370135
ExecuteSellAsync() End 3 - 10.0521106
U
C

Note it completes. If you call subscription.Dispose(); before it naturally completes then it behaves properly and doesn't issue the OnComplete notification.

Let's wrap this in a nice set of extension methods:

public static class ObservableEx
{
    public static IObservable<Unit> IntervalAsync(TimeSpan period, Func<Task> actionAsync, IScheduler scheduler) =>
        TimerAsync(period, period, actionAsync, scheduler);

    public static IObservable<T> IntervalAsync<T>(TimeSpan period, Func<Task<T>> functionAsync, IScheduler scheduler) =>
        TimerAsync(period, period, functionAsync, scheduler);

    public static IObservable<Unit> TimerAsync(TimeSpan dueTime, TimeSpan period, Func<Task> actionAsync, IScheduler scheduler) =>
        Observable.Create<Unit>(o =>
        {
            Func<IScheduler, CancellationToken, Task<IDisposable>> handler = null;
            handler = async (s, ct) =>
            {
                if (ct.IsCancellationRequested)
                {
                    o.OnCompleted();
                }
                else
                {
                    await actionAsync();
                    o.OnNext(Unit.Default);
                }
                return s.ScheduleAsync(period, handler);
            };
            return scheduler.ScheduleAsync(dueTime, handler);
        });

    public static IObservable<T> TimerAsync<T>(TimeSpan dueTime, TimeSpan period, Func<Task<T>> functionAsync, IScheduler scheduler) =>
        Observable.Create<T>(o =>
        {
            Func<IScheduler, CancellationToken, Task<IDisposable>> handler = null;
            handler = async (s, ct) =>
            {
                if (ct.IsCancellationRequested)
                {
                    o.OnCompleted();
                }
                else
                {
                    o.OnNext(await functionAsync());
                }
                return s.ScheduleAsync(period, handler);
            };
            return scheduler.ScheduleAsync(dueTime, handler);
        });
}

Now, clearly there are a bunch of overloads that I didn't write - ones that use a default scheduler and ones that allow for cooperative cancellation - but I hope you get the idea.

Now with these extension methods I can do this:

IDisposable subscription =
    ObservableEx
        .IntervalAsync(TimeSpan.FromSeconds(2.0), () => ExecuteSellAsync(), Scheduler.Default)
        .Take(3)
        .Subscribe(x => Console.WriteLine("U"), () => Console.WriteLine("C"));

I get the same output as before.

I haven't fully tested the extension methods. They might require a little more love and attention.

Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • Thank you very much for your answer! Your answers on Rx are basically light in the tunnel. It works flawlessly. However, I have a question. Does that mean I shouldn't use `.SelectMany` concurrency at all with Rx? Because the async calls were the reason I did. What would the extensions be for `Observable.Timer(x)` and `Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromMinutes(29))`. Here are two examples of mine: https://pastebin.com/CPxERqf7 – nop Feb 15 '21 at 06:48
  • I guess `Observable.Timer(x)` could be just the same with `.Take(1)`, but what about `Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromMinutes(29)).` – nop Feb 15 '21 at 07:43
  • Good stuff. It should be noted though that the `TimeSpan period` parameter of the `IntervalAsync` API is semantically a delay and not a period. It configures the delay that will be imposed between completing one operation and starting the next. The actual period between starting consecutive operations will include the duration of each operation, and in the general case it will not be stable. Could you use the same mechanism (the `Scheduler.ScheduleAsync` method) and create an API for periodic execution that is actually periodic? (and still enforces the no-overlapping execution policy?) – Theodor Zoulias Feb 15 '21 at 08:29
  • @TheodorZoulias - Yes, you could use `Func> handler` and pass thru the `dueTime` as the state object, and then just recursively call the `ScheduleAsync` with `DateTimeOffset dueTime2 = ((DateTimeOffset)state).Add(period);` as both the `state` and the `dueTime`. It would then be mathematically computing the time based on the previous time. It could easily become overrun and may need logic to to drop off any "missed" times, i.e. skip while the next time is in the past. – Enigmativity Feb 16 '21 at 03:09
  • I remember that some time ago I tried to do something similar, something like starting a "hot scheduled operation" that I could `await` later, without success. I don't remember the details TBH. In your case you could also measure the duration of the `await functionAsync()` with a `Stopwatch`, and subtract this duration when you call the next `ScheduleAsync`. (but this would defeat the purpose of doing everything the Rx-way, in order to use test schedulers etc) – Theodor Zoulias Feb 16 '21 at 04:18
  • @Enigmativity, what about `.SelectMany` with the other examples that I gave above, so I can accept the answer – nop Feb 17 '21 at 20:24
  • @nop - Can you be more clear as to what examples you mean? Keep in mind that `SelectMany` introduces concurrency and that will prevent you placing a set interval between values. – Enigmativity Feb 17 '21 at 21:51
  • @Enigmativity, these examples here: https://pastebin.com/CPxERqf7. I could use your extension + `.Take(1)` for the first example, but for the second I can't. How could I do `Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromMinutes(29)` – nop Feb 17 '21 at 22:44
  • 1
    @nop - I gave overloads for both examples. What problem are you facing using the overloads? – Enigmativity Feb 17 '21 at 22:50
  • @nop - Specifically it would be `ObservableEx.TimerAsync(TimeSpan.FromSeconds(1), TimeSpan.FromMinutes(29), KeepAliveListenKeyAsync)`. – Enigmativity Feb 17 '21 at 22:55
  • @Enigmativity, ohh, sorry, I was being retarded. One last question: should I call it with the await, like here: https://pastebin.com/5uLrd0Le or simply without it? – nop Feb 17 '21 at 23:04
  • @Enigmativity, Basically, the `Func` to be `async () => { await ... }` because I have two async calls. – nop Feb 17 '21 at 23:42
  • @nop - No, just return the task - don't await it. – Enigmativity Feb 17 '21 at 23:52
  • @nop - Here's a simplified example: `ObservableEx.IntervalStartAsync(TimeSpan.FromSeconds(2.0), () => { if (sellFlag) { return ExecuteSellAsync(); } return Task.FromResult(Unit.Default); }, Scheduler.Default)`. – Enigmativity Feb 17 '21 at 23:55
  • @Enigmativity, yes, that one had only one awaited call but this one: https://pastebin.com/x84Z6ZLa has two awaited calls and I can't return on the first call, because it wont reach the second. That's what I meant – nop Feb 18 '21 at 00:09
  • @nop - then `ContinueWith` is your friend - again a simplified example: `ObservableEx.IntervalStartAsync(TimeSpan.FromSeconds(2.0), () => { return GetOrderAsync().ContinueWith(t => { if (t.Result.Success) { return CancelOrderAsync(); } return new System.Threading.Tasks.Task(() => { }); }); }, Scheduler.Default)` – Enigmativity Feb 18 '21 at 01:07
  • @Enigmativity, thanks again! How do I also print the occurred exceptions and retry the thing after the exception gets printed out? – nop Feb 20 '21 at 14:12
  • @Enigmativity, basically I want to keep the timer running 24/7, even if an exception pops up at some point. And of course to log what the exception message was. – nop Feb 20 '21 at 14:31
  • 1
    @nop - Exception handling is a bit of a broad issue. It depends where the exceptions occur as to how to handle them. Here's a basic exception handler. https://dotnetfiddle.net/Ui6s3k – Enigmativity Feb 21 '21 at 01:04
  • @Enigmativity, thanks. The exception message somehow doesn't appear to me. I did something wrong: https://pastebin.com/r5w7d4bz – nop Feb 21 '21 at 02:46
  • @nop - I can't tell from that code. Again you need a [mcve] that I can play with. – Enigmativity Feb 21 '21 at 02:49
  • @Enigmativity, full code here: https://pastebin.com/jHdU38J2. Basically, I want it to print the exception message and retry the procedure/anonymous method. As a result, it should print: `Time triggered. Test message. Time triggered. Test message.` and so on. – nop Feb 21 '21 at 03:06
  • @nop - all of my `TimerAsync` methods weren't handling errors. I'm fixing that. – Enigmativity Feb 21 '21 at 05:33
  • @Enigmativity, sure let me know when you do – nop Feb 21 '21 at 09:48
  • @nop - and here's my complete set of overloads. https://pastebin.com/ztjjmQiy – Enigmativity Feb 21 '21 at 10:31
  • @Enigmativity, thanks a lot mate! It works flawlessly! What's the reason it works fine as `IObservable safe = null; safe = ...` and not as `IObservable safe = query.Catch...` tho? – nop Feb 21 '21 at 10:45
  • @nop - Yes, it has to avoid the compiler warning of using a non-initialized variable. Are you from Oz? – Enigmativity Feb 21 '21 at 12:14
  • @Enigmativity, hmm, on the test project it didn't let me compile it, but it did compile on mine and it didn't work properly (by properly I mean, it printed `Time triggered. Test message. Time triggered.` and that was it). Seems like it's a bug in the project or VS. Anyway, thanks a lot! :) – nop Feb 21 '21 at 12:39
  • @nop you from Australia? – Enigmativity Feb 21 '21 at 21:25
  • @Enigmativity, nope. I'm from Bulgaria. – nop Feb 21 '21 at 21:45
  • @nop - No worries - you were using the very Aussie phrase "mate" a couple of times. It made me wonder. If I'm ever in Bulgaria I'll look you up. I hear it's beautiful. – Enigmativity Feb 21 '21 at 21:48
  • @Enigmativity, awesome, mate! :) It is! Could you send your contacts? – nop Feb 21 '21 at 21:49
  • @nop - My name is James. Try `$"{firstname}@{stackoverflowusername}.com"`. – Enigmativity Feb 21 '21 at 21:51