Skip to main content
Callbacks allow you to extend AuthX with custom logic for user retrieval, token validation, and permission checks. This enables integration with databases, custom authorization systems, and business logic.

Available callbacks

AuthX supports two main callback types:

Subject getter

Retrieve and serialize user data based on token subject

Token blocklist

Check if a token has been revoked or blocked

Subject getter callback

The subject getter callback retrieves user information from your data store:

Basic usage

from fastapi import FastAPI, HTTPException
from authx import AuthX, AuthXConfig
from typing import Optional, Dict, Any

app = FastAPI()

auth_config = AuthXConfig(
    JWT_ALGORITHM="HS256",
    JWT_SECRET_KEY="your-secret-key",
)

auth = AuthX(config=auth_config)
auth.handle_errors(app)

# Simple in-memory user database
USERS_DB = {
    "user1": {
        "username": "user1",
        "email": "user1@example.com",
        "full_name": "User One",
        "role": "admin",
    },
    "user2": {
        "username": "user2",
        "email": "user2@example.com",
        "full_name": "User Two",
        "role": "user",
    },
}

# Define the callback function
def get_user_from_uid(uid: str) -> Optional[Dict[str, Any]]:
    """Retrieve user information based on the token's subject (uid).
    
    Args:
        uid: The user identifier from the token's 'sub' claim
        
    Returns:
        User dictionary if found, None otherwise
    """
    return USERS_DB.get(uid)

# Register the callback
auth.set_subject_getter(get_user_from_uid)

Using the callback in routes

from fastapi import Request

@app.get("/me")
async def get_current_user(request: Request):
    """Get the current user's information."""
    # This calls the subject getter callback automatically
    user = await auth.get_current_subject(request)
    
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    
    return user

# Or use the CURRENT_SUBJECT dependency
@app.get("/profile")
async def get_profile(user: Dict[str, Any] = auth.CURRENT_SUBJECT):
    """Get user profile using dependency injection."""
    return {
        "username": user["username"],
        "email": user["email"],
        "full_name": user["full_name"],
        "role": user["role"],
    }

Async callback

Use async callbacks for database queries:
from motor.motor_asyncio import AsyncIOMotorClient
from typing import Optional, Dict, Any

# MongoDB client
mongo_client = AsyncIOMotorClient("mongodb://localhost:27017")
db = mongo_client.myapp

async def get_user_from_database(uid: str) -> Optional[Dict[str, Any]]:
    """Async callback to fetch user from MongoDB.
    
    Args:
        uid: The user identifier from the token
        
    Returns:
        User document if found, None otherwise
    """
    user = await db.users.find_one({"_id": uid})
    if user:
        # Convert ObjectId to string for JSON serialization
        user["_id"] = str(user["_id"])
    return user

# Register async callback
auth.set_subject_getter(get_user_from_database)

Type-safe callbacks with Pydantic

Use Pydantic models for type safety:
from pydantic import BaseModel, EmailStr
from typing import Optional

class User(BaseModel):
    """User model."""
    username: str
    email: EmailStr
    full_name: str
    role: str
    is_active: bool = True

# Configure AuthX with the User model
auth = AuthX(config=auth_config, model=User)

async def get_user_model(uid: str) -> Optional[User]:
    """Retrieve user as Pydantic model.
    
    Args:
        uid: User identifier
        
    Returns:
        User model instance or None
    """
    user_data = await db.users.find_one({"username": uid})
    if user_data:
        return User(**user_data)
    return None

auth.set_subject_getter(get_user_model)

# Now the dependency is type-safe
@app.get("/profile")
async def get_profile(user: User = auth.CURRENT_SUBJECT):
    """Get profile with type-safe user model."""
    return {
        "username": user.username,
        "email": user.email,
        "full_name": user.full_name,
        "role": user.role,
    }

Token blocklist callback

The blocklist callback checks if a token has been revoked:

Basic implementation

# In-memory blocklist (use Redis in production)
REVOKED_TOKENS: set[str] = set()

def check_if_token_revoked(token: str) -> bool:
    """Check if a token has been revoked.
    
    Args:
        token: The raw JWT token string
        
    Returns:
        True if the token is revoked, False otherwise
    """
    return token in REVOKED_TOKENS

# Register the callback
auth.set_token_blocklist(check_if_token_revoked)

Using with logout

from fastapi import Request

@app.post("/logout")
async def logout(request: Request):
    """Logout and revoke the current token."""
    # Get the current token
    token = await auth.get_access_token_from_request(request)
    
    # Add to blocklist
    REVOKED_TOKENS.add(token.token)
    
    return {"message": "Successfully logged out"}

# Protected routes automatically check the blocklist
@app.get("/protected")
async def protected_route(user: Dict = auth.CURRENT_SUBJECT):
    """If token is in blocklist, this raises RevokedTokenError."""
    return {"message": "Access granted", "user": user}

