Skip to content

Auth Service Integration

Reference for integrating auth services into your own endpoints and background tasks. All services are async, database-backed, and injected via FastAPI's dependency system.

Service Architecture

┌─────────────────────────────────────────────────────────┐
│                    FastAPI Endpoint                      │
│   Depends(get_user_service)  Depends(get_org_service)   │
└──────────┬───────────────────────────┬──────────────────┘
           │                           │
           ▼                           ▼
    ┌─────────────┐           ┌─────────────────┐
    │ UserService │           │   OrgService    │
    │             │           │                 │
    │  create     │           │  create         │
    │  get        │           │  get / update   │
    │  update     │           │  delete / list  │
    │  delete     │           └────────┬────────┘
    │  lockout    │                    │
    │  pw reset   │                    ▼
    │  email ver  │       ┌──────────────────────┐
    └─────────────┘       │  MembershipService   │
           │              │                      │
           │              │  add / remove member │
           │              │  update role         │
           │              │  transfer ownership  │
           │              │  bulk add            │
           │              │  list with details   │
           └──────────────┴──────────┬───────────┘
                          ┌─────────────────────┐
                          │   InviteService     │
                          │                     │
                          │  create invite      │
                          │  accept by token    │
                          │  accept pending     │
                          │  list pending       │
                          └─────────────────────┘

Infrastructure (singletons, no DB dependency):
  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐
  │ TokenBlacklist  │  │  AuditEmitter   │  │   RateLimiter   │
  └─────────────────┘  └─────────────────┘  └─────────────────┘

UserService, OrgService, MembershipService, and InviteService each accept an AsyncSession in their constructor. Use the dependency functions in deps.py to get properly scoped instances — never instantiate services directly in endpoints.


UserService

Manages user lifecycle: creation, lookup, updates, deactivation, password reset, email verification, and account lockout.

class UserService:
    def __init__(self, db: AsyncSession) -> None

Methods

Method Signature Description
create_user (user_data: UserCreate) -> User Hash password and persist new user. Email is lowercased.
get_user_by_email (email: str) -> User \| None Case-insensitive email lookup.
get_user_by_id (user_id: int) -> User \| None Fetch by primary key.
update_user (user_id: int, **updates) -> User \| None Set arbitrary model fields. Sets updated_at.
activate_user (user_id: int) -> User \| None Set is_active=True.
deactivate_user (user_id: int) -> User \| None Set is_active=False.
delete_user (user_id: int) -> bool Permanent delete. Cleans up memberships and invites first.
list_users () -> list[User] All users, newest first.
find_existing_emails_with_prefix (prefix: str, domain: str) -> list[str] Find emails matching prefix*@domain. Used by CLI to auto-increment test users.
create_password_reset_token (email: str) -> str \| None Create a PasswordResetToken record, return the token string. Returns None if user not found — callers should not reveal this.
reset_password (token: str, new_password: str) -> None Validate token, set new hashed password, mark token used. Raises ValueError on invalid/expired token.
create_email_verification_token (user_id: int) -> str Create an EmailVerificationToken record, return the token string.
verify_email (token: str) -> None Set user.is_verified=True, mark token used. Raises ValueError on invalid/expired token.
record_failed_login (email: str) -> None Increment failed_login_attempts. Locks account when threshold (ACCOUNT_LOCKOUT_ATTEMPTS) is reached.
reset_login_attempts (user_id: int) -> None Zero out failed attempts and clear locked_until.
is_account_locked (user: User) -> bool Check lockout. Auto-unlocks and persists to DB when locked_until has passed.

Example: Registration with email verification

from app.components.backend.api.deps import get_user_service
from app.services.auth.user_service import UserService
from app.models.user import UserCreate

@router.post("/register", response_model=UserResponse)
async def register(
    user_data: UserCreate,
    user_service: UserService = Depends(get_user_service),
):
    existing = await user_service.get_user_by_email(user_data.email)
    if existing:
        raise HTTPException(status_code=400, detail="Email already registered")

    user = await user_service.create_user(user_data)
    token = await user_service.create_email_verification_token(user.id)

    # Send token via your email provider
    await send_verification_email(user.email, token)
    return UserResponse.model_validate(user)

Example: Login with lockout and failed-attempt tracking

