Skip to content

Worker Configuration

Comprehensive guide to configuring and scaling the worker component in your Aegis Stack application.

Worker Settings

Workers use arq's native WorkerSettings classes with direct task imports. Each queue has its own configuration:

System Worker (app/components/worker/queues/system.py):

from arq import RedisSettings
from app.components.worker.tasks.system import system_health_check

class WorkerSettings:
    functions = [system_health_check]
    queue_name = "arq:queue:system"
    max_jobs = 15
    job_timeout = 300

Load Test Worker (app/components/worker/queues/load_test.py):

from app.components.worker.tasks.load_tasks import (
    cpu_intensive_task,
    io_simulation_task,
    memory_operations_task,
    failure_testing_task,
)

class WorkerSettings:
    functions = [cpu_intensive_task, io_simulation_task, memory_operations_task, failure_testing_task]
    queue_name = "arq:queue:load_test"
    max_jobs = 50
    job_timeout = 60

Global Configuration (app/core/config.py):

class Settings(BaseSettings):
    REDIS_URL: str = "redis://localhost:6379"
    REDIS_DB: int = 0
    WORKER_KEEP_RESULT_SECONDS: int = 3600
    WORKER_MAX_TRIES: int = 3

    @property
    def redis_settings(self) -> RedisSettings:
        """Get Redis settings for arq workers."""
        return RedisSettings.from_dsn(self.REDIS_URL, database=self.REDIS_DB)

Dramatiq uses a single global broker shared by all queues. The broker is configured once and set globally via dramatiq.set_broker():

Broker Setup (app/components/worker/broker.py):

import dramatiq
from app.components.worker.middleware import EventPublishMiddleware
from app.core.config import settings
from dramatiq.brokers.redis import RedisBroker
from dramatiq.middleware import AsyncIO
from dramatiq.results import Results
from dramatiq.results.backends.redis import RedisBackend

# Result backend for storing task return values
result_backend = RedisBackend(url=settings.REDIS_URL)

# Create a single global RedisBroker
broker = RedisBroker(url=settings.REDIS_URL)
broker.add_middleware(AsyncIO())
broker.add_middleware(Results(backend=result_backend))
broker.add_middleware(EventPublishMiddleware())

# Set as the global broker so @dramatiq.actor uses it
dramatiq.set_broker(broker)

AsyncIO Middleware

The AsyncIO middleware is part of Dramatiq's standard library (dramatiq.middleware). No external packages are required to run async actors. Simply define your actor as async def and Dramatiq handles the event loop.

Actor Definition (app/components/worker/queues/system.py):

import dramatiq
from app.components.worker.broker import broker  # noqa: F401 - imports trigger broker setup
from app.core.log import logger

@dramatiq.actor(queue_name="system", store_results=True)
async def system_health_check() -> dict:
    """Periodic system health check."""
    logger.info("Running system health check")
    return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()}

@dramatiq.actor(queue_name="system", store_results=True, max_retries=3)
async def cleanup_temp_files(days_old: int = 7) -> dict:
    """Remove temporary files older than specified days."""
    cleaned = await remove_old_files(days_old)
    return {"files_removed": cleaned}

TaskIQ uses per-queue broker instances. Each queue module creates its own RedisStreamBroker with a unique queue name:

Queue Broker (app/components/worker/queues/system.py):

from taskiq_redis import RedisAsyncResultBackend, RedisStreamBroker
from app.components.worker.middleware_taskiq import EventPublishMiddleware
from app.core.config import settings

redis_url = settings.redis_url_effective

broker = (
    RedisStreamBroker(url=redis_url, queue_name="taskiq:system")
    .with_result_backend(
        RedisAsyncResultBackend(redis_url=redis_url, result_ex_time=60)
    )
    .with_middlewares(EventPublishMiddleware().set_queue_name("system"))
)

@broker.task
async def system_health_check() -> dict[str, str]:
    """Simple system health check task."""
    return {
        "status": "healthy",
        "timestamp": datetime.now(UTC).isoformat(),
    }

Key characteristics: - RedisStreamBroker uses Redis Streams for acknowledgement support - Unique queue_name per broker prevents worker cross-contamination - result_ex_time=60 sets result TTL in seconds - EventPublishMiddleware publishes lifecycle events for the dashboard


Environment Variables

Shared (All Backends)

# Redis connection
REDIS_URL=redis://redis:6379        # Used by Docker containers
REDIS_URL_LOCAL=redis://localhost:6379  # Used by CLI commands running locally
REDIS_DB=0

Backend-Specific

