Scheduler Examples
Essential timing patterns and job configuration examples for the scheduler component.
Common Timing Patterns
Daily Tasks
# app/components/scheduler/main.py
from apscheduler.schedulers.asyncio import AsyncIOScheduler
def create_scheduler() -> AsyncIOScheduler:
scheduler = AsyncIOScheduler()
# Daily at 6:30 AM
scheduler.add_job(
send_daily_report,
trigger="cron",
hour=6, minute=30,
id="daily_report",
name="Send Daily Report"
)
# Daily at midnight
scheduler.add_job(
cleanup_temp_files,
trigger="cron",
hour=0, minute=0,
id="daily_cleanup",
name="Daily Cleanup"
)
return scheduler
Weekly Tasks
# Weekly report every Monday at 9 AM
scheduler.add_job(
generate_weekly_report,
trigger="cron",
day_of_week="mon",
hour=9, minute=0,
id="weekly_report",
name="Weekly Report Generation"
)
# Weekend maintenance every Saturday at 2 AM
scheduler.add_job(
weekend_maintenance,
trigger="cron",
day_of_week="sat",
hour=2, minute=0,
id="weekend_maintenance",
name="Weekend System Maintenance"
)
Interval-Based Tasks
# Every 15 minutes
scheduler.add_job(
health_check,
trigger="interval",
minutes=15,
id="health_check",
name="System Health Check"
)
# Every 2 hours during business hours
scheduler.add_job(
business_sync,
trigger="interval",
hours=2,
start_date="2024-01-01 09:00:00",
end_date="2024-01-01 17:00:00",
id="business_sync",
name="Business Hours Sync"
)
Business Hours Only
# Weekdays 9 AM - 5 PM only
scheduler.add_job(
business_task,
trigger="cron",
day_of_week="mon-fri",
hour="9-17",
minute=30,
id="business_task",
name="Business Hours Task"
)
# Every 30 minutes during business hours
scheduler.add_job(
frequent_business_task,
trigger="cron",
day_of_week="mon-fri",
hour="9-17",
minute="*/30",
id="frequent_business",
name="Frequent Business Task"
)
Job Configuration Options
Preventing Overlapping Jobs
# Only one instance can run at a time
scheduler.add_job(
long_running_task,
trigger="interval",
minutes=30,
max_instances=1,
id="long_task",
name="Long Running Task"
)
# Skip missed runs and coalesce into one
scheduler.add_job(
batch_process,
trigger="interval",
hours=1,
coalesce=True,
max_instances=1,
id="batch_process",
name="Batch Processing"
)
Retry and Error Handling
# Job with retry logic built into function
async def robust_email_task() -> None:
max_retries = 3
for attempt in range(max_retries):
try:
await send_emails()
break
except EmailServiceError as e:
if attempt == max_retries - 1:
logger.error(f"Email task failed after {max_retries} attempts: {e}")
raise
await asyncio.sleep(60) # Wait 1 minute before retry
scheduler.add_job(
robust_email_task,
trigger="cron",
hour=8, minute=0,
id="morning_emails",
name="Morning Email Campaign"
)
Grace Time for Missed Jobs
# Allow jobs to run up to 5 minutes late
scheduler.add_job(
time_sensitive_task,
trigger="cron",
hour=12, minute=0,
misfire_grace_time=300, # 5 minutes in seconds
id="lunch_task",
name="Lunchtime Task"
)
Real-World Task Functions
File Processing
# app/services/file_tasks.py
from pathlib import Path
from app.core.log import logger
async def process_uploads() -> None:
"""Process uploaded files in batches."""
upload_dir = Path("uploads/pending")
if not upload_dir.exists():
logger.info("No upload directory found, skipping")
return
files = list(upload_dir.glob("*"))
if not files:
logger.info("No files to process")
return
logger.info(f"Processing {len(files)} files")
for file_path in files:
try:
await process_single_file(file_path)
file_path.unlink() # Delete after processing
except Exception as e:
logger.error(f"Failed to process {file_path}: {e}")
# Move to error directory instead of deleting
error_dir = Path("uploads/errors")
error_dir.mkdir(exist_ok=True)
file_path.rename(error_dir / file_path.name)
Database Maintenance
# app/services/db_tasks.py
async def cleanup_old_records() -> None:
"""Clean up old database records."""
from datetime import datetime, timedelta
from app.core.db import get_async_session
cutoff_date = datetime.now() - timedelta(days=90)
async with get_async_session() as session:
# Clean up old log entries
result = await session.execute(
"DELETE FROM logs WHERE created_at < :cutoff",
{"cutoff": cutoff_date}
)
deleted_count = result.rowcount
await session.commit()
logger.info(f"Cleaned up {deleted_count} old log records")
# Schedule for 3 AM daily
scheduler.add_job(
cleanup_old_records,
trigger="cron",
hour=3, minute=0,
id="db_cleanup",
name="Database Cleanup"
)
API Integration
# app/services/sync_tasks.py
import httpx
from app.core.config import settings
async def sync_external_data() -> None:
"""Sync data from external API."""
async with httpx.AsyncClient() as client:
try:
response = await client.get(
"https://api.external-service.com/data",
headers={"Authorization": f"Bearer {settings.API_TOKEN}"},
timeout=30.0
)
response.raise_for_status()
data = response.json()
await process_external_data(data)
logger.info(f"Successfully synced {len(data)} records")
except httpx.RequestError as e:
logger.error(f"API sync failed: {e}")
raise
except httpx.HTTPStatusError as e:
logger.error(f"API returned error {e.response.status_code}: {e}")
raise
# Sync every 4 hours
scheduler.add_job(
sync_external_data,
trigger="interval",
hours=4,
id="external_sync",
name="External Data Sync"
)
Advanced Patterns
Dynamic Job Scheduling
# app/services/dynamic_tasks.py
from datetime import datetime, timedelta
async def schedule_user_reminder(user_id: str, reminder_time: datetime) -> None:
"""Dynamically schedule a user reminder."""
# Access the running scheduler instance (assumes scheduler is already running)
from app.components.scheduler.main import create_scheduler
scheduler = create_scheduler() # Note: This creates a new instance - use carefully
scheduler.add_job(
send_user_reminder,
trigger="date",
run_date=reminder_time,
args=[user_id],
id=f"reminder_{user_id}_{int(reminder_time.timestamp())}",
name=f"Reminder for user {user_id}",
replace_existing=True
)
async def send_user_reminder(user_id: str) -> None:
"""Send reminder to specific user."""
logger.info(f"Sending reminder to user {user_id}")
await send_notification(user_id, "Your scheduled reminder!")
Conditional Job Execution
async def conditional_backup() -> None:
"""Only backup if conditions are met."""
from pathlib import Path
# Check if backup is needed
last_backup = Path("backups/last_backup.txt")
if last_backup.exists():
last_backup_time = datetime.fromtimestamp(last_backup.stat().st_mtime)
if datetime.now() - last_backup_time < timedelta(hours=6):
logger.info("Recent backup exists, skipping")
return
# Check system load
import psutil
if psutil.cpu_percent(interval=1) > 80:
logger.info("System load too high, postponing backup")
return
# Perform backup
await perform_database_backup()
last_backup.touch()
scheduler.add_job(
conditional_backup,
trigger="interval",
hours=2,
id="smart_backup",
name="Smart Conditional Backup"
)
Testing Job Functions
Unit Testing Jobs
# tests/test_scheduler_jobs.py
import pytest
from unittest.mock import patch, AsyncMock
from app.services.file_tasks import process_uploads
@pytest.mark.asyncio
async def test_process_uploads_no_files():
"""Test upload processing with no files."""
with patch("pathlib.Path.exists", return_value=False):
await process_uploads() # Should not raise
@pytest.mark.asyncio
async def test_process_uploads_success():
"""Test successful file processing."""
from pathlib import Path
mock_files = [Path("file1.txt"), Path("file2.txt")]
with patch("pathlib.Path.exists", return_value=True), \
patch("pathlib.Path.glob", return_value=mock_files), \
patch("app.services.file_tasks.process_single_file", new_callable=AsyncMock) as mock_process, \
patch.object(Path, "unlink"): # Mock file deletion
await process_uploads()
assert mock_process.call_count == 2
Integration Testing
# tests/test_scheduler_integration.py
import pytest
from app.components.scheduler.main import create_scheduler
def test_scheduler_jobs_registered():
"""Test that all expected jobs are registered."""
scheduler = create_scheduler()
job_ids = [job.id for job in scheduler.get_jobs()]
expected_jobs = ["daily_cleanup", "health_check", "external_sync"]
for job_id in expected_jobs:
assert job_id in job_ids
def test_job_configuration():
"""Test job configuration is correct."""
scheduler = create_scheduler()
daily_job = scheduler.get_job("daily_cleanup")
assert daily_job is not None
assert daily_job.name == "Daily Cleanup"
# Test trigger configuration
assert daily_job.trigger.hour == 0
assert daily_job.trigger.minute == 0
Next Steps
- Scheduler Component - Return to scheduler overview
- Database Persistence - Job persistence and monitoring
- Component Overview - How components work together