@router.post("/token")
async def login(
    request: Request,
    form_data: OAuth2PasswordRequestForm = Depends(),
    user_service: UserService = Depends(get_user_service),
):
    user = await user_service.get_user_by_email(form_data.username)

    if not user:
        raise HTTPException(status_code=401, detail="Incorrect email or password")

    if await user_service.is_account_locked(user):
        raise HTTPException(
            status_code=403,
            detail="Account locked due to too many failed login attempts. Please try again later.",
        )

    if not verify_password(form_data.password, user.hashed_password):
        await user_service.record_failed_login(user.email)
        raise HTTPException(status_code=401, detail="Incorrect email or password")

    await user_service.reset_login_attempts(user.id)
    access_token = create_access_token(data={"sub": user.email})
    return {"access_token": access_token, "token_type": "bearer"}

OrgService

CRUD operations for organizations. Organizations are identified by both integer ID and a unique string slug.

class OrgService:
    def __init__(self, db: AsyncSession) -> None

Methods

Method Signature Description
create_org (org_data: OrgCreate) -> Organization Persist a new organization.
get_org_by_id (org_id: int) -> Organization \| None Fetch by primary key.
get_org_by_slug (slug: str) -> Organization \| None Fetch by unique slug.
update_org (org_id: int, **updates: str) -> Organization \| None Update arbitrary fields. Sets updated_at.
delete_org (org_id: int) -> bool Delete org and all its memberships.
list_orgs () -> list[Organization] All organizations, newest first.

Example

from app.components.backend.api.deps import get_org_service
from app.services.auth.org_service import OrgService
from app.models.org import OrgCreate, OrgResponse

@router.post("/orgs", response_model=OrgResponse)
async def create_org(
    org_data: OrgCreate,
    org_service: OrgService = Depends(get_org_service),
):
    existing = await org_service.get_org_by_slug(org_data.slug)
    if existing:
        raise HTTPException(status_code=400, detail="Slug already in use")
    org = await org_service.create_org(org_data)
    return OrgResponse.model_validate(org)

MembershipService

Manages the relationship between users and organizations. Valid roles are owner, admin, and member (defined in VALID_ORG_ROLES).

class MembershipService:
    def __init__(self, db: AsyncSession) -> None

Methods

Method Signature Description
add_member (org_id: int, user_id: int, role: str = "member") -> OrganizationMember Add a user to an org. Raises ValueError for invalid roles.
remove_member (org_id: int, user_id: int) -> bool Remove a membership record. Returns False if not found.
get_member (org_id: int, user_id: int) -> OrganizationMember \| None Fetch a specific membership.
update_member_role (org_id: int, user_id: int, role: str) -> OrganizationMember \| None Change a member's role. Raises ValueError for invalid roles.
transfer_ownership (org_id: int, current_owner_id: int, new_owner_id: int) -> None Atomically demote current owner to admin and promote new owner. New owner must already be a member.
bulk_add_members (org_id: int, user_ids: list[int], role: str = "member") -> list[OrganizationMember] Batch-add users. Skips existing members. Single query to fetch existing IDs, then bulk insert. Returns only newly added records.
list_org_members (org_id: int) -> list[OrganizationMember] All membership records for an org.
list_org_members_with_details (org_id: int) -> list[dict] Members with user email and full name. Single JOIN query.
list_user_orgs (user_id: int) -> list[Organization] All organizations a user belongs to.
list_all_memberships () -> list[dict] All memberships with org name and slug. Single JOIN query.

Example: Bulk-add members after org creation

from app.components.backend.api.deps import get_membership_service
from app.services.auth.membership_service import MembershipService

@router.post("/orgs/{org_id}/members/bulk")
async def bulk_add(
    org_id: int,
    body: BulkAddMembersRequest,
    current_user: User = Depends(require_role("admin", "owner")),
    membership_service: MembershipService = Depends(get_membership_service),
):
    added = await membership_service.bulk_add_members(org_id, body.user_ids, body.role)
    return {"added": len(added)}

Example: Transfer ownership

@router.post("/orgs/{org_id}/transfer-ownership")
async def transfer_ownership(
    org_id: int,
    body: TransferOwnershipRequest,
    current_user: User = Depends(require_role("owner")),
    membership_service: MembershipService = Depends(get_membership_service),
):
    await membership_service.transfer_ownership(org_id, current_user.id, body.user_id)
    return {"status": "ownership transferred"}

Example: List members with user details

@router.get("/orgs/{org_id}/members/details", response_model=list[MemberDetailResponse])
async def list_member_details(
    org_id: int,
    user=Depends(_get_current_user),
    membership_service: MembershipService = Depends(get_membership_service),
):
    return await membership_service.list_org_members_with_details(org_id)

