Skip to main content

Overview

Infrastructure services provide concrete implementations of service interfaces defined in the application layer. These handle cross-cutting concerns like security, token management, and external integrations.

Service Categories

  • Security Services: Password hashing, encryption
  • Token Services: JWT creation and verification
  • External Services: Email, SMS, cloud storage
  • Caching Services: Redis, in-memory caching

JWT Token Service

Location

src/features/auth/infrastructure/services/security/jwt_handler.py:8

Implementation

from datetime import datetime, timedelta
from typing import Dict, Any, Optional
from jose import JWTError, jwt
from jose.exceptions import ExpiredSignatureError

class JWTService(ITokenService):
    """Implementación del servicio de tokens JWT usando python-jose"""
    
    def __init__(
        self,
        secret_key: str,
        algorithm: str = "HS256",
        issuer: Optional[str] = None,
        audience: Optional[str] = None
    ):
        self.secret_key = secret_key
        self.algorithm = algorithm
        self.issuer = issuer
        self.audience = audience

Creating Access Tokens

def create_access_token(self, data: Dict[str, Any], expires_in: int = 900) -> str:
    """Crear access token JWT"""
    to_encode = data.copy()
    
    # Agregar claims estándar
    now = datetime.utcnow()
    expire = now + timedelta(seconds=expires_in)
    
    to_encode.update({
        "exp": expire,
        "iat": now,
        "nbf": now,
        "type": "access"
    })
    
    if self.issuer:
        to_encode["iss"] = self.issuer
    if self.audience:
        to_encode["aud"] = self.audience
    
    return jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
Reference: jwt_handler.py:23-43 Key Features:
  • Default 15-minute expiration (900 seconds)
  • Standard JWT claims: exp, iat, nbf
  • Custom type claim to distinguish token types
  • Optional issuer and audience validation

Creating Refresh Tokens

def create_refresh_token(self, data: Dict[str, Any], expires_in: int = 2592000) -> str:
    """Crear refresh token JWT"""
    to_encode = data.copy()
    
    now = datetime.utcnow()
    expire = now + timedelta(seconds=expires_in)
    
    to_encode.update({
        "exp": expire,
        "iat": now,
        "nbf": now,
        "type": "refresh"
    })
    
    if self.issuer:
        to_encode["iss"] = self.issuer
    if self.audience:
        to_encode["aud"] = self.audience
    
    return jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
Reference: jwt_handler.py:45-64 Key Features:
  • Default 30-day expiration (2,592,000 seconds)
  • Marked with type: refresh to prevent misuse as access token
  • Same security standards as access tokens

Token Verification

def verify_token(self, token: str) -> Optional[Dict[str, Any]]:
    """Verificar y decodificar token"""
    try:
        payload = jwt.decode(
            token,
            self.secret_key,
            algorithms=[self.algorithm],
            issuer=self.issuer,
            audience=self.audience,
            options={
                "verify_exp": True,
                "verify_iss": bool(self.issuer),
                "verify_aud": bool(self.audience),
                "verify_signature": True
            }
        )
        return payload
    except (JWTError, ExpiredSignatureError):
        return None
Reference: jwt_handler.py:66-84 Verification includes:
  • Signature validation
  • Expiration check
  • Issuer validation (if configured)
  • Audience validation (if configured)

Token Utilities

Decode Without Verification

def decode_token(self, token: str) -> Optional[Dict[str, Any]]:
    """Decodificar token sin verificar"""
    try:
        return jwt.decode(
            token,
            self.secret_key,
            algorithms=[self.algorithm],
            options={"verify_signature": False}
        )
    except JWTError:
        return None
Reference: jwt_handler.py:86-96 Useful for inspecting expired tokens or debugging.

Check Expiration

def is_token_expired(self, token: str) -> bool:
    """Verificar si token está expirado"""
    payload = self.decode_token(token)
    if not payload or "exp" not in payload:
        return True
    
    expiry = datetime.fromtimestamp(payload["exp"])
    return expiry < datetime.utcnow()
Reference: jwt_handler.py:98-105

Get Expiry Date

def get_token_expiry(self, token: str) -> Optional[datetime]:
    """Obtener fecha de expiración del token"""
    payload = self.decode_token(token)
    if payload and "exp" in payload:
        return datetime.fromtimestamp(payload["exp"])
    return None
Reference: jwt_handler.py:107-112

Password Hashing Service

Location

src/features/auth/infrastructure/services/security/password_hasher.py:7

Implementation

import bcrypt
from argon2 import PasswordHasher as Argon2Hasher
from argon2.exceptions import VerifyMismatchError, InvalidHashError
from typing import Tuple

class PasswordHasher:
    """Servicio para hashing y verificación de passwords"""
    
    def __init__(self, algorithm: str = "argon2"):
        self.algorithm = algorithm
        if algorithm == "argon2":
            self.argon2_hasher = Argon2Hasher(
                time_cost=2,
                memory_cost=102400,
                parallelism=8,
                hash_len=32,
                salt_len=16
            )
