From 028ff7dcae34731dae428eb5356f62875babf6b9 Mon Sep 17 00:00:00 2001 From: ig Date: Thu, 4 May 2023 09:45:32 +0200 Subject: [PATCH] experiment "Observable Process", unstable --- csharp/Lib/Utils/WIP/ObservablePipeTarget.cs | 66 ++++++++++++++++++++ csharp/Lib/Utils/WIP/ObservableProcess.cs | 56 +++++++++++++++++ csharp/Lib/Utils/WIP/ObserverPipeSource.cs | 55 ++++++++++++++++ csharp/Lib/Utils/WIP/ReadablePipeTarget.cs | 63 +++++++++++++++++++ 4 files changed, 240 insertions(+) create mode 100644 csharp/Lib/Utils/WIP/ObservablePipeTarget.cs create mode 100644 csharp/Lib/Utils/WIP/ObservableProcess.cs create mode 100644 csharp/Lib/Utils/WIP/ObserverPipeSource.cs create mode 100644 csharp/Lib/Utils/WIP/ReadablePipeTarget.cs diff --git a/csharp/Lib/Utils/WIP/ObservablePipeTarget.cs b/csharp/Lib/Utils/WIP/ObservablePipeTarget.cs new file mode 100644 index 000000000..ba7e41a50 --- /dev/null +++ b/csharp/Lib/Utils/WIP/ObservablePipeTarget.cs @@ -0,0 +1,66 @@ +using System.Buffers; +using System.Reactive.Subjects; +using CliWrap; + +namespace InnovEnergy.Lib.Utils.WIP; + +using Data = IReadOnlyList; + + +public class ObservablePipeTarget : PipeTarget, IObservable +{ + private readonly Subject _Data = new(); + + + public override async Task CopyFromAsync(Stream source, CancellationToken ct = new CancellationToken()) + { + using var buffer = MemoryPool.Shared.Rent(81920); + + while (true) + { + Int32 bytesRead; + + try + { + bytesRead = await source + .ReadAsync(buffer.Memory, ct) + .ConfigureAwait(false); + } + catch (Exception e) + { + _Data.OnError(e); + break; + } + + + if (ct.IsCancellationRequested || bytesRead == 0) + { + _Data.OnCompleted(); + break; + } + + buffer.Memory[..bytesRead] + .ToArray() + .Apply(_Data.OnNext); + } + + + // var memory = buffer.Memory; + // + // return Observable + // .Repeat(Unit.Default) + // .TakeWhile(_ => !ct.IsCancellationRequested) + // .Select(async _ => + // { + // var bytesRead = await source.ReadAsync(memory, ct); + // _Data.OnNext(memory[..bytesRead]); + // }) + // .Select(t => t.ToObservable()) + // .Merge(maxConcurrent: 1) + // .ToTask(ct); + } + + public IDisposable Subscribe(IObserver> observer) => _Data.Subscribe(observer); + + //public IDisposable Subscribe(IObserver observer) => _Lines.Subscribe(observer); +} \ No newline at end of file diff --git a/csharp/Lib/Utils/WIP/ObservableProcess.cs b/csharp/Lib/Utils/WIP/ObservableProcess.cs new file mode 100644 index 000000000..3f605076d --- /dev/null +++ b/csharp/Lib/Utils/WIP/ObservableProcess.cs @@ -0,0 +1,56 @@ +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Reactive.Threading.Tasks; +using CliWrap; + +namespace InnovEnergy.Lib.Utils.WIP; + +using Data = IReadOnlyList; + +public class ObservableProcess +{ + private readonly Command _Command; + private readonly CancellationTokenSource _Kill = new CancellationTokenSource(); + private readonly CancellationTokenSource _Interrupt = new CancellationTokenSource(); + + private readonly ObserverPipeSource _StdIn = new ObserverPipeSource(); + private readonly ReadablePipeTarget _StdOut = new ReadablePipeTarget(); + private readonly ReadablePipeTarget _StdErr = new ReadablePipeTarget(); + + public IObserver StdIn => _StdIn; + + private readonly Subject _Pid = new Subject(); + private readonly Subject _ExitCode = new Subject(); + + public Task Pid { get; } + public Task ExitCode { get; } + + + public Task Read(Int32 nBytes) => _StdOut.Read(nBytes); + + public ObservableProcess(Command command) + { + _Command = command; + Pid = _Pid.ToTask(); + ExitCode = _ExitCode.ToTask(); + } + + public void Start() + { + var commandTask = _Command + .WithStandardInputPipe(_StdIn) + .WithStandardOutputPipe(_StdOut) + .WithStandardErrorPipe(_StdErr) + .ExecuteAsync(_Kill.Token, _Interrupt.Token); + + _Pid.OnNext(commandTask.ProcessId); + + commandTask.Task + .ToObservable() + .Select(t => t.ExitCode) + .Subscribe(_ExitCode); + } + + public void Kill() => _Kill.Cancel(); + public void Interrupt() => _Interrupt.Cancel(); +} \ No newline at end of file diff --git a/csharp/Lib/Utils/WIP/ObserverPipeSource.cs b/csharp/Lib/Utils/WIP/ObserverPipeSource.cs new file mode 100644 index 000000000..fd101b9fe --- /dev/null +++ b/csharp/Lib/Utils/WIP/ObserverPipeSource.cs @@ -0,0 +1,55 @@ +using System.Reactive; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Reactive.Threading.Tasks; +using CliWrap; + +namespace InnovEnergy.Lib.Utils.WIP; + +using Data = IReadOnlyList; + + +public class ObserverPipeSource : PipeSource, IObserver +{ + private readonly IObservable _Observable; + private readonly IObserver _Observer; + + public ObserverPipeSource() + { + var subject = new Subject(); + + _Observer = subject; + _Observable = subject + .Synchronize() // make thread-safe + .Publish() + .RefCount(); + } + + public override Task CopyToAsync(Stream dest, CancellationToken ct = new CancellationToken()) + { + return _Observable + .Select(Push) + .Concat() + .ToTask(ct); + + IObservable Push(Data data) => Observable.FromAsync(async () => + { + if (ct.IsCancellationRequested) + throw new Exception("Broken Pipe"); + + await dest.WriteAsync(data.ToArray(), ct); + await dest.FlushAsync(ct); + + return Unit.Default; + }); + } + + + + public void OnCompleted() => _Observer.OnCompleted(); + + public void OnError(Exception error) => _Observer.OnError(error); + + public void OnNext (Data value) => _Observer.OnNext(value); + +} \ No newline at end of file diff --git a/csharp/Lib/Utils/WIP/ReadablePipeTarget.cs b/csharp/Lib/Utils/WIP/ReadablePipeTarget.cs new file mode 100644 index 000000000..dd972d721 --- /dev/null +++ b/csharp/Lib/Utils/WIP/ReadablePipeTarget.cs @@ -0,0 +1,63 @@ +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Reactive.Threading.Tasks; +using CliWrap; + +namespace InnovEnergy.Lib.Utils.WIP; + +using Data = IReadOnlyList; + + +public class ReadablePipeTarget : PipeTarget +{ + private readonly ISubject _Requests = new ReplaySubject(); + private readonly Subject _Replies = new Subject(); + + public async Task Read(Int32 nBytes) + { + var data = _Replies.FirstAsync(); + _Requests.OnNext(nBytes); + return await data; + } + + public override Task CopyFromAsync(Stream source, CancellationToken ct = new CancellationToken()) + { + var replies = _Requests + .Synchronize() + .Select(Pull) + .Concat() + .Publish() + .RefCount(); + + replies.Subscribe(_Replies, ct); + + return replies.ToTask(ct); + + IObservable Pull(Int32 nToRead) => Observable.FromAsync(async () => + { + if (ct.IsCancellationRequested) + throw new Exception("End of Stream"); + + var buffer = new Byte[nToRead]; + var nReceived = 0; + + do + { + var nRemaining = nToRead - nReceived; + var read = await source.ReadAsync(buffer, nReceived, nRemaining, ct); + if (read <= 0 || ct.IsCancellationRequested) + throw new Exception("End of Stream"); + + nReceived += read; + } + while (nReceived < nToRead); + + return buffer; + }); + } + + + + + +} \ No newline at end of file