WORKER_KEEP_RESULT_SECONDS=3600     # How long to retain task results
WORKER_MAX_TRIES=3                  # Retry attempts on failure
WORKER_PROCESSES=1                  # Number of OS processes
WORKER_THREADS=8                    # Threads per process

The WORKER_PROCESSES x WORKER_THREADS formula determines total concurrency. For I/O-bound async actors, increasing threads is usually sufficient. For CPU-bound work, increase processes to utilize multiple cores.

WORKER_PROCESSES=2                  # Number of worker processes (--workers flag)

TaskIQ only uses process-based concurrency (no thread configuration). Each process runs its own async event loop.

Local Development

# .env file for local development
REDIS_URL=redis://redis:6379        # Used by Docker containers
REDIS_URL_LOCAL=redis://localhost:6379  # Used by CLI commands running locally
REDIS_DB=0
ENVIRONMENT=development
APP_ENV=dev                         # Enables auto-reload in Docker

When running CLI commands locally (outside Docker), the system automatically uses REDIS_URL_LOCAL if configured. This dual-configuration approach allows:

  • Docker containers to connect to the redis service
  • Local CLI commands to connect to localhost Redis without starting containers

Production Settings

ENVIRONMENT=production
WORKER_KEEP_RESULT_SECONDS=1800     # Shorter retention in production
WORKER_MAX_TRIES=5                  # More retries in production
REDIS_URL=redis://redis-prod:6379
REDIS_DB=1
REDIS_PASSWORD=your-secure-password
ENVIRONMENT=production
WORKER_PROCESSES=4                  # Match CPU count
WORKER_THREADS=8                    # Per-process threads
REDIS_URL=redis://redis-prod:6379
REDIS_DB=1
REDIS_PASSWORD=your-secure-password
ENVIRONMENT=production
WORKER_PROCESSES=4                  # Match CPU count
REDIS_URL=redis://redis-prod:6379
REDIS_DB=1
REDIS_PASSWORD=your-secure-password

Docker Integration

# docker-compose.yml
services:
  worker-system:
    build: .
    command:
      - worker
      - system
    environment:
      - DOCKER_CONTAINER=1
      - WORKER_QUEUE_TYPE=system
      - WORKER_TIMEOUT_SECONDS=1800
      - WORKER_MAX_TRIES=5
      - REDIS_URL=redis://redis:6379
    depends_on:
      redis:
        condition: service_healthy
    restart: unless-stopped

  worker-load-test:
    build: .
    command:
      - worker
      - load_test
    environment:
      - DOCKER_CONTAINER=1
      - WORKER_QUEUE_TYPE=load_test
      - WORKER_TIMEOUT_SECONDS=60
      - WORKER_MAX_TRIES=1
      - REDIS_URL=redis://redis:6379
    depends_on:
      redis:
        condition: service_healthy
    restart: unless-stopped
# docker-compose.yml
services:
  worker-system:
    build: .
    command:
      - worker
      - system
    environment:
      - DOCKER_CONTAINER=1
      - WORKER_QUEUE_TYPE=system
      - WORKER_PROCESSES=${SYSTEM_WORKER_PROCESSES:-2}
      - WORKER_THREADS=${SYSTEM_WORKER_THREADS:-4}
      - WORKER_TIMEOUT_SECONDS=1800
      - REDIS_URL=redis://redis:6379
    depends_on:
      redis:
        condition: service_healthy
    restart: unless-stopped

  worker-load-test:
    build: .
    command:
      - worker
      - load_test
    environment:
      - DOCKER_CONTAINER=1
      - WORKER_QUEUE_TYPE=load_test
      - WORKER_PROCESSES=${LOAD_TEST_WORKER_PROCESSES:-4}
      - WORKER_THREADS=${LOAD_TEST_WORKER_THREADS:-8}
      - WORKER_TIMEOUT_SECONDS=60
      - REDIS_URL=redis://redis:6379
    depends_on:
      redis:
        condition: service_healthy
    restart: unless-stopped
# docker-compose.yml
services:
  worker-system:
    build: .
    command:
      - worker
      - system
    environment:
      - DOCKER_CONTAINER=1
      - WORKER_QUEUE_TYPE=system
      - WORKER_PROCESSES=${SYSTEM_WORKER_PROCESSES:-2}
      - WORKER_TIMEOUT_SECONDS=1800
      - REDIS_URL=redis://redis:6379
      # Docker file watching fix
      - WATCHFILES_FORCE_POLLING=true
      - WATCHFILES_POLL_DELAY_MS=800
    depends_on:
      redis:
        condition: service_healthy
    restart: unless-stopped

  worker-load-test:
    build: .
    command:
      - worker
      - load_test
    environment:
      - DOCKER_CONTAINER=1
      - WORKER_QUEUE_TYPE=load_test
      - WORKER_PROCESSES=${LOAD_TEST_WORKER_PROCESSES:-4}
      - WORKER_TIMEOUT_SECONDS=60
      - REDIS_URL=redis://redis:6379
      - WATCHFILES_FORCE_POLLING=true
      - WATCHFILES_POLL_DELAY_MS=800
    depends_on:
      redis:
        condition: service_healthy
    restart: unless-stopped

