Documentation Index
Fetch the complete documentation index at: https://mintlify.com/azfar-imtiaz/PayPulse-Cloud/llms.txt
Use this file to discover all available pages before exploring further.
OAuth token management in PayPulse Cloud is handled by oauth_utils.py and secretsmanager_utils.py in the common Lambda layer. The design keeps token handling centralized so that every Lambda function that needs Gmail access follows the same refresh-and-validate path.
Token storage pattern
Tokens are stored per user in AWS Secrets Manager using the key pattern:
Where {user_id} is the internal PayPulse user ID (a UUID prefixed with user_). Each secret is a JSON object containing the following fields:
| Field | Type | Description |
|---|
access_token | string | Short-lived OAuth access token (1-hour lifetime) |
refresh_token | string | Long-lived token used to acquire new access tokens |
expires_at | string | ISO 8601 UTC timestamp of access token expiry |
expires_in | integer | Token lifetime in seconds at time of issue |
scope | string | Space-separated list of granted OAuth scopes |
token_type | string | Always "Bearer" |
created_at | string | ISO 8601 UTC timestamp of when the secret was written |
google_user_id | string | Google’s unique identifier for the connected account |
google_email | string | Google email address of the connected account |
google_name | string | Display name from the Google account |
google_verified_email | boolean | Whether Google has verified the email address |
The prepare_oauth_secret_data function in oauth_utils.py builds this structure before storage:
def prepare_oauth_secret_data(
access_token: str,
refresh_token: Optional[str],
expires_in: int,
scope: str,
google_user_info: Dict[str, str] = None
) -> Dict[str, Any]:
import time
from datetime import datetime, timedelta
# Calculate expiration timestamp
expires_at = datetime.utcnow() + timedelta(seconds=expires_in)
oauth_data = {
"access_token": access_token,
"expires_at": expires_at.isoformat(),
"expires_in": expires_in,
"scope": scope,
"token_type": "Bearer",
"created_at": datetime.utcnow().isoformat()
}
if refresh_token:
oauth_data.update({"refresh_token": refresh_token})
# Add Google user information if provided
if google_user_info:
oauth_data.update(google_user_info)
return oauth_data
Token expiry check
Before a token is used, is_token_expired compares the stored expires_at timestamp against the current UTC time:
def is_token_expired(oauth_data: Dict[str, Any]) -> bool:
if 'expires_at' not in oauth_data:
return False
from datetime import datetime
expires_at = datetime.fromisoformat(oauth_data['expires_at'])
return datetime.utcnow() >= expires_at
If expires_at is absent (for example, in older records), the function returns False and lets the token attempt proceed.
Automatic token refresh
Token refresh happens inside create_gmail_service in gmail_api_utils.py. The function checks whether the token has expired or will expire within the next 5 minutes, and refreshes proactively if so:
# Check if token is expired or will expire soon, and refresh proactively
should_refresh = False
# Check expiration using the stored expires_at timestamp
if expires_at:
try:
expiry_time = dateutil.parser.isoparse(expires_at).replace(tzinfo=None)
current_time = datetime.utcnow()
time_until_expiry = expiry_time - current_time
# Refresh if expired or expiring within 5 minutes
should_refresh = time_until_expiry < timedelta(minutes=5)
logging.info(f"Token expires in {time_until_expiry.total_seconds():.0f} seconds")
except Exception as e:
logging.warning(f"Could not parse expires_at '{expires_at}': {e}. Checking credentials.expired")
should_refresh = credentials.expired
else:
# Fallback to credentials.expired if no expires_at provided
should_refresh = credentials.expired
if should_refresh:
logging.info("Access token expired or expiring soon, refreshing...")
try:
import google.auth.transport.requests
request = google.auth.transport.requests.Request()
credentials.refresh(request)
# Update tokens in Secrets Manager
new_expires_in = int((credentials.expiry - datetime.utcnow()).total_seconds()) if credentials.expiry else 3600
update_oauth_tokens(
user_id=user_id,
access_token=credentials.token,
refresh_token=credentials.refresh_token or refresh_token,
expires_in=new_expires_in,
region=region
)
logging.info("Access token refreshed and updated in Secrets Manager")
except RefreshError as e:
error_str = str(e)
# Check if this is an expired/revoked refresh token
if 'invalid_grant' in error_str and ('expired' in error_str or 'revoked' in error_str):
logging.warning(f"Refresh token expired/revoked for user {user_id}. Clearing tokens.")
from utils.secretsmanager_utils import delete_oauth_tokens
try:
delete_oauth_tokens(user_id, region)
except Exception as delete_error:
logging.error(f"Failed to clear expired tokens: {delete_error}")
raise RefreshTokenExpiredError(
"Refresh token has expired or been revoked. Please re-connect your Gmail account."
) from e
else:
raise OAuthValidationError(f"Token refresh failed: {str(e)}") from e
After a successful refresh, update_oauth_tokens writes the new access_token and updated expires_at back to Secrets Manager while preserving the existing scope and Google user info.
Account switch detection
When the iOS app sends new tokens, the backend compares the google_user_id in the incoming token against the one already stored for the user:
def validate_google_account_consistency(
user_id: str,
new_google_user_id: str,
new_google_email: str,
region: str
) -> Dict[str, Any]:
try:
from utils.secretsmanager_utils import get_oauth_tokens
try:
existing_oauth_data = get_oauth_tokens(user_id, region)
existing_google_user_id = existing_oauth_data.get('google_user_id')
existing_google_email = existing_oauth_data.get('google_email')
if existing_google_user_id and existing_google_user_id != new_google_user_id:
return {
'is_account_switch': True,
'existing_email': existing_google_email,
'new_email': new_google_email,
'message': f"Switching from {existing_google_email} to {new_google_email}"
}
else:
return {
'is_account_switch': False,
'existing_email': existing_google_email,
'new_email': new_google_email,
'message': "Same Google account or first connection"
}
except Exception:
# No existing OAuth data found - first connection
return {
'is_account_switch': False,
'existing_email': None,
'new_email': new_google_email,
'message': "First Gmail connection"
}
except Exception as e:
raise OAuthValidationError(
f"Error validating Google account consistency: {str(e)}"
) from e
If is_account_switch is True, the new tokens still overwrite the old ones — the check is informational. The store-tokens endpoint includes account_switch in its response so the iOS app can display a warning to the user.
Token validation flow
Retrieve tokens from Secrets Manager
Lambda calls get_oauth_tokens(user_id, region), which reads the gmail/user/{user_id} secret. If expires_at is in the past, the function adds is_expired: True to the returned dict.
Create Gmail service (refresh if needed)
create_gmail_service is called with the stored tokens. It checks whether the token expires within 5 minutes and refreshes proactively using the refresh token.
Update Secrets Manager after refresh
If a refresh occurred, update_oauth_tokens writes the new access token and expiry timestamp back to Secrets Manager, preserving the existing Google user info and scope.
Use the access token
The Gmail API service object is returned and used to search for or download emails. The service holds valid credentials for the duration of the Lambda invocation.
If a refresh token has expired or been revoked by the user, create_gmail_service raises a RefreshTokenExpiredError and deletes the stale tokens from Secrets Manager. The user must reconnect their Gmail account from the iOS app.