From 15638f639cb0271ac6daa01ea62e65a9abb7efb6 Mon Sep 17 00:00:00 2001 From: Yinyin Liu Date: Thu, 4 Jul 2024 10:46:28 +0200 Subject: [PATCH] add aggregator.py to Cerbo and Venus --- .../dbus-fzsonick-48tl/aggregator.py | 284 +++++++++++++++++ .../dbus-fzsonick-48tl/start.sh | 10 +- .../update_all_cerbo_installations.sh | 8 + .../dbus-fzsonick-48tl/aggregator.py | 296 ++++++++++++++++++ .../dbus-fzsonick-48tl/service/run | 3 +- .../dbus-fzsonick-48tl/start.sh | 9 + .../update_all_venus_installations.sh | 22 +- 7 files changed, 626 insertions(+), 6 deletions(-) create mode 100755 firmware/Cerbo_Release/CerboReleaseFiles/dbus-fzsonick-48tl/aggregator.py create mode 100755 firmware/Venus_Release/VenusReleaseFiles/dbus-fzsonick-48tl/aggregator.py diff --git a/firmware/Cerbo_Release/CerboReleaseFiles/dbus-fzsonick-48tl/aggregator.py b/firmware/Cerbo_Release/CerboReleaseFiles/dbus-fzsonick-48tl/aggregator.py new file mode 100755 index 000000000..ae8365650 --- /dev/null +++ b/firmware/Cerbo_Release/CerboReleaseFiles/dbus-fzsonick-48tl/aggregator.py @@ -0,0 +1,284 @@ +#!/usr/bin/python3 -u +# coding=utf-8 + +import os +import csv +import time +from datetime import datetime, timedelta +import requests +import zipfile +import base64 +import io +import hmac +import hashlib +from threading import Thread, Event +import config as cfg + +CSV_DIR = "/data/csv_files/" +HOURLY_DIR = "/data/csv_files/HourlyData" +DAILY_DIR = "/data/csv_files/DailyData" + +# S3 Credentials +print("Start with the correct credentials") + +S3BUCKET = cfg.S3BUCKET +S3KEY = cfg.S3KEY +S3SECRET = cfg.S3SECRET + +stop_event = Event() + +def datetime_to_timestamp(dt): + return time.mktime(dt.timetuple()) + +class AggregatedData: + def __init__(self, min_soc, max_soc, discharging_battery_power, charging_battery_power, heating_power): + self.min_soc = min_soc + self.max_soc = max_soc + self.discharging_battery_power = discharging_battery_power + self.charging_battery_power = charging_battery_power + self.heating_power = heating_power + + def to_csv(self): + return ("/MinSoc;{};\n" + "/MaxSoc;{};\n" + "/DischargingBatteryPower;{};\n" + "/ChargingBatteryPower;{};\n" + "/HeatingPower;{};").format( + self.min_soc, self.max_soc, self.discharging_battery_power, self.charging_battery_power, self.heating_power) + + def save(self, directory): + timestamp = int(time.time()) + if not os.path.exists(directory): + os.makedirs(directory) + csv_path = os.path.join(directory, "{}.csv".format(timestamp)) + with open(csv_path, 'w') as file: + file.write(self.to_csv()) + print("Saved file to:", csv_path) + print("File content:\n", self.to_csv()) + + @staticmethod + def delete_data(directory): + if not os.path.exists(directory): + return + for file in os.listdir(directory): + file_path = os.path.join(directory, file) + if os.path.isfile(file_path): + os.remove(file_path) + print("Deleted file: {}".format(file_path)) + + def push_to_s3(self, s3_config): + csv_data = self.to_csv() + compressed_csv = self.compress_csv_data(csv_data) + s3_path = datetime.now().strftime("%Y-%m-%d") + ".csv" + response = s3_config.create_put_request(s3_path, compressed_csv) + if response.status_code != 200: + print("ERROR: PUT", response.text) + return False + print("Successfully uploaded to S3:", s3_path) + return True + + @staticmethod + def compress_csv_data(csv_data): + memory_stream = io.BytesIO() + with zipfile.ZipFile(memory_stream, 'w', zipfile.ZIP_DEFLATED) as archive: + archive.writestr("data.csv", csv_data.encode('utf-8')) + compressed_bytes = memory_stream.getvalue() + return base64.b64encode(compressed_bytes).decode('utf-8') + + +class S3config: + def __init__(self, bucket, region, provider, key, secret): + self.bucket = bucket + self.region = region + self.provider = provider + self.key = key + self.secret = secret + self.content_type = "application/base64; charset=utf-8" + + @property + def host(self): + return "{}.{}.{}".format(self.bucket, self.region, self.provider) + + @property + def url(self): + return "https://{}".format(self.host) + + def create_put_request(self, s3_path, data): + headers = self._create_request_headers("PUT", s3_path) + url = "{}/{}".format(self.url, s3_path) + response = requests.put(url, headers=headers, data=data) + return response + + def _create_request_headers(self, method, s3_path): + date = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT') + auth = self._create_authorization(method, s3_path, date) + return { + "Host": self.host, + "Date": date, + "Authorization": auth, + "Content-Type": self.content_type + } + + def _create_authorization(self, method, s3_path, date): + payload = "{}\n\n{}\n{}\n/{}/{}".format(method, self.content_type, date, self.bucket, s3_path) + signature = base64.b64encode( + hmac.new(self.secret.encode(), payload.encode(), hashlib.sha1).digest() + ).decode() + return "AWS {}:{}".format(self.key, signature) + + +class Aggregator: + @staticmethod + def hourly_data_aggregation_manager(): + try: + current_time = datetime.now() + next_rounded_hour = (current_time + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) + time_until_next_hour = (next_rounded_hour - current_time).total_seconds() + + print("Waiting for {} seconds until the next rounded hour.".format(time_until_next_hour)) + if stop_event.wait(time_until_next_hour): + return + + while not stop_event.is_set(): + try: + current_time = datetime.now() + after_timestamp = datetime_to_timestamp(current_time - timedelta(hours=1)) + before_timestamp = datetime_to_timestamp(current_time) + aggregated_data = Aggregator.create_hourly_data(CSV_DIR, after_timestamp, before_timestamp) + print("Saving in hourly directory") + aggregated_data.save(HOURLY_DIR) + except Exception as e: + print("An error occurred during hourly data aggregation:", str(e)) + if stop_event.wait(3600): # Sleep for 1 hour + return + except KeyboardInterrupt: + print("Hourly data aggregation manager stopped.") + + @staticmethod + def daily_data_aggregation_manager(): + try: + current_time = datetime.now() + next_rounded_day = (current_time + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0) + time_until_next_day = (next_rounded_day - current_time).total_seconds() + + print("Waiting for {} seconds until the next rounded day.".format(time_until_next_day)) + if stop_event.wait(time_until_next_day): + return + + while not stop_event.is_set(): + try: + current_time = datetime.now() + after_timestamp = datetime_to_timestamp(current_time - timedelta(days=1)) + before_timestamp = datetime_to_timestamp(current_time) + aggregated_data = Aggregator.create_daily_data(HOURLY_DIR, after_timestamp, before_timestamp) + print("Saving in daily directory") + aggregated_data.save(DAILY_DIR) + s3_config = S3config(S3BUCKET, "sos-ch-dk-2", "exo.io", S3KEY, S3SECRET) + if aggregated_data.push_to_s3(s3_config): + print("Deleting from hourly directory") + AggregatedData.delete_data(HOURLY_DIR) + print("Deleting from daily directory") + AggregatedData.delete_data(DAILY_DIR) + except Exception as e: + print("An error occurred during daily data aggregation:", str(e)) + if stop_event.wait(86400): # Sleep for 1 day + return + except KeyboardInterrupt: + print("Daily data aggregation manager stopped.") + + @staticmethod + def create_hourly_data(directory, after_timestamp, before_timestamp): + node_data = {} + + for filename in os.listdir(directory): + file_path = os.path.join(directory, filename) + if os.path.isfile(file_path) and Aggregator.is_file_within_time_range(filename, after_timestamp, before_timestamp): + with open(file_path, 'r') as file: + reader = csv.reader(file, delimiter=';') + for row in reader: + if len(row) >= 2: + variable_name, value = row[0], row[1] + try: + value = float(value) + node_number = Aggregator.extract_node_number(variable_name) + if node_number not in node_data: + node_data[node_number] = {'soc': [], 'discharge': [], 'charge': [], 'heating': []} + if "Soc" in variable_name: + node_data[node_number]['soc'].append(value) + elif "/Dc/Power" in variable_name or "/DischargingBatteryPower" in variable_name or "/ChargingBatteryPower" in variable_name: + if value < 0: + node_data[node_number]['discharge'].append(value) + else: + node_data[node_number]['charge'].append(value) + elif "/HeatingPower" in variable_name: + node_data[node_number]['heating'].append(value) + except ValueError: + pass + + if len(node_data) == 1: + # Directly use the values for a single node + for node_number, data in node_data.items(): + min_soc = data['soc'][0] if data['soc'] else 0.0 + max_soc = data['soc'][0] if data['soc'] else 0.0 + avg_discharging_power = data['discharge'][0] if data['discharge'] else 0.0 + avg_charging_power = data['charge'][0] if data['charge'] else 0.0 + avg_heating_power = data['heating'][0] if data['heating'] else 0.0 + return AggregatedData(min_soc, max_soc, avg_discharging_power, avg_charging_power, avg_heating_power) + else: + min_soc = min([min(data['soc']) for data in node_data.values() if data['soc']]) if node_data else 0.0 + max_soc = max([max(data['soc']) for data in node_data.values() if data['soc']]) if node_data else 0.0 + total_discharging_power = sum([sum(data['discharge']) for data in node_data.values() if data['discharge']]) + total_charging_power = sum([sum(data['charge']) for data in node_data.values() if data['charge']]) + total_heating_power = sum([sum(data['heating']) for data in node_data.values() if data['heating']]) + count_discharging_power = sum([len(data['discharge']) for data in node_data.values() if data['discharge']]) + count_charging_power = sum([len(data['charge']) for data in node_data.values() if data['charge']]) + count_heating_power = sum([len(data['heating']) for data in node_data.values() if data['heating']]) + avg_discharging_power = total_discharging_power / count_discharging_power if count_discharging_power else 0.0 + avg_charging_power = total_charging_power / count_charging_power if count_charging_power else 0.0 + avg_heating_power = total_heating_power / count_heating_power if count_heating_power else 0.0 + return AggregatedData(min_soc, max_soc, avg_discharging_power, avg_charging_power, avg_heating_power) + + @staticmethod + def create_daily_data(directory, after_timestamp, before_timestamp): + return Aggregator.create_hourly_data(directory, after_timestamp, before_timestamp) + + @staticmethod + def is_file_within_time_range(filename, start_time, end_time): + try: + file_timestamp = float(os.path.splitext(filename)[0]) + return start_time <= file_timestamp < end_time + except ValueError: + return False + + @staticmethod + def extract_node_number(variable_name): + parts = variable_name.split('/') + try: + return int(parts[3]) + except (IndexError, ValueError): + return 0 + +if __name__ == "__main__": + print("Aggregator has started AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA") + + def run_hourly_manager(): + Aggregator.hourly_data_aggregation_manager() + + def run_daily_manager(): + Aggregator.daily_data_aggregation_manager() + + hourly_thread = Thread(target=run_hourly_manager) + daily_thread = Thread(target=run_daily_manager) + + hourly_thread.start() + daily_thread.start() + + try: + hourly_thread.join() + daily_thread.join() + except KeyboardInterrupt: + print("Program interrupted. Stopping threads...") + stop_event.set() + hourly_thread.join() + daily_thread.join() + print("Program stopped.") diff --git a/firmware/Cerbo_Release/CerboReleaseFiles/dbus-fzsonick-48tl/start.sh b/firmware/Cerbo_Release/CerboReleaseFiles/dbus-fzsonick-48tl/start.sh index d818ffc57..e47d76801 100755 --- a/firmware/Cerbo_Release/CerboReleaseFiles/dbus-fzsonick-48tl/start.sh +++ b/firmware/Cerbo_Release/CerboReleaseFiles/dbus-fzsonick-48tl/start.sh @@ -2,6 +2,14 @@ . /opt/victronenergy/serial-starter/run-service.sh -app=/opt/victronenergy/dbus-fzsonick-48tl/dbus-fzsonick-48tl.py +app="/opt/victronenergy/dbus-fzsonick-48tl/dbus-fzsonick-48tl.py" args="$tty" + +# Start aggregator.py in the background +/opt/victronenergy/dbus-fzsonick-48tl/aggregator.py & + +# Start dbus-fzsonick-48tl.py using the start command start $args + +# Wait for all background processes to finish +wait diff --git a/firmware/Cerbo_Release/CerboReleaseFiles/dbus-fzsonick-48tl/update_all_cerbo_installations.sh b/firmware/Cerbo_Release/CerboReleaseFiles/dbus-fzsonick-48tl/update_all_cerbo_installations.sh index 054449aae..b308cec6d 100755 --- a/firmware/Cerbo_Release/CerboReleaseFiles/dbus-fzsonick-48tl/update_all_cerbo_installations.sh +++ b/firmware/Cerbo_Release/CerboReleaseFiles/dbus-fzsonick-48tl/update_all_cerbo_installations.sh @@ -17,6 +17,10 @@ for ip_address in "${ip_addresses_usb0[@]}"; do scp "dbus-fzsonick-48tl.py" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/" scp "dbus-fzsonick-48tl.py" "root@"$ip_address":/data/dbus-fzsonick-48tl/" + scp "aggregator.py" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/" + scp "aggregator.py" "root@"$ip_address":/data/dbus-fzsonick-48tl/" + scp "start.sh" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/" + scp "start.sh" "root@"$ip_address":/data/dbus-fzsonick-48tl/" ssh "$username"@"$ip_address" "cd /opt/victronenergy/serial-starter && echo '$root_password' | ./start-tty.sh ttyUSB0" @@ -30,6 +34,10 @@ for ip_address in "${ip_addresses_usb1[@]}"; do scp "dbus-fzsonick-48tl.py" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/" scp "dbus-fzsonick-48tl.py" "root@"$ip_address":/data/dbus-fzsonick-48tl/" + scp "aggregator.py" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/" + scp "aggregator.py" "root@"$ip_address":/data/dbus-fzsonick-48tl/" + scp "start.sh" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/" + scp "start.sh" "root@"$ip_address":/data/dbus-fzsonick-48tl/" ssh "$username"@"$ip_address" "cd /opt/victronenergy/serial-starter && echo '$root_password' | ./start-tty.sh ttyUSB1" diff --git a/firmware/Venus_Release/VenusReleaseFiles/dbus-fzsonick-48tl/aggregator.py b/firmware/Venus_Release/VenusReleaseFiles/dbus-fzsonick-48tl/aggregator.py new file mode 100755 index 000000000..388b0b116 --- /dev/null +++ b/firmware/Venus_Release/VenusReleaseFiles/dbus-fzsonick-48tl/aggregator.py @@ -0,0 +1,296 @@ +#!/usr/bin/python2 -u +# coding=utf-8 + +import os +import csv +import time +from datetime import datetime, timedelta +import requests +import zipfile +import base64 +import io +import hmac +import hashlib +from threading import Thread, Event +import config as cfg + +CSV_DIR = "/data/csv_files/" +HOURLY_DIR = "/data/csv_files/HourlyData" +DAILY_DIR = "/data/csv_files/DailyData" + +# S3 Credentials +#S3BUCKET = "6-c0436b6a-d276-4cd8-9c44-1eae86cf5d0e" +#S3KEY = "EXO2a6cd837ae9279271b1710af" +#S3SECRET = "IAK2wc7mL0HWD9LHFeiv1nl5jvousOLLAHKCQwmwniI" + + +print("start with the correct credentials") + +S3BUCKET = cfg.S3BUCKET +S3KEY = cfg.S3KEY +S3SECRET = cfg.S3SECRET + +stop_event = Event() + +def datetime_to_timestamp(dt): + return time.mktime(dt.timetuple()) + +class AggregatedData: + def __init__(self, min_soc, max_soc, discharging_battery_power, charging_battery_power, heating_power): + self.min_soc = min_soc + self.max_soc = max_soc + self.discharging_battery_power = discharging_battery_power + self.charging_battery_power = charging_battery_power + self.heating_power = heating_power + + def to_csv(self): + return ("/MinSoc;{};\n" + "/MaxSoc;{};\n" + "/DischargingBatteryPower;{};\n" + "/ChargingBatteryPower;{};\n" + "/HeatingPower;{};").format( + self.min_soc, self.max_soc, self.discharging_battery_power, self.charging_battery_power, self.heating_power) + + def save(self, directory): + timestamp = int(time.time()) + if not os.path.exists(directory): + os.makedirs(directory) + csv_path = os.path.join(directory, "{}.csv".format(timestamp)) + with open(csv_path, 'w') as file: + file.write(self.to_csv()) + print("Saved file to:", csv_path) + print("File content:\n", self.to_csv()) + + @staticmethod + def delete_data(directory): + if not os.path.exists(directory): + return + for file in os.listdir(directory): + file_path = os.path.join(directory, file) + if os.path.isfile(file_path): + os.remove(file_path) + print("Deleted file: {}".format(file_path)) + + def push_to_s3(self, s3_config): + csv_data = self.to_csv() + compressed_csv = self.compress_csv_data(csv_data) + s3_path = datetime.now().strftime("%Y-%m-%d") + ".csv" + response = s3_config.create_put_request(s3_path, compressed_csv) + if response.status_code != 200: + print("ERROR: PUT", response.text) + return False + print("Successfully uploaded to S3:", s3_path) + return True + + @staticmethod + def compress_csv_data(csv_data): + memory_stream = io.BytesIO() + with zipfile.ZipFile(memory_stream, 'w', zipfile.ZIP_DEFLATED) as archive: + archive.writestr("data.csv", csv_data.encode('utf-8')) + compressed_bytes = memory_stream.getvalue() + return base64.b64encode(compressed_bytes).decode('utf-8') + + +class S3config: + def __init__(self, bucket, region, provider, key, secret): + self.bucket = bucket + self.region = region + self.provider = provider + self.key = key + self.secret = secret + self.content_type = "application/base64; charset=utf-8" + + @property + def host(self): + return "{}.{}.{}".format(self.bucket, self.region, self.provider) + + @property + def url(self): + return "https://{}".format(self.host) + + def create_put_request(self, s3_path, data): + headers = self._create_request_headers("PUT", s3_path) + url = "{}/{}".format(self.url, s3_path) + response = requests.put(url, headers=headers, data=data) + return response + + def _create_request_headers(self, method, s3_path): + date = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT') + auth = self._create_authorization(method, s3_path, date) + return { + "Host": self.host, + "Date": date, + "Authorization": auth, + "Content-Type": self.content_type + } + + def _create_authorization(self, method, s3_path, date): + payload = "{}\n\n{}\n{}\n/{}/{}".format(method, self.content_type, date, self.bucket, s3_path) + signature = base64.b64encode( + hmac.new(self.secret.encode(), payload.encode(), hashlib.sha1).digest() + ).decode() + return "AWS {}:{}".format(self.key, signature) + + +class Aggregator: + @staticmethod + def hourly_data_aggregation_manager(): + try: + current_time = datetime.now() + #next_rounded_minute = (current_time + timedelta(minutes=1)).replace(second=0, microsecond=0) + next_rounded_hour = (current_time + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) + #time_until_next_minute = (next_rounded_minute - current_time).total_seconds() + time_until_next_hour = (next_rounded_hour - current_time).total_seconds() + + #print("Waiting for {} seconds until the next rounded minute.".format(time_until_next_minute)) + print("Waiting for {} seconds until the next rounded hour.".format(time_until_next_hour)) + if stop_event.wait(time_until_next_hour): + return + + while not stop_event.is_set(): + try: + current_time = datetime.now() + after_timestamp = datetime_to_timestamp(current_time - timedelta(hours=1)) + before_timestamp = datetime_to_timestamp(current_time) + aggregated_data = Aggregator.create_hourly_data(CSV_DIR, after_timestamp, before_timestamp) + print("save in hourly dir") + aggregated_data.save(HOURLY_DIR) + except Exception as e: + print("An error occurred during hourly data aggregation:", str(e)) + if stop_event.wait(3600): # Sleep for 1 hour + return + except KeyboardInterrupt: + print("Hourly data aggregation manager stopped.") + + @staticmethod + def daily_data_aggregation_manager(): + try: + current_time = datetime.now() + #next_rounded_five_minutes = (current_time + timedelta(minutes=5)).replace(second=0, microsecond=0) + next_rounded_day = (current_time + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0) + #time_until_next_five_minutes = (next_rounded_five_minutes - current_time).total_seconds() + time_until_next_day = (next_rounded_day - current_time).total_seconds() + + #print("Waiting for {} seconds until the next rounded 5 minutes.".format(time_until_next_five_minutes)) + print("Waiting for {} seconds until the next rounded day.".format(time_until_next_day)) + if stop_event.wait(time_until_next_day): + return + + while not stop_event.is_set(): + try: + current_time = datetime.now() + after_timestamp = datetime_to_timestamp(current_time - timedelta(days=1)) + before_timestamp = datetime_to_timestamp(current_time) + aggregated_data = Aggregator.create_daily_data(HOURLY_DIR, after_timestamp, before_timestamp) + print("save in daily dir") + aggregated_data.save(DAILY_DIR) + s3_config = S3config(S3BUCKET, "sos-ch-dk-2", "exo.io", S3KEY, S3SECRET) + if aggregated_data.push_to_s3(s3_config): + print("delete from hourly dir") + AggregatedData.delete_data(HOURLY_DIR) + print("delete from daily dir") + AggregatedData.delete_data(DAILY_DIR) + except Exception as e: + print("An error occurred during daily data aggregation:", str(e)) + if stop_event.wait(86400): # Sleep for 1 day + return + except KeyboardInterrupt: + print("Daily data aggregation manager stopped.") + + @staticmethod + def create_hourly_data(directory, after_timestamp, before_timestamp): + node_data = {} + + for filename in os.listdir(directory): + file_path = os.path.join(directory, filename) + if os.path.isfile(file_path) and Aggregator.is_file_within_time_range(filename, after_timestamp, before_timestamp): + with open(file_path, 'r') as file: + reader = csv.reader(file, delimiter=';') + for row in reader: + if len(row) >= 2: + variable_name, value = row[0], row[1] + try: + value = float(value) + node_number = Aggregator.extract_node_number(variable_name) + if node_number not in node_data: + node_data[node_number] = {'soc': [], 'discharge': [], 'charge': [], 'heating': []} + if "Soc" in variable_name: + node_data[node_number]['soc'].append(value) + elif "/Dc/Power" in variable_name or "/DischargingBatteryPower" in variable_name or "/ChargingBatteryPower" in variable_name: + if value < 0: + node_data[node_number]['discharge'].append(value) + else: + node_data[node_number]['charge'].append(value) + elif "/HeatingPower" in variable_name: + node_data[node_number]['heating'].append(value) + except ValueError: + pass + + if len(node_data) == 1: + # Directly use the values for a single node + for node_number, data in node_data.items(): + min_soc = data['soc'][0] if data['soc'] else 0.0 + max_soc = data['soc'][0] if data['soc'] else 0.0 + avg_discharging_power = data['discharge'][0] if data['discharge'] else 0.0 + avg_charging_power = data['charge'][0] if data['charge'] else 0.0 + avg_heating_power = data['heating'][0] if data['heating'] else 0.0 + return AggregatedData(min_soc, max_soc, avg_discharging_power, avg_charging_power, avg_heating_power) + else: + min_soc = min([min(data['soc']) for data in node_data.values() if data['soc']]) if node_data else 0.0 + max_soc = max([max(data['soc']) for data in node_data.values() if data['soc']]) if node_data else 0.0 + total_discharging_power = sum([sum(data['discharge']) for data in node_data.values() if data['discharge']]) + total_charging_power = sum([sum(data['charge']) for data in node_data.values() if data['charge']]) + total_heating_power = sum([sum(data['heating']) for data in node_data.values() if data['heating']]) + count_discharging_power = sum([len(data['discharge']) for data in node_data.values() if data['discharge']]) + count_charging_power = sum([len(data['charge']) for data in node_data.values() if data['charge']]) + count_heating_power = sum([len(data['heating']) for data in node_data.values() if data['heating']]) + avg_discharging_power = total_discharging_power / count_discharging_power if count_discharging_power else 0.0 + avg_charging_power = total_charging_power / count_charging_power if count_charging_power else 0.0 + avg_heating_power = total_heating_power / count_heating_power if count_heating_power else 0.0 + return AggregatedData(min_soc, max_soc, avg_discharging_power, avg_charging_power, avg_heating_power) + + @staticmethod + def create_daily_data(directory, after_timestamp, before_timestamp): + return Aggregator.create_hourly_data(directory, after_timestamp, before_timestamp) + + @staticmethod + def is_file_within_time_range(filename, start_time, end_time): + try: + file_timestamp = float(os.path.splitext(filename)[0]) + return start_time <= file_timestamp < end_time + except ValueError: + return False + + @staticmethod + def extract_node_number(variable_name): + parts = variable_name.split('/') + try: + return int(parts[3]) + except (IndexError, ValueError): + return 0 + +if __name__ == "__main__": + print("aggregator has started aAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA") + #exit(0) + def run_hourly_manager(): + Aggregator.hourly_data_aggregation_manager() + + def run_daily_manager(): + Aggregator.daily_data_aggregation_manager() + + hourly_thread = Thread(target=run_hourly_manager) + daily_thread = Thread(target=run_daily_manager) + + hourly_thread.start() + daily_thread.start() + + try: + hourly_thread.join() + daily_thread.join() + except KeyboardInterrupt: + print("Program interrupted. Stopping threads...") + stop_event.set() + hourly_thread.join() + daily_thread.join() + print("Program stopped.") + diff --git a/firmware/Venus_Release/VenusReleaseFiles/dbus-fzsonick-48tl/service/run b/firmware/Venus_Release/VenusReleaseFiles/dbus-fzsonick-48tl/service/run index 7f5301435..4aa0c7684 100755 --- a/firmware/Venus_Release/VenusReleaseFiles/dbus-fzsonick-48tl/service/run +++ b/firmware/Venus_Release/VenusReleaseFiles/dbus-fzsonick-48tl/service/run @@ -1,4 +1,5 @@ #!/bin/sh exec 2>&1 -exec softlimit -d 100000000 -s 1000000 -a 100000000 /opt/innovenergy/dbus-fzsonick-48tl/start.sh TTY +softlimit -d 100000000 -s 1000000 -a 100000000 /opt/innovenergy/dbus-fzsonick-48tl/start.sh + diff --git a/firmware/Venus_Release/VenusReleaseFiles/dbus-fzsonick-48tl/start.sh b/firmware/Venus_Release/VenusReleaseFiles/dbus-fzsonick-48tl/start.sh index 83860d3e4..92522f8f7 100755 --- a/firmware/Venus_Release/VenusReleaseFiles/dbus-fzsonick-48tl/start.sh +++ b/firmware/Venus_Release/VenusReleaseFiles/dbus-fzsonick-48tl/start.sh @@ -4,4 +4,13 @@ app="/opt/innovenergy/dbus-fzsonick-48tl/dbus-fzsonick-48tl.py" args="$tty" + +# Start aggregator.py in the background +/opt/innovenergy/dbus-fzsonick-48tl/aggregator.py & + +# Start dbus-fzsonick-48tl.py using the start command start $args + +# Wait for all background processes to finish +wait + diff --git a/firmware/Venus_Release/VenusReleaseFiles/dbus-fzsonick-48tl/update_all_venus_installations.sh b/firmware/Venus_Release/VenusReleaseFiles/dbus-fzsonick-48tl/update_all_venus_installations.sh index d28de40ae..2c13b49dd 100755 --- a/firmware/Venus_Release/VenusReleaseFiles/dbus-fzsonick-48tl/update_all_venus_installations.sh +++ b/firmware/Venus_Release/VenusReleaseFiles/dbus-fzsonick-48tl/update_all_venus_installations.sh @@ -7,10 +7,8 @@ set -e echo -e "\n============================ Deploy ============================\n" -#ip_addresses_usb0=("10.2.0.155" "10.2.1.97" "10.2.0.104" "10.2.1.159" "10.2.0.224" "10.2.0.209" "10.2.2.36") -ip_addresses_usb0=("10.2.2.36") - -#ip_addresses_usb1=("10.2.1.35") +ip_addresses_usb0=("10.2.0.155" "10.2.1.97" "10.2.0.104" "10.2.1.159" "10.2.0.224" "10.2.0.209" "10.2.0.227") +ip_addresses_usb1=("10.2.1.35") for ip_address in "${ip_addresses_usb0[@]}"; do @@ -19,6 +17,14 @@ for ip_address in "${ip_addresses_usb0[@]}"; do scp "dbus-fzsonick-48tl.py" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/" scp "dbus-fzsonick-48tl.py" "root@"$ip_address":/data/dbus-fzsonick-48tl/" + scp "aggregator.py" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/" + scp "aggregator.py" "root@"$ip_address":/data/dbus-fzsonick-48tl/" + scp "start.sh" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/" + scp "start.sh" "root@"$ip_address":/data/dbus-fzsonick-48tl/" + scp "signals.py" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/" + scp "signals.py" "root@"$ip_address":/data/dbus-fzsonick-48tl/" + scp "service/run" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/service" + scp "service/run" "root@"$ip_address":/data/dbus-fzsonick-48tl/service" ssh "$username"@"$ip_address" "cd /opt/victronenergy/serial-starter && echo '$root_password' | ./start-tty.sh ttyUSB0" @@ -32,6 +38,14 @@ for ip_address in "${ip_addresses_usb1[@]}"; do scp "dbus-fzsonick-48tl.py" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/" scp "dbus-fzsonick-48tl.py" "root@"$ip_address":/data/dbus-fzsonick-48tl/" + scp "aggregator.py" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/" + scp "aggregator.py" "root@"$ip_address":/data/dbus-fzsonick-48tl/" + scp "start.sh" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/" + scp "start.sh" "root@"$ip_address":/data/dbus-fzsonick-48tl/" + scp "signals.py" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/" + scp "signals.py" "root@"$ip_address":/data/dbus-fzsonick-48tl/" + scp "service/run" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/service" + scp "service/run" "root@"$ip_address":/data/dbus-fzsonick-48tl/service" ssh "$username"@"$ip_address" "cd /opt/victronenergy/serial-starter && echo '$root_password' | ./start-tty.sh ttyUSB1"