Worker Configuration
Comprehensive guide to configuring and scaling the worker component in your Aegis Stack application.
Worker Settings
Workers use arq's native WorkerSettings classes with direct task imports. Each queue has its own configuration:
System Worker (app/components/worker/queues/system.py)
from arq import RedisSettings
from app.components.worker.tasks.system import system_health_check
class WorkerSettings:
functions = [system_health_check]
queue_name = "arq:queue:system"
max_jobs = 15
job_timeout = 300
Load Test Worker (app/components/worker/queues/load_test.py)
from app.components.worker.tasks.load_tasks import (
cpu_intensive_task,
io_simulation_task,
memory_operations_task,
failure_testing_task,
)
class WorkerSettings:
functions = [cpu_intensive_task, io_simulation_task, memory_operations_task, failure_testing_task]
queue_name = "arq:queue:load_test"
max_jobs = 50
job_timeout = 60
Global Configuration (app/core/config.py)
class Settings(BaseSettings):
# Redis settings for arq background tasks
REDIS_URL: str = "redis://localhost:6379"
REDIS_DB: int = 0
# Shared arq worker settings
WORKER_KEEP_RESULT_SECONDS: int = 3600
WORKER_MAX_TRIES: int = 3
# PURE ARQ - NO CONFIGURATION DICTIONARY NEEDED!
# WorkerSettings classes access these values directly
@property
def redis_settings(self) -> RedisSettings:
"""Get Redis settings for arq workers."""
return RedisSettings.from_dsn(
self.REDIS_URL,
database=self.REDIS_DB
)
Environment Variables
Configure worker behavior through environment variables:
# Docker environment (docker-compose.yml)
# WorkerSettings uses these environment variables through app.core.config
REDIS_URL=redis://redis:6379 # Redis connection for all workers
REDIS_DB=0 # Redis database number
ENVIRONMENT=production # Affects WorkerSettings logic
WORKER_KEEP_RESULT_SECONDS=3600 # Used by WorkerSettings.keep_result
WORKER_MAX_TRIES=3 # Used by WorkerSettings.max_tries
Local Development
# .env file for local development
REDIS_URL=redis://redis:6379 # Used by Docker containers
REDIS_URL_LOCAL=redis://localhost:6379 # Used by CLI commands running locally
REDIS_DB=0
WORKER_KEEP_RESULT_SECONDS=3600 # WorkerSettings will use this
WORKER_MAX_TRIES=3 # WorkerSettings will use this
ENVIRONMENT=development # WorkerSettings can check this
APP_ENV=dev # Enables auto-reload in Docker
CLI Command Configuration
When running CLI commands locally (outside Docker), the system automatically uses REDIS_URL_LOCAL if configured:
# Local CLI commands use this pattern:
# 1. Docker services connect to: redis://redis:6379
# 2. Local CLI commands connect to: redis://localhost:6379
# In your .env file:
REDIS_URL=redis://redis:6379 # For Docker containers
REDIS_URL_LOCAL=redis://localhost:6379 # For local CLI commands
# Example CLI usage (automatically uses REDIS_URL_LOCAL):
my-app load-test io
my-app health status
This dual-configuration approach allows:
- Docker containers to connect to the
redisservice - Local CLI commands to connect to
localhostRedis without starting containers
Production Settings
# Production environment variables
# These are used by WorkerSettings classes through app.core.config
ENVIRONMENT=production # WorkerSettings can scale based on this
WORKER_KEEP_RESULT_SECONDS=1800 # WorkerSettings.keep_result
WORKER_MAX_TRIES=5 # WorkerSettings.max_tries
REDIS_URL=redis://redis-prod:6379
REDIS_DB=1 # Separate DB for production
REDIS_PASSWORD=your-secure-password
Docker Integration
docker-compose.yml Configuration
services:
worker:
build: .
command: ["worker"]
environment:
- ENVIRONMENT=production
- WORKER_KEEP_RESULT_SECONDS=1800
- WORKER_MAX_TRIES=5
- REDIS_URL=redis://redis:6379
- REDIS_DB=0
depends_on:
- redis
deploy:
replicas: 1
resources:
limits:
cpus: '2'
memory: 2G
reservations:
cpus: '1'
memory: 1G
redis:
image: redis:7-alpine
command: redis-server --maxmemory 256mb --maxmemory-policy allkeys-lru
ports:
- "6379:6379"
volumes:
- redis-data:/data
Scaling Strategies
Horizontal Scaling
Scale by running multiple worker containers:
# Scale to 3 worker instances
docker compose up --scale worker=3
# Or in docker-compose.yml
deploy:
replicas: 3
Vertical Scaling
Increase per-worker concurrency by modifying WorkerSettings classes:
# app/components/worker/queues/system.py
class WorkerSettings:
functions = [system_health_check]
queue_name = "arq:queue:system"
max_jobs = 30 # Increased from 15
job_timeout = 300
# app/components/worker/queues/load_test.py
class WorkerSettings:
functions = [cpu_intensive_task, io_simulation_task]
queue_name = "arq:queue:load_test"
max_jobs = 100 # Increased from 50
job_timeout = 60
Environment-Based Configuration
WorkerSettings classes can dynamically configure based on environment variables:
# app/components/worker/queues/system.py
from app.core.config import settings
from arq import RedisSettings
class WorkerSettings:
functions = [system_health_check]
queue_name = "arq:queue:system"
# Use environment values from settings
keep_result = settings.WORKER_KEEP_RESULT_SECONDS
max_tries = settings.WORKER_MAX_TRIES
redis_settings = settings.redis_settings
# Dynamic scaling based on environment
max_jobs = 30 if settings.ENVIRONMENT == "production" else 15
job_timeout = 300 if settings.ENVIRONMENT == "production" else 600
Queue-Specific Scaling
Scale by modifying WorkerSettings classes directly:
# app/components/worker/queues/system.py
from app.core.config import settings
class WorkerSettings:
functions = [system_health_check]
queue_name = "arq:queue:system"
# Environment-based scaling
max_jobs = 25 if settings.ENVIRONMENT == "production" else 15
job_timeout = 300
keep_result = 1800 if settings.ENVIRONMENT == "production" else 3600
max_tries = 5 if settings.ENVIRONMENT == "production" else 3
# Redis settings
redis_settings = RedisSettings.from_dsn(settings.REDIS_URL)
Redis Configuration
Connection Settings
# app/core/config.py
class Settings(BaseSettings):
# Redis configuration
REDIS_URL: str = "redis://localhost:6379"
REDIS_DB: int = 0
REDIS_PASSWORD: str | None = None
REDIS_MAX_CONNECTIONS: int = 100
REDIS_CONNECTION_TIMEOUT: int = 5
@property
def redis_settings(self) -> RedisSettings:
"""Get Redis settings for arq."""
return RedisSettings(
host=self.REDIS_URL,
database=self.REDIS_DB,
password=self.REDIS_PASSWORD,
max_connections=self.REDIS_MAX_CONNECTIONS,
conn_timeout=self.REDIS_CONNECTION_TIMEOUT,
)
Redis Persistence
For production, configure Redis persistence:
# docker-compose.yml
redis:
image: redis:7-alpine
command: >
redis-server
--maxmemory 512mb
--maxmemory-policy allkeys-lru
--save 900 1
--save 300 10
--save 60 10000
--appendonly yes
volumes:
- redis-data:/data
- ./redis.conf:/usr/local/etc/redis/redis.conf
Performance Tuning
Memory Management
Control memory usage through WorkerSettings properties:
# app/components/worker/queues/system.py
class WorkerSettings:
functions = [system_health_check]
keep_result = 1800 # 30 minutes for production
max_jobs = 15 # Limit concurrent jobs to manage memory
# app/components/worker/queues/load_test.py
class WorkerSettings:
functions = [light_cpu_task, light_io_task]
keep_result = 300 # Shorter retention for high-volume tests
max_jobs = 50 # Higher concurrency for testing
Timeout Configuration
Set timeouts based on task complexity in each WorkerSettings class:
# app/components/worker/queues/system.py
class WorkerSettings:
functions = [system_health_check, cleanup_temp_files]
job_timeout = 300 # 5 minutes for system tasks
# app/components/worker/queues/load_test.py
class WorkerSettings:
functions = [cpu_intensive_task, io_simulation_task]
job_timeout = 60 # Quick test tasks
max_jobs = 50
Connection Pooling
# Optimize Redis connection pooling
REDIS_MAX_CONNECTIONS = 50 # Per worker process
REDIS_CONNECTION_TIMEOUT = 5
REDIS_SOCKET_KEEPALIVE = True
REDIS_SOCKET_KEEPALIVE_OPTIONS = {
1: 3, # TCP_KEEPIDLE
2: 3, # TCP_KEEPINTVL
3: 3, # TCP_KEEPCNT
}
Monitoring Configuration
Health Check Settings
Health checks are built into each WorkerSettings configuration:
# app/components/worker/queues/system.py
class WorkerSettings:
functions = [system_health_check] # This task provides health data
# Health check behavior
health_check_key = "worker:health:system"
health_check_interval = 30 # seconds
# arq built-in health monitoring
keep_result = 3600 # Keep results for health queries
Metrics Collection
Metrics are automatically collected by arq and available via Redis:
# Check queue metrics with arq CLI
# arq app.components.worker.queues.system.WorkerSettings --check
# Or query Redis directly
# KEYS arq:queue:*
# HGETALL arq:queue:system
Security Configuration
Authentication
# Redis authentication
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD")
REDIS_USERNAME = os.getenv("REDIS_USERNAME") # Redis 6.0+
REDIS_SSL = os.getenv("REDIS_SSL", "false").lower() == "true"
REDIS_SSL_CERT_REQS = "required" if REDIS_SSL else None
Network Security
# docker-compose.yml - Internal network only
services:
worker:
networks:
- internal
# No ports exposed
redis:
networks:
- internal
# Only expose for development
ports:
- "127.0.0.1:6379:6379" # Localhost only
networks:
internal:
driver: bridge
internal: true
Advanced Configuration
Custom Worker Classes
# app/components/worker/custom.py
from arq import Worker
class CustomWorker(Worker):
"""Extended worker with custom behavior."""
async def startup(self):
"""Initialize resources on worker startup."""
await super().startup()
# Custom initialization
await init_database_pool()
await setup_monitoring()
async def shutdown(self):
"""Cleanup on worker shutdown."""
# Custom cleanup
await close_database_pool()
await super().shutdown()
Task Registration
See Examples for complete task registration patterns.
Next Steps
- Examples - See configuration in action
- Load Testing - Test your configuration
- Back to Overview - Return to worker component overview