Reference: password_hasher.py:10-19

Argon2 Configuration

Argon2Hasher(
    time_cost=2,        # Number of iterations
    memory_cost=102400,  # Memory usage in KiB (100 MB)
    parallelism=8,       # Number of parallel threads
    hash_len=32,         # Length of hash in bytes
    salt_len=16          # Length of salt in bytes
)
Why Argon2?
  • Winner of Password Hashing Competition (2015)
  • Resistant to GPU cracking
  • Memory-hard algorithm
  • Configurable resource usage

Hashing Passwords

def hash(self, password: str) -> Tuple[str, str]:
    """
    Hash de password
    
    Returns:
        Tuple[str, str]: (hashed_password, algorithm_used)
    """
    if self.algorithm == "bcrypt":
        salt = bcrypt.gensalt()
        hashed = bcrypt.hashpw(password.encode(), salt)
        return hashed.decode(), "bcrypt"
    
    elif self.algorithm == "argon2":
        hashed = self.argon2_hasher.hash(password)
        return hashed, "argon2"
    
    else:
        raise ValueError(f"Unsupported algorithm: {self.algorithm}")
Reference: password_hasher.py:21-38 Returns:
  • Hashed password string
  • Algorithm identifier (for future migrations)

Verifying Passwords

def verify(self, password: str, hashed_password: str) -> bool:
    """
    Verificar password contra hash
    
    Args:
        password: Password en texto plano
        hashed_password: Hash almacenado
        
    Returns:
        bool: True si el password es válido
    """
    # Detectar algoritmo por el formato del hash
    if hashed_password.startswith("$2b$") or hashed_password.startswith("$2a$"):
        # bcrypt
        try:
            return bcrypt.checkpw(password.encode(), hashed_password.encode())
        except ValueError:
            return False
    
    elif hashed_password.startswith("$argon2"):
        # argon2
        try:
            self.argon2_hasher.verify(hashed_password, password)
            return True
        except (VerifyMismatchError, InvalidHashError):
            return False
    
    else:
        # Hash desconocido
        return False
Reference: password_hasher.py:40-69 Key Features:
  • Auto-detects algorithm from hash format
  • Supports bcrypt ($2b$ prefix)
  • Supports Argon2 ($argon2 prefix)
  • Graceful error handling

Algorithm Migration

def needs_rehash(self, hashed_password: str) -> bool:
    """
    Verificar si necesita re-hash (para migración de algoritmos)
    """
    if self.algorithm == "argon2":
        return self.argon2_hasher.check_needs_rehash(hashed_password)
    
    # Para bcrypt, siempre retornar False (no tiene método de verificación)
    return False
Reference: password_hasher.py:71-79 Use this to migrate from bcrypt to Argon2:
if hasher.verify(password, user.password) and hasher.needs_rehash(user.password):
    new_hash, _ = hasher.hash(password)
    user.password = new_hash
    user_repo.save(user)

Service Configuration

Dependency Injection

from flask import Flask
from src.core.config import Config

def configure_services(app: Flask):
    # JWT Service
    jwt_service = JWTService(
        secret_key=app.config['SECRET_KEY'],
        algorithm='HS256',
        issuer='soft-bee-api',
        audience='soft-bee-client'
    )
    
    # Password Hasher
    password_hasher = PasswordHasher(algorithm='argon2')
    
    return {
        'jwt_service': jwt_service,
        'password_hasher': password_hasher
    }

Environment Variables

# .env
SECRET_KEY=your-secret-key-here
JWT_ALGORITHM=HS256
JWT_ISSUER=soft-bee-api
JWT_AUDIENCE=soft-bee-client
ACCESS_TOKEN_EXPIRE_MINUTES=15
REFRESH_TOKEN_EXPIRE_DAYS=30
PASSWORD_HASH_ALGORITHM=argon2

Best Practices

Security

  1. Secret Key Management
    • Never commit secrets to version control
    • Use environment variables or secret managers
    • Rotate keys periodically
  2. Token Expiration
    • Short-lived access tokens (15 minutes)
    • Long-lived refresh tokens (30 days)
    • Implement token revocation
  3. Password Hashing
    • Always use Argon2 or bcrypt
    • Never use MD5, SHA1, or plain SHA256
    • Configure appropriate cost factors

Error Handling

try:
    payload = jwt_service.verify_token(token)
    if not payload:
        raise AuthenticationError("Invalid token")
except Exception as e:
    logger.error(f"Token verification failed: {e}")
    raise AuthenticationError("Token verification failed")

Testing

def test_create_access_token():
    jwt_service = JWTService(secret_key="test-secret")
    token = jwt_service.create_access_token(
        data={"sub": "user123"},
        expires_in=60
    )
    
    payload = jwt_service.verify_token(token)
    assert payload["sub"] == "user123"
    assert payload["type"] == "access"

Build docs developers (and LLMs) love