Files
math-tables/app/main.py

462 lines
15 KiB
Python

#!/usr/bin/env python3
import json
import random
import io
import zipfile
import tempfile
from typing import List
from fastapi import FastAPI, Request, Form
from fastapi.responses import HTMLResponse, RedirectResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel
from fpdf import FPDF
import logging
# Import the functions from gen_op.py
from app.gen_op import generer_pdf_exercices_en_memoire
# Import S3 functions
from app.s3_utils import (
S3_BUCKET_NAME,
create_bucket_if_not_exists,
upload_to_s3,
download_from_s3,
list_objects_in_s3,
delete_from_s3,
)
# Custom filter to exclude health check logs
class HealthCheckFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
# Exclude health check requests from logs
return "GET /health " not in record.getMessage()
# Apply the filter to Uvicorn's access logger
logging.getLogger("uvicorn.access").addFilter(HealthCheckFilter())
app = FastAPI()
templates = Jinja2Templates(directory="app/templates")
# Create bucket on startup
try:
create_bucket_if_not_exists(S3_BUCKET_NAME)
except Exception as e:
print(f"Warning: Could not create/check S3 bucket: {e}")
class MathExercisesPDF(FPDF):
def header(self):
self.set_font("Helvetica", "B", 16)
self.cell(
0,
10,
"Exercices de Multiplication et Division",
0,
1,
"C",
new_x="LMARGIN",
new_y="NEXT",
)
self.ln(10)
class ExerciseRequest(BaseModel):
min_table: int
max_table: int
num_exercises: int = 15
multiplication_only: bool = False
class OperationExerciseRequest(BaseModel):
num_exercises: int = 20
def generate_exercises(
min_table: int,
max_table: int,
num_exercises: int = 15,
multiplication_only: bool = False,
) -> List[str]:
"""Génère des exercices de multiplication et division aléatoires mélangés sans doublons"""
exercises: List[str] = []
used_operations: set = set() # Pour éviter les doublons
# Générer le nombre exact d'exercices demandé
attempts = 0
max_attempts = num_exercises * 10 # Limite pour éviter une boucle infinie
while len(exercises) < num_exercises and attempts < max_attempts:
attempts += 1
# Choisir deux nombres aléatoires entre min_table et max_table
a = random.randint(min_table, max_table)
b = random.randint(min_table, max_table)
result = a * b
# Déterminer le type d'exercice
if multiplication_only:
exercise_type = "multiplication"
else:
# Choisir aléatoirement le type d'exercice (seulement multiplication ou division)
exercise_type = random.choice(["multiplication", "division"])
if exercise_type == "multiplication":
# Exercice de multiplication
operation_key = f"mult_{a}_{b}" # Clé unique pour cette opération
if operation_key not in used_operations:
exercise = f"{a} · {b} = ____"
exercises.append(exercise)
used_operations.add(operation_key)
elif not multiplication_only: # division
# Exercice de division
divisor = random.choice([a, b])
operation_key = f"div_{result}_{divisor}" # Clé unique pour cette opération
if operation_key not in used_operations:
exercise = f"{result} : {divisor} = ____"
exercises.append(exercise)
used_operations.add(operation_key)
# Si nous n'avons pas pu générer suffisamment d'exercices uniques,
# compléter avec des variations
if len(exercises) < num_exercises:
remaining = num_exercises - len(exercises)
for _ in range(remaining):
# Générer des variations avec des nombres légèrement différents
a = random.randint(min_table, max_table)
b = random.randint(min_table, max_table)
result = a * b
# Déterminer le type d'exercice
if multiplication_only:
exercise_type = "multiplication"
else:
# Choisir aléatoirement le type d'exercice
exercise_type = random.choice(["multiplication", "division"])
if exercise_type == "multiplication":
exercise = f"{a} · {b} = ____"
elif not multiplication_only: # division
divisor = random.choice([a, b])
exercise = f"{result} : {divisor} = ____"
else:
# Fallback to multiplication if multiplication_only is True
exercise = f"{a} · {b} = ____"
exercises.append(exercise)
return exercises
def create_math_exercises_pdf(
min_table: int,
max_table: int,
num_exercises: int = 15,
multiplication_only: bool = False,
) -> str:
"""Crée un fichier PDF avec des exercices de mathématiques mélangés en 3 colonnes et l'upload sur S3"""
import datetime
# Add timestamp to filename
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
# Ajouter des informations sur la plage de tables
if multiplication_only:
if min_table == max_table:
table_info = f"Tables de multiplication seulement pour {min_table}"
pdf_filename = f"exercices_multiplication_seulement_table_{min_table}_{num_exercises}_exercices_{timestamp}.pdf"
else:
table_info = (
f"Tables de multiplication seulement de {min_table} à {max_table}"
)
pdf_filename = f"exercices_multiplication_seulement_tables_{min_table}_a_{max_table}_{num_exercises}_exercices_{timestamp}.pdf"
else:
if min_table == max_table:
table_info = f"Tables de multiplication et division pour {min_table}"
pdf_filename = f"exercices_mathematiques_table_{min_table}_{num_exercises}_exercices_{timestamp}.pdf"
else:
table_info = (
f"Tables de multiplication et division de {min_table} à {max_table}"
)
pdf_filename = f"exercices_mathematiques_tables_{min_table}_a_{max_table}_{num_exercises}_exercices_{timestamp}.pdf"
pdf = MathExercisesPDF()
pdf.add_page()
pdf.set_font("Helvetica", "", 12)
pdf.cell(0, 10, table_info, 0, 1, "C", new_x="LMARGIN", new_y="NEXT")
pdf.ln(5)
# Générer les exercices
exercises = generate_exercises(
min_table, max_table, num_exercises, multiplication_only
)
# Pas d'en-têtes de colonnes
# Répartir les exercices en 3 colonnes
num_per_column = (num_exercises + 2) // 3 # Arrondi vers le haut
col1_exercises = exercises[:num_per_column]
col2_exercises = exercises[num_per_column : num_per_column * 2]
col3_exercises = exercises[num_per_column * 2 :]
# Ajouter les exercices numérotés de 1 à n dans les colonnes
max_rows = max(len(col1_exercises), len(col2_exercises), len(col3_exercises))
for i in range(max_rows):
# Colonne 1
if i < len(col1_exercises):
exercise_num = i + 1
col1_text = f"{exercise_num}. {col1_exercises[i]}"
else:
col1_text = ""
# Colonne 2
if i < len(col2_exercises):
exercise_num = i + 1 + len(col1_exercises)
col2_text = f"{exercise_num}. {col2_exercises[i]}"
else:
col2_text = ""
# Colonne 3
if i < len(col3_exercises):
exercise_num = i + 1 + len(col1_exercises) + len(col2_exercises)
col3_text = f"{exercise_num}. {col3_exercises[i]}"
else:
col3_text = ""
# Ajouter la ligne
pdf.cell(60, 10, col1_text, 0, 0, "L", new_x="RIGHT", new_y="TOP")
pdf.cell(60, 10, col2_text, 0, 0, "L", new_x="RIGHT", new_y="TOP")
pdf.cell(60, 10, col3_text, 0, 1, "L", new_x="LMARGIN", new_y="NEXT")
# Sauvegarder le PDF en mémoire
pdf_data = pdf.output()
# Upload to S3
upload_success = upload_to_s3(pdf_data, S3_BUCKET_NAME, pdf_filename)
if not upload_success:
raise Exception("Failed to upload PDF to S3")
return pdf_filename
def create_operation_exercises_pdf(num_exercises: int = 20) -> str:
"""Crée un fichier PDF avec des exercices d'opérations et l'upload sur S3"""
import datetime
# Add timestamp to filename
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
pdf_filename = f"exercices_operations_{num_exercises}_exercices_{timestamp}.pdf"
# Generate PDF in memory using gen-op functions
pdf_data = generer_pdf_exercices_en_memoire(num_exercises)
# Upload to S3
upload_success = upload_to_s3(
pdf_data, S3_BUCKET_NAME, pdf_filename, "application/pdf"
)
if not upload_success:
raise Exception("Failed to upload PDF to S3")
return pdf_filename
@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/generate")
async def generate_exercises_endpoint(request: ExerciseRequest):
try:
if request.min_table < 1 or request.max_table < 1:
return {"error": "Les tables doivent être supérieures à 0"}
if request.min_table > request.max_table:
return {
"error": "La table minimale doit être inférieure ou égale à la table maximale"
}
if request.num_exercises < 1:
return {"error": "Le nombre d'exercices doit être supérieur à 0"}
pdf_filename = create_math_exercises_pdf(
request.min_table,
request.max_table,
request.num_exercises,
request.multiplication_only,
)
# Return redirect to automatically download the file
return RedirectResponse(url=f"/download/{pdf_filename}", status_code=303)
except Exception as e:
return {"error": f"Erreur lors de la création du PDF: {str(e)}"}
@app.post("/generate-operations")
async def generate_operation_exercises_endpoint(request: OperationExerciseRequest):
try:
if request.num_exercises < 1:
return {"error": "Le nombre d'exercices doit être supérieur à 0"}
pdf_filename = create_operation_exercises_pdf(request.num_exercises)
# Return redirect to automatically download the file
return RedirectResponse(url=f"/download/{pdf_filename}", status_code=303)
except Exception as e:
return {"error": f"Erreur lors de la création du PDF: {str(e)}"}
@app.get("/download/{filename}")
async def download_pdf(filename: str):
# Download file from S3
file_data = download_from_s3(S3_BUCKET_NAME, filename)
if file_data is None:
return {"error": "File not found"}
# Return streaming response with PDF data
return StreamingResponse(
io.BytesIO(file_data),
media_type="application/pdf",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
@app.get("/list")
async def list_pdfs(page: int = 1, page_size: int = 10):
"""List PDF files in the S3 bucket with pagination (sorted from newest to oldest)"""
try:
# Validate page and page_size parameters
if page < 1:
page = 1
if page_size < 1 or page_size > 100:
page_size = 10
pdf_files = list_objects_in_s3(S3_BUCKET_NAME)
# Calculate pagination
total_files = len(pdf_files)
total_pages = (total_files + page_size - 1) // page_size # Ceiling division
# Slice the files for the current page
start_index = (page - 1) * page_size
end_index = start_index + page_size
paginated_files = pdf_files[start_index:end_index]
return {
"files": paginated_files,
"pagination": {
"current_page": page,
"page_size": page_size,
"total_files": total_files,
"total_pages": total_pages,
"has_next": page < total_pages,
"has_prev": page > 1,
},
}
except Exception as e:
return {"error": f"Error listing files: {str(e)}"}
@app.delete("/delete/{filename}")
async def delete_pdf(filename: str):
"""Delete a PDF file from S3 bucket"""
try:
# Decode URL encoded filename
import urllib.parse
decoded_filename = urllib.parse.unquote(filename)
success = delete_from_s3(S3_BUCKET_NAME, decoded_filename)
if success:
return {"message": f"Fichier {decoded_filename} supprimé avec succès"}
else:
return {
"error": f"Erreur lors de la suppression du fichier {decoded_filename}"
}
except Exception as e:
return {"error": f"Error deleting file: {str(e)}"}
@app.post("/bulk-delete")
async def bulk_delete(filenames: str = Form(...)):
"""Delete multiple PDF files from S3 bucket"""
try:
filename_list = json.loads(filenames)
deleted_files = []
errors = []
for filename in filename_list:
success = delete_from_s3(S3_BUCKET_NAME, filename)
if success:
deleted_files.append(filename)
else:
errors.append(filename)
if errors:
return {
"message": f"Fichiers supprimés: {len(deleted_files)}",
"errors": errors,
}
else:
return {"message": f"{len(deleted_files)} fichiers supprimés avec succès"}
except Exception as e:
return {"error": f"Error deleting files: {str(e)}"}
@app.post("/bulk-download")
async def bulk_download(filenames: str = Form(...)):
"""Download multiple PDF files as a zip archive"""
try:
# Parse the JSON string to get the list of filenames
filename_list = json.loads(filenames)
# Create a temporary zip file
with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp_zip:
with zipfile.ZipFile(tmp_zip.name, "w") as zipf:
for filename in filename_list:
file_data = download_from_s3(S3_BUCKET_NAME, filename)
if file_data:
zipf.writestr(filename, file_data)
# Read the zip file
with open(tmp_zip.name, "rb") as f:
zip_data = f.read()
# Clean up temporary file
import os
os.unlink(tmp_zip.name)
# Return streaming response with zip data
zip_filename = f"math_exercises_{len(filename_list)}_files.zip"
return StreamingResponse(
io.BytesIO(zip_data),
media_type="application/zip",
headers={
"Content-Disposition": f'attachment; filename="{zip_filename}"'
},
)
except Exception as e:
return {"error": f"Error downloading files: {str(e)}"}
@app.get("/health")
async def health_check():
"""Health check endpoint that returns the status of the application"""
return {"status": "healthy"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)