Добавлен транспорт TCP
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
using System;
|
||||
using System.Net.Http;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
@@ -16,7 +17,12 @@ if (args.Length < 2)
|
||||
|
||||
var protocol = args[0];
|
||||
var serialization = args[1];
|
||||
IClient? client = protocol == "http" ? new HttpClientWrapper(serialization == "json" ? ConvertJsonResponse : ConvertMessagePackResponse) : null;
|
||||
IClient? client = protocol switch
|
||||
{
|
||||
"http" => new HttpClientWrapper(serialization == "json" ? ConvertJsonResponse : ConvertMessagePackResponse),
|
||||
"tcp" => new TcpClientWrapper(serialization == "json" ? ConvertBytesJson : ConvertBytesMessagePack),
|
||||
_ => null
|
||||
};
|
||||
|
||||
client?.Start();
|
||||
System.Console.WriteLine("Client started:");
|
||||
@@ -51,3 +57,16 @@ static async Task<Data?> ConvertMessagePackResponse(HttpResponseMessage response
|
||||
var msgPackData = MessagePackSerializer.Deserialize<MessagePackData>(bytes);
|
||||
return msgPackData?.ToData();
|
||||
}
|
||||
|
||||
static Task<Data?> ConvertBytesJson(byte[] bytes)
|
||||
{
|
||||
var json = Encoding.UTF8.GetString(bytes);
|
||||
var jsonData = JsonSerializer.Deserialize<JsonData>(json);
|
||||
return Task.FromResult(jsonData?.ToData());
|
||||
}
|
||||
|
||||
static Task<Data?> ConvertBytesMessagePack(byte[] bytes)
|
||||
{
|
||||
var msgPackData = MessagePackSerializer.Deserialize<MessagePackData>(bytes);
|
||||
return Task.FromResult(msgPackData?.ToData());
|
||||
}
|
||||
|
||||
115
Client/TcpClient.cs
Normal file
115
Client/TcpClient.cs
Normal file
@@ -0,0 +1,115 @@
|
||||
using System;
|
||||
using System.Diagnostics;
|
||||
using System.Net.Sockets;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Domain;
|
||||
|
||||
namespace Client;
|
||||
|
||||
public class TcpClientWrapper : IClient
|
||||
{
|
||||
private readonly string _host;
|
||||
private readonly int _port;
|
||||
private CancellationTokenSource _cts = new CancellationTokenSource();
|
||||
private Task? _runningTask;
|
||||
private Func<byte[], Task<Data?>>? _responseConverter;
|
||||
private TcpClient? _tcpClient;
|
||||
|
||||
public TcpClientWrapper(Func<byte[], Task<Data?>> responseConverter, string host = "192.168.1.117", int port = 5555)
|
||||
{
|
||||
_host = host;
|
||||
_port = port;
|
||||
_responseConverter = responseConverter;
|
||||
}
|
||||
|
||||
public void Start()
|
||||
{
|
||||
_cts = new CancellationTokenSource();
|
||||
_runningTask = Task.Run(() => RunAsync(_cts.Token));
|
||||
}
|
||||
|
||||
public void Stop()
|
||||
{
|
||||
_cts.Cancel();
|
||||
_runningTask?.Wait();
|
||||
_tcpClient?.Close();
|
||||
}
|
||||
|
||||
private async Task RunAsync(CancellationToken token)
|
||||
{
|
||||
_tcpClient = new TcpClient();
|
||||
await _tcpClient.ConnectAsync(_host, _port, token);
|
||||
using (var stream = _tcpClient.GetStream())
|
||||
{
|
||||
long index = 0;
|
||||
var sw = Stopwatch.StartNew();
|
||||
var lastMs = 0L;
|
||||
var lastIndex = 0L;
|
||||
var ms = 1000;
|
||||
while (!token.IsCancellationRequested && _tcpClient.Connected)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Читаем длину полностью
|
||||
var lengthBytes = new byte[4];
|
||||
int totalRead = 0;
|
||||
while (totalRead < 4)
|
||||
{
|
||||
var read = await stream.ReadAsync(lengthBytes, totalRead, 4 - totalRead, token);
|
||||
if (read == 0) break; // соединение закрыто
|
||||
totalRead += read;
|
||||
}
|
||||
if (totalRead < 4) break;
|
||||
var length = BitConverter.ToInt32(lengthBytes, 0);
|
||||
|
||||
// Читаем данные полностью
|
||||
var dataBytes = new byte[length];
|
||||
totalRead = 0;
|
||||
while (totalRead < length)
|
||||
{
|
||||
var read = await stream.ReadAsync(dataBytes, totalRead, length - totalRead, token);
|
||||
if (read == 0) break;
|
||||
totalRead += read;
|
||||
}
|
||||
if (totalRead < length) break;
|
||||
|
||||
if (_responseConverter != null)
|
||||
{
|
||||
var data = await _responseConverter(dataBytes);
|
||||
if (data != null)
|
||||
{
|
||||
var diff = sw.ElapsedMilliseconds - lastMs;
|
||||
if (diff >= ms)
|
||||
{
|
||||
var fetched = index - lastIndex;
|
||||
System.Console.WriteLine($"Fetched {fetched} data packages in {diff} ms.");
|
||||
lastIndex = index;
|
||||
lastMs = sw.ElapsedMilliseconds;
|
||||
}
|
||||
//Console.WriteLine(data);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
Console.WriteLine("Response converter not set.");
|
||||
}
|
||||
index++;
|
||||
}
|
||||
catch (TaskCanceledException)
|
||||
{
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Console.WriteLine($"Error: {ex.Message}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public override string ToString()
|
||||
{
|
||||
return $"TcpClient connected to {_host}:{_port}";
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user