Теперь TCP клиент всё время пытается переподключиться к серверу

This commit is contained in:
Пытков Роман
2025-09-20 00:25:57 +03:00
parent 956ab61660
commit 632b618722

View File

@@ -45,114 +45,114 @@ public class TcpClientWrapper : IClient
private async Task RunAsync(CancellationToken token) private async Task RunAsync(CancellationToken token)
{ {
const int maxRetries = 5;
const int retryDelayMs = 1000; const int retryDelayMs = 1000;
for (int attempt = 1; attempt <= maxRetries; attempt++) while (!token.IsCancellationRequested)
{ {
try try
{ {
_tcpClient = new TcpClient(); _tcpClient = new TcpClient();
await _tcpClient.ConnectAsync(_host, _port, token); await _tcpClient.ConnectAsync(_host, _port, token);
break; // Connection successful Console.WriteLine("Connected to server.");
using (var stream = _tcpClient.GetStream())
{
long index = 0;
var sw = Stopwatch.StartNew();
var lastMs = 0L;
var lastIndex = 0L;
var ms = 1000;
while (!token.IsCancellationRequested && _tcpClient.Connected)
{
try
{
// Читаем magic header полностью
var magicBytes = new byte[4];
int totalRead = 0;
while (totalRead < 4)
{
var read = await stream.ReadAsync(magicBytes, totalRead, 4 - totalRead, token);
if (read == 0) break; // соединение закрыто
totalRead += read;
}
if (totalRead < 4) break;
var magic = BitConverter.ToUInt32(magicBytes, 0);
if (magic != MagicHeader)
{
Console.WriteLine($"Invalid magic header: {magic:X8}, expected {MagicHeader:X8}. Skipping packet.");
continue; // or break, depending on policy
}
// Читаем длину полностью
var lengthBytes = new byte[4];
totalRead = 0;
while (totalRead < 4)
{
var read = await stream.ReadAsync(lengthBytes, totalRead, 4 - totalRead, token);
if (read == 0) break; // соединение закрыто
totalRead += read;
}
if (totalRead < 4) break;
var length = BitConverter.ToInt32(lengthBytes, 0);
// Читаем данные полностью
var dataBytes = new byte[length];
totalRead = 0;
while (totalRead < length)
{
var read = await stream.ReadAsync(dataBytes, totalRead, length - totalRead, token);
if (read == 0) break;
totalRead += read;
}
if (totalRead < length) break;
if (_responseConverter != null)
{
var data = await _responseConverter(dataBytes);
if (data != null)
{
var diff = sw.ElapsedMilliseconds - lastMs;
if (diff >= ms)
{
var fetched = index - lastIndex;
System.Console.WriteLine($"Fetched {fetched} data packages in {diff} ms.");
lastIndex = index;
lastMs = sw.ElapsedMilliseconds;
}
//Console.WriteLine(data);
_callback?.Invoke(data);
}
}
else
{
Console.WriteLine("Response converter not set.");
}
index++;
}
catch (TaskCanceledException)
{
break;
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
break; // выход из внутреннего цикла для переподключения
}
}
}
Console.WriteLine("Disconnected from server. Attempting to reconnect...");
} }
catch (Exception ex) when (attempt < maxRetries) catch (TaskCanceledException)
{ {
Console.WriteLine($"Connection attempt {attempt} failed: {ex.Message}. Retrying in {retryDelayMs}ms..."); break;
await Task.Delay(retryDelayMs, token);
} }
catch (Exception ex) catch (Exception ex)
{ {
Console.WriteLine($"Failed to connect after {maxRetries} attempts: {ex.Message}"); Console.WriteLine($"Connection failed: {ex.Message}. Retrying in {retryDelayMs}ms...");
throw; await Task.Delay(retryDelayMs, token);
} }
} }
System.Console.WriteLine("Client stopped.");
using (var stream = _tcpClient!.GetStream())
{
long index = 0;
var sw = Stopwatch.StartNew();
var lastMs = 0L;
var lastIndex = 0L;
var ms = 1000;
while (!token.IsCancellationRequested && _tcpClient.Connected)
{
try
{
// Читаем magic header полностью
var magicBytes = new byte[4];
int totalRead = 0;
while (totalRead < 4)
{
var read = await stream.ReadAsync(magicBytes, totalRead, 4 - totalRead, token);
if (read == 0) break; // соединение закрыто
totalRead += read;
}
if (totalRead < 4) break;
var magic = BitConverter.ToUInt32(magicBytes, 0);
if (magic != MagicHeader)
{
Console.WriteLine($"Invalid magic header: {magic:X8}, expected {MagicHeader:X8}. Skipping packet.");
continue; // or break, depending on policy
}
// Читаем длину полностью
var lengthBytes = new byte[4];
totalRead = 0;
while (totalRead < 4)
{
var read = await stream.ReadAsync(lengthBytes, totalRead, 4 - totalRead, token);
if (read == 0) break; // соединение закрыто
totalRead += read;
}
if (totalRead < 4) break;
var length = BitConverter.ToInt32(lengthBytes, 0);
// Читаем данные полностью
var dataBytes = new byte[length];
totalRead = 0;
while (totalRead < length)
{
var read = await stream.ReadAsync(dataBytes, totalRead, length - totalRead, token);
if (read == 0) break;
totalRead += read;
}
if (totalRead < length) break;
if (_responseConverter != null)
{
var data = await _responseConverter(dataBytes);
if (data != null)
{
var diff = sw.ElapsedMilliseconds - lastMs;
if (diff >= ms)
{
var fetched = index - lastIndex;
System.Console.WriteLine($"Fetched {fetched} data packages in {diff} ms.");
lastIndex = index;
lastMs = sw.ElapsedMilliseconds;
}
//Console.WriteLine(data);
_callback?.Invoke(data);
}
}
else
{
Console.WriteLine("Response converter not set.");
}
index++;
}
catch (TaskCanceledException)
{
break;
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
}
}
}
System.Console.WriteLine("End ");
} }
public override string ToString() public override string ToString()