Files
gilles a8f0d6ccba Initial commit — KC868-A2 contrôleur solaire ESP32
Fonctionnalités :
- Lecture RS485 Modbus Epever Tracer 4210N (115200 bps, FC03/FC04/FC16)
- Moteur de règles JSON (LittleFS) — commande automatique des relais
- Interface web mobile-first (dashboard, règles, config, historique, EPEVER, debug)
- WiFi AP+STA simultanés avec reconnexion automatique et portail captif
- mDNS configurable (pv.local par défaut)
- Configuration registres EPEVER depuis l'UI (18 registres holding)
- Historique basse/haute résolution avec graphes canvas
- VPN WireGuard optionnel (désactivé par défaut, config via UI)
- OTA firmware + filesystem via ElegantOTA
- Deep sleep / économie d'énergie

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-09 19:25:01 +02:00

247 lines
8.3 KiB
Python

#!/usr/bin/env python3
"""
Serveur de simulation KC868-A2 — port 8080.
Sert les fichiers statiques depuis ../data/ et implémente tous les
endpoints /api/* avec un état en mémoire qui varie dans le temps.
Lancement : python3 sim.py
Accès : http://localhost:8080
"""
import http.server
import json
import math
import os
import socketserver
import threading
import time
from urllib.parse import urlparse, parse_qs
# Chemin vers les fichiers web du projet
DATA_DIR = os.path.join(os.path.dirname(__file__), '..', 'data')
PORT = int(os.environ.get('SIM_PORT', 8080))
MIME = {
'.html': 'text/html; charset=utf-8',
'.js': 'application/javascript',
'.css': 'text/css',
'.json': 'application/json',
'.ico': 'image/x-icon',
}
# ---------------------------------------------------------------------------
# État simulé — modifiable via les endpoints POST
# ---------------------------------------------------------------------------
_lock = threading.Lock()
state = {
'pv': 18.72, 'pvCurrent': 4.20,
'battery': 13.45, 'batSOC': 75,
'batTemperature': 25.0, 'batStatut': 1,
'batSousVoltage': False, 'batSurVoltage': False,
'loadVoltage': 13.40, 'loadCurrent': 2.00, 'loadPower': 26.80,
'energieGenJour': 1.50, 'energieGenTotal': 120.00,
'energieConJour': 0.80, 'energieConTotal': 85.00,
'sun': True, 'relay1': False, 'relay2': False,
'di1': False, 'di2': False,
'autoMode': True, 'rs485_ok': True, 'last_update': 0,
}
rules = []
next_id = [1]
sleep_cfg = {'actif': False, 'intervalle': 600, 'seuil': 2.0}
# Historique circulaire (échantillonnage toutes les 5s en simulation)
history = {'b': [], 'p': [], 'l': [], 's': []}
MAX_HIST = 288
def _update():
"""Mise à jour périodique de l'état et de l'historique."""
while True:
t = time.time()
with _lock:
state['pv'] = round(18.72 + 0.8 * math.sin(t / 60), 2)
state['pvCurrent'] = round(max(0, 4.20 + 0.4 * math.sin(t / 45)), 2)
state['battery'] = round(13.45 + 0.3 * math.sin(t / 120), 2)
state['loadPower'] = round(max(0, 26.80 + 3.0 * math.sin(t / 30)), 1)
state['batSOC'] = min(100, max(0, int(75 + 5 * math.sin(t / 180))))
state['sun'] = (int(t / 30) % 2) == 0 # alterne jour/nuit toutes les 30s
state['rs485_ok'] = True
state['last_update'] = int(t * 1000)
# Historique — 1 point toutes les 5s (= 5 min en temps réel)
h = history
if len(h['b']) >= MAX_HIST:
for k in h: h[k].pop(0)
h['b'].append(round(state['battery'], 2))
h['p'].append(round(state['pv'], 2))
h['l'].append(round(state['loadPower'], 1))
h['s'].append(state['batSOC'])
time.sleep(5)
threading.Thread(target=_update, daemon=True).start()
# ---------------------------------------------------------------------------
# Handler HTTP
# ---------------------------------------------------------------------------
class Handler(http.server.BaseHTTPRequestHandler):
def do_OPTIONS(self):
self._cors(200)
def do_GET(self):
p = urlparse(self.path)
if p.path == '/api/state':
with _lock: self._json(state)
elif p.path == '/api/rules':
with _lock: self._json(rules)
elif p.path == '/api/sleep':
with _lock: self._json(sleep_cfg)
elif p.path == '/api/history':
with _lock:
out = {'n': len(history['b'])}
out.update(history)
self._json(out)
else:
self._static(p.path)
def do_POST(self):
p = urlparse(self.path)
qs = parse_qs(p.query)
body = self._read_body()
# --- Relais ---
if p.path.startswith('/api/relay/'):
parts = p.path.split('/') # ['','api','relay','1','on']
n, cmd = int(parts[3]), parts[4]
key = f'relay{n}'
with _lock:
state[key] = (cmd == 'on')
self._json({'ok': True})
# --- Mode ---
elif p.path == '/api/mode/auto':
with _lock: state['autoMode'] = True
self._json({'ok': True})
elif p.path == '/api/mode/manuel':
with _lock: state['autoMode'] = False
self._json({'ok': True})
# --- Règles ---
elif p.path == '/api/rules' and body:
try:
r = json.loads(body)
with _lock:
r['id'] = next_id[0]; next_id[0] += 1
rules.append(r)
self._json({'ok': True}, 201)
except Exception:
self._json({'ok': False}, 400)
elif p.path == '/api/rules/toggle':
rid = int(qs.get('id', [0])[0])
with _lock:
found = next((r for r in rules if r['id'] == rid), None)
if found: found['enabled'] = not found['enabled']
self._json({'ok': bool(found)}, 200 if found else 404)
elif p.path == '/api/rules/delete':
rid = int(qs.get('id', [0])[0])
with _lock:
before = len(rules)
rules[:] = [r for r in rules if r['id'] != rid]
ok = len(rules) < before
self._json({'ok': ok}, 200 if ok else 404)
# --- Sleep ---
elif p.path == '/api/sleep' and body:
try:
cfg = json.loads(body)
with _lock:
sleep_cfg.update({
'actif': bool(cfg.get('actif', sleep_cfg['actif'])),
'intervalle': int(cfg.get('intervalle', sleep_cfg['intervalle'])),
'seuil': float(cfg.get('seuil', sleep_cfg['seuil'])),
})
self._json({'ok': True})
except Exception:
self._json({'ok': False}, 400)
else:
self.send_error(404)
# ------------------------------------------------------------------
def _static(self, path):
if path in ('', '/'):
path = '/index.html'
filepath = os.path.normpath(os.path.join(DATA_DIR, path.lstrip('/')))
# Sécurité : rester dans DATA_DIR
if not filepath.startswith(os.path.realpath(DATA_DIR)):
self.send_error(403); return
if not os.path.isfile(filepath):
self.send_error(404); return
ext = os.path.splitext(filepath)[1]
with open(filepath, 'rb') as f:
data = f.read()
self.send_response(200)
self.send_header('Content-Type', MIME.get(ext, 'application/octet-stream'))
self.send_header('Content-Length', str(len(data)))
self._cors_headers()
self.end_headers()
self.wfile.write(data)
def _json(self, obj, code=200):
data = json.dumps(obj).encode()
self.send_response(code)
self.send_header('Content-Type', 'application/json')
self.send_header('Content-Length', str(len(data)))
self._cors_headers()
self.end_headers()
self.wfile.write(data)
def _cors(self, code):
self.send_response(code)
self._cors_headers()
self.end_headers()
def _cors_headers(self):
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
def _read_body(self):
length = int(self.headers.get('Content-Length', 0))
return self.rfile.read(length).decode() if length else ''
def log_message(self, fmt, *args):
print(f'[Sim] {self.address_string()} {fmt % args}', flush=True)
# ---------------------------------------------------------------------------
if __name__ == '__main__':
data_real = os.path.realpath(DATA_DIR)
if not os.path.isdir(data_real):
print(f'[Sim] ERREUR : dossier data introuvable : {data_real}')
raise SystemExit(1)
with socketserver.ThreadingTCPServer(('', PORT), Handler) as httpd:
httpd.allow_reuse_address = True
print(f'[Sim] Serveur de simulation sur http://localhost:{PORT}')
print(f'[Sim] Fichiers web depuis : {data_real}')
print(f'[Sim] Historique : 1 point / 5s (288 pts max = ~24 min simulées)')
httpd.serve_forever()