Fixed unawaited SendAsync calls of Websocket

This commit is contained in:
Yinyin Liu 2026-02-26 13:52:43 +01:00
parent abedc6c203
commit d464c9cd71
3 changed files with 55 additions and 39 deletions

View File

@ -160,8 +160,8 @@ public static class SessionMethods
var installation = Db.GetInstallationById(action.InstallationId); var installation = Db.GetInstallationById(action.InstallationId);
installation.TestingMode = action.TestingMode; installation.TestingMode = action.TestingMode;
installation.Apply(Db.Update); installation.Apply(Db.Update);
WebsocketManager.InformWebsocketsForInstallation(action.InstallationId); await WebsocketManager.InformWebsocketsForInstallation(action.InstallationId);
// Save the configuration change to the database // Save the configuration change to the database
Db.HandleAction(action); Db.HandleAction(action);
return true; return true;
@ -179,7 +179,7 @@ public static class SessionMethods
{ {
installation.TestingMode = action.TestingMode; installation.TestingMode = action.TestingMode;
installation.Apply(Db.Update); installation.Apply(Db.Update);
WebsocketManager.InformWebsocketsForInstallation(action.InstallationId); await WebsocketManager.InformWebsocketsForInstallation(action.InstallationId);
} }
Db.UpdateAction(action); Db.UpdateAction(action);
@ -199,7 +199,7 @@ public static class SessionMethods
var installation = Db.GetInstallationById(action.InstallationId); var installation = Db.GetInstallationById(action.InstallationId);
installation.TestingMode = false; installation.TestingMode = false;
installation.Apply(Db.Update); installation.Apply(Db.Update);
WebsocketManager.InformWebsocketsForInstallation(action.InstallationId); await WebsocketManager.InformWebsocketsForInstallation(action.InstallationId);
} }
Db.Delete(action); Db.Delete(action);

View File

@ -181,7 +181,7 @@ public static class RabbitMqManager
//If the status has changed, update all the connected front-ends regarding this installation //If the status has changed, update all the connected front-ends regarding this installation
if(prevStatus != receivedStatusMessage.Status && WebsocketManager.InstallationConnections[installationId].Connections.Count > 0) if(prevStatus != receivedStatusMessage.Status && WebsocketManager.InstallationConnections[installationId].Connections.Count > 0)
{ {
WebsocketManager.InformWebsocketsForInstallation(installationId); _ = WebsocketManager.InformWebsocketsForInstallation(installationId); // fire-and-forget: sync event handler, can't await
} }
} }
} }

View File