Async blocklist with Redis

import aioredis
from typing import Optional

# Redis client
redis: Optional[aioredis.Redis] = None

@app.on_event("startup")
async def startup():
    global redis
    redis = await aioredis.from_url("redis://localhost")

@app.on_event("shutdown")
async def shutdown():
    if redis:
        await redis.close()

async def check_token_in_redis(token: str) -> bool:
    """Check if token is revoked using Redis.
    
    Args:
        token: The JWT token string
        
    Returns:
        True if revoked, False otherwise
    """
    if redis is None:
        return False
    
    # Check if token exists in Redis
    is_revoked = await redis.exists(f"revoked_token:{token}")
    return bool(is_revoked)

# Register async callback
auth.set_token_blocklist(check_token_in_redis)

@app.post("/logout")
async def logout(request: Request):
    """Logout and store revoked token in Redis."""
    token = await auth.get_access_token_from_request(request)
    payload = auth.verify_token(token)
    
    # Calculate time until expiration
    import time
    exp_timestamp = payload.exp
    ttl = max(0, int(exp_timestamp - time.time()))
    
    # Store in Redis with expiration
    if redis:
        await redis.setex(
            f"revoked_token:{token.token}",
            ttl,
            "revoked"
        )
    
    return {"message": "Successfully logged out"}

Using JTI for efficient revocation

# Store only JTIs instead of full tokens
REVOKED_JTIS: set[str] = set()

def check_jti_revoked(token: str) -> bool:
    """Check if token's JTI is revoked.
    
    Args:
        token: The JWT token string
        
    Returns:
        True if the token's JTI is revoked
    """
    try:
        # Decode token without verification to get JTI
        from authx.token import decode_token
        payload = decode_token(
            token,
            key=auth.config.public_key,
            verify=False
        )
        jti = payload.get("jti")
        return jti in REVOKED_JTIS
    except:
        return False

auth.set_token_blocklist(check_jti_revoked)

@app.post("/logout")
async def logout(request: Request):
    """Logout by revoking token's JTI."""
    token = await auth.get_access_token_from_request(request)
    payload = auth.verify_token(token)
    
    # Add JTI to revoked set
    REVOKED_JTIS.add(payload.jti)
    
    return {"message": "Successfully logged out"}

Combining callbacks

Use both callbacks together:
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel, EmailStr
from typing import Optional
import aioredis

app = FastAPI()

auth_config = AuthXConfig(
    JWT_ALGORITHM="HS256",
    JWT_SECRET_KEY="your-secret-key",
)

auth = AuthX(config=auth_config)
auth.handle_errors(app)

# User model
class User(BaseModel):
    username: str
    email: EmailStr
    full_name: str
    role: str
    is_active: bool = True

# Configure with model
auth = AuthX(config=auth_config, model=User)

# Database (example with SQLAlchemy)
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select

async def get_user_by_username(uid: str) -> Optional[User]:
    """Retrieve user from database.
    
    Args:
        uid: Username from token subject
        
    Returns:
        User model or None
    """
    async with get_db_session() as session:
        result = await session.execute(
            select(UserTable).where(UserTable.username == uid)
        )
        user_row = result.scalar_one_or_none()
        
        if user_row:
            return User(
                username=user_row.username,
                email=user_row.email,
                full_name=user_row.full_name,
                role=user_row.role,
                is_active=user_row.is_active,
            )
        return None

# Redis for token revocation
redis_client: Optional[aioredis.Redis] = None

async def is_token_revoked(token: str) -> bool:
    """Check if token is revoked in Redis.
    
    Args:
        token: JWT token string
        
    Returns:
        True if revoked
    """
    if redis_client is None:
        return False
    
    is_revoked = await redis_client.exists(f"revoked:{token}")
    return bool(is_revoked)

# Register callbacks
auth.set_subject_getter(get_user_by_username)
auth.set_token_blocklist(is_token_revoked)

@app.post("/login")
async def login(username: str, password: str):
    """Login endpoint."""
    # Verify credentials (use proper password hashing)
    user = await get_user_by_username(username)
    if not user or not verify_password(password, user.password_hash):
        raise HTTPException(status_code=401, detail="Invalid credentials")
    
    # Create token
    access_token = auth.create_access_token(uid=username)
    return {"access_token": access_token, "token_type": "bearer"}

@app.get("/me")
async def get_current_user(user: User = auth.CURRENT_SUBJECT):
    """Get current user (uses subject getter callback)."""
    return user

@app.post("/logout")
async def logout(request: Request):
    """Logout (uses blocklist callback)."""
    token = await auth.get_access_token_from_request(request)
    payload = auth.verify_token(token)
    
    # Add to Redis blocklist
    if redis_client:
        import time
        ttl = max(0, int(payload.exp - time.time()))
        await redis_client.setex(f"revoked:{token.token}", ttl, "1")
    
    return {"message": "Logged out successfully"}

