Full Stack Learning Hub

Comprehensive guides, cheat sheets, and code examples for full stack development.

View on GitHub

OAuth2 and Token Management Guide


Table of Contents

  1. Introduction to OAuth2
  2. Client Credentials Flow
  3. Base64 Encoding for Credentials
  4. Token Acquisition and Storage
  5. Bearer Token Usage
  6. Token Lifecycle Management
  7. Error Handling
  8. Security Best Practices
  9. Complete Implementation Example
  10. Common Pitfalls and Solutions

Introduction to OAuth2

OAuth2 is an authorization framework that enables applications to obtain limited access to user accounts or services. It works by delegating user authentication to the service that hosts the user account.

OAuth2 vs API Keys

API Key Authentication:

headers = {
    "X-API-Key": "your_api_key_here"
}
response = requests.get(url, headers=headers)

OAuth2 Token Authentication:

# Step 1: Obtain token
token = get_access_token()

# Step 2: Use token in requests
headers = {
    "Authorization": f"Bearer {token}"
}
response = requests.get(url, headers=headers)

When to Use OAuth2


Client Credentials Flow

The Client Credentials flow is used for machine-to-machine authentication without user interaction.

Flow Diagram

Your Application
      |
      | 1. POST credentials
      v
Token Endpoint (e.g., https://accounts.spotify.com/api/token)
      |
      | 2. Validate credentials
      | 3. Generate token
      |
      | 4. Return access token
      v
Your Application
      |
      | 5. Use token for API requests
      v
API Endpoint

Complete Implementation

import requests
import base64
from datetime import datetime, timedelta

def get_access_token(client_id, client_secret):
    """
    Obtain OAuth2 access token using client credentials flow.

    Args:
        client_id: Your application's client ID
        client_secret: Your application's client secret

    Returns:
        str: Access token, or None if request fails
    """
    # Token endpoint URL
    url = "https://accounts.spotify.com/api/token"

    # Combine credentials
    cred_string = f"{client_id}:{client_secret}"

    # Encode to base64
    byte_creds = cred_string.encode('utf-8')
    b64_string = base64.b64encode(byte_creds).decode('utf-8')

    # Prepare request
    headers = {
        "Authorization": f"Basic {b64_string}",
        "Content-Type": "application/x-www-form-urlencoded"
    }

    data = {
        "grant_type": "client_credentials"
    }

    # Make request
    try:
        response = requests.post(url, headers=headers, data=data, timeout=10)

        if response.status_code == 200:
            token_data = response.json()
            return token_data['access_token']
        else:
            print(f"Error: {response.status_code} - {response.text}")
            return None

    except requests.exceptions.RequestException as e:
        print(f"Request failed: {e}")
        return None

Understanding Each Component

1. Credential String:

cred_string = f"{client_id}:{client_secret}"
# Example: "my_client_id:my_client_secret"

2. Authorization Header:

headers = {
    "Authorization": f"Basic {b64_string}"
}
# Example: "Basic bXlfY2xpZW50X2lkOm15X2NsaWVudF9zZWNyZXQ="

3. Grant Type:

data = {
    "grant_type": "client_credentials"  # Specifies the OAuth2 flow
}

4. Response Structure:

{
    "access_token": "BQC8K3Xr...",
    "token_type": "Bearer",
    "expires_in": 3600,
    "scope": "user-read-private user-read-email"
}

Base64 Encoding for Credentials

Base64 encoding converts binary data into ASCII text format, making it safe to transmit over HTTP.

Why Base64?

Implementation

import base64

def encode_credentials(client_id, client_secret):
    """Encode credentials in Base64 format for OAuth2."""

    # Step 1: Combine with colon separator
    cred_string = f"{client_id}:{client_secret}"

    # Step 2: Convert to bytes
    byte_creds = cred_string.encode('utf-8')

    # Step 3: Encode to base64
    b64_bytes = base64.b64encode(byte_creds)

    # Step 4: Convert back to string
    b64_string = b64_bytes.decode('utf-8')

    return b64_string

# Usage
encoded = encode_credentials("my_id", "my_secret")
print(encoded)  # "bXlfaWQ6bXlfc2VjcmV0"

Decoding (for demonstration only)

def decode_credentials(b64_string):
    """Decode base64 credentials (demonstration only)."""
    decoded_bytes = base64.b64decode(b64_string)
    decoded_string = decoded_bytes.decode('utf-8')
    return decoded_string

# This shows Base64 is NOT secure encryption
decoded = decode_credentials("bXlfaWQ6bXlfc2VjcmV0")
print(decoded)  # "my_id:my_secret"

Security Note: Always use HTTPS. Base64 encoding provides NO security on its own.


Token Acquisition and Storage

Basic Token Storage

class TokenManager:
    def __init__(self, client_id, client_secret):
        self.client_id = client_id
        self.client_secret = client_secret
        self.access_token = None
        self.token_expiry = None

    def get_token(self):
        """Get valid access token (fetch new if needed)."""
        if self._is_token_valid():
            return self.access_token

        return self._fetch_new_token()

    def _is_token_valid(self):
        """Check if current token is still valid."""
        if not self.access_token or not self.token_expiry:
            return False

        import time
        return time.time() < self.token_expiry

    def _fetch_new_token(self):
        """Fetch a new access token from the server."""
        url = "https://accounts.spotify.com/api/token"

        # Encode credentials
        cred_string = f"{self.client_id}:{self.client_secret}"
        b64_string = base64.b64encode(
            cred_string.encode('utf-8')
        ).decode('utf-8')

        headers = {
            "Authorization": f"Basic {b64_string}",
            "Content-Type": "application/x-www-form-urlencoded"
        }

        data = {"grant_type": "client_credentials"}

        response = requests.post(url, headers=headers, data=data, timeout=10)

        if response.status_code == 200:
            token_data = response.json()
            self.access_token = token_data['access_token']

            # Set expiry (subtract 60 seconds for safety margin)
            import time
            self.token_expiry = time.time() + token_data['expires_in'] - 60

            return self.access_token
        else:
            raise Exception(f"Token fetch failed: {response.status_code}")

Usage

# Initialize once
token_manager = TokenManager(
    client_id="your_client_id",
    client_secret="your_client_secret"
)

# Use throughout application
token = token_manager.get_token()  # Fetches new token
token = token_manager.get_token()  # Returns cached token

Bearer Token Usage

Bearer tokens are included in the Authorization header of API requests.

Basic Usage

def make_authenticated_request(url, token):
    """Make API request with Bearer token."""
    headers = {
        "Authorization": f"Bearer {token}"
    }

    response = requests.get(url, headers=headers)

    if response.status_code == 200:
        return response.json()
    else:
        raise Exception(f"Request failed: {response.status_code}")

Complete API Client

class SpotifyClient:
    def __init__(self, client_id, client_secret):
        self.token_manager = TokenManager(client_id, client_secret)
        self.base_url = "https://api.spotify.com/v1"

    def _get_headers(self):
        """Get headers with current access token."""
        token = self.token_manager.get_token()
        return {
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json"
        }

    def search_track(self, query, limit=1):
        """Search for a track."""
        url = f"{self.base_url}/search"
        params = {
            "q": query,
            "type": "track",
            "limit": limit
        }

        response = requests.get(
            url,
            headers=self._get_headers(),
            params=params
        )

        if response.status_code == 200:
            return response.json()
        elif response.status_code == 401:
            # Token expired, retry once with new token
            self.token_manager.access_token = None
            response = requests.get(
                url,
                headers=self._get_headers(),
                params=params
            )
            return response.json() if response.status_code == 200 else None
        else:
            raise Exception(f"Search failed: {response.status_code}")

    def get_artist(self, artist_id):
        """Get artist information."""
        url = f"{self.base_url}/artists/{artist_id}"
        response = requests.get(url, headers=self._get_headers())

        if response.status_code == 200:
            return response.json()
        else:
            return None

Usage Example

# Initialize client
client = SpotifyClient("your_id", "your_secret")

# Search for tracks
results = client.search_track("Bohemian Rhapsody")
track = results['tracks']['items'][0]
print(f"Found: {track['name']} by {track['artists'][0]['name']}")

# Get artist details
artist = client.get_artist(track['artists'][0]['id'])
print(f"Followers: {artist['followers']['total']}")

Token Lifecycle Management

Token Expiry Handling

import time
from datetime import datetime, timedelta

class AdvancedTokenManager:
    def __init__(self, client_id, client_secret, token_endpoint):
        self.client_id = client_id
        self.client_secret = client_secret
        self.token_endpoint = token_endpoint

        self.access_token = None
        self.token_type = None
        self.expires_at = None
        self.scope = None

    def get_token(self):
        """Get valid token, refreshing if necessary."""
        if self._needs_refresh():
            self._refresh_token()

        return self.access_token

    def _needs_refresh(self):
        """Check if token needs refresh."""
        if not self.access_token:
            return True

        if not self.expires_at:
            return True

        # Refresh if less than 5 minutes remaining
        buffer_seconds = 300
        return time.time() >= (self.expires_at - buffer_seconds)

    def _refresh_token(self):
        """Fetch new access token."""
        cred_string = f"{self.client_id}:{self.client_secret}"
        b64_creds = base64.b64encode(cred_string.encode()).decode()

        headers = {
            "Authorization": f"Basic {b64_creds}",
            "Content-Type": "application/x-www-form-urlencoded"
        }

        data = {"grant_type": "client_credentials"}

        response = requests.post(
            self.token_endpoint,
            headers=headers,
            data=data,
            timeout=10
        )

        if response.status_code == 200:
            token_data = response.json()
            self._store_token(token_data)
        else:
            raise Exception(f"Token refresh failed: {response.status_code}")

    def _store_token(self, token_data):
        """Store token and metadata."""
        self.access_token = token_data['access_token']
        self.token_type = token_data.get('token_type', 'Bearer')
        self.scope = token_data.get('scope', '')

        # Calculate expiry time
        expires_in = token_data.get('expires_in', 3600)
        self.expires_at = time.time() + expires_in

    def get_token_info(self):
        """Get information about current token."""
        if not self.access_token:
            return "No token available"

        remaining = self.expires_at - time.time() if self.expires_at else 0

        return {
            "token_type": self.token_type,
            "scope": self.scope,
            "expires_in": int(remaining),
            "expires_at": datetime.fromtimestamp(self.expires_at).isoformat()
        }

Error Handling

HTTP Status Codes

def handle_auth_response(response):
    """Handle authentication response with proper error handling."""

    if response.status_code == 200:
        return response.json()

    elif response.status_code == 400:
        # Bad Request - Invalid grant_type or malformed request
        error_data = response.json()
        raise ValueError(f"Bad Request: {error_data.get('error_description', 'Invalid request')}")

    elif response.status_code == 401:
        # Unauthorized - Invalid credentials
        raise PermissionError("Authentication failed: Invalid client ID or secret")

    elif response.status_code == 403:
        # Forbidden - Valid credentials but insufficient permissions
        raise PermissionError("Access forbidden: Insufficient permissions")

    elif response.status_code == 429:
        # Too Many Requests - Rate limited
        retry_after = response.headers.get('Retry-After', 60)
        raise Exception(f"Rate limited. Retry after {retry_after} seconds")

    elif response.status_code >= 500:
        # Server Error
        raise Exception(f"Server error: {response.status_code}")

    else:
        raise Exception(f"Unexpected error: {response.status_code} - {response.text}")

Comprehensive Error Handling

import requests
from requests.exceptions import Timeout, ConnectionError, RequestException

def safe_token_request(client_id, client_secret, token_url):
    """Make token request with comprehensive error handling."""

    try:
        # Prepare request
        cred_string = f"{client_id}:{client_secret}"
        b64_creds = base64.b64encode(cred_string.encode()).decode()

        headers = {
            "Authorization": f"Basic {b64_creds}",
            "Content-Type": "application/x-www-form-urlencoded"
        }
        data = {"grant_type": "client_credentials"}

        # Make request with timeout
        response = requests.post(
            token_url,
            headers=headers,
            data=data,
            timeout=10
        )

        # Handle response
        return handle_auth_response(response)

    except Timeout:
        raise Exception("Request timed out - check network connection")

    except ConnectionError:
        raise Exception("Connection failed - check network or endpoint URL")

    except ValueError as e:
        raise Exception(f"Invalid credentials or request: {e}")

    except PermissionError as e:
        raise Exception(f"Authorization error: {e}")

    except RequestException as e:
        raise Exception(f"Request failed: {e}")

Retry Logic with Exponential Backoff

import time
from functools import wraps

def retry_with_backoff(max_retries=3, backoff_factor=2):
    """Decorator for retry logic with exponential backoff."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        raise

                    wait_time = backoff_factor ** attempt
                    print(f"Attempt {attempt + 1} failed: {e}")
                    print(f"Retrying in {wait_time} seconds...")
                    time.sleep(wait_time)

        return wrapper
    return decorator

@retry_with_backoff(max_retries=3, backoff_factor=2)
def fetch_token(client_id, client_secret, token_url):
    """Fetch token with automatic retry."""
    return safe_token_request(client_id, client_secret, token_url)

Security Best Practices

1. Credential Storage

Bad Practice:

# NEVER hardcode credentials
client_id = "abc123"
client_secret = "secret456"

Good Practice:

import os
from dotenv import load_dotenv

# Load from environment variables
load_dotenv()
client_id = os.getenv('SPOTIFY_CLIENT_ID')
client_secret = os.getenv('SPOTIFY_CLIENT_SECRET')

if not client_id or not client_secret:
    raise ValueError("Missing required credentials")

.env file:

SPOTIFY_CLIENT_ID=your_client_id_here
SPOTIFY_CLIENT_SECRET=your_secret_here

.gitignore:

.env
credentials.json
config/secrets.py

2. HTTPS Only

def validate_token_url(url):
    """Ensure token endpoint uses HTTPS."""
    if not url.startswith('https://'):
        raise ValueError("Token endpoint must use HTTPS")
    return url

# Usage
token_url = validate_token_url("https://accounts.spotify.com/api/token")

3. Token Storage Security

# Don't log tokens
logger.info(f"Token: {token}")  # BAD

# Don't store tokens in plain text files
with open('token.txt', 'w') as f:
    f.write(token)  # BAD

# Use secure storage (keyring example)
import keyring

# Store token securely
keyring.set_password("my_app", "access_token", token)

# Retrieve token
token = keyring.get_password("my_app", "access_token")

4. Input Validation

def validate_credentials(client_id, client_secret):
    """Validate credential format."""
    if not isinstance(client_id, str) or not client_id:
        raise ValueError("Client ID must be a non-empty string")

    if not isinstance(client_secret, str) or not client_secret:
        raise ValueError("Client secret must be a non-empty string")

    if len(client_id) < 10:
        raise ValueError("Client ID appears invalid (too short)")

    return True

5. Request Timeout

# Always set timeouts
response = requests.post(url, headers=headers, data=data, timeout=10)

# Or use session with default timeout
class TimeoutHTTPAdapter(requests.adapters.HTTPAdapter):
    def __init__(self, timeout=10, *args, **kwargs):
        self.timeout = timeout
        super().__init__(*args, **kwargs)

    def send(self, request, **kwargs):
        kwargs['timeout'] = kwargs.get('timeout') or self.timeout
        return super().send(request, **kwargs)

# Usage
session = requests.Session()
session.mount('https://', TimeoutHTTPAdapter(timeout=10))

6. Token Scoping

# Request minimal scopes needed
data = {
    "grant_type": "client_credentials",
    "scope": "user-read-email user-read-private"  # Only what you need
}

Complete Implementation Example

import os
import base64
import time
import requests
from dotenv import load_dotenv
from typing import Optional, Dict, Any

class OAuth2Client:
    """Complete OAuth2 client with best practices."""

    def __init__(self, token_endpoint: str):
        # Load credentials from environment
        load_dotenv()
        self.client_id = os.getenv('CLIENT_ID')
        self.client_secret = os.getenv('CLIENT_SECRET')

        if not self.client_id or not self.client_secret:
            raise ValueError("Missing CLIENT_ID or CLIENT_SECRET")

        self.token_endpoint = self._validate_url(token_endpoint)

        # Token state
        self.access_token: Optional[str] = None
        self.token_type: str = "Bearer"
        self.expires_at: Optional[float] = None
        self.scope: Optional[str] = None

    @staticmethod
    def _validate_url(url: str) -> str:
        """Ensure URL uses HTTPS."""
        if not url.startswith('https://'):
            raise ValueError("Endpoint must use HTTPS")
        return url

    def get_token(self) -> str:
        """Get valid access token."""
        if self._is_token_valid():
            return self.access_token

        return self._fetch_new_token()

    def _is_token_valid(self) -> bool:
        """Check if current token is valid."""
        if not self.access_token or not self.expires_at:
            return False

        # Refresh if less than 5 minutes remaining
        buffer = 300
        return time.time() < (self.expires_at - buffer)

    def _fetch_new_token(self) -> str:
        """Fetch new access token."""
        # Encode credentials
        cred_string = f"{self.client_id}:{self.client_secret}"
        b64_creds = base64.b64encode(cred_string.encode()).decode()

        # Prepare request
        headers = {
            "Authorization": f"Basic {b64_creds}",
            "Content-Type": "application/x-www-form-urlencoded"
        }
        data = {"grant_type": "client_credentials"}

        # Make request
        try:
            response = requests.post(
                self.token_endpoint,
                headers=headers,
                data=data,
                timeout=10
            )

            if response.status_code == 200:
                token_data = response.json()
                self._store_token(token_data)
                return self.access_token
            else:
                self._handle_error(response)

        except requests.exceptions.Timeout:
            raise Exception("Token request timed out")
        except requests.exceptions.ConnectionError:
            raise Exception("Connection failed")
        except requests.exceptions.RequestException as e:
            raise Exception(f"Request failed: {e}")

    def _store_token(self, token_data: Dict[str, Any]):
        """Store token and metadata."""
        self.access_token = token_data['access_token']
        self.token_type = token_data.get('token_type', 'Bearer')
        self.scope = token_data.get('scope', '')

        expires_in = token_data.get('expires_in', 3600)
        self.expires_at = time.time() + expires_in

    def _handle_error(self, response: requests.Response):
        """Handle error responses."""
        if response.status_code == 401:
            raise PermissionError("Invalid credentials")
        elif response.status_code == 429:
            retry_after = response.headers.get('Retry-After', 60)
            raise Exception(f"Rate limited. Retry after {retry_after}s")
        else:
            raise Exception(f"Error {response.status_code}: {response.text}")

    def make_request(self, url: str, method: str = 'GET', **kwargs) -> Dict[str, Any]:
        """Make authenticated API request."""
        token = self.get_token()

        headers = kwargs.get('headers', {})
        headers['Authorization'] = f"{self.token_type} {token}"
        kwargs['headers'] = headers

        response = requests.request(method, url, **kwargs)

        if response.status_code == 200:
            return response.json()
        elif response.status_code == 401:
            # Token might be expired, retry once
            self.access_token = None
            return self.make_request(url, method, **kwargs)
        else:
            raise Exception(f"Request failed: {response.status_code}")


# Usage Example
if __name__ == "__main__":
    # Initialize client
    client = OAuth2Client("https://accounts.spotify.com/api/token")

    # Make authenticated requests
    search_results = client.make_request(
        "https://api.spotify.com/v1/search",
        params={"q": "Thriller", "type": "track", "limit": 1}
    )

    track = search_results['tracks']['items'][0]
    print(f"Found: {track['name']} by {track['artists'][0]['name']}")

Common Pitfalls and Solutions

Pitfall 1: Fetching Token on Every Request

Problem:

def search_song(query):
    token = get_token()  # Fetches new token every time!
    headers = {"Authorization": f"Bearer {token}"}
    response = requests.get(url, headers=headers)

Solution:

# Use token manager that caches tokens
token_manager = TokenManager(client_id, client_secret)

def search_song(query):
    token = token_manager.get_token()  # Returns cached if valid
    headers = {"Authorization": f"Bearer {token}"}
    response = requests.get(url, headers=headers)

Pitfall 2: No Error Handling

Problem:

response = requests.post(url, headers=headers, data=data)
token = response.json()['access_token']  # Crashes if request fails!

Solution:

try:
    response = requests.post(url, headers=headers, data=data, timeout=10)
    if response.status_code == 200:
        token = response.json()['access_token']
    else:
        raise Exception(f"Failed: {response.status_code}")
except requests.exceptions.RequestException as e:
    print(f"Error: {e}")
    token = None

Pitfall 3: Hardcoded Credentials

Problem:

client_id = "abc123"  # Committed to Git!
client_secret = "secret456"

Solution:

import os
from dotenv import load_dotenv

load_dotenv()
client_id = os.getenv('CLIENT_ID')
client_secret = os.getenv('CLIENT_SECRET')

Pitfall 4: No Request Timeout

Problem:

response = requests.post(url, headers=headers, data=data)  # Hangs forever

Solution:

response = requests.post(url, headers=headers, data=data, timeout=10)

Pitfall 5: Ignoring Token Expiry

Problem:

token = get_token()
# Use token for hours without checking expiry

Solution:

class TokenManager:
    def get_token(self):
        if self._is_expired():
            self._refresh()
        return self.access_token

Pitfall 6: Using HTTP Instead of HTTPS

Problem:

url = "http://api.example.com/token"  # Insecure!

Solution:

url = "https://api.example.com/token"  # Always HTTPS

Back to Main