@ -17,6 +17,8 @@ public static class WebsocketManager
{ {
while (true) while (true)
{ {
var idsToInform = new List<Int64>();
lock (InstallationConnections) lock (InstallationConnections)
{ {
Console.WriteLine("Monitoring installation table..."); Console.WriteLine("Monitoring installation table...");
@ -31,10 +33,8 @@ public static class WebsocketManager
(installationConnection.Value.Product == (int)ProductType.SodiStoreMax && (DateTime.Now - installationConnection.Value.Timestamp) > TimeSpan.FromMinutes(2)) (installationConnection.Value.Product == (int)ProductType.SodiStoreMax && (DateTime.Now - installationConnection.Value.Timestamp) > TimeSpan.FromMinutes(2))
) )
{ {
Console.WriteLine("Installation ID is " + installationConnection.Key); Console.WriteLine("Installation ID is " + installationConnection.Key);
Console.WriteLine("installationConnection.Value.Timestamp is " + installationConnection.Value.Timestamp); Console.WriteLine("installationConnection.Value.Timestamp is " + installationConnection.Value.Timestamp);
// Console.WriteLine("diff is "+(DateTime.Now-installationConnection.Value.Timestamp));
installationConnection.Value.Status = (int)StatusType.Offline; installationConnection.Value.Status = (int)StatusType.Offline;
Installation installation = Db.Installations.FirstOrDefault(f => f.Product == installationConnection.Value.Product && f.Id == installationConnection.Key); Installation installation = Db.Installations.FirstOrDefault(f => f.Product == installationConnection.Value.Product && f.Id == installationConnection.Key);
@ -42,42 +42,59 @@ public static class WebsocketManager
installation.Apply(Db.Update); installation.Apply(Db.Update);
if (installationConnection.Value.Connections.Count > 0) if (installationConnection.Value.Connections.Count > 0)
{ {
InformWebsocketsForInstallation(installationConnection.Key); idsToInform.Add(installationConnection.Key);
} }
} }
} }
} }
// Send notifications outside the lock so we can await the async SendAsync calls
foreach (var id in idsToInform)
await InformWebsocketsForInstallation(id);
await Task.Delay(TimeSpan.FromMinutes(1)); await Task.Delay(TimeSpan.FromMinutes(1));
} }
} }
//Inform all the connected websockets regarding installation "installationId" //Inform all the connected websockets regarding installation "installationId"
public static void InformWebsocketsForInstallation(Int64 installationId) public static async Task InformWebsocketsForInstallation(Int64 installationId)
{ {
var installation = Db.GetInstallationById(installationId); var installation = Db.GetInstallationById(installationId);
var installationConnection = InstallationConnections[installationId]; byte[] dataToSend;
Console.WriteLine("Update all the connected websockets for installation " + installation.Name); List<WebSocket> connections;
var jsonObject = new
{
id = installationId,
status = installationConnection.Status,
testingMode = installation.TestingMode
};
string jsonString = JsonSerializer.Serialize(jsonObject); lock (InstallationConnections)
byte[] dataToSend = Encoding.UTF8.GetBytes(jsonString);
foreach (var connection in installationConnection.Connections)
{ {
connection.SendAsync( var installationConnection = InstallationConnections[installationId];
new ArraySegment<byte>(dataToSend, 0, dataToSend.Length), Console.WriteLine("Update all the connected websockets for installation " + installation.Name);
WebSocketMessageType.Text,
true, // Indicates that this is the end of the message var jsonObject = new
CancellationToken.None {
); id = installationId,
status = installationConnection.Status,
testingMode = installation.TestingMode
};
dataToSend = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(jsonObject));
connections = installationConnection.Connections.ToList(); // snapshot before releasing lock
} }
// Send to all connections concurrently (preserves original fire-and-forget intent),
// but isolate failures so one closed socket doesn't affect others or crash the caller.
await Task.WhenAll(connections
.Where(c => c.State == WebSocketState.Open)
.Select(async c =>
{
try
{
await c.SendAsync(new ArraySegment<byte>(dataToSend, 0, dataToSend.Length),
WebSocketMessageType.Text, true, CancellationToken.None);
}
catch (Exception ex)
{
Console.WriteLine($"WebSocket send failed for installation {installationId}: {ex.Message}");
}
}));
} }
@ -109,9 +126,9 @@ public static class WebsocketManager
var jsonString = JsonSerializer.Serialize(jsonObject); var jsonString = JsonSerializer.Serialize(jsonObject);
var dataToSend = Encoding.UTF8.GetBytes(jsonString); var dataToSend = Encoding.UTF8.GetBytes(jsonString);
currentWebSocket.SendAsync(dataToSend, await currentWebSocket.SendAsync(dataToSend,
WebSocketMessageType.Text, WebSocketMessageType.Text,
true, true,
CancellationToken.None CancellationToken.None
); );
@ -120,6 +137,7 @@ public static class WebsocketManager
//Received a new message from this websocket. //Received a new message from this websocket.
//We have a HandleWebSocketConnection per connected frontend //We have a HandleWebSocketConnection per connected frontend
byte[] encodedDataToSend;
lock (InstallationConnections) lock (InstallationConnections)
{ {
List<WebsocketMessage> dataToSend = new List<WebsocketMessage>(); List<WebsocketMessage> dataToSend = new List<WebsocketMessage>();
@ -157,15 +175,7 @@ public static class WebsocketManager
} }
var jsonString = JsonSerializer.Serialize(dataToSend); var jsonString = JsonSerializer.Serialize(dataToSend);
var encodedDataToSend = Encoding.UTF8.GetBytes(jsonString); encodedDataToSend = Encoding.UTF8.GetBytes(jsonString);
currentWebSocket.SendAsync(encodedDataToSend,
WebSocketMessageType.Text,
true, // Indicates that this is the end of the message
CancellationToken.None
);
// Console.WriteLine("Printing installation connection list"); // Console.WriteLine("Printing installation connection list");
// Console.WriteLine("----------------------------------------------"); // Console.WriteLine("----------------------------------------------");
@ -175,6 +185,12 @@ public static class WebsocketManager
// } // }
// Console.WriteLine("----------------------------------------------"); // Console.WriteLine("----------------------------------------------");
} }
await currentWebSocket.SendAsync(encodedDataToSend,
WebSocketMessageType.Text,
true,
CancellationToken.None
);
} }
lock (InstallationConnections) lock (InstallationConnections)