#!/usr/bin/env python3 """ Modbus TCP client for KACO/Inesco SodistoreGrid battery storage system. Reads system, battery, grid meter, and inverter registers. Writes to inverter control registers to test server handling. Register Map based on: inesco_Modbus_Register_Map_SodistoreGird.pdf - Base address = 1 - Combines 32-bit low/high registers (low word first) - Applies scaling factors (0.1, 0.01, etc.) """ from pymodbus.client.sync import ModbusTcpClient import datetime INVALID_16BIT = 0xFFFF INVALID_32BIT = 0xFFFFFFFF # -------------------------- # KACO/Inesco Register Map # -------------------------- REGISTER_MAP = { # ---- Read Only ---- # System Data (30001-30003) 30001: {"name": "Protocol Version", "type": "INT16", "scale": 0.1, "unit": "version"}, 30002: {"name": "System Timestamp (Low)", "type": "UINT32", "scale": 1, "unit": "unix_ts"}, # 30002+30003 # Battery Data (31002-31010) 31002: {"name": "Avg Battery Voltage", "type": "INT16", "scale": 0.1, "unit": "V"}, 31003: {"name": "Sum Battery Current", "type": "INT32", "scale": 0.1, "unit": "A"}, # 31003+31004 (Pos: Charge, Neg: Discharge) 31005: {"name": "Avg SOC", "type": "UINT16", "scale": 0.01, "unit": "%"}, 31006: {"name": "Sum Battery Power", "type": "INT32", "scale": 0.1, "unit": "W"}, # 31006+31007 (Pos: Charge, Neg: Discharge) 31010: {"name": "Avg SOH", "type": "UINT16", "scale": 0.01, "unit": "%"}, # Grid Meter Data (33000-33011) 33000: {"name": "Grid Active Power", "type": "INT32", "scale": 0.1, "unit": "W"}, # 33000+33001 33002: {"name": "Grid Frequency", "type": "UINT16", "scale": 0.1, "unit": "Hz"}, 33003: {"name": "Grid Voltage U1", "type": "UINT16", "scale": 0.1, "unit": "V"}, # Single register (PDF typo: listed as UINT32) 33004: {"name": "Grid Voltage U2", "type": "UINT16", "scale": 0.1, "unit": "V"}, # Single register (PDF typo: listed as UINT32) 33005: {"name": "Grid Voltage U3", "type": "UINT16", "scale": 0.1, "unit": "V"}, # Single register (PDF typo: listed as UINT32) 33006: {"name": "Grid Current I1", "type": "INT32", "scale": 0.1, "unit": "A"}, # 33006+33007 33008: {"name": "Grid Current I2", "type": "INT32", "scale": 0.1, "unit": "A"}, # 33008+33009 33010: {"name": "Grid Current I3", "type": "INT32", "scale": 0.1, "unit": "A"}, # 33010+33011 # Inverter Data - Read Only (34001-34005) 34001: {"name": "Inverter Power", "type": "UINT16", "scale": 1, "unit": "W"}, 34002: {"name": "Max Charge Current (RO)", "type": "UINT16", "scale": 1, "unit": "A"}, 34003: {"name": "Max Discharge Current (RO)", "type": "UINT16", "scale": 1, "unit": "A"}, 34004: {"name": "Max Charge Voltage (RO)", "type": "UINT16", "scale": 1, "unit": "V"}, 34005: {"name": "Min Discharge Voltage (RO)", "type": "UINT16", "scale": 1, "unit": "V"}, # ---- Write Registers (40001-40005) ---- 40001: {"name": "Inverter Power Percentage", "type": "UINT16", "scale": 1, "unit": "%"}, 40002: {"name": "Max Charge Current", "type": "UINT16", "scale": 1, "unit": "A"}, 40003: {"name": "Max Discharge Current", "type": "UINT16", "scale": 1, "unit": "A"}, 40004: {"name": "Max Charge Voltage", "type": "UINT16", "scale": 1, "unit": "V"}, 40005: {"name": "Min Discharge Voltage", "type": "UINT16", "scale": 1, "unit": "V"}, } # -------------------------- # Decode helpers # -------------------------- def decode_signed_16bit(val): if val is None: return None if val == 0x8000: return None if not 0 <= val <= 0xFFFF: return None return val - 0x10000 if val >= 0x8000 else val def decode_unsigned_16bit(val): if val is None: return None if val == INVALID_16BIT: return None if not 0 <= val <= 0xFFFF: return None return val def decode_signed_32bit(low, high): if low is None or high is None: return None combined = (high << 16) | low if combined == INVALID_32BIT: return None if combined >= 0x80000000: combined -= 0x100000000 return combined def decode_unsigned_32bit(low, high): if low is None or high is None: return None combined = (high << 16) | low if combined == INVALID_32BIT: return None return combined def scale(val, factor): return round(val * factor, 2) if val is not None else "Missing/Invalid" def timestamp_str_from_words(low, high): ts = decode_unsigned_32bit(low, high) if ts is None: return "Missing/Invalid" try: dt = datetime.datetime.fromtimestamp(ts) return dt.strftime("%Y-%m-%d %H:%M:%S") except Exception: return f"Invalid timestamp ({ts})" # -------------------------- # Modbus read helper (1-based -> pymodbus 0-based) # -------------------------- def read_modbus_data(client, start_address, count): try: result = client.read_holding_registers(start_address - 1, count, unit=1) if result.isError(): print(f"Error reading data from address {start_address}") return None return result.registers except Exception as e: print(f"Error reading from Modbus server: {e}") return None # -------------------------- # Generic block reader/printer # -------------------------- def read_and_print_block(client, title, start_addr, end_addr): """ Reads [start_addr..end_addr] inclusive and prints values using REGISTER_MAP. Handles INT32/UINT32 by consuming (low+high). """ count = end_addr - start_addr + 1 regs = read_modbus_data(client, start_addr, count) if not regs: print(f"\n--- {title} ---") print("No data available.") return print(f"\n--- {title} ({start_addr}..{end_addr}) ---") i = 0 while i < count: addr = start_addr + i info = REGISTER_MAP.get(addr) # If not mapped, just skip printing (still consumes 1 word) if info is None: i += 1 continue raw = regs[i] dtype = info["type"] factor = info.get("scale", 1) unit = info.get("unit", "") if dtype in ("INT32", "UINT32"): if i + 1 >= count: print(f"{info['name']} ({addr}+{addr+1}): Missing/Invalid (no high word)") i += 1 continue low = regs[i] high = regs[i + 1] if dtype == "INT32": decoded = decode_signed_32bit(low, high) else: decoded = decode_unsigned_32bit(low, high) # Special pretty-print for timestamp at 30002+30003 if addr == 30002 and dtype == "UINT32": print(f"System Timestamp (30002+30003): {timestamp_str_from_words(low, high)}") else: scaled_val = scale(decoded, factor) print(f"{info['name']} ({addr}+{addr+1}): {scaled_val} {unit}") i += 2 continue if dtype == "INT16": decoded = decode_signed_16bit(raw) elif dtype == "UINT16": decoded = decode_unsigned_16bit(raw) else: decoded = raw scaled_val = scale(decoded, factor) print(f"{info['name']} ({addr}): {scaled_val} {unit}") i += 1 # -------------------------- # Write helpers (1-based -> pymodbus 0-based) # -------------------------- def to_u16(val): return val & 0xFFFF def write_u16(client, addr_1based, value): try: res = client.write_register(addr_1based - 1, to_u16(value), unit=1) ok = not res.isError() print(f"WRITE UINT16 {addr_1based} = {value} -> {'OK' if ok else 'FAIL'}") return ok except Exception as e: print(f"WRITE UINT16 {addr_1based} exception: {e}") return False def write_test_kaco(client): """ Writes to KACO inverter control registers (40001-40005) to test server behavior. Values are chosen as safe examples - adjust to your actual system limits. """ print("\n=== WRITE TEST (KACO INVERTER CONTROL REGISTERS) ===") # 40001: Inverter Power Percentage (0-100%) write_u16(client, 40001, 80) # Set to 80% of inverter capacity # 40002: Max Charge Current (Amps) write_u16(client, 40002, 50) # 50A max charge # 40003: Max Discharge Current (Amps) write_u16(client, 40003, 50) # 50A max discharge # 40004: Max Charge Voltage (Volts) write_u16(client, 40004, 580) # 580V max charge voltage # 40005: Min Discharge Voltage (Volts) write_u16(client, 40005, 420) # 420V min discharge voltage def main(): client = ModbusTcpClient("localhost", port=502) if not client.connect(): print("Failed to connect to Modbus server.") print("Make sure modbus_tcp_server_kaco.py is running on port 502.") return try: print("=" * 70) print("KACO/Inesco SodistoreGrid - Modbus TCP Client Test") print("=" * 70) # ---- Read-only blocks ---- read_and_print_block(client, "System Data", 30001, 30003) read_and_print_block(client, "Battery Data", 31002, 31010) read_and_print_block(client, "Grid Meter Data", 33000, 33011) read_and_print_block(client, "Inverter Data (Read-Only)", 34001, 34005) # ---- Write tests ---- write_test_kaco(client) # ---- Re-read write registers to verify ---- read_and_print_block(client, "Inverter Control (After Write)", 40001, 40005) print("\n" + "=" * 70) print("Test completed successfully!") print("=" * 70) finally: client.close() if __name__ == "__main__": main()