2

tl;dr: "I would like to read an SslStream continuously in C# but I can't figure out the best way."

Some background: I'm getting stock data from a stream that I would like to present in a web interface. The stream sends a heartbeat every 5 seconds and you can also subscribe/unsubscribe to stock prices and news etc.

https://api.test.nordnet.se/next/2/api-docs/docs/feeds

Currently I'm using SslTcpClient from MSDN example to read and write to the stream and it works OK.

https://msdn.microsoft.com/en-us/library/system.net.security.sslstream.aspx

My problem is that the stream continuously sends data and I'm not really sure how to consume this and present it in the best way.

My solution now is to make a ajax call every five seconds to "clear" the stream and get the new data. This does not feel like a robust solution and I'm quite often getting the error "The Read method cannot be called when another read operation is pending.".

Update:

@phuzi: Difference below between ajax and SignalR call. Unfortunately this does not help me to read the stream continuously since I still need to call the method every five seconds. A solution might be Hangfire in combination with SignalR although I do not know how to best implement this.

$.connection.hub.start().done(function () {
    setInterval(function () {
        stockHub.server.getHeartBeat();
    }, 5000);
});

setInterval(function () {
    $.ajax({
        url: '@Url.Action("getHeartBeat")',
        type: 'POST',
        data: {
        },
        success: function (message) {
            console.log(message);
        },
        error: function () {
        }
    );
}, 5000);

@usr: Here is some example data:

I subscribe to stock depth at a specific market using this string

"{"cmd":"subscribe", "args":{"t":"depth", "i":"5110", "m":11}}\n"

Then I read the feed and add a expected value to my read method that the response should contain to be the string I'm looking for, for example:

response.Contains("{"type":"depth","data":{"i":"5110","m":11")

And then I get a string that looks something like this in return:

"{"type":"depth","data":{"i":"5110","m":11,"tick_timestamp":1439894747189,"bid1":1.02,"bid_volume1":734827,"bid_orders1":30,"ask1":1.03,"ask_volume1":393598,"ask_orders1":8,"bid2":1.01,"bid_volume2":805705,"bid_orders2":35,"ask2":1.04,"ask_volume2":404815,"ask_orders2":15,"bid3":1.00,"bid_volume3":1387177,"bid_orders3":62,"ask3":1.05,"ask_volume3":601579,"ask_orders3":29,"bid4":0.995,"bid_volume4":123610,"bid_orders4":9,"ask4":1.06,"ask_volume4":313060,"ask_orders4":15,"bid5":0.990,"bid_volume5":386543,"bid_orders5":31,"ask5":1.07,"ask_volume5":741100,"ask_orders5":11}}"

Solved the error "The Read method cannot be called when another read operation is pending." using a lock.

private static readonly object _readBytesLock = new object();

private static volatile bool _readingBytes = false;

public static string ReadMessage(SslStream sslStream, string expectedValue = "heartbeat")
{
    // Read the  message sent by the server. 
    // The end of the message is signaled using the 
    // "<EOF>" marker.
    string message;
    byte[] buffer = new byte[2048];
    StringBuilder messageData = new StringBuilder();
    int bytes = -1;
    do
    {
        //bytes = sslStream.Read(buffer, 0, buffer.Length);
        bytes = GetBytes(sslStream, buffer);
        // Use Decoder class to convert from bytes to UTF8 
        // in case a character spans two buffers.
        Decoder decoder = Encoding.UTF8.GetDecoder();
        char[] chars = new char[decoder.GetCharCount(buffer, 0, bytes)];
        decoder.GetChars(buffer, 0, bytes, chars, 0);
        message = messageData.Append(chars).ToString();
        Debug.WriteLine(messageData);
        if (message.Contains(expectedValue))
        {
            return message;
        }
        if (Regex.Matches(messageData.ToString(), "heartbeat").Count >= 3)
        {
            Debug.WriteLine("Something went wrong, 3 heartbeats received and nothing with the expected value: " + expectedValue);
            return message;
        }
    } while (bytes != 0);

    return message;
}

public static int GetBytes(SslStream sslStream, byte[] buffer)
{
    try
    {
        if (buffer == null)
            return -1;
        var bytes = -1;
        lock (_readBytesLock)
        {
            if (!_readingBytes)
            {
                _readingBytes = true;
                bytes = sslStream.Read(buffer, 0, buffer.Length);
            }
            _readingBytes = false;
            return bytes;
        }
    }
    catch (Exception ex)
    {
        var methodName = System.Reflection.MethodBase.GetCurrentMethod().Name;
        Debug.WriteLine("Method " + methodName + " failed " + ex.Message);
        return 0;
        //throw;
    }
}
Ogglas
  • 62,132
  • 37
  • 328
  • 418
  • Have you looked at signalR? – phuzi Aug 14 '15 at 14:09
  • Can you make an example for the format that the server uses to send data? – usr Aug 14 '15 at 14:10
  • It is not clear what your architechure is. It sounds like you are trying to fake a scheduler in an `ASP.Net` service by having a client continuously pinging the server. Or are you trying to get a Push based subscription rather than a pull based poll? – Aron Aug 14 '15 at 19:01
  • @phuzi Yes I have looked at SignalR but the problem still remains that the feed needs to be read and written to continuously. Added some example code if I misunderstood you. – Ogglas Aug 18 '15 at 12:00
  • @usr Added example to original post. – Ogglas Aug 18 '15 at 12:01
  • @Aron: Ideally I would like a push based service, which I think can be achieved with SignalR but my real problem is to read and write to the feed continuously. Like I said to phuzi I have thought about Hangfire but I'm not sure how to implement this. – Ogglas Aug 18 '15 at 12:01
  • Is the format line-based? Are you sending whole lines and receiving whole lines? – usr Aug 18 '15 at 12:01
  • @Ogglas What was the original reason for the heartbeat? Is it still relevant with signalR? – phuzi Aug 18 '15 at 12:04
  • @usr Correct, the end of the message is signaled using the "" marker, "\n". – Ogglas Aug 18 '15 at 12:04
  • @phuzi The heartbeats are sent so that the client know that the connection is still valid. However I do not control the server so I cannot choose to disable heartbeats, I need to consume these as well. – Ogglas Aug 18 '15 at 12:08
  • Post the C# code that is failing together with the exception ToString output. – usr Aug 18 '15 at 12:12
  • @usr Solved the code that generate a error myself. However the application stops working after a while anyway. Either the stream stacks up a lot of heartbeats that have to be read before I can get any financial data or if I read the heartbeats to fast the system becomes very slow to work with. Basically I only get exactly the data I want when the system is opening the stream for the first time as it is now. – Ogglas Aug 18 '15 at 12:30
  • How often is ReadMessage being called? You need to ensure that at any given time there is a read call outstanding so that all incoming data is drained. I think you should be using a `StreamReader` and call `ReadLine` in a loop. That gets rid of 90% of your code. Process each line immediately.; Why are you not processing the data line by line? You said that this is a line-based protocol. You could be receiving lots of unrelated data in one `message`. – usr Aug 18 '15 at 12:43
  • @usr Switched to StreamReader and StreamWriter and it works really well. Thank you for that! Do you have any pointers to what I could use to call ReadMessage in a loop and at the same time process the messages? I was thinking about System.Threading.Timer but I would like something that automatically starts over when the ReadMessage method has been processed. – Ogglas Aug 21 '15 at 08:50
  • 1
    Why a timer? Run an loop that reads lines until you get null. Always have that loop running. `while (true) ReadLine(); ProcessLine();` – usr Aug 21 '15 at 09:42
  • @usr This is of course the best solution. I have implemented this and it works like a charm! Thank you very much! – Ogglas Sep 28 '15 at 10:30

1 Answers1

0

Final code that works really well. A big thanks to @usr and everyone else helping.

private Task _feedTask;
private SslStream _sslStream;
private StreamReader _streamReader;
private static readonly log4net.ILog _logger
        = log4net.LogManager.GetLogger(
                System.Reflection.MethodBase.GetCurrentMethod()
                 .DeclaringType);

public void CreateNewTaskAndStartReadingFeed()
{
    _feedTask = new Task(() => StartReadingFeed());
    _feedTask.Start();
}

public void StartReadingFeed()
{
   try
   {
       while (true)
       {
           var message = GetStreamMessage();

           if (message != null)
           {
               Debug.WriteLine("StartReadingFeed message: " + message);
               OnReceivedSomething(message);
           }
           else
           {
               Debug.WriteLine("StartReadingFeed message was null");
           }
       }
   }
   catch (Exception ex)
   {
       var methodName = System.Reflection.MethodBase.GetCurrentMethod().Name;
       Debug.WriteLine($"Method {methodName} failed " + ex.Message);
       Debug.WriteLine("Creating a new task and starts to reed feed again");
       _logger.Error("StartReadingFeed failed", ex);
       CreateNewTaskAndStartReadingFeed();
   }
}

public string GetStreamMessage()
{
    try
    {
        return _sslStream.CanRead ? _streamReader.ReadLine() : null;
    }
    catch (Exception ex)
    {
        var methodName = System.Reflection.MethodBase.GetCurrentMethod().Name;
        Debug.WriteLine($"Method {methodName} failed " + ex.Message);
        _logger.Error($"{methodName} failed.", ex);
        return null;
    }
}
Ogglas
  • 62,132
  • 37
  • 328
  • 418