@app.get("/protected")
async def protected_route(user: User = auth.CURRENT_SUBJECT):
    """Protected route.
    
    Automatically checks:
    1. Token exists and is valid
    2. Token is not revoked (blocklist callback)
    3. User exists in database (subject getter callback)
    """
    return {
        "message": "Access granted",
        "user": user.username,
        "role": user.role,
    }

Error handling in callbacks

Handle errors gracefully:
import logging
from typing import Optional

logger = logging.getLogger(__name__)

async def safe_get_user(uid: str) -> Optional[User]:
    """Get user with error handling.
    
    Args:
        uid: User identifier
        
    Returns:
        User model or None
    """
    try:
        async with get_db_session() as session:
            result = await session.execute(
                select(UserTable).where(UserTable.username == uid)
            )
            user_row = result.scalar_one_or_none()
            
            if user_row:
                return User(**user_row.__dict__)
            
            logger.warning(f"User not found: {uid}")
            return None
            
    except Exception as e:
        logger.error(f"Error fetching user {uid}: {e}")
        return None

async def safe_check_blocklist(token: str) -> bool:
    """Check blocklist with error handling.
    
    Args:
        token: JWT token
        
    Returns:
        True if revoked, False on error or if not revoked
    """
    try:
        if redis_client is None:
            return False
        
        is_revoked = await redis_client.exists(f"revoked:{token}")
        return bool(is_revoked)
        
    except Exception as e:
        logger.error(f"Error checking token blocklist: {e}")
        # Fail open: allow access if we can't check
        # Or fail closed: return True to deny access
        return False

auth.set_subject_getter(safe_get_user)
auth.set_token_blocklist(safe_check_blocklist)

Complete example

from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel, EmailStr
from typing import Optional
from authx import AuthX, AuthXConfig
import logging

logger = logging.getLogger(__name__)

app = FastAPI(title="Custom Callbacks Example")

# Configuration
auth_config = AuthXConfig(
    JWT_ALGORITHM="HS256",
    JWT_SECRET_KEY="your-secret-key",
)

# User model
class User(BaseModel):
    username: str
    email: EmailStr
    full_name: str
    role: str

# Initialize with model
auth = AuthX(config=auth_config, model=User)
auth.handle_errors(app)

# Mock database
USERS_DB = {
    "admin": User(
        username="admin",
        email="admin@example.com",
        full_name="Admin User",
        role="admin",
    ),
    "user1": User(
        username="user1",
        email="user1@example.com",
        full_name="Regular User",
        role="user",
    ),
}

# Mock blocklist
REVOKED_TOKENS: set[str] = set()

# Subject getter callback
async def get_user(uid: str) -> Optional[User]:
    """Retrieve user by username."""
    user = USERS_DB.get(uid)
    if not user:
        logger.warning(f"User not found: {uid}")
    return user

# Blocklist callback
def is_token_revoked(token: str) -> bool:
    """Check if token is revoked."""
    return token in REVOKED_TOKENS

# Register callbacks
auth.set_subject_getter(get_user)
auth.set_token_blocklist(is_token_revoked)

@app.post("/login")
def login(username: str, password: str):
    """Login endpoint."""
    user = USERS_DB.get(username)
    if not user:
        raise HTTPException(status_code=401, detail="Invalid credentials")
    
    access_token = auth.create_access_token(uid=username)
    return {"access_token": access_token, "token_type": "bearer"}

@app.get("/me")
async def get_current_user(user: User = auth.CURRENT_SUBJECT):
    """Get current user information."""
    return user

@app.post("/logout")
async def logout(request: Request):
    """Logout and revoke token."""
    token = await auth.get_access_token_from_request(request)
    REVOKED_TOKENS.add(token.token)
    return {"message": "Logged out successfully"}

@app.get("/admin")
async def admin_only(user: User = auth.CURRENT_SUBJECT):
    """Admin-only endpoint."""
    if user.role != "admin":
        raise HTTPException(status_code=403, detail="Admin access required")
    
    return {"message": "Admin access granted", "user": user.username}

Best practices

Callback recommendations:
  • Use async callbacks for database/Redis operations
  • Handle errors gracefully and log failures
  • Use Pydantic models for type safety
  • Cache user data to reduce database queries
  • Use Redis for token blocklists in production
  • Store JTI instead of full tokens for efficiency
  • Set appropriate TTL for revoked tokens
  • Return None from subject getter if user not found
  • Return False from blocklist if check fails (fail open)
  • Test callbacks thoroughly

Next steps

Token revocation

Deep dive into token revocation strategies

Error handling

Handle errors in callbacks

Scopes and permissions

Combine callbacks with scope-based permissions

Basic usage

Return to basic authentication concepts

Build docs developers (and LLMs) love