77 lines
1.8 KiB
Python
Executable File
77 lines
1.8 KiB
Python
Executable File
"""
|
|
Tests pour le scheduler APScheduler
|
|
"""
|
|
import pytest
|
|
import asyncio
|
|
from backend.app.services.scheduler import ScanScheduler
|
|
|
|
|
|
class TestScheduler:
|
|
"""Tests pour le scheduler"""
|
|
|
|
@pytest.fixture
|
|
def scheduler(self):
|
|
"""Fixture scheduler"""
|
|
sched = ScanScheduler()
|
|
yield sched
|
|
if sched.is_running:
|
|
sched.stop()
|
|
|
|
def test_scheduler_start_stop(self, scheduler):
|
|
"""Test démarrage/arrêt du scheduler"""
|
|
assert scheduler.is_running is False
|
|
|
|
scheduler.start()
|
|
assert scheduler.is_running is True
|
|
|
|
scheduler.stop()
|
|
assert scheduler.is_running is False
|
|
|
|
def test_add_ping_scan_job(self, scheduler):
|
|
"""Test ajout tâche ping scan"""
|
|
scheduler.start()
|
|
|
|
async def dummy_scan():
|
|
pass
|
|
|
|
scheduler.add_ping_scan_job(dummy_scan, interval_seconds=60)
|
|
|
|
jobs = scheduler.get_jobs()
|
|
job_ids = [job.id for job in jobs]
|
|
|
|
assert 'ping_scan' in job_ids
|
|
|
|
def test_add_port_scan_job(self, scheduler):
|
|
"""Test ajout tâche port scan"""
|
|
scheduler.start()
|
|
|
|
async def dummy_scan():
|
|
pass
|
|
|
|
scheduler.add_port_scan_job(dummy_scan, interval_seconds=300)
|
|
|
|
jobs = scheduler.get_jobs()
|
|
job_ids = [job.id for job in jobs]
|
|
|
|
assert 'port_scan' in job_ids
|
|
|
|
def test_remove_job(self, scheduler):
|
|
"""Test suppression de tâche"""
|
|
scheduler.start()
|
|
|
|
async def dummy_scan():
|
|
pass
|
|
|
|
scheduler.add_ping_scan_job(dummy_scan, interval_seconds=60)
|
|
|
|
# Vérifier présence
|
|
jobs = scheduler.get_jobs()
|
|
assert len(jobs) == 1
|
|
|
|
# Supprimer
|
|
scheduler.remove_job('ping_scan')
|
|
|
|
# Vérifier absence
|
|
jobs = scheduler.get_jobs()
|
|
assert len(jobs) == 0
|