"""Google OAuth2 and Service Account authentication for PageSpeed Insights API.""" import json import os import time from typing import Optional from google.auth.transport.requests import Request from google.oauth2 import service_account import requests class GoogleAuthenticator: """Handles Google API authentication using service account credentials.""" def __init__(self): self.credentials = None self.setup_credentials() def setup_credentials(self): """Setup Google service account credentials from environment variables.""" try: # Try to get service account JSON from environment variable service_account_json = os.environ.get('GOOGLE_SERVICE_ACCOUNT_JSON') if service_account_json: # Parse the JSON string service_account_info = json.loads(service_account_json) # Create credentials from service account info # PageSpeed Insights API doesn't require special scopes self.credentials = service_account.Credentials.from_service_account_info( service_account_info ) # Refresh the credentials to get access token self.credentials.refresh(Request()) else: print("⚠️ GOOGLE_SERVICE_ACCOUNT_JSON não encontrado nas variáveis de ambiente") except Exception as e: print(f"❌ Erro ao configurar credenciais do Google: {str(e)}") self.credentials = None def get_access_token(self) -> Optional[str]: """Get a valid access token for Google APIs.""" if not self.credentials: return None try: # Refresh token if needed if not self.credentials.valid: self.credentials.refresh(Request()) return self.credentials.token except Exception as e: print(f"❌ Erro ao obter token de acesso: {str(e)}") return None def is_authenticated(self) -> bool: """Check if we have valid authentication.""" return self.credentials is not None and self.credentials.valid class PageSpeedAPI: """PageSpeed Insights API client with OAuth2 authentication.""" def __init__(self): self.authenticator = GoogleAuthenticator() self.base_url = "https://www.googleapis.com/pagespeedonline/v5/runPagespeed" def analyze_url(self, url: str, strategy: str = 'mobile') -> Optional[dict]: """Analyze a URL using PageSpeed Insights API with retry logic.""" max_retries = 3 base_delay = 2 for attempt in range(max_retries): try: params = { 'url': url, 'strategy': strategy, 'category': ['performance', 'accessibility', 'best-practices', 'seo'] } headers = { 'Content-Type': 'application/json' } # Try different authentication methods in order of preference # Removed public API as per user request auth_methods = [ ('api_key', self._get_api_key_auth), ('oauth2', self._get_oauth2_auth) ] last_response = None for auth_name, auth_method in auth_methods: auth_headers, auth_params = auth_method() final_headers = {**headers, **auth_headers} final_params = {**params, **auth_params} response = requests.get( self.base_url, headers=final_headers, params=final_params, timeout=60 ) last_response = response # If successful, return immediately if response.status_code == 200: return response.json() # If rate limited (429), break and retry after delay if response.status_code == 429: break # If unauthorized/forbidden, try next auth method if response.status_code in [401, 403]: continue # For other errors, raise immediately response.raise_for_status() # If we get here, check if it's a rate limit error if last_response and last_response.status_code == 429: if attempt < max_retries - 1: # Exponential backoff with jitter delay = base_delay * (2 ** attempt) + (attempt * 0.5) print(f"Rate limited. Retrying in {delay} seconds... (attempt {attempt + 1}/{max_retries})") time.sleep(delay) continue else: raise Exception("Rate limit exceeded. PageSpeed API has strict limits. Please try again later or configure a Google API Key for higher limits.") # If we get here with a response, raise the error if last_response: last_response.raise_for_status() except requests.exceptions.RequestException as e: if attempt < max_retries - 1 and "429" in str(e): delay = base_delay * (2 ** attempt) time.sleep(delay) continue raise Exception(f"Error calling PageSpeed API: {str(e)}") raise Exception("Max retries exceeded") def _get_api_key_auth(self) -> tuple[dict, dict]: """Get API key authentication headers and params.""" api_key = os.environ.get('GOOGLE_API_KEY') if api_key and len(api_key) < 100: # Ensure it's actually an API key, not JSON return {}, {'key': api_key} return {}, {} def _get_oauth2_auth(self) -> tuple[dict, dict]: """Get OAuth2 authentication headers and params.""" if self.authenticator.is_authenticated(): access_token = self.authenticator.get_access_token() if access_token: return {'Authorization': f'Bearer {access_token}'}, {} return {}, {} def _get_public_auth(self) -> tuple[dict, dict]: """Get public (no auth) headers and params.""" return {}, {} def is_available(self) -> bool: """Check if the PageSpeed API is available with authentication.""" # Check if we have any authentication method available api_key = os.environ.get('GOOGLE_API_KEY') has_api_key = api_key and len(api_key) < 100 has_oauth = self.authenticator.is_authenticated() return has_api_key or has_oauth