#!/usr/bin/env python3 """ Simulateur Modbus RTU — émule les registres de l'Epever Tracer 4210N. Se connecte au serveur TCP exposé par QEMU pour UART2 (port 1235). Répond aux requêtes FC04 (Read Input Registers) avec des valeurs simulées qui varient dans le temps pour reproduire un comportement réaliste. """ import math import socket import time QEMU_HOST = '127.0.0.1' QEMU_PORT = 1235 RECONNECT_DELAY = 2 # secondes # --------------------------------------------------------------------------- # CRC16 Modbus # --------------------------------------------------------------------------- def crc16(data: bytes) -> int: crc = 0xFFFF for b in data: crc ^= b for _ in range(8): crc = (crc >> 1) ^ 0xA001 if crc & 1 else crc >> 1 return crc # --------------------------------------------------------------------------- # Registres simulés Epever Tracer 4210N # --------------------------------------------------------------------------- def get_reg(addr: int) -> int: """Retourne la valeur simulée d'un registre, avec variation sinusoïdale.""" t = time.time() # --- PV (0x3100..0x3107) --- if addr == 0x3100: # Tension PV × 100 return max(0, 1872 + int(50 * math.sin(t / 60))) if addr == 0x3101: # Courant PV × 100 return max(0, 420 + int(30 * abs(math.sin(t / 45)))) if addr == 0x3104: # Tension batterie × 100 return 1345 + int(20 * math.sin(t / 120)) # --- Load + température batterie (0x310C..0x3110) --- if addr == 0x310C: return 1340 # Tension load × 100 = 13.40 V if addr == 0x310D: return 200 # Courant load × 100 = 2.00 A if addr == 0x310E: return 2680 # Puissance load × 100 = 26.80 W if addr == 0x3110: return 2500 # Temp. batterie × 100 = 25.00 °C # --- SOC (0x311A) --- if addr == 0x311A: return 75 # SOC = 75 % # --- Statut charge (0x3200) --- if addr == 0x3200: return 0x0004 # bits 3-2 = 01 → charge float # --- Énergie (0x3300..0x3307) — registres 32 bits (Low/High) --- if addr == 0x3300: return 150 # Prod. aujourd'hui low = 1.50 kWh if addr == 0x3301: return 0 if addr == 0x3302: return 12000 # Prod. totale low = 120.00 kWh if addr == 0x3303: return 0 if addr == 0x3304: return 80 # Conso. aujourd'hui low = 0.80 kWh if addr == 0x3305: return 0 if addr == 0x3306: return 8500 # Conso. totale low = 85.00 kWh if addr == 0x3307: return 0 # --- Jour/Nuit (0x200C) --- if addr == 0x200C: return 0x0008 # Bit 3 = 1 → charge active (jour) return 0 # --------------------------------------------------------------------------- # Construction de la réponse FC04 # --------------------------------------------------------------------------- def build_fc04_response(slave: int, start: int, count: int) -> bytes: values = [get_reg(start + i) for i in range(count)] payload = bytes([slave, 0x04, count * 2]) payload += b''.join(v.to_bytes(2, 'big') for v in values) crc = crc16(payload) return payload + bytes([crc & 0xFF, crc >> 8]) # --------------------------------------------------------------------------- # Gestion d'une connexion QEMU # --------------------------------------------------------------------------- def handle(sock: socket.socket) -> None: print('[Modbus] ✓ Connecté à QEMU UART2', flush=True) buf = bytearray() try: while True: chunk = sock.recv(256) if not chunk: break buf.extend(chunk) # Trame RTU FC04 : exactement 8 octets while len(buf) >= 8: slave, fc = buf[0], buf[1] start = (buf[2] << 8) | buf[3] count = (buf[4] << 8) | buf[5] crc_rx = buf[6] | (buf[7] << 8) if crc16(bytes(buf[:6])) == crc_rx and fc == 0x04: resp = build_fc04_response(slave, start, count) sock.sendall(resp) print(f'[Modbus] FC04 0x{start:04X} × {count} reg → envoyé', flush=True) del buf[:8] else: # Octet parasite — décaler del buf[:1] except (ConnectionResetError, BrokenPipeError, OSError): pass print('[Modbus] Déconnecté', flush=True) # --------------------------------------------------------------------------- # Boucle principale : reconnexion automatique # --------------------------------------------------------------------------- def main() -> None: print(f'[Modbus] Attente de QEMU UART2 sur tcp://{QEMU_HOST}:{QEMU_PORT}', flush=True) while True: try: with socket.create_connection((QEMU_HOST, QEMU_PORT), timeout=60) as sock: handle(sock) except (ConnectionRefusedError, OSError) as exc: print(f'[Modbus] {exc} — retry dans {RECONNECT_DELAY}s', flush=True) time.sleep(RECONNECT_DELAY) if __name__ == '__main__': main()