pagespeedinsights / google_auth.py
Guilherme Favaron
Remove public API - require Google Cloud authentication
ed17ba2
"""Google OAuth2 and Service Account authentication for PageSpeed Insights API."""
import json
import os
import time
from typing import Optional
from google.auth.transport.requests import Request
from google.oauth2 import service_account
import requests
class GoogleAuthenticator:
"""Handles Google API authentication using service account credentials."""
def __init__(self):
self.credentials = None
self.setup_credentials()
def setup_credentials(self):
"""Setup Google service account credentials from environment variables."""
try:
# Try to get service account JSON from environment variable
service_account_json = os.environ.get('GOOGLE_SERVICE_ACCOUNT_JSON')
if service_account_json:
# Parse the JSON string
service_account_info = json.loads(service_account_json)
# Create credentials from service account info
# PageSpeed Insights API doesn't require special scopes
self.credentials = service_account.Credentials.from_service_account_info(
service_account_info
)
# Refresh the credentials to get access token
self.credentials.refresh(Request())
else:
print("⚠️ GOOGLE_SERVICE_ACCOUNT_JSON não encontrado nas variáveis de ambiente")
except Exception as e:
print(f"❌ Erro ao configurar credenciais do Google: {str(e)}")
self.credentials = None
def get_access_token(self) -> Optional[str]:
"""Get a valid access token for Google APIs."""
if not self.credentials:
return None
try:
# Refresh token if needed
if not self.credentials.valid:
self.credentials.refresh(Request())
return self.credentials.token
except Exception as e:
print(f"❌ Erro ao obter token de acesso: {str(e)}")
return None
def is_authenticated(self) -> bool:
"""Check if we have valid authentication."""
return self.credentials is not None and self.credentials.valid
class PageSpeedAPI:
"""PageSpeed Insights API client with OAuth2 authentication."""
def __init__(self):
self.authenticator = GoogleAuthenticator()
self.base_url = "https://www.googleapis.com/pagespeedonline/v5/runPagespeed"
def analyze_url(self, url: str, strategy: str = 'mobile') -> Optional[dict]:
"""Analyze a URL using PageSpeed Insights API with retry logic."""
max_retries = 3
base_delay = 2
for attempt in range(max_retries):
try:
params = {
'url': url,
'strategy': strategy,
'category': ['performance', 'accessibility', 'best-practices', 'seo']
}
headers = {
'Content-Type': 'application/json'
}
# Try different authentication methods in order of preference
# Removed public API as per user request
auth_methods = [
('api_key', self._get_api_key_auth),
('oauth2', self._get_oauth2_auth)
]
last_response = None
for auth_name, auth_method in auth_methods:
auth_headers, auth_params = auth_method()
final_headers = {**headers, **auth_headers}
final_params = {**params, **auth_params}
response = requests.get(
self.base_url,
headers=final_headers,
params=final_params,
timeout=60
)
last_response = response
# If successful, return immediately
if response.status_code == 200:
return response.json()
# If rate limited (429), break and retry after delay
if response.status_code == 429:
break
# If unauthorized/forbidden, try next auth method
if response.status_code in [401, 403]:
continue
# For other errors, raise immediately
response.raise_for_status()
# If we get here, check if it's a rate limit error
if last_response and last_response.status_code == 429:
if attempt < max_retries - 1:
# Exponential backoff with jitter
delay = base_delay * (2 ** attempt) + (attempt * 0.5)
print(f"Rate limited. Retrying in {delay} seconds... (attempt {attempt + 1}/{max_retries})")
time.sleep(delay)
continue
else:
raise Exception("Rate limit exceeded. PageSpeed API has strict limits. Please try again later or configure a Google API Key for higher limits.")
# If we get here with a response, raise the error
if last_response:
last_response.raise_for_status()
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1 and "429" in str(e):
delay = base_delay * (2 ** attempt)
time.sleep(delay)
continue
raise Exception(f"Error calling PageSpeed API: {str(e)}")
raise Exception("Max retries exceeded")
def _get_api_key_auth(self) -> tuple[dict, dict]:
"""Get API key authentication headers and params."""
api_key = os.environ.get('GOOGLE_API_KEY')
if api_key and len(api_key) < 100: # Ensure it's actually an API key, not JSON
return {}, {'key': api_key}
return {}, {}
def _get_oauth2_auth(self) -> tuple[dict, dict]:
"""Get OAuth2 authentication headers and params."""
if self.authenticator.is_authenticated():
access_token = self.authenticator.get_access_token()
if access_token:
return {'Authorization': f'Bearer {access_token}'}, {}
return {}, {}
def _get_public_auth(self) -> tuple[dict, dict]:
"""Get public (no auth) headers and params."""
return {}, {}
def is_available(self) -> bool:
"""Check if the PageSpeed API is available with authentication."""
# Check if we have any authentication method available
api_key = os.environ.get('GOOGLE_API_KEY')
has_api_key = api_key and len(api_key) < 100
has_oauth = self.authenticator.is_authenticated()
return has_api_key or has_oauth