Skip to content

Worker Configuration

Comprehensive guide to configuring and scaling the worker component in your Aegis Stack application.

Worker Settings

Workers use arq's native WorkerSettings classes with direct task imports. Each queue has its own configuration:

System Worker (app/components/worker/queues/system.py)

from arq import RedisSettings
from app.components.worker.tasks.system import system_health_check

class WorkerSettings:
    functions = [system_health_check]
    queue_name = "arq:queue:system"
    max_jobs = 15
    job_timeout = 300

Load Test Worker (app/components/worker/queues/load_test.py)

from app.components.worker.tasks.load_tasks import (
    cpu_intensive_task,
    io_simulation_task,
    memory_operations_task,
    failure_testing_task,
)

class WorkerSettings:
    functions = [cpu_intensive_task, io_simulation_task, memory_operations_task, failure_testing_task]
    queue_name = "arq:queue:load_test"
    max_jobs = 50
    job_timeout = 60

Global Configuration (app/core/config.py)

class Settings(BaseSettings):
    # Redis settings for arq background tasks
    REDIS_URL: str = "redis://localhost:6379"
    REDIS_DB: int = 0

    # Shared arq worker settings
    WORKER_KEEP_RESULT_SECONDS: int = 3600
    WORKER_MAX_TRIES: int = 3

    # PURE ARQ - NO CONFIGURATION DICTIONARY NEEDED!
    # WorkerSettings classes access these values directly

    @property
    def redis_settings(self) -> RedisSettings:
        """Get Redis settings for arq workers."""
        return RedisSettings.from_dsn(
            self.REDIS_URL,
            database=self.REDIS_DB
        )

Environment Variables

Configure worker behavior through environment variables:

# Docker environment (docker-compose.yml)
# WorkerSettings uses these environment variables through app.core.config
REDIS_URL=redis://redis:6379        # Redis connection for all workers
REDIS_DB=0                          # Redis database number
ENVIRONMENT=production              # Affects WorkerSettings logic
WORKER_KEEP_RESULT_SECONDS=3600     # Used by WorkerSettings.keep_result
WORKER_MAX_TRIES=3                  # Used by WorkerSettings.max_tries

Local Development

# .env file for local development
REDIS_URL=redis://redis:6379        # Used by Docker containers
REDIS_URL_LOCAL=redis://localhost:6379  # Used by CLI commands running locally
REDIS_DB=0
WORKER_KEEP_RESULT_SECONDS=3600     # WorkerSettings will use this
WORKER_MAX_TRIES=3                  # WorkerSettings will use this  
ENVIRONMENT=development             # WorkerSettings can check this
APP_ENV=dev                         # Enables auto-reload in Docker

CLI Command Configuration

When running CLI commands locally (outside Docker), the system automatically uses REDIS_URL_LOCAL if configured:

# Local CLI commands use this pattern:
# 1. Docker services connect to: redis://redis:6379
# 2. Local CLI commands connect to: redis://localhost:6379

# In your .env file:
REDIS_URL=redis://redis:6379        # For Docker containers
REDIS_URL_LOCAL=redis://localhost:6379  # For local CLI commands

# Example CLI usage (automatically uses REDIS_URL_LOCAL):
my-app load-test io
my-app health status

This dual-configuration approach allows:

  • Docker containers to connect to the redis service
  • Local CLI commands to connect to localhost Redis without starting containers

Production Settings

# Production environment variables
# These are used by WorkerSettings classes through app.core.config
ENVIRONMENT=production              # WorkerSettings can scale based on this
WORKER_KEEP_RESULT_SECONDS=1800     # WorkerSettings.keep_result
WORKER_MAX_TRIES=5                  # WorkerSettings.max_tries
REDIS_URL=redis://redis-prod:6379
REDIS_DB=1                          # Separate DB for production
REDIS_PASSWORD=your-secure-password

Docker Integration

docker-compose.yml Configuration

services:
  worker:
    build: .
    command: ["worker"]
    environment:
      - ENVIRONMENT=production
      - WORKER_KEEP_RESULT_SECONDS=1800
      - WORKER_MAX_TRIES=5
      - REDIS_URL=redis://redis:6379
      - REDIS_DB=0
    depends_on:
      - redis
    deploy:
      replicas: 1
      resources:
        limits:
          cpus: '2'
          memory: 2G
        reservations:
          cpus: '1'
          memory: 1G

  redis:
    image: redis:7-alpine
    command: redis-server --maxmemory 256mb --maxmemory-policy allkeys-lru
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data

Scaling Strategies

Horizontal Scaling

Scale by running multiple worker containers:

# Scale to 3 worker instances
docker compose up --scale worker=3

# Or in docker-compose.yml
deploy:
  replicas: 3

Vertical Scaling

Increase per-worker concurrency by modifying WorkerSettings classes:

# app/components/worker/queues/system.py
class WorkerSettings:
    functions = [system_health_check]
    queue_name = "arq:queue:system"
    max_jobs = 30  # Increased from 15
    job_timeout = 300

# app/components/worker/queues/load_test.py
class WorkerSettings:
    functions = [cpu_intensive_task, io_simulation_task]
    queue_name = "arq:queue:load_test"
    max_jobs = 100  # Increased from 50
    job_timeout = 60

Environment-Based Configuration

WorkerSettings classes can dynamically configure based on environment variables:

# app/components/worker/queues/system.py
from app.core.config import settings
from arq import RedisSettings

