Skip to content

Examples

Real-world usage patterns for the Communications Service.

Basic Usage

Send Welcome Email

from app.services.comms.email import send_email_simple

async def send_welcome_email(user_email: str, user_name: str) -> None:
    """Send welcome email to new user."""
    await send_email_simple(
        to=user_email,
        subject="Welcome to Our Platform!",
        html=f"""
        <h1>Welcome, {user_name}!</h1>
        <p>Thanks for signing up. Here's what you can do next:</p>
        <ul>
            <li>Complete your profile</li>
            <li>Explore our features</li>
            <li>Join our community</li>
        </ul>
        <p>If you have any questions, reply to this email.</p>
        """,
    )

Send Verification Code

from app.services.comms.sms import send_sms_simple

async def send_verification_code(phone: str, code: str) -> None:
    """Send SMS verification code."""
    await send_sms_simple(
        to=phone,
        body=f"Your verification code is: {code}\n\nThis code expires in 10 minutes.",
    )

Password Reset Flow

from app.services.comms.email import send_email_simple

async def send_password_reset(email: str, reset_token: str) -> None:
    """Send password reset email."""
    reset_url = f"https://yourapp.com/reset?token={reset_token}"

    await send_email_simple(
        to=email,
        subject="Reset Your Password",
        html=f"""
        <h1>Password Reset Request</h1>
        <p>Click the link below to reset your password:</p>
        <p><a href="{reset_url}">Reset Password</a></p>
        <p>This link expires in 1 hour.</p>
        <p>If you didn't request this, ignore this email.</p>
        """,
        text=f"Reset your password: {reset_url}",
    )

Worker Integration with arq

Use the worker component to send emails and SMS asynchronously as background jobs.

Why Use Workers?

  • Non-blocking - Don't make users wait for email/SMS to send
  • Retry logic - Automatic retries on failure
  • Rate limiting - Control throughput to providers
  • Reliability - Jobs persist in Redis queue

Project Setup

Generate a project with both comms and worker components:

aegis init my-app --services comms --components worker
cd my-app
uv sync && source .venv/bin/activate

Define Email Worker Task

Create a worker task for sending emails:

# app/components/worker/tasks/comms.py
from arq import Retry
from app.services.comms.email import send_email
from app.services.comms.models import SendEmailRequest
from app.core.log import logger


async def send_email_task(
    ctx: dict,
    to: list[str],
    subject: str,
    text: str | None = None,
    html: str | None = None,
) -> dict:
    """
    Worker task to send email asynchronously.

    Args:
        ctx: arq context
        to: List of recipient emails
        subject: Email subject
        text: Plain text body
        html: HTML body

    Returns:
        Result with email ID and status
    """
    logger.info(f"Sending email to {to}")

    try:
        request = SendEmailRequest(
            to=to,
            subject=subject,
            text=text,
            html=html,
        )
        result = await send_email(request)

        logger.info(f"Email sent successfully: {result.id}")
        return {
            "status": "sent",
            "id": result.id,
            "to": result.to,
        }

    except Exception as e:
        logger.error(f"Failed to send email: {e}")
        # Retry up to 3 times with exponential backoff
        raise Retry(defer=ctx.get("job_try", 1) * 60)


async def send_sms_task(
    ctx: dict,
    to: str,
    body: str,
) -> dict:
    """
    Worker task to send SMS asynchronously.

    Args:
        ctx: arq context
        to: Recipient phone number
        body: Message text

    Returns:
        Result with message SID and status
    """
    from app.services.comms.sms import send_sms
    from app.services.comms.models import SendSMSRequest

    logger.info(f"Sending SMS to {to}")

    try:
        request = SendSMSRequest(to=to, body=body)
        result = await send_sms(request)

        logger.info(f"SMS sent successfully: {result.sid}")
        return {
            "status": "sent",
            "sid": result.sid,
            "to": result.to,
            "segments": result.segments,
        }

    except Exception as e:
        logger.error(f"Failed to send SMS: {e}")
        raise Retry(defer=ctx.get("job_try", 1) * 60)

