using System.Collections.Concurrent; using System.Diagnostics; using System.Globalization; using System.IO.Compression; using System.Text.Json; using System.Text.RegularExpressions; using InnovEnergy.App.Backend.DataTypes; using InnovEnergy.App.Backend.DataTypes.Methods; using InnovEnergy.Lib.S3Utils; using S3Bucket = InnovEnergy.Lib.S3Utils.DataTypes.S3Bucket; using S3Region = InnovEnergy.Lib.S3Utils.DataTypes.S3Region; namespace InnovEnergy.App.Backend.Services; /// /// One data point of the dynamic-pricing "Current Price" history (CHF/kWh). /// public class PricePoint { public Int64 Timestamp { get; set; } // unix seconds public Double Price { get; set; } // CHF/kWh } /// /// Builds the "Current Price" history shown on the Configuration tab for /// Growatt (device 3) and Sinexcel (device 4) installations. /// /// CurrentPrice only lives inside the per-10-second chunk files (Config.CurrentPrice); /// there is no pre-aggregated source. To keep load bounded we sample ONE chunk per /// 15-minute slot, fetch those in parallel, and cache each fully-past day (immutable). /// public static class CurrentPriceHistoryService { private const Int64 BucketSeconds = 900; // 15-minute resolution private const Int32 MaxParallelFetches = 24; private const Int32 CacheRetentionDays = 14; // bound the in-memory day cache private const String S3CfgPath = "/home/ubuntu/.s3cfg"; private static readonly Regex ChunkFileRegex = new(@"/([0-9]+)\.(csv|json)$", RegexOptions.Compiled); // Immutable past days cached as "{installationId}:{yyyyMMdd}". private static readonly ConcurrentDictionary> DayCache = new(); public static async Task> GetHistory(Installation installation, Int64 startSec, Int64 endSec) { var todayUtc = DateTime.UtcNow.Date; var firstDay = DateTimeOffset.FromUnixTimeSeconds(startSec).UtcDateTime.Date; var lastDay = DateTimeOffset.FromUnixTimeSeconds(endSec).UtcDateTime.Date; var points = new List(); for (var day = firstDay; day <= lastDay; day = day.AddDays(1)) points.AddRange(await GetDay(installation, day, cacheable: day < todayUtc)); return points .Where(p => p.Timestamp >= startSec && p.Timestamp <= endSec) .OrderBy(p => p.Timestamp) .ToList(); } private static async Task> GetDay(Installation installation, DateTime dayUtc, Boolean cacheable) { var key = $"{installation.Id}:{dayUtc:yyyyMMdd}"; if (cacheable && DayCache.TryGetValue(key, out var cached)) return cached; var dayStart = new DateTimeOffset(dayUtc, TimeSpan.Zero).ToUnixTimeSeconds(); var dayEnd = dayStart + 86400 - 1; var timestamps = SampleByBucket(ListChunkTimestamps(installation, dayStart, dayEnd)); var points = await FetchPrices(installation, timestamps); // Only cache non-empty past days: an empty result can mean a transient s3cmd/S3 // failure, and caching that would serve "no data" forever until restart. if (cacheable && points.Count > 0) CacheDay(key, points); return points; } private static void CacheDay(String key, List points) { DayCache[key] = points; // Prune entries older than the retention window to bound memory growth. var cutoff = DateTime.UtcNow.Date.AddDays(-CacheRetentionDays); foreach (var existingKey in DayCache.Keys) { var datePart = existingKey.Substring(existingKey.IndexOf(':') + 1); if (DateTime.TryParseExact(datePart, "yyyyMMdd", CultureInfo.InvariantCulture, DateTimeStyles.None, out var day) && day < cutoff) DayCache.TryRemove(existingKey, out _); } } // Keep the first chunk in each 15-minute slot. private static List SampleByBucket(List timestamps) { var seenBuckets = new HashSet(); var picked = new List(); foreach (var t in timestamps.OrderBy(x => x)) if (seenBuckets.Add(t / BucketSeconds)) picked.Add(t); return picked; } // List every chunk filename in range via `s3cmd ls` over the 5-digit timestamp prefixes // (same listing approach as Controller.GetCsvTimestampsForInstallation). private static List ListChunkTimestamps(Installation installation, Int64 start, Int64 end) { var all = new List(); var startPrefix = Int64.Parse(start.ToString().Substring(0, 5)); var endPrefix = Int64.Parse(end.ToString().Substring(0, 5)); for (var prefix = startPrefix; prefix <= endPrefix; prefix++) { var output = RunS3cmdLs("s3://" + installation.BucketName() + "/" + prefix); foreach (var line in output.Split('\n')) { var match = ChunkFileRegex.Match(line); if (match.Success && Int64.TryParse(match.Groups[1].Value, out var t) && t >= start && t <= end) all.Add(t); } } return all; } private static String RunS3cmdLs(String bucketPath) { try { var startInfo = new ProcessStartInfo { FileName = "s3cmd", Arguments = $"--config {S3CfgPath} ls {bucketPath}", RedirectStandardOutput = true, RedirectStandardError = true, UseShellExecute = false, CreateNoWindow = true }; using var process = new Process { StartInfo = startInfo }; process.Start(); var output = process.StandardOutput.ReadToEnd(); process.StandardError.ReadToEnd(); process.WaitForExit(); return process.ExitCode == 0 ? output : ""; } catch (Exception e) { Console.WriteLine($"[CurrentPriceHistory] s3cmd ls failed for {bucketPath}: {e.Message}"); return ""; } } private static async Task> FetchPrices(Installation installation, List timestamps) { var region = new S3Region($"https://{installation.S3Region}.{installation.S3Provider}", ExoCmd.S3Credentials!); var bucket = region.Bucket(installation.BucketName()); using var gate = new SemaphoreSlim(MaxParallelFetches); var tasks = timestamps.Select(async ts => { await gate.WaitAsync(); try { var price = await FetchPriceAt(bucket, ts); return price.HasValue ? new PricePoint { Timestamp = ts, Price = price.Value } : null; } finally { gate.Release(); } }); var results = await Task.WhenAll(tasks); return results.Where(p => p != null).Select(p => p!).OrderBy(p => p.Timestamp).ToList(); } private static async Task FetchPriceAt(S3Bucket bucket, Int64 ts) { try { var raw = await bucket.Path($"{ts}.json").GetObjectAsString(); var json = DecodeChunk(raw); return json == null ? null : ExtractCurrentPrice(json); } catch { return null; // missing chunk / decode error -> just skip this slot } } // Chunk objects are Base64-encoded ZIP archives whose inner "data.json" holds the record. private static String? DecodeChunk(String raw) { try { var trimmed = raw.Trim(); if (trimmed.StartsWith('{')) return raw; // defensive: already plain JSON var bytes = Convert.FromBase64String(trimmed); using var zip = new ZipArchive(new MemoryStream(bytes), ZipArchiveMode.Read); var entry = zip.GetEntry("data.json"); if (entry == null) return null; using var reader = new StreamReader(entry.Open()); return reader.ReadToEnd(); } catch { return null; } } // data.json is an object keyed by timestamp(s); read Config.CurrentPrice of the last record. private static Double? ExtractCurrentPrice(String json) { using var doc = JsonDocument.Parse(json); if (doc.RootElement.ValueKind != JsonValueKind.Object) return null; var found = false; var record = default(JsonElement); foreach (var prop in doc.RootElement.EnumerateObject()) { record = prop.Value; found = true; } if (!found || !record.TryGetProperty("Config", out var config) || !config.TryGetProperty("CurrentPrice", out var currentPrice)) return null; return currentPrice.ValueKind switch { JsonValueKind.Number => currentPrice.GetDouble(), JsonValueKind.String when Double.TryParse(currentPrice.GetString(), NumberStyles.Any, CultureInfo.InvariantCulture, out var parsed) => parsed, _ => null }; } }