a8f0d6ccba
Fonctionnalités : - Lecture RS485 Modbus Epever Tracer 4210N (115200 bps, FC03/FC04/FC16) - Moteur de règles JSON (LittleFS) — commande automatique des relais - Interface web mobile-first (dashboard, règles, config, historique, EPEVER, debug) - WiFi AP+STA simultanés avec reconnexion automatique et portail captif - mDNS configurable (pv.local par défaut) - Configuration registres EPEVER depuis l'UI (18 registres holding) - Historique basse/haute résolution avec graphes canvas - VPN WireGuard optionnel (désactivé par défaut, config via UI) - OTA firmware + filesystem via ElegantOTA - Deep sleep / économie d'énergie Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
139 lines
5.0 KiB
Python
139 lines
5.0 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Simulateur Modbus RTU — émule les registres de l'Epever Tracer 4210N.
|
||
|
||
Se connecte au serveur TCP exposé par QEMU pour UART2 (port 1235).
|
||
Répond aux requêtes FC04 (Read Input Registers) avec des valeurs simulées
|
||
qui varient dans le temps pour reproduire un comportement réaliste.
|
||
"""
|
||
import math
|
||
import socket
|
||
import time
|
||
|
||
QEMU_HOST = '127.0.0.1'
|
||
QEMU_PORT = 1235
|
||
RECONNECT_DELAY = 2 # secondes
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# CRC16 Modbus
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def crc16(data: bytes) -> int:
|
||
crc = 0xFFFF
|
||
for b in data:
|
||
crc ^= b
|
||
for _ in range(8):
|
||
crc = (crc >> 1) ^ 0xA001 if crc & 1 else crc >> 1
|
||
return crc
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Registres simulés Epever Tracer 4210N
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def get_reg(addr: int) -> int:
|
||
"""Retourne la valeur simulée d'un registre, avec variation sinusoïdale."""
|
||
t = time.time()
|
||
|
||
# --- PV (0x3100..0x3107) ---
|
||
if addr == 0x3100: # Tension PV × 100
|
||
return max(0, 1872 + int(50 * math.sin(t / 60)))
|
||
if addr == 0x3101: # Courant PV × 100
|
||
return max(0, 420 + int(30 * abs(math.sin(t / 45))))
|
||
if addr == 0x3104: # Tension batterie × 100
|
||
return 1345 + int(20 * math.sin(t / 120))
|
||
|
||
# --- Load + température batterie (0x310C..0x3110) ---
|
||
if addr == 0x310C: return 1340 # Tension load × 100 = 13.40 V
|
||
if addr == 0x310D: return 200 # Courant load × 100 = 2.00 A
|
||
if addr == 0x310E: return 2680 # Puissance load × 100 = 26.80 W
|
||
if addr == 0x3110: return 2500 # Temp. batterie × 100 = 25.00 °C
|
||
|
||
# --- SOC (0x311A) ---
|
||
if addr == 0x311A: return 75 # SOC = 75 %
|
||
|
||
# --- Statut charge (0x3200) ---
|
||
if addr == 0x3200: return 0x0004 # bits 3-2 = 01 → charge float
|
||
|
||
# --- Énergie (0x3300..0x3307) — registres 32 bits (Low/High) ---
|
||
if addr == 0x3300: return 150 # Prod. aujourd'hui low = 1.50 kWh
|
||
if addr == 0x3301: return 0
|
||
if addr == 0x3302: return 12000 # Prod. totale low = 120.00 kWh
|
||
if addr == 0x3303: return 0
|
||
if addr == 0x3304: return 80 # Conso. aujourd'hui low = 0.80 kWh
|
||
if addr == 0x3305: return 0
|
||
if addr == 0x3306: return 8500 # Conso. totale low = 85.00 kWh
|
||
if addr == 0x3307: return 0
|
||
|
||
# --- Jour/Nuit (0x200C) ---
|
||
if addr == 0x200C: return 0x0008 # Bit 3 = 1 → charge active (jour)
|
||
|
||
return 0
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Construction de la réponse FC04
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def build_fc04_response(slave: int, start: int, count: int) -> bytes:
|
||
values = [get_reg(start + i) for i in range(count)]
|
||
payload = bytes([slave, 0x04, count * 2])
|
||
payload += b''.join(v.to_bytes(2, 'big') for v in values)
|
||
crc = crc16(payload)
|
||
return payload + bytes([crc & 0xFF, crc >> 8])
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Gestion d'une connexion QEMU
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def handle(sock: socket.socket) -> None:
|
||
print('[Modbus] ✓ Connecté à QEMU UART2', flush=True)
|
||
buf = bytearray()
|
||
try:
|
||
while True:
|
||
chunk = sock.recv(256)
|
||
if not chunk:
|
||
break
|
||
buf.extend(chunk)
|
||
|
||
# Trame RTU FC04 : exactement 8 octets
|
||
while len(buf) >= 8:
|
||
slave, fc = buf[0], buf[1]
|
||
start = (buf[2] << 8) | buf[3]
|
||
count = (buf[4] << 8) | buf[5]
|
||
crc_rx = buf[6] | (buf[7] << 8)
|
||
|
||
if crc16(bytes(buf[:6])) == crc_rx and fc == 0x04:
|
||
resp = build_fc04_response(slave, start, count)
|
||
sock.sendall(resp)
|
||
print(f'[Modbus] FC04 0x{start:04X} × {count} reg → envoyé', flush=True)
|
||
del buf[:8]
|
||
else:
|
||
# Octet parasite — décaler
|
||
del buf[:1]
|
||
|
||
except (ConnectionResetError, BrokenPipeError, OSError):
|
||
pass
|
||
print('[Modbus] Déconnecté', flush=True)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Boucle principale : reconnexion automatique
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def main() -> None:
|
||
print(f'[Modbus] Attente de QEMU UART2 sur tcp://{QEMU_HOST}:{QEMU_PORT}', flush=True)
|
||
while True:
|
||
try:
|
||
with socket.create_connection((QEMU_HOST, QEMU_PORT), timeout=60) as sock:
|
||
handle(sock)
|
||
except (ConnectionRefusedError, OSError) as exc:
|
||
print(f'[Modbus] {exc} — retry dans {RECONNECT_DELAY}s', flush=True)
|
||
time.sleep(RECONNECT_DELAY)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|