Register Tasks with Worker

Add tasks to your worker configuration:

# app/components/worker/main.py
from app.components.worker.tasks.comms import send_email_task, send_sms_task

# Add to your worker functions list
functions = [
    send_email_task,
    send_sms_task,
    # ... other tasks
]

Queue Jobs from Your Application

from app.components.worker.pools import get_queue_pool

async def queue_welcome_email(user_email: str, user_name: str) -> None:
    """Queue welcome email as background job."""
    pool, queue_name = await get_queue_pool()

    await pool.enqueue_job(
        "send_email_task",
        to=[user_email],
        subject="Welcome to Our Platform!",
        html=f"<h1>Welcome, {user_name}!</h1><p>Thanks for joining!</p>",
        _queue_name=queue_name,
    )


async def queue_verification_sms(phone: str, code: str) -> None:
    """Queue SMS verification as background job."""
    pool, queue_name = await get_queue_pool()

    await pool.enqueue_job(
        "send_sms_task",
        to=phone,
        body=f"Your code: {code}",
        _queue_name=queue_name,
    )

Integration with Auth Service

Send welcome email when user registers:

# app/components/backend/api/auth/router.py
from app.components.worker.pools import get_queue_pool

@router.post("/register")
async def register(user_data: UserCreate) -> User:
    """Register new user and send welcome email."""
    # Create user
    user = await user_service.create_user(user_data)

    # Queue welcome email as background job (non-blocking)
    pool, queue_name = await get_queue_pool()
    await pool.enqueue_job(
        "send_email_task",
        to=[user.email],
        subject="Welcome!",
        html=f"<h1>Welcome, {user.name}!</h1>",
        _queue_name=queue_name,
    )

    return user

The worker task uses the actual send_email function:

# app/components/worker/tasks/comms.py
async def send_email_task(
    ctx: dict,
    to: list[str],
    subject: str,
    text: str | None = None,
    html: str | None = None,
) -> dict:
    """Worker task that calls the real send_email function."""
    request = SendEmailRequest(
        to=to,
        subject=subject,
        text=text,
        html=html,
    )

    # This calls the actual Resend API
    result = await send_email(request)

    return {
        "status": "sent",
        "id": result.id,
        "to": result.to,
    }

Batch Email Sending

Send emails to multiple recipients efficiently:

from app.components.worker.pools import get_queue_pool

async def send_newsletter(
    recipients: list[str],
    subject: str,
    content: str,
) -> None:
    """Queue newsletter for all recipients."""
    pool, queue_name = await get_queue_pool()

    for email in recipients:
        await pool.enqueue_job(
            "send_email_task",
            to=[email],
            subject=subject,
            html=content,
            _queue_name=queue_name,
        )

    print(f"Queued {len(recipients)} emails")

Running the Worker

Start the worker to process queued jobs:

# Terminal 1: Run your application
make serve

# Terminal 2: Run the worker
arq app.components.worker.main.WorkerSettings

Or with Docker Compose:

docker compose up
# Worker runs automatically as a separate service

Advanced Patterns

Scheduled Communications

Use the scheduler component with comms:

# app/components/scheduler/main.py
from app.services.comms.email import send_email_simple

async def send_daily_digest() -> None:
    """Send daily digest email to all users."""
    users = await get_subscribed_users()

    for user in users:
        await send_email_simple(
            to=user.email,
            subject="Your Daily Digest",
            html=generate_digest_html(user),
        )

# Schedule to run daily at 8am
scheduler.add_job(
    send_daily_digest,
    "cron",
    hour=8,
    minute=0,
)

Multi-Channel Notifications

Send to multiple channels based on user preferences:

from app.services.comms.email import send_email_simple
from app.services.comms.sms import send_sms_simple

