I am trying to transfer Json between client and server through a constantly open socket. And I ran into several problems:
In the beginning, I encountered such a problem that when transferring several data, the last Json was not processed and was expecting further data. When after that I transferred more data, the unread Json was processed as the following ones.
2019/05/06 12:34:44.495|INFO|Test.Server.Tcp.Program|98 [...json dump...]
2019/05/06 12:34:44.603|INFO|Test.Server.Tcp.Program|99 [...json dump...]
2019/05/06 12:34:54.787|INFO|Test.Server.Tcp.Program|100 [...json dump...]
2019/05/06 12:34:54.823|INFO|Test.Server.Tcp.Program|101 [...json dump...]
In this case, first I sent Json 100 times, and after 10 seconds another 5 times. The logs show that 99 were processed first, and after 10 seconds, the rest.
As I understand it, this is somehow due to the fact that the Read () method from the open thread hangs up the thread until the following data is received. I tried to use the ReceiveTimeout property while ignoring the SocketError.TimedOut error. As a result, when sending Json 100 times, on the last one after waiting for a timeout, it throws a deserialization error.
2019/05/06 12:58:38.743|INFO|Test.Server.Tcp.Program|98 [...json dump...]
2019/05/06 12:58:38.743|INFO|Test.Server.Tcp.Program|99 [...json dump...]
2019/05/06 12:58:39.528|FATAL|Test.Server.Tcp.Program|Newtonsoft.Json.JsonSerializationException: Unexpected token while deserializing object: PropertyName. Path 'Data.UserId', line 898, position 13.
at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateValueInternal(JsonReader reader, Type objectType, JsonContract contract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerMember, Object existingValue)
at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.Deserialize(JsonReader reader, Type objectType, Boolean checkAdditionalContent)
at Newtonsoft.Json.JsonSerializer.DeserializeInternal(JsonReader reader, Type objectType)
at Newtonsoft.Json.JsonSerializer.Deserialize[T](JsonReader reader)
at TcpJsonCarrier`1.<ReceiveRun>b__13_0() in D:\Projects\...path...\TcpJsonCarrier.cs:line 71
Class to work with stream:
public class TcpJsonCarrier<T> : IDisposable {
private readonly ILogger _logger;
private readonly CancellationToken _cancellationToken;
private readonly ConcurrentQueue<T> _incomingData;
private readonly ConcurrentQueue<T> _outgoingData;
private readonly StreamReader _streamReader;
private readonly StreamWriter _streamWriter;
private readonly JsonTextReader _jsonTextReader;
private readonly JsonTextWriter _jsonTextWriter;
private readonly JsonSerializer _jsonSerializer;
public TcpJsonCarrier(ILogger logger, TcpClient tcpClient) :
this(logger, tcpClient, CancellationToken.None) { }
public TcpJsonCarrier(ILogger logger, TcpClient tcpClient, CancellationToken cancellationToken) {
_logger = logger;
// tcpClient.ReceiveTimeout = 1000;
_cancellationToken = cancellationToken;
_incomingData = new ConcurrentQueue<T>();
_outgoingData = new ConcurrentQueue<T>();
_streamReader = new StreamReader(tcpClient.GetStream(), Encoding.UTF8, true, 1024, true);
_streamWriter = new StreamWriter(tcpClient.GetStream(), Encoding.UTF8, 1024, true) {AutoFlush = true};
_jsonTextReader = new JsonTextReader(_streamReader) {SupportMultipleContent = true};
_jsonTextWriter = new JsonTextWriter(_streamWriter);
_jsonSerializer = new JsonSerializer {
Formatting = Formatting.Indented,
TypeNameHandling = TypeNameHandling.Auto
};
ReceiveRun();
SendRun();
}
public IEnumerable<T> Read() {
while (_incomingData.TryDequeue(out var data)) {
yield return data;
}
}
public void Write(T data) {
_outgoingData.Enqueue(data);
}
private Task ReceiveRun() {
return Task.Factory.StartNew(async () => {
while (true) {
await Task.Delay(1, _cancellationToken);
try {
while (_jsonTextReader.Read())
_incomingData.Enqueue(_jsonSerializer.Deserialize<T>(_jsonTextReader));
}
catch (IOException ioe) when (ioe.InnerException is SocketException se &&
se.SocketErrorCode == SocketError.TimedOut) { }
catch (Exception e) {
_logger.LogCritical(e.ToString());
throw;
}
}
}, TaskCreationOptions.LongRunning);
}
private Task SendRun() {
return Task.Factory.StartNew(async () => {
while (true) {
try {
await Task.Delay(1, _cancellationToken);
_cancellationToken.ThrowIfCancellationRequested();
while (_outgoingData.TryDequeue(out var data)) {
_jsonSerializer.Serialize(_jsonTextWriter, data);
}
}
catch (Exception e) {
_logger.LogCritical(e.ToString());
throw;
}
}
}, TaskCreationOptions.LongRunning);
}
public void Dispose() {
_streamReader?.Dispose();
_streamWriter?.Dispose();
((IDisposable) _jsonTextReader)?.Dispose();
((IDisposable) _jsonTextWriter)?.Dispose();
}
}
Server:
class Program {
static void Main(string[] args) {
var loggerFactory = new LoggerFactory().AddNLog();
var logger = loggerFactory.CreateLogger<Program>();
var tcpListener = new TcpListener(IPAddress.Any, 9100);
tcpListener.Start();
var sem = new SemaphoreSlim(1, 1);
while (true) {
sem.Wait();
tcpListener
.AcceptTcpClientAsync()
.ContinueWith(async task => {
var tcpClient = await task;
logger.LogInformation("Client accepted");
sem.Release();
var tcpJsonCarrier = new TcpJsonCarrier<Message>(logger, tcpClient);
var count = 0;
while (true) {
await Task.Delay(1);
foreach (var message in tcpJsonCarrier.Read()) {
logger.LogInformation($"{++count} {message}");
}
}
}, TaskContinuationOptions.LongRunning);
}
}
}
Client:
class Program {
static void Main(string[] args) {
var loggerFactory = new LoggerFactory().AddNLog();
var logger = loggerFactory.CreateLogger<Program>();
var client = new TcpClient();
client
.ConnectAsync("...remoteIp...", 9100)
.ContinueWith(async task => {
await task;
logger.LogInformation("Connected");
var tcpJsonCarrier = new TcpJsonCarrier<Message>(logger, client);
for (int i = 0; i < 100; i++) {
await Task.Delay(100);
tcpJsonCarrier.Write(new Message(...messageData...));
}
await Task.Delay(10000);
for (int i = 0; i < 5; i++) {
await Task.Delay(100);
tcpJsonCarrier.Write(new Message(...messageData...));
}
}, TaskContinuationOptions.LongRunning);
while (true) { }
}
}
At the moment, I'm at a dead end.