InviteService

Handles org invitations. Invites can be sent to users who don't yet have an account — pending invites are automatically accepted when the user registers.

class InviteService:
    def __init__(self, db: AsyncSession) -> None

Methods

Method Signature Description
create_invite (org_id: int, email: str, role: str, invited_by: int) -> OrgInvite Create invite. If user already exists, adds them immediately and marks invite accepted. Raises ValueError on duplicate pending invite or existing membership.
accept_invite_by_token (token: str, caller_email: str, caller_user_id: int) -> OrgInvite Accept a pending invite. In email mode (INVITE_ACCEPTANCE_MODE), caller_email must match the invite. Adds user to the org and marks invite accepted.
accept_pending_invites (email: str, user_id: int) -> int Accept all pending invites for an email address. Call this after user registration. Returns the count of invites accepted.
list_pending_invites (org_id: int) -> list[OrgInvite] All pending invites for an org.

Invite status flow

create_invite()
    ├─ user exists  →  add to org immediately  →  status: "accepted"
    └─ user absent  →  store pending invite    →  status: "pending"
                            ┌───────────────────────────┘
        accept_pending_invites()  — called at registration
        accept_invite_by_token()  — called from invite link
                            └─  MembershipService.add_member()
                                invite.status = "accepted"

Example: Send invite

from app.components.backend.api.deps import get_invite_service
from app.services.auth.invite_service import InviteService

@router.post("/orgs/{org_id}/invites", response_model=InviteResponse)
async def invite_member(
    org_id: int,
    body: InviteCreate,
    current_user: User = Depends(require_role("admin", "owner")),
    invite_service: InviteService = Depends(get_invite_service),
):
    invite = await invite_service.create_invite(
        org_id=org_id,
        email=body.email,
        role=body.role,
        invited_by=current_user.id,
    )
    if invite.status == "pending":
        await send_invite_email(body.email, invite.token)
    return InviteResponse.model_validate(invite)

Example: Accept pending invites on registration

@router.post("/register", response_model=UserResponse)
async def register(
    user_data: UserCreate,
    user_service: UserService = Depends(get_user_service),
    invite_service: InviteService = Depends(get_invite_service),
):
    user = await user_service.create_user(user_data)

    # Fulfill any pending invites for this email
    await invite_service.accept_pending_invites(user.email, user.id)

    return UserResponse.model_validate(user)

Example: Accept invite by token

@router.post("/auth/accept-invite", response_model=InviteResponse)
async def accept_invite(
    body: AcceptInviteRequest,
    current_user: User = Depends(get_current_user),
    invite_service: InviteService = Depends(get_invite_service),
):
    invite = await invite_service.accept_invite_by_token(
        token=body.token,
        caller_email=current_user.email,
        caller_user_id=current_user.id,
    )
    return InviteResponse.model_validate(invite)

Dependency Injection

All services are wired in app/components/backend/api/deps.py. Each dependency function creates a fresh service instance bound to a scoped AsyncSession.

Available dependencies

# app/components/backend/api/deps.py

async def get_async_db() -> AsyncGenerator[AsyncSession]:
    """Scoped async session. Auto-commits on success, rolls back on exception."""

async def get_user_service(db: AsyncSession = Depends(get_async_db)) -> UserService:
    """UserService bound to a request-scoped session."""

async def get_org_service(db: AsyncSession = Depends(get_async_db)) -> OrgService:
    """OrgService bound to a request-scoped session."""

async def get_membership_service(db: AsyncSession = Depends(get_async_db)) -> MembershipService:
    """MembershipService bound to a request-scoped session."""

async def get_invite_service(db: AsyncSession = Depends(get_async_db)) -> InviteService:
    """InviteService bound to a request-scoped session."""

def get_audit() -> AuditEmitter:
    """Return the global audit_emitter singleton."""

Using multiple services in one endpoint

When an endpoint needs multiple services, each Depends(get_*_service) call chains through Depends(get_async_db). FastAPI deduplicates dependencies within a request, so all services in a single request share the same session.

from app.components.backend.api.deps import (
    get_org_service,
    get_membership_service,
    get_audit,
)

