Worker Configuration
Comprehensive guide to configuring and scaling the worker component in your Aegis Stack application.
Worker Settings
Workers use arq's native WorkerSettings classes with direct task imports. Each queue has its own configuration:
System Worker (app/components/worker/queues/system.py):
from arq import RedisSettings
from app.components.worker.tasks.system import system_health_check
class WorkerSettings:
functions = [system_health_check]
queue_name = "arq:queue:system"
max_jobs = 15
job_timeout = 300
Load Test Worker (app/components/worker/queues/load_test.py):
from app.components.worker.tasks.load_tasks import (
cpu_intensive_task,
io_simulation_task,
memory_operations_task,
failure_testing_task,
)
class WorkerSettings:
functions = [cpu_intensive_task, io_simulation_task, memory_operations_task, failure_testing_task]
queue_name = "arq:queue:load_test"
max_jobs = 50
job_timeout = 60
Global Configuration (app/core/config.py):
class Settings(BaseSettings):
REDIS_URL: str = "redis://localhost:6379"
REDIS_DB: int = 0
WORKER_KEEP_RESULT_SECONDS: int = 3600
WORKER_MAX_TRIES: int = 3
@property
def redis_settings(self) -> RedisSettings:
"""Get Redis settings for arq workers."""
return RedisSettings.from_dsn(self.REDIS_URL, database=self.REDIS_DB)
Dramatiq uses a single global broker shared by all queues. The broker is configured once and set globally via dramatiq.set_broker():
Broker Setup (app/components/worker/broker.py):
import dramatiq
from app.components.worker.middleware import EventPublishMiddleware
from app.core.config import settings
from dramatiq.brokers.redis import RedisBroker
from dramatiq.middleware import AsyncIO
from dramatiq.results import Results
from dramatiq.results.backends.redis import RedisBackend
# Result backend for storing task return values
result_backend = RedisBackend(url=settings.REDIS_URL)
# Create a single global RedisBroker
broker = RedisBroker(url=settings.REDIS_URL)
broker.add_middleware(AsyncIO())
broker.add_middleware(Results(backend=result_backend))
broker.add_middleware(EventPublishMiddleware())
# Set as the global broker so @dramatiq.actor uses it
dramatiq.set_broker(broker)
AsyncIO Middleware
The AsyncIO middleware is part of Dramatiq's standard library (dramatiq.middleware). No external packages are required to run async actors. Simply define your actor as async def and Dramatiq handles the event loop.
Actor Definition (app/components/worker/queues/system.py):
import dramatiq
from app.components.worker.broker import broker # noqa: F401 - imports trigger broker setup
from app.core.log import logger
@dramatiq.actor(queue_name="system", store_results=True)
async def system_health_check() -> dict:
"""Periodic system health check."""
logger.info("Running system health check")
return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()}
@dramatiq.actor(queue_name="system", store_results=True, max_retries=3)
async def cleanup_temp_files(days_old: int = 7) -> dict:
"""Remove temporary files older than specified days."""
cleaned = await remove_old_files(days_old)
return {"files_removed": cleaned}
TaskIQ uses per-queue broker instances. Each queue module creates its own RedisStreamBroker with a unique queue name:
Queue Broker (app/components/worker/queues/system.py):
from taskiq_redis import RedisAsyncResultBackend, RedisStreamBroker
from app.components.worker.middleware_taskiq import EventPublishMiddleware
from app.core.config import settings
redis_url = settings.redis_url_effective
broker = (
RedisStreamBroker(url=redis_url, queue_name="taskiq:system")
.with_result_backend(
RedisAsyncResultBackend(redis_url=redis_url, result_ex_time=60)
)
.with_middlewares(EventPublishMiddleware().set_queue_name("system"))
)
@broker.task
async def system_health_check() -> dict[str, str]:
"""Simple system health check task."""
return {
"status": "healthy",
"timestamp": datetime.now(UTC).isoformat(),
}
Key characteristics:
- RedisStreamBroker uses Redis Streams for acknowledgement support
- Unique queue_name per broker prevents worker cross-contamination
- result_ex_time=60 sets result TTL in seconds
- EventPublishMiddleware publishes lifecycle events for the dashboard
Environment Variables
Shared (All Backends)
# Redis connection
REDIS_URL=redis://redis:6379 # Used by Docker containers
REDIS_URL_LOCAL=redis://localhost:6379 # Used by CLI commands running locally
REDIS_DB=0
Backend-Specific
The WORKER_PROCESSES x WORKER_THREADS formula determines total concurrency. For I/O-bound async actors, increasing threads is usually sufficient. For CPU-bound work, increase processes to utilize multiple cores.
Local Development
# .env file for local development
REDIS_URL=redis://redis:6379 # Used by Docker containers
REDIS_URL_LOCAL=redis://localhost:6379 # Used by CLI commands running locally
REDIS_DB=0
ENVIRONMENT=development
APP_ENV=dev # Enables auto-reload in Docker
When running CLI commands locally (outside Docker), the system automatically uses REDIS_URL_LOCAL if configured. This dual-configuration approach allows:
- Docker containers to connect to the
redisservice - Local CLI commands to connect to
localhostRedis without starting containers
Production Settings
Docker Integration
# docker-compose.yml
services:
worker-system:
build: .
command:
- worker
- system
environment:
- DOCKER_CONTAINER=1
- WORKER_QUEUE_TYPE=system
- WORKER_TIMEOUT_SECONDS=1800
- WORKER_MAX_TRIES=5
- REDIS_URL=redis://redis:6379
depends_on:
redis:
condition: service_healthy
restart: unless-stopped
worker-load-test:
build: .
command:
- worker
- load_test
environment:
- DOCKER_CONTAINER=1
- WORKER_QUEUE_TYPE=load_test
- WORKER_TIMEOUT_SECONDS=60
- WORKER_MAX_TRIES=1
- REDIS_URL=redis://redis:6379
depends_on:
redis:
condition: service_healthy
restart: unless-stopped
# docker-compose.yml
services:
worker-system:
build: .
command:
- worker
- system
environment:
- DOCKER_CONTAINER=1
- WORKER_QUEUE_TYPE=system
- WORKER_PROCESSES=${SYSTEM_WORKER_PROCESSES:-2}
- WORKER_THREADS=${SYSTEM_WORKER_THREADS:-4}
- WORKER_TIMEOUT_SECONDS=1800
- REDIS_URL=redis://redis:6379
depends_on:
redis:
condition: service_healthy
restart: unless-stopped
worker-load-test:
build: .
command:
- worker
- load_test
environment:
- DOCKER_CONTAINER=1
- WORKER_QUEUE_TYPE=load_test
- WORKER_PROCESSES=${LOAD_TEST_WORKER_PROCESSES:-4}
- WORKER_THREADS=${LOAD_TEST_WORKER_THREADS:-8}
- WORKER_TIMEOUT_SECONDS=60
- REDIS_URL=redis://redis:6379
depends_on:
redis:
condition: service_healthy
restart: unless-stopped
# docker-compose.yml
services:
worker-system:
build: .
command:
- worker
- system
environment:
- DOCKER_CONTAINER=1
- WORKER_QUEUE_TYPE=system
- WORKER_PROCESSES=${SYSTEM_WORKER_PROCESSES:-2}
- WORKER_TIMEOUT_SECONDS=1800
- REDIS_URL=redis://redis:6379
# Docker file watching fix
- WATCHFILES_FORCE_POLLING=true
- WATCHFILES_POLL_DELAY_MS=800
depends_on:
redis:
condition: service_healthy
restart: unless-stopped
worker-load-test:
build: .
command:
- worker
- load_test
environment:
- DOCKER_CONTAINER=1
- WORKER_QUEUE_TYPE=load_test
- WORKER_PROCESSES=${LOAD_TEST_WORKER_PROCESSES:-4}
- WORKER_TIMEOUT_SECONDS=60
- REDIS_URL=redis://redis:6379
- WATCHFILES_FORCE_POLLING=true
- WATCHFILES_POLL_DELAY_MS=800
depends_on:
redis:
condition: service_healthy
restart: unless-stopped
Watchfiles Polling
TaskIQ uses watchfiles for auto-reload in development. Docker volume mounts require polling mode (WATCHFILES_FORCE_POLLING=true) since filesystem events don't propagate through bind mounts.
Scaling Strategies
Horizontal Scaling
Scale by running multiple worker containers. arq workers coordinate through Redis:
Vertical Scaling
Increase per-worker concurrency via WorkerSettings:
# app/components/worker/queues/system.py
class WorkerSettings:
functions = [system_health_check]
queue_name = "arq:queue:system"
max_jobs = 30 # Increased from 15
job_timeout = 300
Environment-Based Configuration
# app/components/worker/queues/system.py
from app.core.config import settings
class WorkerSettings:
functions = [system_health_check]
queue_name = "arq:queue:system"
keep_result = settings.WORKER_KEEP_RESULT_SECONDS
max_tries = settings.WORKER_MAX_TRIES
redis_settings = settings.redis_settings
max_jobs = 30 if settings.ENVIRONMENT == "production" else 15
job_timeout = 300 if settings.ENVIRONMENT == "production" else 600
Process x Thread Scaling
Dramatiq's concurrency model is explicit:
# Total concurrent messages = WORKER_PROCESSES x WORKER_THREADS
# Example: 2 processes x 8 threads = 16 concurrent messages per container
I/O-bound workloads — increase threads:
CPU-bound workloads — increase processes:
Mixed workloads — balanced:
Horizontal Scaling
For horizontal scaling, run multiple worker containers:
Each container runs its own set of processes and threads.
Process Scaling
TaskIQ uses process-based concurrency via the --workers flag:
Each process runs its own async event loop, making it effective for both I/O-bound and CPU-bound workloads.
In Docker, configure via environment variable:
Horizontal Scaling
Run multiple worker containers for additional capacity:
Redis Streams ensure each message is consumed by exactly one worker, even across multiple containers.
Redis Configuration
Connection Settings
# app/core/config.py
class Settings(BaseSettings):
REDIS_URL: str = "redis://localhost:6379"
REDIS_DB: int = 0
REDIS_PASSWORD: str | None = None
REDIS_MAX_CONNECTIONS: int = 100
REDIS_CONNECTION_TIMEOUT: int = 5
Redis Persistence
For production, configure Redis persistence:
# docker-compose.yml
redis:
image: redis:7-alpine
command: >
redis-server
--maxmemory 512mb
--maxmemory-policy allkeys-lru
--save 900 1
--save 300 10
--save 60 10000
--appendonly yes
volumes:
- redis-data:/data
- ./redis.conf:/usr/local/etc/redis/redis.conf
Performance Tuning
Memory Management
class WorkerSettings:
functions = [system_health_check]
keep_result = 1800 # 30 minutes for production
max_jobs = 15 # Limit concurrent jobs to manage memory
Timeout Configuration
Process/Thread Tuning
The WORKER_PROCESSES x WORKER_THREADS formula determines total concurrency:
# I/O-bound: more threads, fewer processes
WORKER_PROCESSES=1
WORKER_THREADS=16
# CPU-bound: more processes (one per core), fewer threads
WORKER_PROCESSES=4
WORKER_THREADS=4
Actor-Level Tuning
Connection Pooling
REDIS_MAX_CONNECTIONS = 50 # Per worker process
REDIS_CONNECTION_TIMEOUT = 5
REDIS_SOCKET_KEEPALIVE = True
Monitoring & Health Checks
Health checks use Redis heartbeat keys with TTL. Each worker writes a heartbeat key on startup and updates it periodically:
# app/components/worker/queues/system.py
class WorkerSettings:
functions = [system_health_check]
health_check_key = "worker:health:system"
health_check_interval = 30 # seconds
keep_result = 3600
Check queue metrics:
The API uses LLEN on queue keys for depth metrics.
Dramatiq health is monitored via Redis queue depth and delay queue inspection:
# Query Redis directly
KEYS dramatiq:* # List all Dramatiq keys
LLEN dramatiq:system # System queue depth
LLEN dramatiq:system.DQ # System delay queue depth
The Overseer dashboard reads these metrics automatically for real-time display.
TaskIQ publishes lifecycle events to Redis Streams via EventPublishMiddleware:
Event types:
worker.started/worker.stopped— Worker lifecyclejob.enqueued— Task enqueued (from client side)job.started/job.completed/job.failed— Task execution
class EventPublishMiddleware(TaskiqMiddleware):
async def startup(self) -> None:
await publish_event(redis, "worker.started", queue_name)
async def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
await publish_event(redis, "job.started", queue_name, {...})
async def post_execute(self, message, result) -> None:
event_type = "job.failed" if result.is_err else "job.completed"
await publish_event(redis, event_type, queue_name, {...})
The Overseer dashboard subscribes to these events via SSE for real-time updates.
Security Configuration
Authentication
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD")
REDIS_USERNAME = os.getenv("REDIS_USERNAME") # Redis 6.0+
REDIS_SSL = os.getenv("REDIS_SSL", "false").lower() == "true"
Network Security
# docker-compose.yml - Internal network only
services:
worker-system:
networks:
- internal
# No ports exposed
redis:
networks:
- internal
ports:
- "127.0.0.1:6379:6379" # Localhost only
networks:
internal:
driver: bridge
internal: true
Advanced Configuration
Custom Worker Classes
# app/components/worker/custom.py
from arq import Worker
class CustomWorker(Worker):
"""Extended worker with custom behavior."""
async def startup(self):
"""Initialize resources on worker startup."""
await super().startup()
await init_database_pool()
await setup_monitoring()
async def shutdown(self):
"""Cleanup on worker shutdown."""
await close_database_pool()
await super().shutdown()
Dramatiq Result Retrieval
Results are stored using a key derived from the actor name, queue name, and message ID. The Results middleware with RedisBackend handles storage automatically when store_results=True:
Result keys follow the pattern: MD5(namespace:queue_name:actor_name:message_id).
Custom Middleware
TaskIQ middleware hooks into the task lifecycle:
from taskiq import TaskiqMiddleware, TaskiqMessage, TaskiqResult
class MyMiddleware(TaskiqMiddleware):
async def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
"""Run before each task."""
logger.info(f"Starting task: {message.task_name}")
return message
async def post_execute(self, message: TaskiqMessage, result: TaskiqResult) -> None:
"""Run after each task."""
if result.is_err:
logger.error(f"Task failed: {message.task_name}")
Attach middleware when creating the broker:
Next Steps
- Examples - See configuration in action
- Load Testing - Test your configuration
- Back to Overview - Return to worker component overview