Watchfiles Polling

TaskIQ uses watchfiles for auto-reload in development. Docker volume mounts require polling mode (WATCHFILES_FORCE_POLLING=true) since filesystem events don't propagate through bind mounts.


Scaling Strategies

Horizontal Scaling

Scale by running multiple worker containers. arq workers coordinate through Redis:

docker compose up --scale worker-system=3

Vertical Scaling

Increase per-worker concurrency via WorkerSettings:

# app/components/worker/queues/system.py
class WorkerSettings:
    functions = [system_health_check]
    queue_name = "arq:queue:system"
    max_jobs = 30  # Increased from 15
    job_timeout = 300

Environment-Based Configuration

# app/components/worker/queues/system.py
from app.core.config import settings

class WorkerSettings:
    functions = [system_health_check]
    queue_name = "arq:queue:system"

    keep_result = settings.WORKER_KEEP_RESULT_SECONDS
    max_tries = settings.WORKER_MAX_TRIES
    redis_settings = settings.redis_settings

    max_jobs = 30 if settings.ENVIRONMENT == "production" else 15
    job_timeout = 300 if settings.ENVIRONMENT == "production" else 600

Process x Thread Scaling

Dramatiq's concurrency model is explicit:

# Total concurrent messages = WORKER_PROCESSES x WORKER_THREADS
# Example: 2 processes x 8 threads = 16 concurrent messages per container

I/O-bound workloads — increase threads:

WORKER_PROCESSES=1
WORKER_THREADS=32    # Async actors spend most time awaiting I/O

CPU-bound workloads — increase processes:

WORKER_PROCESSES=4   # One per CPU core
WORKER_THREADS=4     # Fewer threads per process

Mixed workloads — balanced:

WORKER_PROCESSES=2
WORKER_THREADS=8

Horizontal Scaling

For horizontal scaling, run multiple worker containers:

docker compose up --scale worker-system=3

Each container runs its own set of processes and threads.

Process Scaling

TaskIQ uses process-based concurrency via the --workers flag:

taskiq worker app.components.worker.queues.system:broker --workers 4

Each process runs its own async event loop, making it effective for both I/O-bound and CPU-bound workloads.

In Docker, configure via environment variable:

WORKER_PROCESSES=4   # Passed to --workers flag

Horizontal Scaling

Run multiple worker containers for additional capacity:

docker compose up --scale worker-system=3

Redis Streams ensure each message is consumed by exactly one worker, even across multiple containers.


Redis Configuration

Connection Settings

# app/core/config.py
class Settings(BaseSettings):
    REDIS_URL: str = "redis://localhost:6379"
    REDIS_DB: int = 0
    REDIS_PASSWORD: str | None = None
    REDIS_MAX_CONNECTIONS: int = 100
    REDIS_CONNECTION_TIMEOUT: int = 5

Redis Persistence

For production, configure Redis persistence:

# docker-compose.yml
redis:
  image: redis:7-alpine
  command: >
    redis-server
    --maxmemory 512mb
    --maxmemory-policy allkeys-lru
    --save 900 1
    --save 300 10
    --save 60 10000
    --appendonly yes
  volumes:
    - redis-data:/data
    - ./redis.conf:/usr/local/etc/redis/redis.conf

Performance Tuning

Memory Management

class WorkerSettings:
    functions = [system_health_check]
    keep_result = 1800  # 30 minutes for production
    max_jobs = 15       # Limit concurrent jobs to manage memory

Timeout Configuration

class WorkerSettings:  # system queue
    functions = [system_health_check, cleanup_temp_files]
    job_timeout = 300  # 5 minutes for system tasks

class WorkerSettings:  # load_test queue
    functions = [cpu_intensive_task, io_simulation_task]
    job_timeout = 60   # Quick test tasks
    max_jobs = 50

Process/Thread Tuning

The WORKER_PROCESSES x WORKER_THREADS formula determines total concurrency:

# I/O-bound: more threads, fewer processes
WORKER_PROCESSES=1
WORKER_THREADS=16

# CPU-bound: more processes (one per core), fewer threads
WORKER_PROCESSES=4
WORKER_THREADS=4

Actor-Level Tuning

