0

I am trying to transfer Json between client and server through a constantly open socket. And I ran into several problems:

In the beginning, I encountered such a problem that when transferring several data, the last Json was not processed and was expecting further data. When after that I transferred more data, the unread Json was processed as the following ones.

2019/05/06 12:34:44.495|INFO|Test.Server.Tcp.Program|98 [...json dump...]
2019/05/06 12:34:44.603|INFO|Test.Server.Tcp.Program|99 [...json dump...]
2019/05/06 12:34:54.787|INFO|Test.Server.Tcp.Program|100 [...json dump...]
2019/05/06 12:34:54.823|INFO|Test.Server.Tcp.Program|101 [...json dump...]

In this case, first I sent Json 100 times, and after 10 seconds another 5 times. The logs show that 99 were processed first, and after 10 seconds, the rest.

As I understand it, this is somehow due to the fact that the Read () method from the open thread hangs up the thread until the following data is received. I tried to use the ReceiveTimeout property while ignoring the SocketError.TimedOut error. As a result, when sending Json 100 times, on the last one after waiting for a timeout, it throws a deserialization error.

2019/05/06 12:58:38.743|INFO|Test.Server.Tcp.Program|98 [...json dump...]
2019/05/06 12:58:38.743|INFO|Test.Server.Tcp.Program|99 [...json dump...]
2019/05/06 12:58:39.528|FATAL|Test.Server.Tcp.Program|Newtonsoft.Json.JsonSerializationException: Unexpected token while deserializing object: PropertyName. Path 'Data.UserId', line 898, position 13.
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateValueInternal(JsonReader reader, Type objectType, JsonContract contract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerMember, Object existingValue)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.Deserialize(JsonReader reader, Type objectType, Boolean checkAdditionalContent)
   at Newtonsoft.Json.JsonSerializer.DeserializeInternal(JsonReader reader, Type objectType)
   at Newtonsoft.Json.JsonSerializer.Deserialize[T](JsonReader reader)
   at TcpJsonCarrier`1.<ReceiveRun>b__13_0() in D:\Projects\...path...\TcpJsonCarrier.cs:line 71

Class to work with stream:

public class TcpJsonCarrier<T> : IDisposable {
        private readonly ILogger _logger;
        private readonly CancellationToken _cancellationToken;

        private readonly ConcurrentQueue<T> _incomingData;
        private readonly ConcurrentQueue<T> _outgoingData;

        private readonly StreamReader _streamReader;
        private readonly StreamWriter _streamWriter;

        private readonly JsonTextReader _jsonTextReader;
        private readonly JsonTextWriter _jsonTextWriter;

        private readonly JsonSerializer _jsonSerializer;


        public TcpJsonCarrier(ILogger logger, TcpClient tcpClient) :
            this(logger, tcpClient, CancellationToken.None) { }

        public TcpJsonCarrier(ILogger logger, TcpClient tcpClient, CancellationToken cancellationToken) {
            _logger = logger;
//            tcpClient.ReceiveTimeout = 1000;
            _cancellationToken = cancellationToken;

            _incomingData = new ConcurrentQueue<T>();
            _outgoingData = new ConcurrentQueue<T>();

            _streamReader = new StreamReader(tcpClient.GetStream(), Encoding.UTF8, true, 1024, true);
            _streamWriter = new StreamWriter(tcpClient.GetStream(), Encoding.UTF8, 1024, true) {AutoFlush = true};

            _jsonTextReader = new JsonTextReader(_streamReader) {SupportMultipleContent = true};
            _jsonTextWriter = new JsonTextWriter(_streamWriter);

            _jsonSerializer = new JsonSerializer {
                Formatting = Formatting.Indented,
                TypeNameHandling = TypeNameHandling.Auto
            };

            ReceiveRun();
            SendRun();
        }

        public IEnumerable<T> Read() {
            while (_incomingData.TryDequeue(out var data)) {
                yield return data;
            }
        }

        public void Write(T data) {
            _outgoingData.Enqueue(data);
        }

        private Task ReceiveRun() {
            return Task.Factory.StartNew(async () => {
                while (true) {
                    await Task.Delay(1, _cancellationToken);

                    try {
                        while (_jsonTextReader.Read())
                            _incomingData.Enqueue(_jsonSerializer.Deserialize<T>(_jsonTextReader));
                    }
                    catch (IOException ioe) when (ioe.InnerException is SocketException se &&
                                                  se.SocketErrorCode == SocketError.TimedOut) { }
                    catch (Exception e) {
                        _logger.LogCritical(e.ToString());
                        throw;
                    }
                }
            }, TaskCreationOptions.LongRunning);
        }

        private Task SendRun() {
            return Task.Factory.StartNew(async () => {
                while (true) {
                    try {
                        await Task.Delay(1, _cancellationToken);

                        _cancellationToken.ThrowIfCancellationRequested();

                        while (_outgoingData.TryDequeue(out var data)) {
                            _jsonSerializer.Serialize(_jsonTextWriter, data);
                        }
                    }
                    catch (Exception e) {
                        _logger.LogCritical(e.ToString());
                        throw;
                    }
                }
            }, TaskCreationOptions.LongRunning);
        }

        public void Dispose() {
            _streamReader?.Dispose();
            _streamWriter?.Dispose();
            ((IDisposable) _jsonTextReader)?.Dispose();
            ((IDisposable) _jsonTextWriter)?.Dispose();
        }
    }

Server:

class Program {
        static void Main(string[] args) {
            var loggerFactory = new LoggerFactory().AddNLog();
            var logger = loggerFactory.CreateLogger<Program>();

            var tcpListener = new TcpListener(IPAddress.Any, 9100);
            tcpListener.Start();

            var sem = new SemaphoreSlim(1, 1);
            while (true) {
                sem.Wait();

                tcpListener
                    .AcceptTcpClientAsync()
                    .ContinueWith(async task => {
                        var tcpClient = await task;

                        logger.LogInformation("Client accepted");

                        sem.Release();

                        var tcpJsonCarrier = new TcpJsonCarrier<Message>(logger, tcpClient);

                        var count = 0;
                        while (true) {
                            await Task.Delay(1);

                            foreach (var message in tcpJsonCarrier.Read()) {
                                logger.LogInformation($"{++count} {message}");
                            }
                        }
                    }, TaskContinuationOptions.LongRunning);
            }
        }
    }

Client:

class Program {
        static void Main(string[] args) {
            var loggerFactory = new LoggerFactory().AddNLog();
            var logger = loggerFactory.CreateLogger<Program>();

            var client = new TcpClient();
            client
                .ConnectAsync("...remoteIp...", 9100)
                .ContinueWith(async task => {
                    await task;

                    logger.LogInformation("Connected");

                    var tcpJsonCarrier = new TcpJsonCarrier<Message>(logger, client);

                    for (int i = 0; i < 100; i++) {
                        await Task.Delay(100);

                        tcpJsonCarrier.Write(new Message(...messageData...));
                    }

                    await Task.Delay(10000);

                    for (int i = 0; i < 5; i++) {
                        await Task.Delay(100);

                        tcpJsonCarrier.Write(new Message(...messageData...));
                    }
                }, TaskContinuationOptions.LongRunning);

            while (true) { }
        }
    }

At the moment, I'm at a dead end.

  • If the individual JSON strings aren't huge, you could use message framing to send and receive each JSON string. Then deserialize each JSON string after it has been fully received. See: [Broken TCP messages](https://stackoverflow.com/q/7257139), https://blog.stephencleary.com/2009/04/message-framing.html and https://blogs.msdn.microsoft.com/joncole/2006/03/20/simple-message-framing-sample-for-tcp-socket/ – dbc May 06 '19 at 17:28
  • Yes, I imagine that such a problem exists. It is just not clear where the first problem I described appears from. It appears with any number of messages. It is the last message that is processed, until new data arrives. If we assume that the recipient received the message only partially and for a long time does not receive the rest, I assumed that this was due to the fact that the sender did not send the rest. As I understand it, this is because the sender did not execute Flush. But AutoFlush is on, and the Flush call manually does not help. – Антон Лебедев May 07 '19 at 10:06

0 Answers0