Добавлен предсказуемый генератор данных

This commit is contained in:
Пытков Роман
2025-09-19 22:51:21 +03:00
parent 38f3880be8
commit c27e78cffe
11 changed files with 234 additions and 130 deletions

30
.vscode/launch.json vendored
View File

@@ -6,7 +6,7 @@
"type": "coreclr",
"request": "launch",
"program": "${workspaceFolder}/Server/bin/Debug/net8.0/Server.dll",
"args": [ "http", "json" ],
"args": [ "http", "json", "random" ],
"cwd": "${workspaceFolder}/Server",
"console": "integratedTerminal",
"stopAtEntry": false,
@@ -32,7 +32,7 @@
"type": "coreclr",
"request": "launch",
"program": "${workspaceFolder}/Server/bin/Debug/net8.0/Server.dll",
"args": [ "http", "bin" ],
"args": [ "http", "bin", "random" ],
"cwd": "${workspaceFolder}/Server",
"console": "integratedTerminal",
"stopAtEntry": false,
@@ -59,7 +59,7 @@
"type": "coreclr",
"request": "launch",
"program": "${workspaceFolder}/Server/bin/Debug/net8.0/Server.dll",
"args": [ "tcp", "json" ],
"args": [ "tcp", "json", "random" ],
"cwd": "${workspaceFolder}/Server",
"console": "integratedTerminal",
"stopAtEntry": false,
@@ -85,7 +85,7 @@
"type": "coreclr",
"request": "launch",
"program": "${workspaceFolder}/Server/bin/Debug/net8.0/Server.dll",
"args": [ "tcp", "bin" ],
"args": [ "tcp", "bin", "random" ],
"cwd": "${workspaceFolder}/Server",
"console": "integratedTerminal",
"stopAtEntry": false,
@@ -105,7 +105,21 @@
"presentation": {
"hidden": true
}
}
},
{
"name": "TCP/BIN (Predictable) Server",
"type": "coreclr",
"request": "launch",
"program": "${workspaceFolder}/Server/bin/Debug/net8.0/Server.dll",
"args": [ "tcp", "bin", "predictable", "2000" ],
"cwd": "${workspaceFolder}/Server",
"console": "integratedTerminal",
"stopAtEntry": false,
"presentation": {
"hidden": true
}
},
],
"compounds": [
{
@@ -132,6 +146,12 @@
"configurations": ["TCP/BIN Server", "TCP/BIN Client"],
"preLaunchTask": "dotnet: build",
"stopAll": true
},
{
"name": "TCP/BIN: (Predictable) Server and Client",
"configurations": ["TCP/BIN (Predictable) Server", "TCP/BIN Client"],
"preLaunchTask": "dotnet: build",
"stopAll": true
}
]
}

View File

@@ -123,8 +123,8 @@ public class TcpClientWrapper : IClient
Console.WriteLine($"Error: {ex.Message}");
}
}
System.Console.WriteLine("End cycle");
}
System.Console.WriteLine("End ");
}
public override string ToString()

View File

