Files
gilles a8f0d6ccba Initial commit — KC868-A2 contrôleur solaire ESP32
Fonctionnalités :
- Lecture RS485 Modbus Epever Tracer 4210N (115200 bps, FC03/FC04/FC16)
- Moteur de règles JSON (LittleFS) — commande automatique des relais
- Interface web mobile-first (dashboard, règles, config, historique, EPEVER, debug)
- WiFi AP+STA simultanés avec reconnexion automatique et portail captif
- mDNS configurable (pv.local par défaut)
- Configuration registres EPEVER depuis l'UI (18 registres holding)
- Historique basse/haute résolution avec graphes canvas
- VPN WireGuard optionnel (désactivé par défaut, config via UI)
- OTA firmware + filesystem via ElegantOTA
- Deep sleep / économie d'énergie

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-09 19:25:01 +02:00

139 lines
5.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Simulateur Modbus RTU — émule les registres de l'Epever Tracer 4210N.
Se connecte au serveur TCP exposé par QEMU pour UART2 (port 1235).
Répond aux requêtes FC04 (Read Input Registers) avec des valeurs simulées
qui varient dans le temps pour reproduire un comportement réaliste.
"""
import math
import socket
import time
QEMU_HOST = '127.0.0.1'
QEMU_PORT = 1235
RECONNECT_DELAY = 2 # secondes
# ---------------------------------------------------------------------------
# CRC16 Modbus
# ---------------------------------------------------------------------------
def crc16(data: bytes) -> int:
crc = 0xFFFF
for b in data:
crc ^= b
for _ in range(8):
crc = (crc >> 1) ^ 0xA001 if crc & 1 else crc >> 1
return crc
# ---------------------------------------------------------------------------
# Registres simulés Epever Tracer 4210N
# ---------------------------------------------------------------------------
def get_reg(addr: int) -> int:
"""Retourne la valeur simulée d'un registre, avec variation sinusoïdale."""
t = time.time()
# --- PV (0x3100..0x3107) ---
if addr == 0x3100: # Tension PV × 100
return max(0, 1872 + int(50 * math.sin(t / 60)))
if addr == 0x3101: # Courant PV × 100
return max(0, 420 + int(30 * abs(math.sin(t / 45))))
if addr == 0x3104: # Tension batterie × 100
return 1345 + int(20 * math.sin(t / 120))
# --- Load + température batterie (0x310C..0x3110) ---
if addr == 0x310C: return 1340 # Tension load × 100 = 13.40 V
if addr == 0x310D: return 200 # Courant load × 100 = 2.00 A
if addr == 0x310E: return 2680 # Puissance load × 100 = 26.80 W
if addr == 0x3110: return 2500 # Temp. batterie × 100 = 25.00 °C
# --- SOC (0x311A) ---
if addr == 0x311A: return 75 # SOC = 75 %
# --- Statut charge (0x3200) ---
if addr == 0x3200: return 0x0004 # bits 3-2 = 01 → charge float
# --- Énergie (0x3300..0x3307) — registres 32 bits (Low/High) ---
if addr == 0x3300: return 150 # Prod. aujourd'hui low = 1.50 kWh
if addr == 0x3301: return 0
if addr == 0x3302: return 12000 # Prod. totale low = 120.00 kWh
if addr == 0x3303: return 0
if addr == 0x3304: return 80 # Conso. aujourd'hui low = 0.80 kWh
if addr == 0x3305: return 0
if addr == 0x3306: return 8500 # Conso. totale low = 85.00 kWh
if addr == 0x3307: return 0
# --- Jour/Nuit (0x200C) ---
if addr == 0x200C: return 0x0008 # Bit 3 = 1 → charge active (jour)
return 0
# ---------------------------------------------------------------------------
# Construction de la réponse FC04
# ---------------------------------------------------------------------------
def build_fc04_response(slave: int, start: int, count: int) -> bytes:
values = [get_reg(start + i) for i in range(count)]
payload = bytes([slave, 0x04, count * 2])
payload += b''.join(v.to_bytes(2, 'big') for v in values)
crc = crc16(payload)
return payload + bytes([crc & 0xFF, crc >> 8])
# ---------------------------------------------------------------------------
# Gestion d'une connexion QEMU
# ---------------------------------------------------------------------------
def handle(sock: socket.socket) -> None:
print('[Modbus] ✓ Connecté à QEMU UART2', flush=True)
buf = bytearray()
try:
while True:
chunk = sock.recv(256)
if not chunk:
break
buf.extend(chunk)
# Trame RTU FC04 : exactement 8 octets
while len(buf) >= 8:
slave, fc = buf[0], buf[1]
start = (buf[2] << 8) | buf[3]
count = (buf[4] << 8) | buf[5]
crc_rx = buf[6] | (buf[7] << 8)
if crc16(bytes(buf[:6])) == crc_rx and fc == 0x04:
resp = build_fc04_response(slave, start, count)
sock.sendall(resp)
print(f'[Modbus] FC04 0x{start:04X} × {count} reg → envoyé', flush=True)
del buf[:8]
else:
# Octet parasite — décaler
del buf[:1]
except (ConnectionResetError, BrokenPipeError, OSError):
pass
print('[Modbus] Déconnecté', flush=True)
# ---------------------------------------------------------------------------
# Boucle principale : reconnexion automatique
# ---------------------------------------------------------------------------
def main() -> None:
print(f'[Modbus] Attente de QEMU UART2 sur tcp://{QEMU_HOST}:{QEMU_PORT}', flush=True)
while True:
try:
with socket.create_connection((QEMU_HOST, QEMU_PORT), timeout=60) as sock:
handle(sock)
except (ConnectionRefusedError, OSError) as exc:
print(f'[Modbus] {exc} — retry dans {RECONNECT_DELAY}s', flush=True)
time.sleep(RECONNECT_DELAY)
if __name__ == '__main__':
main()