@router.post("/orgs", response_model=OrgResponse)
async def create_org_with_owner(
    org_data: OrgCreate,
    current_user: User = Depends(get_current_user),
    org_service: OrgService = Depends(get_org_service),
    membership_service: MembershipService = Depends(get_membership_service),
    audit: AuditEmitter = Depends(get_audit),
):
    org = await org_service.create_org(org_data)
    await membership_service.add_member(org.id, current_user.id, role="owner")
    await audit.emit(
        "org.created",
        actor_id=current_user.id,
        actor_email=current_user.email,
        target_type="org",
        target_id=org.id,
    )
    return OrgResponse.model_validate(org)

Role-based access control

require_role() is a dependency factory from app.services.auth.auth_service. It validates the JWT, loads the user, and checks their system-level role.

from app.services.auth.auth_service import require_role

# Single role
@router.delete("/users/{user_id}")
async def delete_user(
    user_id: int,
    current_user: User = Depends(require_role("admin")),
    user_service: UserService = Depends(get_user_service),
):
    deleted = await user_service.delete_user(user_id)
    if not deleted:
        raise HTTPException(status_code=404, detail="User not found")

# Multiple system roles accepted
@router.get("/auth/users")
async def list_users(
    current_user: User = Depends(require_role("admin", "moderator")),
    user_service: UserService = Depends(get_user_service),
):
    return await user_service.list_users()

Development mode

When AUTH_ENABLED=false, require_role() bypasses token validation and returns a synthetic dev user with the role configured in DEV_USER_ROLE. This applies to all auth dependencies uniformly.


Infrastructure

TokenBlacklist

In-memory JWT revocation store. Tokens are keyed by their jti claim with an expiry timestamp. A cleanup pass removes expired entries on every read and write, so memory stays bounded.

# app/services/auth/token_blacklist.py
token_blacklist = TokenBlacklist()  # global singleton
# Revoke a token on logout
from app.services.auth.token_blacklist import token_blacklist

@router.post("/logout")
async def logout(current_user: User = Depends(get_current_user), token: str = ...):
    payload = verify_token(token)
    if payload:
        jti = payload.get("jti")
        exp = payload.get("exp")
        if jti and exp:
            token_blacklist.revoke(jti, exp)
    return {"status": "logged out"}

The blacklist is consulted automatically in get_current_user_from_token — you do not need to check it manually in your endpoints.

Process-local storage

TokenBlacklist is in-memory and not shared across processes. In multi-worker deployments, revoked tokens may still be accepted by other workers. For production multi-process environments, replace the backend with a Redis-backed implementation.


AuditEmitter

Structured audit logging with a pluggable backend. The default backend writes to the audit logger as structured JSON. All fields except event_type are optional.

# app/core/audit.py
audit_emitter = AuditEmitter()  # global singleton
# Inject via Depends(get_audit) in endpoints
from app.components.backend.api.deps import get_audit
from app.core.audit import AuditEmitter

@router.post("/users/{user_id}/deactivate")
async def deactivate_user(
    user_id: int,
    current_user: User = Depends(require_role("admin")),
    user_service: UserService = Depends(get_user_service),
    audit: AuditEmitter = Depends(get_audit),
):
    user = await user_service.deactivate_user(user_id)
    await audit.emit(
        "user.deactivated",
        actor_id=current_user.id,
        actor_email=current_user.email,
        target_type="user",
        target_id=user_id,
        detail=f"Deactivated by {current_user.email}",
    )
    return UserResponse.model_validate(user)

Standard event type naming: <domain>.<action> — for example auth.login_success, org.member_added, user.password_reset.


RateLimiter

Sliding-window, in-memory rate limiter. Three pre-configured instances cover the main auth endpoints:

# app/components/backend/middleware/rate_limit.py
login_limiter          = RateLimiter(max_requests=5,  window_seconds=60)
register_limiter       = RateLimiter(max_requests=3,  window_seconds=60)
password_reset_limiter = RateLimiter(max_requests=3,  window_seconds=60)

Call .check(request) at the top of an endpoint. It raises HTTP 429 with a Retry-After header if the limit is exceeded.

from app.components.backend.middleware.rate_limit import login_limiter

@router.post("/token")
async def login(
    request: Request,
    form_data: OAuth2PasswordRequestForm = Depends(),
    user_service: UserService = Depends(get_user_service),
):
    login_limiter.check(request)  # raises 429 if over limit
    # ... authentication logic

Set TRUST_PROXY_HEADERS=true in settings when running behind a reverse proxy so the limiter reads the real client IP from X-Forwarded-For rather than the proxy's address.

Process-local storage

Like TokenBlacklist, rate limit counters are in-memory and not shared across processes. For multi-worker deployments, use a Redis-backed rate limiter.


Related:

Was this page helpful?