From 095726786df2b998c619f39af56b41d20ac3b69a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9F=D1=8B=D1=82=D0=BA=D0=BE=D0=B2=20=D0=A0=D0=BE=D0=BC?= =?UTF-8?q?=D0=B0=D0=BD?= Date: Wed, 17 Sep 2025 11:07:45 +0300 Subject: [PATCH] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=BB=D0=B5?= =?UTF-8?q?=D0=BD=20=D1=82=D1=80=D0=B0=D0=BD=D1=81=D0=BF=D0=BE=D1=80=D1=82?= =?UTF-8?q?=20TCP?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .vscode/launch.json | 66 +++++++++++++++++++++++++ Client/Program.cs | 21 +++++++- Client/TcpClient.cs | 115 ++++++++++++++++++++++++++++++++++++++++++++ Server/Program.cs | 39 +++++++++++---- Server/TcpServer.cs | 86 +++++++++++++++++++++++++++++++++ 5 files changed, 317 insertions(+), 10 deletions(-) create mode 100644 Client/TcpClient.cs create mode 100644 Server/TcpServer.cs diff --git a/.vscode/launch.json b/.vscode/launch.json index 11d5f15..e59ee1b 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -52,6 +52,59 @@ "presentation": { "hidden": true } + }, + + { + "name": "TCP/JSON Server", + "type": "coreclr", + "request": "launch", + "program": "${workspaceFolder}/Server/bin/Debug/net8.0/Server.dll", + "args": [ "tcp", "json" ], + "cwd": "${workspaceFolder}/Server", + "console": "integratedTerminal", + "stopAtEntry": false, + "presentation": { + "hidden": true + } + }, + { + "name": "TCP/JSON Client", + "type": "coreclr", + "request": "launch", + "program": "${workspaceFolder}/Client/bin/Debug/net8.0/Client.dll", + "args": [ "tcp", "json" ], + "cwd": "${workspaceFolder}/Client", + "console": "integratedTerminal", + "stopAtEntry": false, + "presentation": { + "hidden": true + } + }, + { + "name": "TCP/BIN Server", + "type": "coreclr", + "request": "launch", + "program": "${workspaceFolder}/Server/bin/Debug/net8.0/Server.dll", + "args": [ "tcp", "bin" ], + "cwd": "${workspaceFolder}/Server", + "console": "integratedTerminal", + "stopAtEntry": false, + "presentation": { + "hidden": true + } + }, + { + "name": "TCP/BIN Client", + "type": "coreclr", + "request": "launch", + "program": "${workspaceFolder}/Client/bin/Debug/net8.0/Client.dll", + "args": [ "tcp", "bin" ], + "cwd": "${workspaceFolder}/Client", + "console": "integratedTerminal", + "stopAtEntry": false, + "presentation": { + "hidden": true + } } ], "compounds": [ @@ -66,6 +119,19 @@ "configurations": ["HTTP/BIN Server", "HTTP/BIN Client"], "preLaunchTask": "dotnet: build", "stopAll": true + }, + + { + "name": "TCP/JSON: Server and Client", + "configurations": ["TCP/JSON Server", "TCP/JSON Client"], + "preLaunchTask": "dotnet: build", + "stopAll": true + }, + { + "name": "TCP/BIN: Server and Client", + "configurations": ["TCP/BIN Server", "TCP/BIN Client"], + "preLaunchTask": "dotnet: build", + "stopAll": true } ] } diff --git a/Client/Program.cs b/Client/Program.cs index 50245b0..6c7100f 100644 --- a/Client/Program.cs +++ b/Client/Program.cs @@ -1,5 +1,6 @@ using System; using System.Net.Http; +using System.Text; using System.Text.Json; using System.Threading; using System.Threading.Tasks; @@ -16,7 +17,12 @@ if (args.Length < 2) var protocol = args[0]; var serialization = args[1]; -IClient? client = protocol == "http" ? new HttpClientWrapper(serialization == "json" ? ConvertJsonResponse : ConvertMessagePackResponse) : null; +IClient? client = protocol switch +{ + "http" => new HttpClientWrapper(serialization == "json" ? ConvertJsonResponse : ConvertMessagePackResponse), + "tcp" => new TcpClientWrapper(serialization == "json" ? ConvertBytesJson : ConvertBytesMessagePack), + _ => null +}; client?.Start(); System.Console.WriteLine("Client started:"); @@ -51,3 +57,16 @@ static async Task ConvertMessagePackResponse(HttpResponseMessage response var msgPackData = MessagePackSerializer.Deserialize(bytes); return msgPackData?.ToData(); } + +static Task ConvertBytesJson(byte[] bytes) +{ + var json = Encoding.UTF8.GetString(bytes); + var jsonData = JsonSerializer.Deserialize(json); + return Task.FromResult(jsonData?.ToData()); +} + +static Task ConvertBytesMessagePack(byte[] bytes) +{ + var msgPackData = MessagePackSerializer.Deserialize(bytes); + return Task.FromResult(msgPackData?.ToData()); +} diff --git a/Client/TcpClient.cs b/Client/TcpClient.cs new file mode 100644 index 0000000..592caa9 --- /dev/null +++ b/Client/TcpClient.cs @@ -0,0 +1,115 @@ +using System; +using System.Diagnostics; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; +using Domain; + +namespace Client; + +public class TcpClientWrapper : IClient +{ + private readonly string _host; + private readonly int _port; + private CancellationTokenSource _cts = new CancellationTokenSource(); + private Task? _runningTask; + private Func>? _responseConverter; + private TcpClient? _tcpClient; + + public TcpClientWrapper(Func> responseConverter, string host = "192.168.1.117", int port = 5555) + { + _host = host; + _port = port; + _responseConverter = responseConverter; + } + + public void Start() + { + _cts = new CancellationTokenSource(); + _runningTask = Task.Run(() => RunAsync(_cts.Token)); + } + + public void Stop() + { + _cts.Cancel(); + _runningTask?.Wait(); + _tcpClient?.Close(); + } + + private async Task RunAsync(CancellationToken token) + { + _tcpClient = new TcpClient(); + await _tcpClient.ConnectAsync(_host, _port, token); + using (var stream = _tcpClient.GetStream()) + { + long index = 0; + var sw = Stopwatch.StartNew(); + var lastMs = 0L; + var lastIndex = 0L; + var ms = 1000; + while (!token.IsCancellationRequested && _tcpClient.Connected) + { + try + { + // Читаем длину полностью + var lengthBytes = new byte[4]; + int totalRead = 0; + while (totalRead < 4) + { + var read = await stream.ReadAsync(lengthBytes, totalRead, 4 - totalRead, token); + if (read == 0) break; // соединение закрыто + totalRead += read; + } + if (totalRead < 4) break; + var length = BitConverter.ToInt32(lengthBytes, 0); + + // Читаем данные полностью + var dataBytes = new byte[length]; + totalRead = 0; + while (totalRead < length) + { + var read = await stream.ReadAsync(dataBytes, totalRead, length - totalRead, token); + if (read == 0) break; + totalRead += read; + } + if (totalRead < length) break; + + if (_responseConverter != null) + { + var data = await _responseConverter(dataBytes); + if (data != null) + { + var diff = sw.ElapsedMilliseconds - lastMs; + if (diff >= ms) + { + var fetched = index - lastIndex; + System.Console.WriteLine($"Fetched {fetched} data packages in {diff} ms."); + lastIndex = index; + lastMs = sw.ElapsedMilliseconds; + } + //Console.WriteLine(data); + } + } + else + { + Console.WriteLine("Response converter not set."); + } + index++; + } + catch (TaskCanceledException) + { + break; + } + catch (Exception ex) + { + Console.WriteLine($"Error: {ex.Message}"); + } + } + } + } + + public override string ToString() + { + return $"TcpClient connected to {_host}:{_port}"; + } +} \ No newline at end of file diff --git a/Server/Program.cs b/Server/Program.cs index b35c6b9..4335385 100644 --- a/Server/Program.cs +++ b/Server/Program.cs @@ -63,32 +63,40 @@ if (protocol == "test") } } -IServer? server = protocol == "http" ? - new HttpServer(index => dataGenerator.GetPackage(index), - serialization == "json" ? PrepareResponseJson : PrepareResponseMessagePack) : - null; +IServer? server = protocol switch +{ + "http" => new HttpServer(index => dataGenerator.GetPackage(index), + serialization == "json" ? PrepareResponseJson : PrepareResponseMessagePack), + "tcp" => new TcpServer(index => dataGenerator.GetPackage(index), + serialization == "json" ? PrepareBytesJson : PrepareBytesMessagePack), + _ => null +}; server?.Start(); System.Console.WriteLine("Server started:"); System.Console.WriteLine(server); +// Создаем CancellationTokenSource для управления остановкой +var cts = new CancellationTokenSource(); + // Обработка выхода по Ctrl+C Console.CancelKeyPress += (sender, e) => { e.Cancel = true; // Prevent immediate termination Console.WriteLine("Shutdown signal received. Stopping server..."); + cts.Cancel(); server?.Stop(); Console.WriteLine("Goodbye!"); - Environment.Exit(0); }; - -// Бесконечный цикл ожидания -while (true) +// Бесконечный цикл ожидания с возможностью прерывания +while (!cts.Token.IsCancellationRequested) { - Thread.Sleep(1000); + cts.Token.WaitHandle.WaitOne(1000); } +return 0; + void PrepareResponseJson(Data data, HttpListenerResponse response) { @@ -109,3 +117,16 @@ void PrepareResponseMessagePack(Data data, HttpListenerResponse response) response.OutputStream.Write(buffer, 0, buffer.Length); } +static byte[] PrepareBytesJson(Data data) +{ + JsonData jsonData = new JsonData(data); + var responseText = JsonSerializer.Serialize(jsonData); + return Encoding.UTF8.GetBytes(responseText); +} + +static byte[] PrepareBytesMessagePack(Data data) +{ + MessagePackData msgPackData = new MessagePackData(data); + return MessagePackSerializer.Serialize(msgPackData); +} + diff --git a/Server/TcpServer.cs b/Server/TcpServer.cs new file mode 100644 index 0000000..122fcc1 --- /dev/null +++ b/Server/TcpServer.cs @@ -0,0 +1,86 @@ +using System; +using System.Diagnostics; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; +using Domain; + +namespace NetworkTest; + +public class TcpServer : IServer +{ + private readonly TcpListener _listener; + private readonly Func _getData; + private readonly Func _prepareBytes; + private readonly int _port; + private CancellationTokenSource _cts = new CancellationTokenSource(); + + public TcpServer(Func getData, Func prepareBytes, int port = 5555) + { + _getData = getData; + _prepareBytes = prepareBytes; + _port = port; + _listener = new TcpListener(IPAddress.Any, _port); + } + + public void Start() + { + _cts = new CancellationTokenSource(); + _listener.Start(); + Task.Run(() => ListenAsync(_cts.Token)); + } + + public void Stop() + { + _cts.Cancel(); + _listener.Stop(); + } + + private async Task ListenAsync(CancellationToken token) + { + while (!token.IsCancellationRequested) + { + try + { + var client = await _listener.AcceptTcpClientAsync(token); + _ = Task.Run(() => HandleClientAsync(client, token)); + } + catch (SocketException) + { + break; + } + catch (OperationCanceledException) + { + break; + } + } + } + + private async Task HandleClientAsync(TcpClient client, CancellationToken token) + { + using (client) + using (var stream = client.GetStream()) + { + var index = 0L; + while (!token.IsCancellationRequested && client.Connected) + { + var data = _getData(index); + if (data != null) + { + var bytes = _prepareBytes(data); + var lengthBytes = BitConverter.GetBytes(bytes.Length); + await stream.WriteAsync(lengthBytes, 0, lengthBytes.Length, token); + await stream.WriteAsync(bytes, 0, bytes.Length, token); + index++; + } + // await Task.Delay(50, token); // Removed delay to test maximum throughput + } + } + } + + public override string ToString() + { + return $"TcpServer listening on port {_port}"; + } +} \ No newline at end of file