28b1c80b50
Moved gen-op.py to app/gen_op.py and integrated it with the FastAPI application Added a new section in the UI for generating operation exercises Updated the PDF generation to work in memory and upload to S3 Bumped version to 1.0.9
523 lines
17 KiB
Python
523 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
import json
|
|
import random
|
|
import os
|
|
import io
|
|
import boto3
|
|
import zipfile
|
|
import tempfile
|
|
import importlib.util
|
|
from botocore.exceptions import ClientError
|
|
from typing import List
|
|
from fastapi import FastAPI, Request, Form
|
|
from fastapi.responses import HTMLResponse, RedirectResponse, StreamingResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
from pydantic import BaseModel
|
|
from fpdf import FPDF
|
|
|
|
import logging
|
|
|
|
# Import the functions from gen_op.py
|
|
from app.gen_op import generer_pdf_exercices_en_memoire
|
|
|
|
# Custom filter to exclude health check logs
|
|
class HealthCheckFilter(logging.Filter):
|
|
def filter(self, record: logging.LogRecord) -> bool:
|
|
# Exclude health check requests from logs
|
|
return "GET /health " not in record.getMessage()
|
|
|
|
# Apply the filter to Uvicorn's access logger
|
|
logging.getLogger("uvicorn.access").addFilter(HealthCheckFilter())
|
|
|
|
app = FastAPI()
|
|
|
|
# S3 Configuration
|
|
S3_BUCKET_NAME = os.environ.get("S3_BUCKET_NAME", "math-exercises")
|
|
|
|
templates = Jinja2Templates(directory="app/templates")
|
|
|
|
|
|
def get_s3_client():
|
|
"""Create and return an S3 client using environment variables"""
|
|
s3_access_key = os.environ.get("S3_ACCESS_KEY")
|
|
s3_secret_key = os.environ.get("S3_SECRET_KEY")
|
|
s3_host_base = os.environ.get("S3_HOST_BASE")
|
|
|
|
if not all([s3_access_key, s3_secret_key, s3_host_base]):
|
|
raise ValueError("S3 environment variables not properly set")
|
|
|
|
s3 = boto3.client(
|
|
"s3",
|
|
aws_access_key_id=s3_access_key,
|
|
aws_secret_access_key=s3_secret_key,
|
|
endpoint_url=s3_host_base,
|
|
region_name="us-east-1", # Required but unused for Infomaniak
|
|
)
|
|
return s3
|
|
|
|
|
|
def create_bucket_if_not_exists(bucket_name):
|
|
"""Create S3 bucket if it doesn't exist"""
|
|
s3_client = get_s3_client()
|
|
|
|
try:
|
|
s3_client.head_bucket(Bucket=bucket_name)
|
|
except ClientError as e:
|
|
error_code = int(e.response["Error"]["Code"])
|
|
if error_code == 404:
|
|
# Bucket doesn't exist, create it
|
|
try:
|
|
s3_client.create_bucket(Bucket=bucket_name)
|
|
print(f"Bucket {bucket_name} created successfully")
|
|
except ClientError as create_error:
|
|
print(f"Error creating bucket: {create_error}")
|
|
raise
|
|
else:
|
|
# Some other error
|
|
print(f"Error checking bucket: {e}")
|
|
raise
|
|
|
|
|
|
# Create bucket on startup
|
|
try:
|
|
create_bucket_if_not_exists(S3_BUCKET_NAME)
|
|
except Exception as e:
|
|
print(f"Warning: Could not create/check S3 bucket: {e}")
|
|
|
|
|
|
def upload_to_s3(file_data, bucket_name, object_name, content_type="application/pdf"):
|
|
"""Upload file data to S3 bucket"""
|
|
s3_client = get_s3_client()
|
|
|
|
try:
|
|
s3_client.put_object(
|
|
Bucket=bucket_name,
|
|
Key=object_name,
|
|
Body=file_data,
|
|
ContentType=content_type,
|
|
)
|
|
return True
|
|
except ClientError as e:
|
|
print(f"Error uploading to S3: {e}")
|
|
return False
|
|
|
|
|
|
def download_from_s3(bucket_name, object_name):
|
|
"""Download file data from S3 bucket"""
|
|
s3_client = get_s3_client()
|
|
|
|
try:
|
|
response = s3_client.get_object(Bucket=bucket_name, Key=object_name)
|
|
return response["Body"].read()
|
|
except ClientError as e:
|
|
print(f"Error downloading from S3: {e}")
|
|
return None
|
|
|
|
|
|
def list_objects_in_s3(bucket_name):
|
|
"""List all objects in S3 bucket (sorted from newest to oldest)"""
|
|
s3_client = get_s3_client()
|
|
|
|
try:
|
|
response = s3_client.list_objects_v2(Bucket=bucket_name)
|
|
if "Contents" in response:
|
|
# Filter for PDF files only and sort by last modified (newest first)
|
|
pdf_files = [
|
|
obj for obj in response["Contents"] if obj["Key"].endswith(".pdf")
|
|
]
|
|
pdf_files.sort(key=lambda x: x["LastModified"], reverse=True)
|
|
return pdf_files
|
|
else:
|
|
return []
|
|
except ClientError as e:
|
|
print(f"Error listing objects in S3: {e}")
|
|
return []
|
|
|
|
|
|
def delete_from_s3(bucket_name, object_name):
|
|
"""Delete file from S3 bucket"""
|
|
s3_client = get_s3_client()
|
|
|
|
try:
|
|
s3_client.delete_object(Bucket=bucket_name, Key=object_name)
|
|
return True
|
|
except ClientError as e:
|
|
print(f"Error deleting from S3: {e}")
|
|
return False
|
|
|
|
|
|
class MathExercisesPDF(FPDF):
|
|
def header(self):
|
|
self.set_font("Helvetica", "B", 16)
|
|
self.cell(
|
|
0,
|
|
10,
|
|
"Exercices de Multiplication et Division",
|
|
0,
|
|
1,
|
|
"C",
|
|
new_x="LMARGIN",
|
|
new_y="NEXT",
|
|
)
|
|
self.ln(10)
|
|
|
|
|
|
class ExerciseRequest(BaseModel):
|
|
min_table: int
|
|
max_table: int
|
|
num_exercises: int = 15
|
|
|
|
class OperationExerciseRequest(BaseModel):
|
|
num_exercises: int = 20
|
|
|
|
|
|
def generate_exercises(
|
|
min_table: int, max_table: int, num_exercises: int = 15
|
|
) -> List[str]:
|
|
"""Génère des exercices de multiplication et division aléatoires mélangés sans doublons"""
|
|
exercises: List[str] = []
|
|
used_operations: set = set() # Pour éviter les doublons
|
|
|
|
# Générer le nombre exact d'exercices demandé
|
|
attempts = 0
|
|
max_attempts = num_exercises * 10 # Limite pour éviter une boucle infinie
|
|
|
|
while len(exercises) < num_exercises and attempts < max_attempts:
|
|
attempts += 1
|
|
|
|
# Choisir deux nombres aléatoires entre min_table et max_table
|
|
a = random.randint(min_table, max_table)
|
|
b = random.randint(min_table, max_table)
|
|
result = a * b
|
|
|
|
# Choisir aléatoirement le type d'exercice (seulement multiplication ou division)
|
|
exercise_type = random.choice(["multiplication", "division"])
|
|
|
|
if exercise_type == "multiplication":
|
|
# Exercice de multiplication
|
|
operation_key = f"mult_{a}_{b}" # Clé unique pour cette opération
|
|
if operation_key not in used_operations:
|
|
exercise = f"{a} · {b} = ____"
|
|
exercises.append(exercise)
|
|
used_operations.add(operation_key)
|
|
else: # division
|
|
# Exercice de division
|
|
divisor = random.choice([a, b])
|
|
operation_key = f"div_{result}_{divisor}" # Clé unique pour cette opération
|
|
if operation_key not in used_operations:
|
|
exercise = f"{result} : {divisor} = ____"
|
|
exercises.append(exercise)
|
|
used_operations.add(operation_key)
|
|
|
|
# Si nous n'avons pas pu générer suffisamment d'exercices uniques,
|
|
# compléter avec des variations
|
|
if len(exercises) < num_exercises:
|
|
remaining = num_exercises - len(exercises)
|
|
for _ in range(remaining):
|
|
# Générer des variations avec des nombres légèrement différents
|
|
a = random.randint(min_table, max_table)
|
|
b = random.randint(min_table, max_table)
|
|
result = a * b
|
|
|
|
# Choisir aléatoirement le type d'exercice
|
|
exercise_type = random.choice(["multiplication", "division"])
|
|
|
|
if exercise_type == "multiplication":
|
|
exercise = f"{a} · {b} = ____"
|
|
else: # division
|
|
divisor = random.choice([a, b])
|
|
exercise = f"{result} : {divisor} = ____"
|
|
|
|
exercises.append(exercise)
|
|
|
|
return exercises
|
|
|
|
|
|
def create_math_exercises_pdf(
|
|
min_table: int, max_table: int, num_exercises: int = 15
|
|
) -> str:
|
|
"""Crée un fichier PDF avec des exercices de mathématiques mélangés en 3 colonnes et l'upload sur S3"""
|
|
import datetime
|
|
|
|
# Add timestamp to filename
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
# Ajouter des informations sur la plage de tables
|
|
if min_table == max_table:
|
|
table_info = f"Tables de multiplication et division pour {min_table}"
|
|
pdf_filename = f"exercices_mathematiques_table_{min_table}_{num_exercises}_exercices_{timestamp}.pdf"
|
|
else:
|
|
table_info = (
|
|
f"Tables de multiplication et division de {min_table} à {max_table}"
|
|
)
|
|
pdf_filename = f"exercices_mathematiques_tables_{min_table}_a_{max_table}_{num_exercises}_exercices_{timestamp}.pdf"
|
|
|
|
pdf = MathExercisesPDF()
|
|
pdf.add_page()
|
|
pdf.set_font("Helvetica", "", 12)
|
|
|
|
pdf.cell(0, 10, table_info, 0, 1, "C", new_x="LMARGIN", new_y="NEXT")
|
|
pdf.ln(5)
|
|
|
|
# Générer les exercices
|
|
exercises = generate_exercises(min_table, max_table, num_exercises)
|
|
|
|
# Pas d'en-têtes de colonnes
|
|
|
|
# Répartir les exercices en 3 colonnes
|
|
num_per_column = (num_exercises + 2) // 3 # Arrondi vers le haut
|
|
col1_exercises = exercises[:num_per_column]
|
|
col2_exercises = exercises[num_per_column : num_per_column * 2]
|
|
col3_exercises = exercises[num_per_column * 2 :]
|
|
|
|
# Ajouter les exercices numérotés de 1 à n dans les colonnes
|
|
max_rows = max(len(col1_exercises), len(col2_exercises), len(col3_exercises))
|
|
|
|
for i in range(max_rows):
|
|
# Colonne 1
|
|
if i < len(col1_exercises):
|
|
exercise_num = i + 1
|
|
col1_text = f"{exercise_num}. {col1_exercises[i]}"
|
|
else:
|
|
col1_text = ""
|
|
|
|
# Colonne 2
|
|
if i < len(col2_exercises):
|
|
exercise_num = i + 1 + len(col1_exercises)
|
|
col2_text = f"{exercise_num}. {col2_exercises[i]}"
|
|
else:
|
|
col2_text = ""
|
|
|
|
# Colonne 3
|
|
if i < len(col3_exercises):
|
|
exercise_num = i + 1 + len(col1_exercises) + len(col2_exercises)
|
|
col3_text = f"{exercise_num}. {col3_exercises[i]}"
|
|
else:
|
|
col3_text = ""
|
|
|
|
# Ajouter la ligne
|
|
pdf.cell(60, 10, col1_text, 0, 0, "L", new_x="RIGHT", new_y="TOP")
|
|
pdf.cell(60, 10, col2_text, 0, 0, "L", new_x="RIGHT", new_y="TOP")
|
|
pdf.cell(60, 10, col3_text, 0, 1, "L", new_x="LMARGIN", new_y="NEXT")
|
|
|
|
# Sauvegarder le PDF en mémoire
|
|
pdf_data = pdf.output()
|
|
|
|
# Upload to S3
|
|
upload_success = upload_to_s3(pdf_data, S3_BUCKET_NAME, pdf_filename)
|
|
|
|
if not upload_success:
|
|
raise Exception("Failed to upload PDF to S3")
|
|
|
|
return pdf_filename
|
|
|
|
|
|
def create_operation_exercises_pdf(num_exercises: int = 20) -> str:
|
|
"""Crée un fichier PDF avec des exercices d'opérations et l'upload sur S3"""
|
|
import datetime
|
|
|
|
# Add timestamp to filename
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
pdf_filename = f"exercices_operations_{num_exercises}_exercices_{timestamp}.pdf"
|
|
|
|
# Generate PDF in memory using gen-op functions
|
|
pdf_data = generer_pdf_exercices_en_memoire(num_exercises)
|
|
|
|
# Upload to S3
|
|
upload_success = upload_to_s3(pdf_data, S3_BUCKET_NAME, pdf_filename, "application/pdf")
|
|
|
|
if not upload_success:
|
|
raise Exception("Failed to upload PDF to S3")
|
|
|
|
return pdf_filename
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
async def read_root(request: Request):
|
|
return templates.TemplateResponse("index.html", {"request": request})
|
|
|
|
|
|
@app.post("/generate")
|
|
async def generate_exercises_endpoint(request: ExerciseRequest):
|
|
try:
|
|
if request.min_table < 1 or request.max_table < 1:
|
|
return {"error": "Les tables doivent être supérieures à 0"}
|
|
if request.min_table > request.max_table:
|
|
return {
|
|
"error": "La table minimale doit être inférieure ou égale à la table maximale"
|
|
}
|
|
if request.num_exercises < 1:
|
|
return {"error": "Le nombre d'exercices doit être supérieur à 0"}
|
|
|
|
pdf_filename = create_math_exercises_pdf(
|
|
request.min_table, request.max_table, request.num_exercises
|
|
)
|
|
|
|
# Return redirect to automatically download the file
|
|
return RedirectResponse(url=f"/download/{pdf_filename}", status_code=303)
|
|
|
|
except Exception as e:
|
|
return {"error": f"Erreur lors de la création du PDF: {str(e)}"}
|
|
|
|
|
|
@app.post("/generate-operations")
|
|
async def generate_operation_exercises_endpoint(request: OperationExerciseRequest):
|
|
try:
|
|
if request.num_exercises < 1:
|
|
return {"error": "Le nombre d'exercices doit être supérieur à 0"}
|
|
|
|
pdf_filename = create_operation_exercises_pdf(request.num_exercises)
|
|
|
|
# Return redirect to automatically download the file
|
|
return RedirectResponse(url=f"/download/{pdf_filename}", status_code=303)
|
|
|
|
except Exception as e:
|
|
return {"error": f"Erreur lors de la création du PDF: {str(e)}"}
|
|
|
|
|
|
@app.get("/download/{filename}")
|
|
async def download_pdf(filename: str):
|
|
# Download file from S3
|
|
file_data = download_from_s3(S3_BUCKET_NAME, filename)
|
|
|
|
if file_data is None:
|
|
return {"error": "File not found"}
|
|
|
|
# Return streaming response with PDF data
|
|
return StreamingResponse(
|
|
io.BytesIO(file_data),
|
|
media_type="application/pdf",
|
|
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
|
|
)
|
|
|
|
|
|
@app.get("/list")
|
|
async def list_pdfs(page: int = 1, page_size: int = 10):
|
|
"""List PDF files in the S3 bucket with pagination (sorted from newest to oldest)"""
|
|
try:
|
|
# Validate page and page_size parameters
|
|
if page < 1:
|
|
page = 1
|
|
if page_size < 1 or page_size > 100:
|
|
page_size = 10
|
|
|
|
pdf_files = list_objects_in_s3(S3_BUCKET_NAME)
|
|
|
|
# Calculate pagination
|
|
total_files = len(pdf_files)
|
|
total_pages = (total_files + page_size - 1) // page_size # Ceiling division
|
|
|
|
# Slice the files for the current page
|
|
start_index = (page - 1) * page_size
|
|
end_index = start_index + page_size
|
|
paginated_files = pdf_files[start_index:end_index]
|
|
|
|
return {
|
|
"files": paginated_files,
|
|
"pagination": {
|
|
"current_page": page,
|
|
"page_size": page_size,
|
|
"total_files": total_files,
|
|
"total_pages": total_pages,
|
|
"has_next": page < total_pages,
|
|
"has_prev": page > 1,
|
|
},
|
|
}
|
|
except Exception as e:
|
|
return {"error": f"Error listing files: {str(e)}"}
|
|
|
|
|
|
@app.delete("/delete/{filename}")
|
|
async def delete_pdf(filename: str):
|
|
"""Delete a PDF file from S3 bucket"""
|
|
try:
|
|
# Decode URL encoded filename
|
|
import urllib.parse
|
|
|
|
decoded_filename = urllib.parse.unquote(filename)
|
|
|
|
success = delete_from_s3(S3_BUCKET_NAME, decoded_filename)
|
|
if success:
|
|
return {"message": f"Fichier {decoded_filename} supprimé avec succès"}
|
|
else:
|
|
return {
|
|
"error": f"Erreur lors de la suppression du fichier {decoded_filename}"
|
|
}
|
|
except Exception as e:
|
|
return {"error": f"Error deleting file: {str(e)}"}
|
|
|
|
|
|
@app.post("/bulk-delete")
|
|
async def bulk_delete(filenames: str = Form(...)):
|
|
"""Delete multiple PDF files from S3 bucket"""
|
|
try:
|
|
filename_list = json.loads(filenames)
|
|
deleted_files = []
|
|
errors = []
|
|
|
|
for filename in filename_list:
|
|
success = delete_from_s3(S3_BUCKET_NAME, filename)
|
|
if success:
|
|
deleted_files.append(filename)
|
|
else:
|
|
errors.append(filename)
|
|
|
|
if errors:
|
|
return {
|
|
"message": f"Fichiers supprimés: {len(deleted_files)}",
|
|
"errors": errors,
|
|
}
|
|
else:
|
|
return {"message": f"{len(deleted_files)} fichiers supprimés avec succès"}
|
|
except Exception as e:
|
|
return {"error": f"Error deleting files: {str(e)}"}
|
|
|
|
|
|
@app.post("/bulk-download")
|
|
async def bulk_download(filenames: str = Form(...)):
|
|
"""Download multiple PDF files as a zip archive"""
|
|
try:
|
|
# Parse the JSON string to get the list of filenames
|
|
filename_list = json.loads(filenames)
|
|
|
|
# Create a temporary zip file
|
|
with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp_zip:
|
|
with zipfile.ZipFile(tmp_zip.name, "w") as zipf:
|
|
for filename in filename_list:
|
|
file_data = download_from_s3(S3_BUCKET_NAME, filename)
|
|
if file_data:
|
|
zipf.writestr(filename, file_data)
|
|
|
|
# Read the zip file
|
|
with open(tmp_zip.name, "rb") as f:
|
|
zip_data = f.read()
|
|
|
|
# Clean up temporary file
|
|
import os
|
|
|
|
os.unlink(tmp_zip.name)
|
|
|
|
# Return streaming response with zip data
|
|
zip_filename = f"math_exercises_{len(filename_list)}_files.zip"
|
|
return StreamingResponse(
|
|
io.BytesIO(zip_data),
|
|
media_type="application/zip",
|
|
headers={
|
|
"Content-Disposition": f'attachment; filename="{zip_filename}"'
|
|
},
|
|
)
|
|
except Exception as e:
|
|
return {"error": f"Error downloading files: {str(e)}"}
|
|
|
|
|
|
@app.get("/health")
|
|
async def health_check():
|
|
"""Health check endpoint that returns the status of the application"""
|
|
return {"status": "healthy"}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|