using System.Drawing.Printing; using System.Net; using System.Net.Security; using System.Net.Sockets; using System.Security.Authentication; using System.Security.Cryptography.X509Certificates; using System.Text; using System.Text.Json; using InnovEnergy.App.Backend.Database; using InnovEnergy.App.Backend.DataTypes; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace InnovEnergy.App.Backend.Websockets; public static class RabbitMqManager { public static ConnectionFactory Factory = null!; public static IConnection Connection = null!; public static IModel Channel = null!; public static void InitializeEnvironment() { //string vpnServerIp = "194.182.190.208"; string vpnServerIp = "10.2.0.11"; // ConnectionFactory factory = new ConnectionFactory(); // factory.HostName = vpnServerIp; // factory.AutomaticRecoveryEnabled = true; // //factory.UserName = ""; // //factory.Password = ""; // factory.VirtualHost = "/"; // factory.Port = 5672; // // //factory.AuthMechanisms = new IAuthMechanismFactory[] { new ExternalMechanismFactory() }; // // System.Diagnostics.Debug.WriteLine("2 "); // // X509Certificate2Collection certCollection = new X509Certificate2Collection(); // X509Certificate2 certificate = new X509Certificate2("/etc/rabbitmq/testca/ca_certificate.pem"); // certCollection.Add(certificate); // // factory.Ssl.Certs = certCollection; // factory.Ssl.Enabled = true; // factory.Ssl.ServerName = "Webserver-FrontAndBack"; // factory.Ssl.Version = SslProtocols.Tls11 | SslProtocols.Tls12 | SslProtocols.Tls13; // factory.Ssl.AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateChainErrors; // factory.Ssl.CertificateValidationCallback = (sender, certificate, chain, sslPolicyErrors) => // { // if (sslPolicyErrors == SslPolicyErrors.RemoteCertificateChainErrors) // { // // Log or debug information about the chain // foreach (var chainElement in chain.ChainElements) // { // Console.WriteLine($"Element Subject: {chainElement.Certificate.Subject}"); // Console.WriteLine($"Element Issuer: {chainElement.Certificate.Issuer}"); // // Add more details as needed // } // } // // // Your custom validation logic // return sslPolicyErrors == SslPolicyErrors.None; // }; Factory = new ConnectionFactory { HostName = vpnServerIp, Port = 5672, VirtualHost = "/", UserName = "consumer", Password = "faceaddb5005815199f8366d3d15ff8a", //AuthMechanisms = new IAuthMechanismFactory[] { new ExternalMechanismFactory() }, // Ssl = new SslOption // { // Enabled = true, // ServerName = "Webserver-FrontAndBack", // //Roots = new X509Certificate2Collection { caCertificate }, // //CertPath = "/etc/rabbitmq/testca/ca_certificate.pem", // CertPath = "/etc/rabbitmq/client/client_certificate.pem", // // // // CertificateValidationCallback = (sender, certificate, chain, sslPolicyErrors) => // // { // // //X509Certificate2 clientCertificate = new X509Certificate2("/etc/rabbitmq/client/client_certificate.pem"); // // // // //X509Certificate2 caCertificate = new X509Certificate2("/etc/openvpn/client/ca-certificate"); // // X509Certificate2 caCertificate = new X509Certificate2("/etc/rabbitmq/testca/ca_certificate.pem"); // // // // // // // // Console.WriteLine(certificate.Subject); // // Console.WriteLine("---------------------------------"); // // //Console.WriteLine(certificate.GetPublicKey()); // // // Your custom validation logic using the CA certificate // // // Return true if the certificate is valid, false otherwise // // return certificate.Issuer == caCertificate.Subject; // // } // CertificateValidationCallback = (sender, certificate, chain, sslPolicyErrors) => // { // X509Certificate2 caCertificate = new X509Certificate2("/etc/rabbitmq/testca/ca_certificate.pem"); // // // Add the CA certificate to the chain policy's extra store // chain.ChainPolicy.ExtraStore.Add(caCertificate); // // // Check if the chain builds successfully // bool chainIsValid = chain.Build((X509Certificate2)certificate); // // if (!chainIsValid) // { // Console.WriteLine("Certificate chain validation failed:"); // // // Print details of each chain status // foreach (var chainStatus in chain.ChainStatus) // { // Console.WriteLine($"Chain Status: {chainStatus.Status}"); // Console.WriteLine($"Chain Status Information: {chainStatus.StatusInformation}"); // // Add more details as needed // // Check if the failure is due to UntrustedRoot // if (chainStatus.Status == X509ChainStatusFlags.UntrustedRoot) // { // // Manually check if the root certificate is the expected one // if (certificate.Issuer == caCertificate.Subject) // { // Console.WriteLine("Manually trusting the root certificate."); // chainIsValid = true; // } // } // } // // // } // // // Additional validation logic if needed // Console.WriteLine($"Certificate Subject: {certificate.Subject}"+chainIsValid); // // // Return true if the certificate is valid // return chainIsValid; // } //} }; Connection = Factory.CreateConnection(); Channel = Connection.CreateModel(); Console.WriteLine("Middleware subscribed to RabbitMQ queue, ready for receiving messages"); Channel.QueueDeclare(queue: "statusQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); } public static async Task StartRabbitMqConsumer() { var consumer = new EventingBasicConsumer(Channel); consumer.Received += (_, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); StatusMessage? receivedStatusMessage = JsonSerializer.Deserialize(message); lock (WebsocketManager.InstallationConnections) { //Consumer received a message if (receivedStatusMessage != null) { Console.WriteLine("----------------------------------------------"); Console.WriteLine("Received a message from installation: " + receivedStatusMessage.InstallationId + " and status is: " + receivedStatusMessage.Status); var installationId = receivedStatusMessage.InstallationId; //This is a heartbit message, just update the timestamp for this installation. //There is no need to notify the corresponding front-ends. if (receivedStatusMessage.Type == MessageType.Heartbit) { Console.WriteLine("This is a heartbit message from installation: " + installationId); } else { //Traverse the Warnings list, and store each of them to the database if (receivedStatusMessage.Warnings != null) { foreach (var warning in receivedStatusMessage.Warnings) { Warning newWarning = new Warning { InstallationId = receivedStatusMessage.InstallationId, Description = warning.Description, Date = warning.Date, Time = warning.Time, DeviceCreatedTheMessage = warning.CreatedBy, Seen = false }; //Create a new warning and add it to the database Db.HandleWarning(newWarning, receivedStatusMessage.InstallationId); } } //Traverse the Alarm list, and store each of them to the database if (receivedStatusMessage.Alarms != null) { Console.WriteLine("Add an alarm for installation "+receivedStatusMessage.InstallationId); foreach (var alarm in receivedStatusMessage.Alarms) { Error newError = new Error { InstallationId = receivedStatusMessage.InstallationId, Description = alarm.Description, Date = alarm.Date, Time = alarm.Time, DeviceCreatedTheMessage = alarm.CreatedBy, Seen = false }; //Create a new error and add it to the database Db.HandleError(newError, receivedStatusMessage.InstallationId); } } } var prevStatus = 0; //This installation id does not exist in our data structure, add it. if (!WebsocketManager.InstallationConnections.ContainsKey(installationId)) { prevStatus = -2; Console.WriteLine("Create new empty list for installation: " + installationId); WebsocketManager.InstallationConnections[installationId] = new InstallationInfo { Status = receivedStatusMessage.Status, Timestamp = DateTime.Now }; } else { prevStatus = WebsocketManager.InstallationConnections[installationId].Status; WebsocketManager.InstallationConnections[installationId].Status = receivedStatusMessage.Status; WebsocketManager.InstallationConnections[installationId].Timestamp = DateTime.Now; } //Console.WriteLine("----------------------------------------------"); //Update all the connected front-ends regarding this installation if(prevStatus != receivedStatusMessage.Status && WebsocketManager.InstallationConnections[installationId].Connections.Count > 0) { WebsocketManager.InformWebsocketsForInstallation(installationId); } } } }; Channel.BasicConsume(queue: "statusQueue", autoAck: true, consumer: consumer); } public static void InformInstallationsToSubscribeToRabbitMq() { var installationIps = Db.Installations.Select(inst => inst.VpnIp).ToList(); Console.WriteLine("Count is "+installationIps.Count); var maxRetransmissions = 2; UdpClient udpClient = new UdpClient(); udpClient.Client.ReceiveTimeout = 2000; int port = 9000; //Send a message to each installation and tell it to subscribe to the queue using (udpClient) { for (int i = 0; i < installationIps.Count; i++) { if(installationIps[i]==""){continue;} Console.WriteLine("-----------------------------------------------------------"); Console.WriteLine("Trying to reach installation with IP: " + installationIps[i]); //Try at most MAX_RETRANSMISSIONS times to reach an installation. for (int j = 0; j < maxRetransmissions; j++) { string message = "This is a message from RabbitMQ server, you can subscribe to the RabbitMQ queue"; byte[] data = Encoding.UTF8.GetBytes(message); udpClient.Send(data, data.Length, installationIps[i], port); Console.WriteLine($"Sent UDP message to {installationIps[i]}:{port}: {message}"); IPEndPoint remoteEndPoint = new IPEndPoint(IPAddress.Parse(installationIps[i]), port); try { byte[] replyData = udpClient.Receive(ref remoteEndPoint); string replyMessage = Encoding.UTF8.GetString(replyData); Console.WriteLine("Received " + replyMessage + " from installation " + installationIps[i]); break; } catch (SocketException ex) { if (ex.SocketErrorCode == SocketError.TimedOut){Console.WriteLine("Timed out waiting for a response. Retry...");} else{Console.WriteLine("Error: " + ex.Message);} } } } } Console.WriteLine("Start RabbitMQ Consumer"); } }