Spaces:
Sleeping
Sleeping
| """Google OAuth2 and Service Account authentication for PageSpeed Insights API.""" | |
| import json | |
| import os | |
| import time | |
| from typing import Optional | |
| from google.auth.transport.requests import Request | |
| from google.oauth2 import service_account | |
| import requests | |
| class GoogleAuthenticator: | |
| """Handles Google API authentication using service account credentials.""" | |
| def __init__(self): | |
| self.credentials = None | |
| self.setup_credentials() | |
| def setup_credentials(self): | |
| """Setup Google service account credentials from environment variables.""" | |
| try: | |
| # Try to get service account JSON from environment variable | |
| service_account_json = os.environ.get('GOOGLE_SERVICE_ACCOUNT_JSON') | |
| if service_account_json: | |
| # Parse the JSON string | |
| service_account_info = json.loads(service_account_json) | |
| # Create credentials from service account info | |
| # PageSpeed Insights API doesn't require special scopes | |
| self.credentials = service_account.Credentials.from_service_account_info( | |
| service_account_info | |
| ) | |
| # Refresh the credentials to get access token | |
| self.credentials.refresh(Request()) | |
| else: | |
| print("⚠️ GOOGLE_SERVICE_ACCOUNT_JSON não encontrado nas variáveis de ambiente") | |
| except Exception as e: | |
| print(f"❌ Erro ao configurar credenciais do Google: {str(e)}") | |
| self.credentials = None | |
| def get_access_token(self) -> Optional[str]: | |
| """Get a valid access token for Google APIs.""" | |
| if not self.credentials: | |
| return None | |
| try: | |
| # Refresh token if needed | |
| if not self.credentials.valid: | |
| self.credentials.refresh(Request()) | |
| return self.credentials.token | |
| except Exception as e: | |
| print(f"❌ Erro ao obter token de acesso: {str(e)}") | |
| return None | |
| def is_authenticated(self) -> bool: | |
| """Check if we have valid authentication.""" | |
| return self.credentials is not None and self.credentials.valid | |
| class PageSpeedAPI: | |
| """PageSpeed Insights API client with OAuth2 authentication.""" | |
| def __init__(self): | |
| self.authenticator = GoogleAuthenticator() | |
| self.base_url = "https://www.googleapis.com/pagespeedonline/v5/runPagespeed" | |
| def analyze_url(self, url: str, strategy: str = 'mobile') -> Optional[dict]: | |
| """Analyze a URL using PageSpeed Insights API with retry logic.""" | |
| max_retries = 3 | |
| base_delay = 2 | |
| for attempt in range(max_retries): | |
| try: | |
| params = { | |
| 'url': url, | |
| 'strategy': strategy, | |
| 'category': ['performance', 'accessibility', 'best-practices', 'seo'] | |
| } | |
| headers = { | |
| 'Content-Type': 'application/json' | |
| } | |
| # Try different authentication methods in order of preference | |
| # Removed public API as per user request | |
| auth_methods = [ | |
| ('api_key', self._get_api_key_auth), | |
| ('oauth2', self._get_oauth2_auth) | |
| ] | |
| last_response = None | |
| for auth_name, auth_method in auth_methods: | |
| auth_headers, auth_params = auth_method() | |
| final_headers = {**headers, **auth_headers} | |
| final_params = {**params, **auth_params} | |
| response = requests.get( | |
| self.base_url, | |
| headers=final_headers, | |
| params=final_params, | |
| timeout=60 | |
| ) | |
| last_response = response | |
| # If successful, return immediately | |
| if response.status_code == 200: | |
| return response.json() | |
| # If rate limited (429), break and retry after delay | |
| if response.status_code == 429: | |
| break | |
| # If unauthorized/forbidden, try next auth method | |
| if response.status_code in [401, 403]: | |
| continue | |
| # For other errors, raise immediately | |
| response.raise_for_status() | |
| # If we get here, check if it's a rate limit error | |
| if last_response and last_response.status_code == 429: | |
| if attempt < max_retries - 1: | |
| # Exponential backoff with jitter | |
| delay = base_delay * (2 ** attempt) + (attempt * 0.5) | |
| print(f"Rate limited. Retrying in {delay} seconds... (attempt {attempt + 1}/{max_retries})") | |
| time.sleep(delay) | |
| continue | |
| else: | |
| raise Exception("Rate limit exceeded. PageSpeed API has strict limits. Please try again later or configure a Google API Key for higher limits.") | |
| # If we get here with a response, raise the error | |
| if last_response: | |
| last_response.raise_for_status() | |
| except requests.exceptions.RequestException as e: | |
| if attempt < max_retries - 1 and "429" in str(e): | |
| delay = base_delay * (2 ** attempt) | |
| time.sleep(delay) | |
| continue | |
| raise Exception(f"Error calling PageSpeed API: {str(e)}") | |
| raise Exception("Max retries exceeded") | |
| def _get_api_key_auth(self) -> tuple[dict, dict]: | |
| """Get API key authentication headers and params.""" | |
| api_key = os.environ.get('GOOGLE_API_KEY') | |
| if api_key and len(api_key) < 100: # Ensure it's actually an API key, not JSON | |
| return {}, {'key': api_key} | |
| return {}, {} | |
| def _get_oauth2_auth(self) -> tuple[dict, dict]: | |
| """Get OAuth2 authentication headers and params.""" | |
| if self.authenticator.is_authenticated(): | |
| access_token = self.authenticator.get_access_token() | |
| if access_token: | |
| return {'Authorization': f'Bearer {access_token}'}, {} | |
| return {}, {} | |
| def _get_public_auth(self) -> tuple[dict, dict]: | |
| """Get public (no auth) headers and params.""" | |
| return {}, {} | |
| def is_available(self) -> bool: | |
| """Check if the PageSpeed API is available with authentication.""" | |
| # Check if we have any authentication method available | |
| api_key = os.environ.get('GOOGLE_API_KEY') | |
| has_api_key = api_key and len(api_key) < 100 | |
| has_oauth = self.authenticator.is_authenticated() | |
| return has_api_key or has_oauth |