The OAuth 2.0 PKCE module provides authentication functionality for secure authorization flows using Proof Key for Code Exchange (PKCE), including code verifier generation, token management, and automatic token refresh.
OAuth2PKCEAuth Class
OAuth2 PKCE authentication handler for the X API.
Constructor
OAuth2PKCEAuth(
base_url: str = "https://api.x.com",
authorization_base_url: str = "https://x.com/i",
client_id: str = None,
client_secret: str = None,
redirect_uri: str = None,
token: Dict[str, Any] = None,
scope: Union[str, List[str]] = None,
)
base_url
str
default:"https://api.x.com"
The base URL for the X API token endpoint
authorization_base_url
str
default:"https://x.com/i"
The base URL for OAuth2 authorization
The client ID for the X API
The client secret for the X API (required for confidential clients)
The redirect URI for OAuth2 authorization
token
Dict[str, Any]
default:"None"
An existing OAuth2 token dictionary (if available)
scope
Union[str, List[str]]
default:"None"
Space-separated string or list of strings for OAuth2 authorization scopes
Methods
get_authorization_url()
Get the authorization URL for the OAuth2 PKCE flow.
get_authorization_url(state: Optional[str] = None) -> str
Optional state parameter for security
Returns: The authorization URL string
exchange_code()
Exchange authorization code for tokens.
exchange_code(
code: str,
code_verifier: Optional[str] = None
) -> Dict[str, Any]
The authorization code from the callback
Optional code verifier (uses stored verifier if not provided)
Returns: The token dictionary containing access_token, token_type, expires_in, refresh_token, and scope
Raises:
ValueError - If code verifier is required but not available, or if token exchange fails
fetch_token()
Fetch token using authorization response URL (legacy method).
fetch_token(authorization_response: str) -> Dict[str, Any]
The full callback URL received after authorization
Returns: The token dictionary
Raises:
ValueError - If no authorization code found in callback URL
refresh_token()
Refresh the access token.
refresh_token() -> Dict[str, Any]
Returns: The refreshed token dictionary
Raises:
ValueError - If no token is available or no client secret is configured
set_pkce_parameters()
Manually set PKCE parameters.
set_pkce_parameters(
code_verifier: str,
code_challenge: Optional[str] = None
)
Optional code challenge (will be generated if not provided)
get_code_verifier()
Get the current code verifier.
get_code_verifier() -> Optional[str]
Returns: The current code verifier, or None if not set
get_code_challenge()
Get the current code challenge.
get_code_challenge() -> Optional[str]
Returns: The current code challenge, or None if not set
is_token_expired()
Check if the token is expired.
is_token_expired() -> bool
Returns: True if the token is expired, False otherwise
Properties
access_token
Get the current access token.
@property
access_token -> Optional[str]
Returns: The current access token, or None if no token exists
Example Usage
Public Client Flow
from xdk.oauth2_auth import OAuth2PKCEAuth
# Initialize OAuth2 PKCE for public client
auth = OAuth2PKCEAuth(
client_id="your_client_id",
redirect_uri="http://localhost:3000/callback",
scope=["tweet.read", "tweet.write", "users.read"]
)
# Step 1: Get authorization URL
auth_url = auth.get_authorization_url(state="random_state")
print(f"Visit this URL to authorize: {auth_url}")
# Step 2: User authorizes and you get the code from callback
code = "authorization_code_from_callback"
# Step 3: Exchange code for access token
token = auth.exchange_code(code)
print(f"Access Token: {token['access_token']}")
print(f"Refresh Token: {token['refresh_token']}")
# Step 4: Check if token is expired and refresh if needed
if auth.is_token_expired():
token = auth.refresh_token()
print(f"Refreshed Access Token: {token['access_token']}")
Confidential Client Flow
from xdk.oauth2_auth import OAuth2PKCEAuth
# Initialize OAuth2 PKCE for confidential client
auth = OAuth2PKCEAuth(
client_id="your_client_id",
client_secret="your_client_secret", # Required for token refresh
redirect_uri="http://localhost:3000/callback",
scope="tweet.read tweet.write users.read"
)
# Get authorization URL
auth_url = auth.get_authorization_url()
print(f"Visit: {auth_url}")
# Exchange authorization code
token = auth.exchange_code("code_from_callback")
# Refresh token when expired
if auth.is_token_expired():
token = auth.refresh_token()
Using Existing Token
from xdk.oauth2_auth import OAuth2PKCEAuth
# Initialize with existing token
existing_token = {
"access_token": "your_access_token",
"token_type": "bearer",
"expires_in": 7200,
"refresh_token": "your_refresh_token",
"scope": "tweet.read tweet.write users.read"
}
auth = OAuth2PKCEAuth(
client_id="your_client_id",
client_secret="your_client_secret",
token=existing_token
)
# Use the access token
print(f"Current token: {auth.access_token}")
# Refresh if needed
if auth.is_token_expired():
token = auth.refresh_token()
Manual PKCE Parameters
from xdk.oauth2_auth import OAuth2PKCEAuth
import secrets
import hashlib
import base64
# Generate PKCE parameters manually
code_verifier = secrets.token_urlsafe(96)[:128]
code_challenge = base64.urlsafe_b64encode(
hashlib.sha256(code_verifier.encode()).digest()
).decode().rstrip("=")
auth = OAuth2PKCEAuth(
client_id="your_client_id",
redirect_uri="http://localhost:3000/callback",
scope=["tweet.read", "users.read"]
)
# Set PKCE parameters manually
auth.set_pkce_parameters(code_verifier, code_challenge)
# Get authorization URL
auth_url = auth.get_authorization_url()
# Exchange code using the manually set verifier
token = auth.exchange_code("code_from_callback")