async def notify_user(
    user: User,
    message: str,
    subject: str = "Notification",
) -> None:
    """Send notification via user's preferred channel."""

    if user.notify_email:
        await send_email_simple(
            to=user.email,
            subject=subject,
            text=message,
        )

    if user.notify_sms and user.phone:
        await send_sms_simple(
            to=user.phone,
            body=message,
        )

Order Confirmation

Complete e-commerce order confirmation:

async def send_order_confirmation(order: Order) -> None:
    """Send order confirmation email and SMS."""

    # Email with full details
    await send_email_simple(
        to=order.customer_email,
        subject=f"Order Confirmed: #{order.id}",
        html=f"""
        <h1>Order Confirmed!</h1>
        <p>Order #: {order.id}</p>
        <p>Total: ${order.total:.2f}</p>
        <h2>Items:</h2>
        <ul>
            {"".join(f"<li>{item.name} x {item.qty}</li>" for item in order.items)}
        </ul>
        <p>Expected delivery: {order.delivery_date}</p>
        """,
    )

    # SMS with summary
    if order.customer_phone:
        await send_sms_simple(
            to=order.customer_phone,
            body=f"Order #{order.id} confirmed! Total: ${order.total:.2f}. Delivery: {order.delivery_date}",
        )

Two-Factor Authentication

Implement 2FA with SMS:

import secrets
from datetime import datetime, timedelta

# Store codes temporarily (use Redis in production)
verification_codes: dict[str, tuple[str, datetime]] = {}

async def send_2fa_code(user_id: str, phone: str) -> None:
    """Generate and send 2FA code."""
    code = secrets.token_hex(3).upper()  # 6 char hex code
    expires = datetime.now() + timedelta(minutes=5)

    # Store code
    verification_codes[user_id] = (code, expires)

    # Send SMS
    await send_sms_simple(
        to=phone,
        body=f"Your verification code: {code}\nExpires in 5 minutes.",
    )

def verify_2fa_code(user_id: str, code: str) -> bool:
    """Verify 2FA code."""
    if user_id not in verification_codes:
        return False

    stored_code, expires = verification_codes[user_id]

    if datetime.now() > expires:
        del verification_codes[user_id]
        return False

    if code == stored_code:
        del verification_codes[user_id]
        return True

    return False

Appointment Reminders

Schedule reminders before appointments:

from datetime import timedelta
from app.components.worker.pools import get_queue_pool

async def schedule_appointment_reminder(
    appointment: Appointment,
    hours_before: int = 24,
) -> None:
    """Schedule SMS reminder before appointment."""
    reminder_time = appointment.datetime - timedelta(hours=hours_before)

    pool, queue_name = await get_queue_pool()
    await pool.enqueue_job(
        "send_sms_task",
        to=appointment.phone,
        body=f"Reminder: You have an appointment tomorrow at {appointment.time}. Reply CONFIRM to confirm or CANCEL to cancel.",
        _defer_until=reminder_time,
        _queue_name=queue_name,
    )

Error Handling

Graceful Degradation

from app.services.comms.email import send_email_simple, EmailError

async def send_notification_with_fallback(
    email: str,
    phone: str,
    message: str,
) -> None:
    """Try email first, fall back to SMS on failure."""
    try:
        await send_email_simple(
            to=email,
            subject="Notification",
            text=message,
        )
    except EmailError as e:
        logger.warning(f"Email failed, trying SMS: {e}")
        await send_sms_simple(to=phone, body=message)

Logging and Monitoring

from app.core.log import logger

async def send_tracked_email(to: str, subject: str, body: str) -> str:
    """Send email with full logging."""
    logger.info(f"Sending email to {to}: {subject}")

    try:
        result = await send_email_simple(to=to, subject=subject, text=body)
        logger.info(f"Email sent successfully: {result.id}")
        return result.id
    except Exception as e:
        logger.error(f"Failed to send email to {to}: {e}")
        raise