@@ -2,6 +2,7 @@ namespace Domain.Dto;
using System.Text.Json.Serialization;
// Вообще то этот класс не следует размещать в Domain, но я не захотел дублировать его в двух проектах по соседству
public class JsonData
{
[JsonPropertyName("concentrationIndex")]

View File

@@ -2,6 +2,8 @@ namespace Domain.Dto;
using MessagePack;
// Вообще то этот класс не следует размещать в Domain, но я не захотел дублировать его в двух проектах по соседству
[MessagePackObject]
public class MessagePackData
{

View File

@@ -1,89 +0,0 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Domain;
namespace Server;
public class DataGenerator
{
private static readonly Random _random = new Random();
private readonly int _minNewPackages;
private readonly int _maxPreviousPackages;
private readonly TimeSpan _generationInterval;
private readonly LinkedList<Data> _cache = new LinkedList<Data>();
private readonly ConcurrentDictionary<long, Data> _dict = [];
private readonly object _lock = new object();
private long _maxRequestedIndex = 0;
private Task _generationTask;
private CancellationTokenSource _cts = new();
public DataGenerator(int minNewPackages = 50, int maxPreviousPackages = 50, TimeSpan? generationInterval = null)
{
_minNewPackages = Math.Max(minNewPackages, 5);
_maxPreviousPackages = Math.Max(maxPreviousPackages, 5);
_generationInterval = generationInterval ?? TimeSpan.FromSeconds(1);
_generationTask = Task.Run(() => GenerateInBackground(_cts.Token));
}
private void GenerateInBackground(CancellationToken token)
{
var firstData = GenerateRandomData(0, "ALL");
_cache.AddLast(firstData);
_dict[firstData.FrameIndex] = firstData;
while (!token.IsCancellationRequested)
{
//await Task.Delay(_generationInterval, token);
var first = _cache.First!.Value;
var last = _cache.Last!.Value;
if (last.FrameIndex - _maxRequestedIndex < _minNewPackages)
{
var data = GenerateRandomData(last.FrameIndex + 1, "ALL");
_cache.AddLast(data);
_dict[data.FrameIndex] = data;
}
if (_maxRequestedIndex - first.FrameIndex > _maxPreviousPackages)
{
_cache.RemoveFirst();
_dict.TryRemove(first.FrameIndex, out _);
}
//System.Console.WriteLine($"[{first.PackageIndex}; {last.PackageIndex}]");
}
}
public Data? GetPackage(long packageIndex)
{
var res = _dict.TryGetValue(packageIndex, out var value);
_maxRequestedIndex = Math.Max(_maxRequestedIndex, packageIndex);
return res ? value : null;
}
private Data GenerateRandomData(long frameIndex, string overlayPoint)
{
var alpha = _random.NextDouble();
var beta = _random.NextDouble() * (1 - alpha);
var theta = 1 - alpha - beta;
var signalQuality = _random.NextDouble();
return new Data(
ConcentrationIndex: _random.NextDouble() < 0.1 ? null : _random.NextDouble(),
RelaxationIndex: _random.NextDouble() < 0.1 ? null : _random.NextDouble(),
CognitiveControl: _random.NextDouble() < 0.1 ? null : _random.NextDouble(),
CognitiveLoad: _random.NextDouble() < 0.1 ? null : _random.NextDouble(),
Alpha: _random.NextDouble() < 0.1 ? null : alpha,
Beta: _random.NextDouble() < 0.1 ? null : beta,
Theta: _random.NextDouble() < 0.1 ? null : theta,
Smr: _random.NextDouble() < 0.1 ? null : _random.NextDouble(),
MuWave: _random.NextDouble() < 0.1 ? null : _random.NextDouble(),
OverlayPoint: overlayPoint,
Artifact: _random.NextDouble() < 0.1 ? null : signalQuality < 0.5,
SignalQuality: _random.NextDouble() < 0.1 ? null : signalQuality,
FrameIndex: frameIndex,
TimeOfDataGenerate: _random.NextDouble() < 0.1 ? null : DateTime.Now
);
}
}

View File

@@ -0,0 +1,18 @@
using Domain;
namespace Server.DataGenerator;
interface IDataGenerator
{
/// <summary>
/// Получить следующий пакет (если вызывающий код не умеет работать с фреймами, его следует закешировать и выдавать по пакетно)
/// </summary>
/// <returns></returns>
public Data GetPackage();
/// <summary>
/// Получить новый фрейм данных
/// </summary>
/// <returns></returns>
public Data[] GetFrame();
}

View File

@@ -0,0 +1,80 @@
using System;
using System.Linq;
using System.Threading;
using Domain;
namespace Server.DataGenerator;
public class PredictableDataGenerator : IDataGenerator
{
private long _currentFrameIndex = 0;
private Data[] _currentFrame;
private int _currentPackageInFrame = 0;
private DateTime _lastFrameTime;
private bool _frameArtifact = false;
private int _delay;
public PredictableDataGenerator(int frameDelay = 0)
{
_delay = frameDelay;
_lastFrameTime = DateTime.Now;
_currentFrame = GenerateFrame(0, _lastFrameTime, _frameArtifact);
_currentFrameIndex = 1;
}
private Data[] GenerateFrame(long frameIndex, DateTime frameTime, bool artifact)
{
Thread.Sleep(_delay);
var overlayPoints = OverlayPoints.Points.Take(8).Append(OverlayPoints.All).ToArray();
return overlayPoints.Select((op, idx) => GeneratePredictableData(frameIndex, op, idx, frameTime, artifact)).ToArray();
}
private Data GeneratePredictableData(long frameIndex, string overlayPoint, int index, DateTime frameTime, bool artifact)
{
double value;
if (overlayPoint == OverlayPoints.All)
{
value = frameIndex;
}
else
{
value = 1000 * (index + 1) + frameIndex;
}
return new Data(
ConcentrationIndex: value,
RelaxationIndex: value,
CognitiveControl: value,
CognitiveLoad: value,
Alpha: value,
Beta: value,
Theta: value,
Smr: value,
MuWave: value,
OverlayPoint: overlayPoint,
Artifact: artifact,
SignalQuality: value,
FrameIndex: frameIndex,
TimeOfDataGenerate: frameTime
);
}
public Data GetPackage()
{
if (_currentPackageInFrame >= _currentFrame.Length)
{
_frameArtifact = !_frameArtifact;
_lastFrameTime = _lastFrameTime.AddSeconds(1);
_currentFrame = GenerateFrame(Interlocked.Increment(ref _currentFrameIndex), _lastFrameTime, _frameArtifact);
_currentPackageInFrame = 0;
}
return _currentFrame[_currentPackageInFrame++];
}
public Data[] GetFrame()
{
_frameArtifact = !_frameArtifact;
_lastFrameTime = _lastFrameTime.AddSeconds(1);
return GenerateFrame(Interlocked.Increment(ref _currentFrameIndex), _lastFrameTime, _frameArtifact);
}
}

View File

@@ -0,0 +1,67 @@
using System;
using System.Linq;
using System.Threading;
using Domain;
namespace Server.DataGenerator;
public class RandomDataGenerator : IDataGenerator
{
private static readonly Random _random = new Random();
private long _currentFrameIndex = 0;
private Data[] _currentFrame;
private int _currentPackageInFrame = 0;
public RandomDataGenerator(int minNewPackages = 50, int maxPreviousPackages = 50, TimeSpan? generationInterval = null)
{
_currentFrame = GenerateFrame(0);
_currentFrameIndex = 1;
}
private Data[] GenerateFrame(long frameIndex)
{
var overlayPoints = OverlayPoints.Points.Take(8).Append(OverlayPoints.All).ToArray();
return overlayPoints.Select(op => GenerateRandomData(frameIndex, op)).ToArray();
}
private Data GenerateRandomData(long frameIndex, string overlayPoint)
{
var alpha = _random.NextDouble();
var beta = _random.NextDouble() * (1 - alpha);
var theta = 1 - alpha - beta;
var signalQuality = _random.NextDouble();
return new Data(
ConcentrationIndex: _random.NextDouble() < 0.1 ? null : _random.NextDouble(),
RelaxationIndex: _random.NextDouble() < 0.1 ? null : _random.NextDouble(),
CognitiveControl: _random.NextDouble() < 0.1 ? null : _random.NextDouble(),
CognitiveLoad: _random.NextDouble() < 0.1 ? null : _random.NextDouble(),
Alpha: _random.NextDouble() < 0.1 ? null : alpha,
Beta: _random.NextDouble() < 0.1 ? null : beta,
Theta: _random.NextDouble() < 0.1 ? null : theta,
Smr: _random.NextDouble() < 0.1 ? null : _random.NextDouble(),
MuWave: _random.NextDouble() < 0.1 ? null : _random.NextDouble(),
OverlayPoint: overlayPoint,
Artifact: _random.NextDouble() < 0.1 ? null : signalQuality < 0.5,
SignalQuality: _random.NextDouble() < 0.1 ? null : signalQuality,
FrameIndex: frameIndex,
TimeOfDataGenerate: _random.NextDouble() < 0.1 ? null : DateTime.Now
);
}
public Data GetPackage()
{
if (_currentPackageInFrame >= _currentFrame.Length)
{
_currentFrame = GenerateFrame(Interlocked.Increment(ref _currentFrameIndex));
_currentPackageInFrame = 0;
}
return _currentFrame[_currentPackageInFrame++];
}
public Data[] GetFrame()
{
return GenerateFrame(Interlocked.Increment(ref _currentFrameIndex));
}
}

View File

@@ -14,12 +14,12 @@ public class HttpServer : IServer
private readonly HttpListener _listener;
private readonly string _url;
private CancellationTokenSource _cts = new CancellationTokenSource();
private Func<long, Data?> _getData;
private Func<Data?> _getPackage;
private Action<Data, HttpListenerResponse> _writeResponse;
public HttpServer(Func<long, Data?> getData, Action<Data, HttpListenerResponse> writeResponse, string url = "http://*:5555/")
public HttpServer(Func<Data?> getPackage, Action<Data, HttpListenerResponse> writeResponse, string url = "http://*:5555/")
{
_getData = getData;
_getPackage = getPackage;
_writeResponse = writeResponse;
_url = url;
_listener = new HttpListener();
@@ -63,28 +63,18 @@ public class HttpServer : IServer
{
if (context.Request.Url?.AbsolutePath.Equals("/fetchpackage", StringComparison.OrdinalIgnoreCase) == true)
{
string? indexStr = context.Request.QueryString["index"];
if (indexStr != null && long.TryParse(indexStr, out long index))
var data = _getPackage();
if (data != null)
{
var data = _getData(index);
if (data != null)
{
_writeResponse(data, context.Response);
}
else
{
var responseText = JsonSerializer.Serialize(new { error = "Data not found" });
context.Response.StatusCode = 404;
byte[] buffer = Encoding.UTF8.GetBytes(responseText);
context.Response.ContentType = "application/json";
context.Response.ContentLength64 = buffer.Length;
context.Response.OutputStream.Write(buffer, 0, buffer.Length);
}
_writeResponse(data, context.Response);
}
else
{
context.Response.StatusCode = 400;
byte[] buffer = Encoding.UTF8.GetBytes("Invalid or missing 'index' parameter.");
var responseText = JsonSerializer.Serialize(new { error = "Data not found" });
context.Response.StatusCode = 404;
byte[] buffer = Encoding.UTF8.GetBytes(responseText);
context.Response.ContentType = "application/json";
context.Response.ContentLength64 = buffer.Length;
context.Response.OutputStream.Write(buffer, 0, buffer.Length);
}
}

View File

@@ -7,17 +7,34 @@ using Domain.Dto;
using MessagePack;
using NetworkTest;
using Server;
using Server.DataGenerator;
if (args.Length < 2)
if (args.Length < 3)
{
System.Console.WriteLine("Pass twp arg: test/http/tcp and json/bin");
System.Console.WriteLine("Pass three args: test/http/tcp, json/bin, and random/predictable");
return -1;
}
var dataGenerator = new DataGenerator(generationInterval: TimeSpan.FromMilliseconds(1));
var protocol = args[0];
var serialization = args[1];
var generatorType = args[2];
int delay = 0;
if (generatorType == "predictable" && args.Length > 3)
{
if (!int.TryParse(args[3], out delay))
{
System.Console.WriteLine("Invalid delay value. Must be an integer.");
return -1;
}
}
IDataGenerator dataGenerator = generatorType switch
{
"random" => new RandomDataGenerator(generationInterval: TimeSpan.FromMilliseconds(1)),
"predictable" => new PredictableDataGenerator(delay),
_ => throw new ArgumentException("Generator type must be 'random' or 'predictable'")
};
if (protocol == "test")
{
@@ -31,7 +48,7 @@ if (protocol == "test")
var json = serialization == "json";
while (true)
{
var data = dataGenerator.GetPackage(index);
var data = dataGenerator.GetPackage();
if (data == null)
{
nullCount++;
@@ -65,9 +82,9 @@ if (protocol == "test")
IServer? server = protocol switch
{
"http" => new HttpServer(index => dataGenerator.GetPackage(index),
"http" => new HttpServer(() => dataGenerator.GetPackage(),
serialization == "json" ? PrepareResponseJson : PrepareResponseMessagePack),
"tcp" => new TcpServer(index => dataGenerator.GetPackage(index),
"tcp" => new TcpServer(() => dataGenerator.GetPackage(),
serialization == "json" ? PrepareBytesJson : PrepareBytesMessagePack),
_ => null
};

View File

@@ -12,14 +12,14 @@ public class TcpServer : IServer
{
private const uint MagicHeader = 0xDEADBEEF;
private readonly TcpListener _listener;
private readonly Func<long, Data?> _getData;
private readonly Func<Data?> _getPackage;
private readonly Func<Data, byte[]> _prepareBytes;
private readonly int _port;
private CancellationTokenSource _cts = new CancellationTokenSource();
public TcpServer(Func<long, Data?> getData, Func<Data, byte[]> prepareBytes, int port = 5555)
public TcpServer(Func<Data?> getPackage, Func<Data, byte[]> prepareBytes, int port = 5555)
{
_getData = getData;
_getPackage = getPackage;
_prepareBytes = prepareBytes;
_port = port;
_listener = new TcpListener(IPAddress.Any, _port);
@@ -63,10 +63,9 @@ public class TcpServer : IServer
using (client)
using (var stream = client.GetStream())
{
var index = 0L;
while (!token.IsCancellationRequested && client.Connected)
{
var data = _getData(index);
var data = _getPackage();
if (data != null)
{
var bytes = _prepareBytes(data);
@@ -75,7 +74,6 @@ public class TcpServer : IServer
await stream.WriteAsync(magicBytes, 0, magicBytes.Length, token);
await stream.WriteAsync(lengthBytes, 0, lengthBytes.Length, token);
await stream.WriteAsync(bytes, 0, bytes.Length, token);
index++;
}
// await Task.Delay(50, token); // Removed delay to test maximum throughput
}