261 lines
9.8 KiB
C#
261 lines
9.8 KiB
C#
using System.Collections.Concurrent;
|
|
using System.Diagnostics;
|
|
using System.Globalization;
|
|
using System.IO.Compression;
|
|
using System.Text.Json;
|
|
using System.Text.RegularExpressions;
|
|
using InnovEnergy.App.Backend.DataTypes;
|
|
using InnovEnergy.App.Backend.DataTypes.Methods;
|
|
using InnovEnergy.Lib.S3Utils;
|
|
using S3Bucket = InnovEnergy.Lib.S3Utils.DataTypes.S3Bucket;
|
|
using S3Region = InnovEnergy.Lib.S3Utils.DataTypes.S3Region;
|
|
|
|
namespace InnovEnergy.App.Backend.Services;
|
|
|
|
/// <summary>
|
|
/// One data point of the dynamic-pricing "Current Price" history (CHF/kWh).
|
|
/// </summary>
|
|
public class PricePoint
|
|
{
|
|
public Int64 Timestamp { get; set; } // unix seconds
|
|
public Double Price { get; set; } // CHF/kWh
|
|
}
|
|
|
|
/// <summary>
|
|
/// Builds the "Current Price" history shown on the Configuration tab for
|
|
/// Growatt (device 3) and Sinexcel (device 4) installations.
|
|
///
|
|
/// CurrentPrice only lives inside the per-10-second chunk files (Config.CurrentPrice);
|
|
/// there is no pre-aggregated source. To keep load bounded we sample ONE chunk per
|
|
/// 15-minute slot, fetch those in parallel, and cache each fully-past day (immutable).
|
|
/// </summary>
|
|
public static class CurrentPriceHistoryService
|
|
{
|
|
private const Int64 BucketSeconds = 900; // 15-minute resolution
|
|
private const Int32 MaxParallelFetches = 24;
|
|
private const Int32 CacheRetentionDays = 14; // bound the in-memory day cache
|
|
private const String S3CfgPath = "/home/ubuntu/.s3cfg";
|
|
|
|
private static readonly Regex ChunkFileRegex = new(@"/([0-9]+)\.(csv|json)$", RegexOptions.Compiled);
|
|
|
|
// Immutable past days cached as "{installationId}:{yyyyMMdd}".
|
|
private static readonly ConcurrentDictionary<String, List<PricePoint>> DayCache = new();
|
|
|
|
public static async Task<List<PricePoint>> GetHistory(Installation installation, Int64 startSec, Int64 endSec)
|
|
{
|
|
var todayUtc = DateTime.UtcNow.Date;
|
|
var firstDay = DateTimeOffset.FromUnixTimeSeconds(startSec).UtcDateTime.Date;
|
|
var lastDay = DateTimeOffset.FromUnixTimeSeconds(endSec).UtcDateTime.Date;
|
|
|
|
var points = new List<PricePoint>();
|
|
for (var day = firstDay; day <= lastDay; day = day.AddDays(1))
|
|
points.AddRange(await GetDay(installation, day, cacheable: day < todayUtc));
|
|
|
|
return points
|
|
.Where(p => p.Timestamp >= startSec && p.Timestamp <= endSec)
|
|
.OrderBy(p => p.Timestamp)
|
|
.ToList();
|
|
}
|
|
|
|
private static async Task<List<PricePoint>> GetDay(Installation installation, DateTime dayUtc, Boolean cacheable)
|
|
{
|
|
var key = $"{installation.Id}:{dayUtc:yyyyMMdd}";
|
|
if (cacheable && DayCache.TryGetValue(key, out var cached))
|
|
return cached;
|
|
|
|
var dayStart = new DateTimeOffset(dayUtc, TimeSpan.Zero).ToUnixTimeSeconds();
|
|
var dayEnd = dayStart + 86400 - 1;
|
|
|
|
var timestamps = SampleByBucket(ListChunkTimestamps(installation, dayStart, dayEnd));
|
|
var points = await FetchPrices(installation, timestamps);
|
|
|
|
// Only cache non-empty past days: an empty result can mean a transient s3cmd/S3
|
|
// failure, and caching that would serve "no data" forever until restart.
|
|
if (cacheable && points.Count > 0)
|
|
CacheDay(key, points);
|
|
|
|
return points;
|
|
}
|
|
|
|
private static void CacheDay(String key, List<PricePoint> points)
|
|
{
|
|
DayCache[key] = points;
|
|
|
|
// Prune entries older than the retention window to bound memory growth.
|
|
var cutoff = DateTime.UtcNow.Date.AddDays(-CacheRetentionDays);
|
|
foreach (var existingKey in DayCache.Keys)
|
|
{
|
|
var datePart = existingKey.Substring(existingKey.IndexOf(':') + 1);
|
|
if (DateTime.TryParseExact(datePart, "yyyyMMdd", CultureInfo.InvariantCulture,
|
|
DateTimeStyles.None, out var day) && day < cutoff)
|
|
DayCache.TryRemove(existingKey, out _);
|
|
}
|
|
}
|
|
|
|
// Keep the first chunk in each 15-minute slot.
|
|
private static List<Int64> SampleByBucket(List<Int64> timestamps)
|
|
{
|
|
var seenBuckets = new HashSet<Int64>();
|
|
var picked = new List<Int64>();
|
|
foreach (var t in timestamps.OrderBy(x => x))
|
|
if (seenBuckets.Add(t / BucketSeconds))
|
|
picked.Add(t);
|
|
return picked;
|
|
}
|
|
|
|
// List every chunk filename in range via `s3cmd ls` over the 5-digit timestamp prefixes
|
|
// (same listing approach as Controller.GetCsvTimestampsForInstallation).
|
|
private static List<Int64> ListChunkTimestamps(Installation installation, Int64 start, Int64 end)
|
|
{
|
|
var all = new List<Int64>();
|
|
var startPrefix = Int64.Parse(start.ToString().Substring(0, 5));
|
|
var endPrefix = Int64.Parse(end.ToString().Substring(0, 5));
|
|
|
|
for (var prefix = startPrefix; prefix <= endPrefix; prefix++)
|
|
{
|
|
var output = RunS3cmdLs("s3://" + installation.BucketName() + "/" + prefix);
|
|
foreach (var line in output.Split('\n'))
|
|
{
|
|
var match = ChunkFileRegex.Match(line);
|
|
if (match.Success && Int64.TryParse(match.Groups[1].Value, out var t) && t >= start && t <= end)
|
|
all.Add(t);
|
|
}
|
|
}
|
|
return all;
|
|
}
|
|
|
|
private static String RunS3cmdLs(String bucketPath)
|
|
{
|
|
try
|
|
{
|
|
var startInfo = new ProcessStartInfo
|
|
{
|
|
FileName = "s3cmd",
|
|
Arguments = $"--config {S3CfgPath} ls {bucketPath}",
|
|
RedirectStandardOutput = true,
|
|
RedirectStandardError = true,
|
|
UseShellExecute = false,
|
|
CreateNoWindow = true
|
|
};
|
|
using var process = new Process { StartInfo = startInfo };
|
|
process.Start();
|
|
var output = process.StandardOutput.ReadToEnd();
|
|
process.StandardError.ReadToEnd();
|
|
process.WaitForExit();
|
|
return process.ExitCode == 0 ? output : "";
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
Console.WriteLine($"[CurrentPriceHistory] s3cmd ls failed for {bucketPath}: {e.Message}");
|
|
return "";
|
|
}
|
|
}
|
|
|
|
private static async Task<List<PricePoint>> FetchPrices(Installation installation, List<Int64> timestamps)
|
|
{
|
|
var region = new S3Region($"https://{installation.S3Region}.{installation.S3Provider}", ExoCmd.S3Credentials!);
|
|
var bucket = region.Bucket(installation.BucketName());
|
|
|
|
using var gate = new SemaphoreSlim(MaxParallelFetches);
|
|
|
|
var tasks = timestamps.Select(async ts =>
|
|
{
|
|
await gate.WaitAsync();
|
|
try
|
|
{
|
|
var price = await FetchPriceAt(bucket, ts);
|
|
return price.HasValue ? new PricePoint { Timestamp = ts, Price = price.Value } : null;
|
|
}
|
|
finally
|
|
{
|
|
gate.Release();
|
|
}
|
|
});
|
|
|
|
var results = await Task.WhenAll(tasks);
|
|
return results.Where(p => p != null).Select(p => p!).OrderBy(p => p.Timestamp).ToList();
|
|
}
|
|
|
|
private static async Task<Double?> FetchPriceAt(S3Bucket bucket, Int64 ts)
|
|
{
|
|
try
|
|
{
|
|
var raw = await bucket.Path($"{ts}.json").GetObjectAsString();
|
|
var json = DecodeChunk(raw);
|
|
return json == null ? null : ExtractCurrentPrice(json);
|
|
}
|
|
catch
|
|
{
|
|
return null; // missing chunk / decode error -> just skip this slot
|
|
}
|
|
}
|
|
|
|
// Chunk objects are Base64-encoded ZIP archives whose inner "data.json" holds the record.
|
|
private static String? DecodeChunk(String raw)
|
|
{
|
|
try
|
|
{
|
|
var trimmed = raw.Trim();
|
|
if (trimmed.StartsWith('{'))
|
|
return raw; // defensive: already plain JSON
|
|
|
|
var bytes = Convert.FromBase64String(trimmed);
|
|
using var zip = new ZipArchive(new MemoryStream(bytes), ZipArchiveMode.Read);
|
|
var entry = zip.GetEntry("data.json");
|
|
if (entry == null)
|
|
return null;
|
|
|
|
using var reader = new StreamReader(entry.Open());
|
|
return reader.ReadToEnd();
|
|
}
|
|
catch
|
|
{
|
|
return null;
|
|
}
|
|
}
|
|
|
|
// data.json is a line-based dump (same format the frontend parseChunkJson consumes):
|
|
// Timestamp;<unix>;
|
|
// {"Config":{"CurrentPrice":0.19,...},"InverterRecord":{...}}
|
|
// i.e. a header line then a JSON-record line, possibly repeated. It is NOT a single
|
|
// JSON object, so parse line by line and return CurrentPrice from the last record.
|
|
private static Double? ExtractCurrentPrice(String dataJson)
|
|
{
|
|
Double? last = null;
|
|
foreach (var line in dataJson.Split('\n'))
|
|
{
|
|
var price = TryReadLinePrice(line.Trim());
|
|
if (price.HasValue)
|
|
last = price;
|
|
}
|
|
return last;
|
|
}
|
|
|
|
private static Double? TryReadLinePrice(String line)
|
|
{
|
|
if (line.Length == 0 || line[0] != '{')
|
|
return null; // skip "Timestamp;...;" headers and blank lines
|
|
|
|
try
|
|
{
|
|
using var doc = JsonDocument.Parse(line);
|
|
if (doc.RootElement.ValueKind != JsonValueKind.Object
|
|
|| !doc.RootElement.TryGetProperty("Config", out var config)
|
|
|| !config.TryGetProperty("CurrentPrice", out var currentPrice))
|
|
return null;
|
|
|
|
return currentPrice.ValueKind switch
|
|
{
|
|
JsonValueKind.Number => currentPrice.GetDouble(),
|
|
JsonValueKind.String when Double.TryParse(currentPrice.GetString(),
|
|
NumberStyles.Any, CultureInfo.InvariantCulture, out var parsed) => parsed,
|
|
_ => null
|
|
};
|
|
}
|
|
catch
|
|
{
|
|
return null; // skip a malformed record line
|
|
}
|
|
}
|
|
}
|