Files
mesh/server/test_gotify.py
Gilles Soulier 1d177e96a6 first
2026-01-05 13:20:54 +01:00

209 lines
6.3 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# Created by: Claude
# Date: 2026-01-03
# Purpose: Script de test pour les notifications Gotify
# Refs: server/CLAUDE.md
import asyncio
import httpx
from datetime import datetime
# Configuration
API_URL = "http://localhost:8000"
GOTIFY_URL = "http://10.0.0.5:8185"
GOTIFY_TOKEN = "AvKcy9o-yvVhyKd"
# Utilisateurs de test
USER1 = {
"email": "alice@test.com",
"username": "alice",
"password": "password123"
}
USER2 = {
"email": "bob@test.com",
"username": "bob",
"password": "password123"
}
async def test_gotify_direct():
"""Tester l'envoi direct à Gotify."""
print("\n" + "="*60)
print("TEST 1: Envoi direct à Gotify")
print("="*60)
try:
async with httpx.AsyncClient(timeout=5.0) as client:
payload = {
"title": "🧪 Test Mesh",
"message": "Ceci est un test de notification depuis Mesh",
"priority": 5,
}
response = await client.post(
f"{GOTIFY_URL}/message",
params={"token": GOTIFY_TOKEN},
json=payload,
)
response.raise_for_status()
print(f"✅ Notification envoyée avec succès à Gotify")
print(f" Response: {response.json()}")
return True
except Exception as e:
print(f"❌ Erreur: {e}")
return False
async def register_user(user_data):
"""Créer un utilisateur de test."""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(
f"{API_URL}/api/auth/register",
json=user_data
)
if response.status_code == 200:
data = response.json()
print(f"✅ Utilisateur {user_data['username']} créé")
return data['access_token']
elif response.status_code == 400:
# Utilisateur existe déjà, essayer de login
response = await client.post(
f"{API_URL}/api/auth/login",
data={
"username": user_data["email"],
"password": user_data["password"]
}
)
if response.status_code == 200:
data = response.json()
print(f" Utilisateur {user_data['username']} existe déjà (login)")
return data['access_token']
response.raise_for_status()
except Exception as e:
print(f"❌ Erreur création/login {user_data['username']}: {e}")
return None
async def create_room(token, room_name):
"""Créer une room de test."""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(
f"{API_URL}/api/rooms",
headers={"Authorization": f"Bearer {token}"},
json={"name": room_name}
)
response.raise_for_status()
data = response.json()
print(f"✅ Room '{room_name}' créée: {data['room_id']}")
return data['room_id']
except Exception as e:
print(f"❌ Erreur création room: {e}")
return None
async def send_message(token, room_id, message):
"""Envoyer un message dans une room (via API REST)."""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(
f"{API_URL}/api/rooms/{room_id}/messages",
headers={"Authorization": f"Bearer {token}"},
json={"content": message}
)
if response.status_code == 404:
print(f"⚠️ Endpoint POST /api/rooms/{room_id}/messages n'existe pas")
print(f" (Normal: envoi de messages via WebSocket uniquement)")
return False
response.raise_for_status()
print(f"✅ Message envoyé: {message[:50]}...")
return True
except Exception as e:
print(f"⚠️ Pas d'API REST pour messages: {e}")
return False
async def test_chat_notification():
"""Tester notification de chat (sans WebSocket)."""
print("\n" + "="*60)
print("TEST 2: Notification de chat (setup)")
print("="*60)
# Créer/login utilisateurs
alice_token = await register_user(USER1)
bob_token = await register_user(USER2)
if not alice_token or not bob_token:
print("❌ Impossible de créer les utilisateurs")
return False
# Créer une room
room_id = await create_room(alice_token, "Test Gotify Chat")
if not room_id:
print("❌ Impossible de créer la room")
return False
print("\n📝 NOTE:")
print(" Les notifications de chat nécessitent une connexion WebSocket.")
print(" Pour tester complètement:")
print(" 1. Bob doit se déconnecter de la room")
print(" 2. Alice envoie un message via WebSocket")
print(" 3. Bob devrait recevoir une notification Gotify")
print("")
print(" Utilisez le client web pour tester end-to-end.")
return True
async def main():
"""Exécuter tous les tests."""
print("\n" + "="*60)
print("🧪 TESTS NOTIFICATIONS GOTIFY - MESH")
print("="*60)
print(f"API URL: {API_URL}")
print(f"Gotify URL: {GOTIFY_URL}")
print(f"Timestamp: {datetime.now().isoformat()}")
# Test 1: Envoi direct
success1 = await test_gotify_direct()
# Test 2: Setup pour chat
success2 = await test_chat_notification()
# Résumé
print("\n" + "="*60)
print("RÉSUMÉ DES TESTS")
print("="*60)
print(f"Test 1 - Envoi direct Gotify: {'✅ PASS' if success1 else '❌ FAIL'}")
print(f"Test 2 - Setup chat: {'✅ PASS' if success2 else '❌ FAIL'}")
print("")
if success1:
print("✅ Gotify est correctement configuré et accessible")
print("✅ Les notifications peuvent être envoyées")
print("")
print("📱 Vérifiez votre application Gotify pour voir la notification")
else:
print("❌ Problème de configuration Gotify")
print(" Vérifiez:")
print(" - GOTIFY_URL est correct dans .env")
print(" - GOTIFY_TOKEN est valide")
print(" - Le serveur Gotify est accessible")
if __name__ == "__main__":
asyncio.run(main())