class WorkerSettings:
    functions = [system_health_check]
    queue_name = "arq:queue:system"

    # Use environment values from settings
    keep_result = settings.WORKER_KEEP_RESULT_SECONDS
    max_tries = settings.WORKER_MAX_TRIES
    redis_settings = settings.redis_settings

    # Dynamic scaling based on environment
    max_jobs = 30 if settings.ENVIRONMENT == "production" else 15
    job_timeout = 300 if settings.ENVIRONMENT == "production" else 600

Queue-Specific Scaling

Scale by modifying WorkerSettings classes directly:

# app/components/worker/queues/system.py
from app.core.config import settings

class WorkerSettings:
    functions = [system_health_check]
    queue_name = "arq:queue:system"

    # Environment-based scaling
    max_jobs = 25 if settings.ENVIRONMENT == "production" else 15
    job_timeout = 300
    keep_result = 1800 if settings.ENVIRONMENT == "production" else 3600
    max_tries = 5 if settings.ENVIRONMENT == "production" else 3

    # Redis settings
    redis_settings = RedisSettings.from_dsn(settings.REDIS_URL)

Redis Configuration

Connection Settings

# app/core/config.py
class Settings(BaseSettings):
    # Redis configuration
    REDIS_URL: str = "redis://localhost:6379"
    REDIS_DB: int = 0
    REDIS_PASSWORD: str | None = None
    REDIS_MAX_CONNECTIONS: int = 100
    REDIS_CONNECTION_TIMEOUT: int = 5

    @property
    def redis_settings(self) -> RedisSettings:
        """Get Redis settings for arq."""
        return RedisSettings(
            host=self.REDIS_URL,
            database=self.REDIS_DB,
            password=self.REDIS_PASSWORD,
            max_connections=self.REDIS_MAX_CONNECTIONS,
            conn_timeout=self.REDIS_CONNECTION_TIMEOUT,
        )

Redis Persistence

For production, configure Redis persistence:

# docker-compose.yml
redis:
  image: redis:7-alpine
  command: >
    redis-server
    --maxmemory 512mb
    --maxmemory-policy allkeys-lru
    --save 900 1
    --save 300 10
    --save 60 10000
    --appendonly yes
  volumes:
    - redis-data:/data
    - ./redis.conf:/usr/local/etc/redis/redis.conf

Performance Tuning

Memory Management

Control memory usage through WorkerSettings properties:

# app/components/worker/queues/system.py
class WorkerSettings:
    functions = [system_health_check]
    keep_result = 1800  # 30 minutes for production
    max_jobs = 15       # Limit concurrent jobs to manage memory

# app/components/worker/queues/load_test.py
class WorkerSettings:
    functions = [light_cpu_task, light_io_task]
    keep_result = 300   # Shorter retention for high-volume tests
    max_jobs = 50       # Higher concurrency for testing

Timeout Configuration

Set timeouts based on task complexity in each WorkerSettings class:

# app/components/worker/queues/system.py
class WorkerSettings:
    functions = [system_health_check, cleanup_temp_files]
    job_timeout = 300  # 5 minutes for system tasks

# app/components/worker/queues/load_test.py
class WorkerSettings:
    functions = [cpu_intensive_task, io_simulation_task]
    job_timeout = 60  # Quick test tasks
    max_jobs = 50

Connection Pooling

# Optimize Redis connection pooling
REDIS_MAX_CONNECTIONS = 50  # Per worker process
REDIS_CONNECTION_TIMEOUT = 5
REDIS_SOCKET_KEEPALIVE = True
REDIS_SOCKET_KEEPALIVE_OPTIONS = {
    1: 3,   # TCP_KEEPIDLE
    2: 3,   # TCP_KEEPINTVL
    3: 3,   # TCP_KEEPCNT
}

Monitoring Configuration

Health Check Settings

Health checks are built into each WorkerSettings configuration:

# app/components/worker/queues/system.py
class WorkerSettings:
    functions = [system_health_check]  # This task provides health data

    # Health check behavior
    health_check_key = "worker:health:system"
    health_check_interval = 30  # seconds

    # arq built-in health monitoring
    keep_result = 3600  # Keep results for health queries

Metrics Collection

Metrics are automatically collected by arq and available via Redis:

# Check queue metrics with arq CLI
# arq app.components.worker.queues.system.WorkerSettings --check

# Or query Redis directly
# KEYS arq:queue:*
# HGETALL arq:queue:system

Security Configuration

Authentication

# Redis authentication
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD")
REDIS_USERNAME = os.getenv("REDIS_USERNAME")  # Redis 6.0+
REDIS_SSL = os.getenv("REDIS_SSL", "false").lower() == "true"
REDIS_SSL_CERT_REQS = "required" if REDIS_SSL else None

Network Security

# docker-compose.yml - Internal network only
services:
  worker:
    networks:
      - internal
    # No ports exposed

  redis:
    networks:
      - internal
    # Only expose for development
    ports:
      - "127.0.0.1:6379:6379"  # Localhost only

networks:
  internal:
    driver: bridge
    internal: true

Advanced Configuration

Custom Worker Classes

# app/components/worker/custom.py
from arq import Worker

class CustomWorker(Worker):
    """Extended worker with custom behavior."""

    async def startup(self):
        """Initialize resources on worker startup."""
        await super().startup()
        # Custom initialization
        await init_database_pool()
        await setup_monitoring()

    async def shutdown(self):
        """Cleanup on worker shutdown."""
        # Custom cleanup
        await close_database_pool()
        await super().shutdown()

Task Registration

See Examples for complete task registration patterns.

Next Steps