@dramatiq.actor(
    queue_name="system",
    store_results=True,
    max_retries=3,
    min_backoff=1000,     # 1s minimum backoff
    max_backoff=600_000,  # 10m maximum backoff
    time_limit=300_000,   # 5 minute time limit (ms)
)
async def my_task() -> dict:
    ...

Result TTL

Configure how long results are stored in Redis:

broker = (
    RedisStreamBroker(url=redis_url, queue_name="taskiq:system")
    .with_result_backend(
        RedisAsyncResultBackend(
            redis_url=redis_url,
            result_ex_time=60,  # Result TTL in seconds
        )
    )
)

Process Count

# Match CPU count for CPU-bound tasks
WORKER_PROCESSES=4

# Fewer processes for I/O-bound tasks (async handles concurrency)
WORKER_PROCESSES=2

Connection Pooling

REDIS_MAX_CONNECTIONS = 50  # Per worker process
REDIS_CONNECTION_TIMEOUT = 5
REDIS_SOCKET_KEEPALIVE = True

Monitoring & Health Checks

Health checks use Redis heartbeat keys with TTL. Each worker writes a heartbeat key on startup and updates it periodically:

# app/components/worker/queues/system.py
class WorkerSettings:
    functions = [system_health_check]
    health_check_key = "worker:health:system"
    health_check_interval = 30  # seconds
    keep_result = 3600

Check queue metrics:

arq app.components.worker.queues.system.WorkerSettings --check

The API uses LLEN on queue keys for depth metrics.

Dramatiq health is monitored via Redis queue depth and delay queue inspection:

# Query Redis directly
KEYS dramatiq:*           # List all Dramatiq keys
LLEN dramatiq:system      # System queue depth
LLEN dramatiq:system.DQ   # System delay queue depth

The Overseer dashboard reads these metrics automatically for real-time display.

TaskIQ publishes lifecycle events to Redis Streams via EventPublishMiddleware:

Event types:

  • worker.started / worker.stopped — Worker lifecycle
  • job.enqueued — Task enqueued (from client side)
  • job.started / job.completed / job.failed — Task execution
class EventPublishMiddleware(TaskiqMiddleware):
    async def startup(self) -> None:
        await publish_event(redis, "worker.started", queue_name)

    async def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
        await publish_event(redis, "job.started", queue_name, {...})

    async def post_execute(self, message, result) -> None:
        event_type = "job.failed" if result.is_err else "job.completed"
        await publish_event(redis, event_type, queue_name, {...})

The Overseer dashboard subscribes to these events via SSE for real-time updates.


Security Configuration

Authentication

REDIS_PASSWORD = os.getenv("REDIS_PASSWORD")
REDIS_USERNAME = os.getenv("REDIS_USERNAME")  # Redis 6.0+
REDIS_SSL = os.getenv("REDIS_SSL", "false").lower() == "true"

Network Security

# docker-compose.yml - Internal network only
services:
  worker-system:
    networks:
      - internal
    # No ports exposed

  redis:
    networks:
      - internal
    ports:
      - "127.0.0.1:6379:6379"  # Localhost only

networks:
  internal:
    driver: bridge
    internal: true

Advanced Configuration

Custom Worker Classes

# app/components/worker/custom.py
from arq import Worker

class CustomWorker(Worker):
    """Extended worker with custom behavior."""

    async def startup(self):
        """Initialize resources on worker startup."""
        await super().startup()
        await init_database_pool()
        await setup_monitoring()

    async def shutdown(self):
        """Cleanup on worker shutdown."""
        await close_database_pool()
        await super().shutdown()

Dramatiq Result Retrieval

Results are stored using a key derived from the actor name, queue name, and message ID. The Results middleware with RedisBackend handles storage automatically when store_results=True:

Result keys follow the pattern: MD5(namespace:queue_name:actor_name:message_id).

curl "http://localhost:8000/api/v1/tasks/result/{message_id}?actor_name=io_simulation_task&queue_name=load_test"

Custom Middleware

TaskIQ middleware hooks into the task lifecycle:

from taskiq import TaskiqMiddleware, TaskiqMessage, TaskiqResult

class MyMiddleware(TaskiqMiddleware):
    async def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
        """Run before each task."""
        logger.info(f"Starting task: {message.task_name}")
        return message

    async def post_execute(self, message: TaskiqMessage, result: TaskiqResult) -> None:
        """Run after each task."""
        if result.is_err:
            logger.error(f"Task failed: {message.task_name}")

Attach middleware when creating the broker:

broker = (
    RedisStreamBroker(url=redis_url, queue_name="taskiq:system")
    .with_middlewares(MyMiddleware())
)

Next Steps

Was this page helpful?