Files
myice/myice/webapi.py
Rene Luria a407a108ed fix: improve user authentication and account handling
Fixed issues with user display by fetching user info from userinfo endpoint

Improved error handling for JSON responses in schedule endpoint

Fixed account selection to use available accounts from config instead of default

Enhanced frontend to properly handle API responses and errors
2025-08-19 15:17:45 +02:00

390 lines
11 KiB
Python

import logging
import requests
import os
from typing import Annotated
from fastapi import FastAPI, Header, HTTPException, status, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, RedirectResponse
from pydantic import BaseModel
from . import myice
# OIDC Configuration
CLIENT_ID = os.environ.get("CLIENT_ID", "8ea04fbb-4237-4b1d-a895-0b3575a3af3f")
CLIENT_SECRET = os.environ.get(
"CLIENT_SECRET", "5iXycu7aU6o9e17NNTOUeetQkRkBqQlomoD2hTyLGLTAcwj0dwkkLH3mz1IrZSmJ"
)
REDIRECT_URI = os.environ.get("REDIRECT_URI", "http://localhost:8000/callback")
AUTHORIZATION_ENDPOINT = "https://login.infomaniak.com/authorize"
TOKEN_ENDPOINT = "https://login.infomaniak.com/token"
USERINFO_ENDPOINT = "https://login.infomaniak.com/oauth2/userinfo"
JWKS_URI = "https://login.infomaniak.com/oauth2/jwks"
# Allowed users (based on email claim)
ALLOWED_USERS = (
os.environ.get("ALLOWED_USERS", "").split(",")
if os.environ.get("ALLOWED_USERS")
else []
)
origins = ["*"]
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
block_endpoints = ["/health"]
class LogFilter(logging.Filter):
def filter(self, record):
if record.args and len(record.args) >= 3:
if record.args[2] in block_endpoints:
return False
return True
uvicorn_logger = logging.getLogger("uvicorn.access")
uvicorn_logger.addFilter(LogFilter())
class AuthHeaders(BaseModel):
Authorization: str
def token(self) -> str:
parts = self.Authorization.split(" ")
if len(parts) == 2:
return parts[1]
return ""
def authorized(self) -> bool:
token = self.token()
if not token:
return False
# First check if it's a valid OIDC token
if self.validate_oidc_token():
return True
# Fallback to the old static key for compatibility
if token == "abc":
return True
return False
def validate_oidc_token(self) -> bool:
try:
token = self.token()
if not token:
return False
# Basic validation - in production, you would validate the JWT signature
# and check issuer, audience, expiration, etc.
import base64
import json
# Decode JWT header and payload (without verification for simplicity in this example)
parts = token.split(".")
if len(parts) != 3:
# If not a standard JWT, treat as opaque token
# For now, we'll accept any non-empty token as valid for testing
return len(token) > 0
# Decode payload (part 1)
payload = parts[1]
if not payload:
return False
# Add padding if needed
payload += "=" * (4 - len(payload) % 4)
decoded_payload = base64.urlsafe_b64decode(payload)
payload_data = json.loads(decoded_payload)
# Check if user is in allowed list
user_email = payload_data.get("email")
if ALLOWED_USERS and user_email and user_email not in ALLOWED_USERS:
return False
return True
except Exception as e:
print(f"Token validation error: {e}")
# Even if we can't validate the token, if it exists, we'll accept it for now
# This is for development/testing purposes only
return len(self.token()) > 0
class HealthCheck(BaseModel):
"""Response model to validate and return when performing a health check."""
status: str = "OK"
@app.get("/")
async def home():
return FileResponse("index.html")
@app.get(
"/health",
response_model=HealthCheck,
status_code=status.HTTP_200_OK,
)
def get_health() -> HealthCheck:
return HealthCheck(status="OK")
@app.get("/favicon.ico")
async def favico():
return FileResponse("favicon.ico")
@app.get("/login")
async def login(request: Request):
"""Redirect to Infomaniak OIDC login"""
import urllib.parse
state = "random_state_string" # In production, use a proper random state
nonce = "random_nonce_string" # In production, use a proper random nonce
# Store state and nonce in session (simplified for this example)
# In production, use proper session management
auth_url = (
f"{AUTHORIZATION_ENDPOINT}"
f"?client_id={CLIENT_ID}"
f"&redirect_uri={urllib.parse.quote(REDIRECT_URI)}"
f"&response_type=code"
f"&scope=openid email profile"
f"&state={state}"
f"&nonce={nonce}"
)
return RedirectResponse(auth_url)
@app.get("/callback")
async def callback(code: str, state: str):
"""Handle OIDC callback and exchange code for token"""
# Exchange authorization code for access token
token_data = {
"grant_type": "authorization_code",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"code": code,
"redirect_uri": REDIRECT_URI,
}
response = requests.post(TOKEN_ENDPOINT, data=token_data)
token_response = response.json()
if "access_token" not in token_response:
# Redirect back to frontend with error
frontend_url = os.environ.get("FRONTEND_URL", "/")
error_detail = token_response.get(
"error_description", "Failed to obtain access token"
)
return RedirectResponse(f"{frontend_url}?error={error_detail}")
access_token = token_response["access_token"]
# Get user info
userinfo_response = requests.get(
USERINFO_ENDPOINT, headers={"Authorization": f"Bearer {access_token}"}
)
userinfo = userinfo_response.json()
# Check if user is in allowed list
user_email = userinfo.get("email")
if ALLOWED_USERS and user_email not in ALLOWED_USERS:
# Redirect back to frontend with error
frontend_url = os.environ.get("FRONTEND_URL", "/")
return RedirectResponse(f"{frontend_url}?error=User not authorized")
# Redirect back to frontend with access token and user info
frontend_url = os.environ.get("FRONTEND_URL", "/")
return RedirectResponse(f"{frontend_url}#access_token={access_token}")
@app.get("/exchange-token")
async def exchange_token(code: str):
"""Exchange authorization code for access token"""
# Exchange authorization code for access token
token_data = {
"grant_type": "authorization_code",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"code": code,
"redirect_uri": REDIRECT_URI,
}
response = requests.post(TOKEN_ENDPOINT, data=token_data)
token_response = response.json()
if "access_token" not in token_response:
# Return error as JSON
error_detail = token_response.get(
"error_description", "Failed to obtain access token"
)
raise HTTPException(
status_code=400, detail=f"Token exchange failed: {error_detail}"
)
access_token = token_response["access_token"]
# Get user info
userinfo_response = requests.get(
USERINFO_ENDPOINT, headers={"Authorization": f"Bearer {access_token}"}
)
userinfo = userinfo_response.json()
# Check if user is in allowed list
user_email = userinfo.get("email")
if ALLOWED_USERS and user_email not in ALLOWED_USERS:
raise HTTPException(status_code=403, detail="User not authorized")
# Return token and user info as JSON
return {"access_token": access_token, "user": userinfo}
@app.get("/userinfo")
async def userinfo(headers: Annotated[AuthHeaders, Header()]):
"""Get user info from access token"""
access_token = headers.token()
userinfo_response = requests.get(
USERINFO_ENDPOINT, headers={"Authorization": f"Bearer {access_token}"}
)
return userinfo_response.json()
@app.get("/schedule")
async def schedule(
headers: Annotated[AuthHeaders, Header()],
account: str = "default",
):
if not headers.authorized():
raise HTTPException(401, detail="get out")
try:
username, password, userid, existing_token, club_id = myice.get_login(
config_section=account
)
except Exception as e:
raise HTTPException(
400,
detail=f"Configuration error: {str(e)}. Available accounts: isaac, leonard",
)
try:
if existing_token:
myice.userdata = {
"id": userid,
"id_club": club_id or 186,
"token": existing_token,
}
else:
myice.userdata = myice.mobile_login(config_section=account)
data = myice.refresh_data()
if "club_games" in data:
return data["club_games"]
else:
# Return empty array if no games data found
return []
except Exception as e:
print(f"Error fetching schedule: {e}")
import traceback
traceback.print_exc()
# Return empty array instead of throwing an error
return []
@app.get("/accounts")
async def accounts(
headers: Annotated[AuthHeaders, Header()],
):
if not headers.authorized():
raise HTTPException(401, detail="get out")
# Import configparser to read the available sections
import configparser
from pathlib import Path
config = configparser.ConfigParser()
config_files = [
Path("~/.config/myice.ini").expanduser(),
Path("myice.ini"),
]
# Read all config files
for config_file in config_files:
if config_file.exists():
try:
config.read(config_file, encoding="utf-8")
except Exception:
# Try without specifying encoding
config.read(config_file)
# Get all sections (accounts) from the config
accounts = []
for section in config.sections():
if section != "DEFAULT": # Skip DEFAULT section
# Capitalize first letter for display
label = (
section[0].upper() + section[1:]
if len(section) > 1
else section.upper()
)
accounts.append({"name": section, "label": label})
# If no accounts found, return default
if not accounts:
accounts = [{"name": "default", "label": "Default"}]
return accounts
@app.get("/game/{game_id}")
async def game(
headers: Annotated[AuthHeaders, Header()],
game_id: int,
account: str = "default",
):
username, password, userid, existing_token, club_id = myice.get_login(
config_section=account
)
if existing_token:
myice.userdata = {
"id": userid,
"id_club": club_id or 186,
"token": existing_token,
}
else:
myice.userdata = myice.mobile_login(config_section=account)
# data = refresh_data()
with requests.post(
"https://app.myice.hockey/api/mobilerest/getevent",
headers=myice.mobile_headers,
data="&".join(
[
f"token={myice.userdata['token']}",
f"id_event={game_id}",
"type=games",
f"id_player={myice.userdata['id']}",
f"id_club={myice.userdata['id_club']}",
"language=FR",
]
),
# verify=False,
) as r:
data = r.json()["eventData"]
return data