481 lines
16 KiB
Python
481 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
import json
|
|
import random
|
|
import os
|
|
import io
|
|
import boto3
|
|
import zipfile
|
|
import tempfile
|
|
from botocore.exceptions import ClientError
|
|
from typing import List
|
|
from fastapi import FastAPI, Request, Form
|
|
from fastapi.responses import HTMLResponse, RedirectResponse, StreamingResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
from pydantic import BaseModel
|
|
from fpdf import FPDF
|
|
|
|
import logging
|
|
|
|
# Custom filter to exclude health check logs
|
|
class HealthCheckFilter(logging.Filter):
|
|
def filter(self, record: logging.LogRecord) -> bool:
|
|
# Exclude health check requests from logs
|
|
return "GET /health " not in record.getMessage()
|
|
|
|
# Apply the filter to Uvicorn's access logger
|
|
logging.getLogger("uvicorn.access").addFilter(HealthCheckFilter())
|
|
|
|
app = FastAPI()
|
|
|
|
# S3 Configuration
|
|
S3_BUCKET_NAME = os.environ.get("S3_BUCKET_NAME", "math-exercises")
|
|
|
|
templates = Jinja2Templates(directory="app/templates")
|
|
|
|
|
|
def get_s3_client():
|
|
"""Create and return an S3 client using environment variables"""
|
|
s3_access_key = os.environ.get("S3_ACCESS_KEY")
|
|
s3_secret_key = os.environ.get("S3_SECRET_KEY")
|
|
s3_host_base = os.environ.get("S3_HOST_BASE")
|
|
|
|
if not all([s3_access_key, s3_secret_key, s3_host_base]):
|
|
raise ValueError("S3 environment variables not properly set")
|
|
|
|
s3 = boto3.client(
|
|
"s3",
|
|
aws_access_key_id=s3_access_key,
|
|
aws_secret_access_key=s3_secret_key,
|
|
endpoint_url=s3_host_base,
|
|
region_name="us-east-1", # Required but unused for Infomaniak
|
|
)
|
|
return s3
|
|
|
|
|
|
def create_bucket_if_not_exists(bucket_name):
|
|
"""Create S3 bucket if it doesn't exist"""
|
|
s3_client = get_s3_client()
|
|
|
|
try:
|
|
s3_client.head_bucket(Bucket=bucket_name)
|
|
except ClientError as e:
|
|
error_code = int(e.response["Error"]["Code"])
|
|
if error_code == 404:
|
|
# Bucket doesn't exist, create it
|
|
try:
|
|
s3_client.create_bucket(Bucket=bucket_name)
|
|
print(f"Bucket {bucket_name} created successfully")
|
|
except ClientError as create_error:
|
|
print(f"Error creating bucket: {create_error}")
|
|
raise
|
|
else:
|
|
# Some other error
|
|
print(f"Error checking bucket: {e}")
|
|
raise
|
|
|
|
|
|
# Create bucket on startup
|
|
try:
|
|
create_bucket_if_not_exists(S3_BUCKET_NAME)
|
|
except Exception as e:
|
|
print(f"Warning: Could not create/check S3 bucket: {e}")
|
|
|
|
|
|
def upload_to_s3(file_data, bucket_name, object_name, content_type="application/pdf"):
|
|
"""Upload file data to S3 bucket"""
|
|
s3_client = get_s3_client()
|
|
|
|
try:
|
|
s3_client.put_object(
|
|
Bucket=bucket_name,
|
|
Key=object_name,
|
|
Body=file_data,
|
|
ContentType=content_type,
|
|
)
|
|
return True
|
|
except ClientError as e:
|
|
print(f"Error uploading to S3: {e}")
|
|
return False
|
|
|
|
|
|
def download_from_s3(bucket_name, object_name):
|
|
"""Download file data from S3 bucket"""
|
|
s3_client = get_s3_client()
|
|
|
|
try:
|
|
response = s3_client.get_object(Bucket=bucket_name, Key=object_name)
|
|
return response["Body"].read()
|
|
except ClientError as e:
|
|
print(f"Error downloading from S3: {e}")
|
|
return None
|
|
|
|
|
|
def list_objects_in_s3(bucket_name):
|
|
"""List all objects in S3 bucket (sorted from newest to oldest)"""
|
|
s3_client = get_s3_client()
|
|
|
|
try:
|
|
response = s3_client.list_objects_v2(Bucket=bucket_name)
|
|
if "Contents" in response:
|
|
# Filter for PDF files only and sort by last modified (newest first)
|
|
pdf_files = [
|
|
obj for obj in response["Contents"] if obj["Key"].endswith(".pdf")
|
|
]
|
|
pdf_files.sort(key=lambda x: x["LastModified"], reverse=True)
|
|
return pdf_files
|
|
else:
|
|
return []
|
|
except ClientError as e:
|
|
print(f"Error listing objects in S3: {e}")
|
|
return []
|
|
|
|
|
|
def delete_from_s3(bucket_name, object_name):
|
|
"""Delete file from S3 bucket"""
|
|
s3_client = get_s3_client()
|
|
|
|
try:
|
|
s3_client.delete_object(Bucket=bucket_name, Key=object_name)
|
|
return True
|
|
except ClientError as e:
|
|
print(f"Error deleting from S3: {e}")
|
|
return False
|
|
|
|
|
|
class MathExercisesPDF(FPDF):
|
|
def header(self):
|
|
self.set_font("Helvetica", "B", 16)
|
|
self.cell(
|
|
0,
|
|
10,
|
|
"Exercices de Multiplication et Division",
|
|
0,
|
|
1,
|
|
"C",
|
|
new_x="LMARGIN",
|
|
new_y="NEXT",
|
|
)
|
|
self.ln(10)
|
|
|
|
|
|
class ExerciseRequest(BaseModel):
|
|
min_table: int
|
|
max_table: int
|
|
num_exercises: int = 15
|
|
|
|
|
|
def generate_exercises(
|
|
min_table: int, max_table: int, num_exercises: int = 15
|
|
) -> List[str]:
|
|
"""Génère des exercices de multiplication et division aléatoires mélangés sans doublons"""
|
|
exercises: List[str] = []
|
|
used_operations: set = set() # Pour éviter les doublons
|
|
|
|
# Générer le nombre exact d'exercices demandé
|
|
attempts = 0
|
|
max_attempts = num_exercises * 10 # Limite pour éviter une boucle infinie
|
|
|
|
while len(exercises) < num_exercises and attempts < max_attempts:
|
|
attempts += 1
|
|
|
|
# Choisir deux nombres aléatoires entre min_table et max_table
|
|
a = random.randint(min_table, max_table)
|
|
b = random.randint(min_table, max_table)
|
|
result = a * b
|
|
|
|
# Choisir aléatoirement le type d'exercice (seulement multiplication ou division)
|
|
exercise_type = random.choice(["multiplication", "division"])
|
|
|
|
if exercise_type == "multiplication":
|
|
# Exercice de multiplication
|
|
operation_key = f"mult_{a}_{b}" # Clé unique pour cette opération
|
|
if operation_key not in used_operations:
|
|
exercise = f"{a} · {b} = ____"
|
|
exercises.append(exercise)
|
|
used_operations.add(operation_key)
|
|
else: # division
|
|
# Exercice de division
|
|
divisor = random.choice([a, b])
|
|
operation_key = f"div_{result}_{divisor}" # Clé unique pour cette opération
|
|
if operation_key not in used_operations:
|
|
exercise = f"{result} : {divisor} = ____"
|
|
exercises.append(exercise)
|
|
used_operations.add(operation_key)
|
|
|
|
# Si nous n'avons pas pu générer suffisamment d'exercices uniques,
|
|
# compléter avec des variations
|
|
if len(exercises) < num_exercises:
|
|
remaining = num_exercises - len(exercises)
|
|
for _ in range(remaining):
|
|
# Générer des variations avec des nombres légèrement différents
|
|
a = random.randint(min_table, max_table)
|
|
b = random.randint(min_table, max_table)
|
|
result = a * b
|
|
|
|
# Choisir aléatoirement le type d'exercice
|
|
exercise_type = random.choice(["multiplication", "division"])
|
|
|
|
if exercise_type == "multiplication":
|
|
exercise = f"{a} · {b} = ____"
|
|
else: # division
|
|
divisor = random.choice([a, b])
|
|
exercise = f"{result} : {divisor} = ____"
|
|
|
|
exercises.append(exercise)
|
|
|
|
return exercises
|
|
|
|
|
|
def create_math_exercises_pdf(
|
|
min_table: int, max_table: int, num_exercises: int = 15
|
|
) -> str:
|
|
"""Crée un fichier PDF avec des exercices de mathématiques mélangés en 3 colonnes et l'upload sur S3"""
|
|
import datetime
|
|
|
|
# Add timestamp to filename
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
# Ajouter des informations sur la plage de tables
|
|
if min_table == max_table:
|
|
table_info = f"Tables de multiplication et division pour {min_table}"
|
|
pdf_filename = f"exercices_mathematiques_table_{min_table}_{num_exercises}_exercices_{timestamp}.pdf"
|
|
else:
|
|
table_info = (
|
|
f"Tables de multiplication et division de {min_table} à {max_table}"
|
|
)
|
|
pdf_filename = f"exercices_mathematiques_tables_{min_table}_a_{max_table}_{num_exercises}_exercices_{timestamp}.pdf"
|
|
|
|
pdf = MathExercisesPDF()
|
|
pdf.add_page()
|
|
pdf.set_font("Helvetica", "", 12)
|
|
|
|
pdf.cell(0, 10, table_info, 0, 1, "C", new_x="LMARGIN", new_y="NEXT")
|
|
pdf.ln(5)
|
|
|
|
# Générer les exercices
|
|
exercises = generate_exercises(min_table, max_table, num_exercises)
|
|
|
|
# Pas d'en-têtes de colonnes
|
|
|
|
# Répartir les exercices en 3 colonnes
|
|
num_per_column = (num_exercises + 2) // 3 # Arrondi vers le haut
|
|
col1_exercises = exercises[:num_per_column]
|
|
col2_exercises = exercises[num_per_column : num_per_column * 2]
|
|
col3_exercises = exercises[num_per_column * 2 :]
|
|
|
|
# Ajouter les exercices numérotés de 1 à n dans les colonnes
|
|
max_rows = max(len(col1_exercises), len(col2_exercises), len(col3_exercises))
|
|
|
|
for i in range(max_rows):
|
|
# Colonne 1
|
|
if i < len(col1_exercises):
|
|
exercise_num = i + 1
|
|
col1_text = f"{exercise_num}. {col1_exercises[i]}"
|
|
else:
|
|
col1_text = ""
|
|
|
|
# Colonne 2
|
|
if i < len(col2_exercises):
|
|
exercise_num = i + 1 + len(col1_exercises)
|
|
col2_text = f"{exercise_num}. {col2_exercises[i]}"
|
|
else:
|
|
col2_text = ""
|
|
|
|
# Colonne 3
|
|
if i < len(col3_exercises):
|
|
exercise_num = i + 1 + len(col1_exercises) + len(col2_exercises)
|
|
col3_text = f"{exercise_num}. {col3_exercises[i]}"
|
|
else:
|
|
col3_text = ""
|
|
|
|
# Ajouter la ligne
|
|
pdf.cell(60, 10, col1_text, 0, 0, "L", new_x="RIGHT", new_y="TOP")
|
|
pdf.cell(60, 10, col2_text, 0, 0, "L", new_x="RIGHT", new_y="TOP")
|
|
pdf.cell(60, 10, col3_text, 0, 1, "L", new_x="LMARGIN", new_y="NEXT")
|
|
|
|
# Sauvegarder le PDF en mémoire
|
|
pdf_data = pdf.output()
|
|
|
|
# Upload to S3
|
|
upload_success = upload_to_s3(pdf_data, S3_BUCKET_NAME, pdf_filename)
|
|
|
|
if not upload_success:
|
|
raise Exception("Failed to upload PDF to S3")
|
|
|
|
return pdf_filename
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
async def read_root(request: Request):
|
|
return templates.TemplateResponse("index.html", {"request": request})
|
|
|
|
|
|
@app.post("/generate")
|
|
async def generate_exercises_endpoint(request: ExerciseRequest):
|
|
try:
|
|
if request.min_table < 1 or request.max_table < 1:
|
|
return {"error": "Les tables doivent être supérieures à 0"}
|
|
if request.min_table > request.max_table:
|
|
return {
|
|
"error": "La table minimale doit être inférieure ou égale à la table maximale"
|
|
}
|
|
if request.num_exercises < 1:
|
|
return {"error": "Le nombre d'exercices doit être supérieur à 0"}
|
|
|
|
pdf_filename = create_math_exercises_pdf(
|
|
request.min_table, request.max_table, request.num_exercises
|
|
)
|
|
|
|
# Return redirect to automatically download the file
|
|
return RedirectResponse(url=f"/download/{pdf_filename}", status_code=303)
|
|
|
|
except Exception as e:
|
|
return {"error": f"Erreur lors de la création du PDF: {str(e)}"}
|
|
|
|
|
|
@app.get("/download/{filename}")
|
|
async def download_pdf(filename: str):
|
|
# Download file from S3
|
|
file_data = download_from_s3(S3_BUCKET_NAME, filename)
|
|
|
|
if file_data is None:
|
|
return {"error": "File not found"}
|
|
|
|
# Return streaming response with PDF data
|
|
return StreamingResponse(
|
|
io.BytesIO(file_data),
|
|
media_type="application/pdf",
|
|
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
|
|
)
|
|
|
|
|
|
@app.get("/list")
|
|
async def list_pdfs(page: int = 1, page_size: int = 10):
|
|
"""List PDF files in the S3 bucket with pagination (sorted from newest to oldest)"""
|
|
try:
|
|
# Validate page and page_size parameters
|
|
if page < 1:
|
|
page = 1
|
|
if page_size < 1 or page_size > 100:
|
|
page_size = 10
|
|
|
|
pdf_files = list_objects_in_s3(S3_BUCKET_NAME)
|
|
|
|
# Calculate pagination
|
|
total_files = len(pdf_files)
|
|
total_pages = (total_files + page_size - 1) // page_size # Ceiling division
|
|
|
|
# Slice the files for the current page
|
|
start_index = (page - 1) * page_size
|
|
end_index = start_index + page_size
|
|
paginated_files = pdf_files[start_index:end_index]
|
|
|
|
return {
|
|
"files": paginated_files,
|
|
"pagination": {
|
|
"current_page": page,
|
|
"page_size": page_size,
|
|
"total_files": total_files,
|
|
"total_pages": total_pages,
|
|
"has_next": page < total_pages,
|
|
"has_prev": page > 1,
|
|
},
|
|
}
|
|
except Exception as e:
|
|
return {"error": f"Error listing files: {str(e)}"}
|
|
|
|
|
|
@app.delete("/delete/{filename}")
|
|
async def delete_pdf(filename: str):
|
|
"""Delete a PDF file from S3 bucket"""
|
|
try:
|
|
# Decode URL encoded filename
|
|
import urllib.parse
|
|
|
|
decoded_filename = urllib.parse.unquote(filename)
|
|
|
|
success = delete_from_s3(S3_BUCKET_NAME, decoded_filename)
|
|
if success:
|
|
return {"message": f"Fichier {decoded_filename} supprimé avec succès"}
|
|
else:
|
|
return {
|
|
"error": f"Erreur lors de la suppression du fichier {decoded_filename}"
|
|
}
|
|
except Exception as e:
|
|
return {"error": f"Error deleting file: {str(e)}"}
|
|
|
|
|
|
@app.post("/bulk-delete")
|
|
async def bulk_delete(filenames: str = Form(...)):
|
|
"""Delete multiple PDF files from S3 bucket"""
|
|
try:
|
|
filename_list = json.loads(filenames)
|
|
deleted_files = []
|
|
errors = []
|
|
|
|
for filename in filename_list:
|
|
success = delete_from_s3(S3_BUCKET_NAME, filename)
|
|
if success:
|
|
deleted_files.append(filename)
|
|
else:
|
|
errors.append(filename)
|
|
|
|
if errors:
|
|
return {
|
|
"message": f"Fichiers supprimés: {len(deleted_files)}",
|
|
"errors": errors,
|
|
}
|
|
else:
|
|
return {"message": f"{len(deleted_files)} fichiers supprimés avec succès"}
|
|
except Exception as e:
|
|
return {"error": f"Error deleting files: {str(e)}"}
|
|
|
|
|
|
@app.post("/bulk-download")
|
|
async def bulk_download(filenames: str = Form(...)):
|
|
"""Download multiple PDF files as a zip archive"""
|
|
try:
|
|
# Parse the JSON string to get the list of filenames
|
|
filename_list = json.loads(filenames)
|
|
|
|
# Create a temporary zip file
|
|
with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp_zip:
|
|
with zipfile.ZipFile(tmp_zip.name, "w") as zipf:
|
|
for filename in filename_list:
|
|
file_data = download_from_s3(S3_BUCKET_NAME, filename)
|
|
if file_data:
|
|
zipf.writestr(filename, file_data)
|
|
|
|
# Read the zip file
|
|
with open(tmp_zip.name, "rb") as f:
|
|
zip_data = f.read()
|
|
|
|
# Clean up temporary file
|
|
import os
|
|
|
|
os.unlink(tmp_zip.name)
|
|
|
|
# Return streaming response with zip data
|
|
zip_filename = f"math_exercises_{len(filename_list)}_files.zip"
|
|
return StreamingResponse(
|
|
io.BytesIO(zip_data),
|
|
media_type="application/zip",
|
|
headers={
|
|
"Content-Disposition": f'attachment; filename="{zip_filename}"'
|
|
},
|
|
)
|
|
except Exception as e:
|
|
return {"error": f"Error downloading files: {str(e)}"}
|
|
|
|
|
|
@app.get("/health")
|
|
async def health_check():
|
|
"""Health check endpoint that returns the status of the application"""
|
|
return {"status": "healthy"}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|