feat: add pagination to file listing and improve UI
This commit is contained in:
+144
-1
@@ -3,9 +3,11 @@ import random
|
||||
import os
|
||||
import io
|
||||
import boto3
|
||||
import zipfile
|
||||
import tempfile
|
||||
from botocore.exceptions import ClientError
|
||||
from typing import List
|
||||
from fastapi import FastAPI, Request, Response
|
||||
from fastapi import FastAPI, Request, Response, Form
|
||||
from fastapi.responses import HTMLResponse, StreamingResponse
|
||||
from fastapi.templating import Jinja2Templates
|
||||
from pydantic import BaseModel
|
||||
@@ -91,6 +93,34 @@ def download_from_s3(bucket_name, object_name):
|
||||
print(f"Error downloading from S3: {e}")
|
||||
return None
|
||||
|
||||
def list_objects_in_s3(bucket_name):
|
||||
"""List all objects in S3 bucket (sorted from newest to oldest)"""
|
||||
s3_client = get_s3_client()
|
||||
|
||||
try:
|
||||
response = s3_client.list_objects_v2(Bucket=bucket_name)
|
||||
if 'Contents' in response:
|
||||
# Filter for PDF files only and sort by last modified (newest first)
|
||||
pdf_files = [obj for obj in response['Contents'] if obj['Key'].endswith('.pdf')]
|
||||
pdf_files.sort(key=lambda x: x['LastModified'], reverse=True)
|
||||
return pdf_files
|
||||
else:
|
||||
return []
|
||||
except ClientError as e:
|
||||
print(f"Error listing objects in S3: {e}")
|
||||
return []
|
||||
|
||||
def delete_from_s3(bucket_name, object_name):
|
||||
"""Delete file from S3 bucket"""
|
||||
s3_client = get_s3_client()
|
||||
|
||||
try:
|
||||
s3_client.delete_object(Bucket=bucket_name, Key=object_name)
|
||||
return True
|
||||
except ClientError as e:
|
||||
print(f"Error deleting from S3: {e}")
|
||||
return False
|
||||
|
||||
class MathExercisesPDF(FPDF):
|
||||
def header(self):
|
||||
self.set_font('Helvetica', 'B', 16)
|
||||
@@ -283,6 +313,119 @@ async def download_pdf(filename: str):
|
||||
headers={'Content-Disposition': f'attachment; filename="{filename}"'}
|
||||
)
|
||||
|
||||
@app.get("/list")
|
||||
async def list_pdfs(page: int = 1, page_size: int = 10):
|
||||
"""List PDF files in the S3 bucket with pagination (sorted from newest to oldest)"""
|
||||
try:
|
||||
# Validate page and page_size parameters
|
||||
if page < 1:
|
||||
page = 1
|
||||
if page_size < 1 or page_size > 100:
|
||||
page_size = 10
|
||||
|
||||
pdf_files = list_objects_in_s3(S3_BUCKET_NAME)
|
||||
|
||||
# Calculate pagination
|
||||
total_files = len(pdf_files)
|
||||
total_pages = (total_files + page_size - 1) // page_size # Ceiling division
|
||||
|
||||
# Slice the files for the current page
|
||||
start_index = (page - 1) * page_size
|
||||
end_index = start_index + page_size
|
||||
paginated_files = pdf_files[start_index:end_index]
|
||||
|
||||
return {
|
||||
"files": paginated_files,
|
||||
"pagination": {
|
||||
"current_page": page,
|
||||
"page_size": page_size,
|
||||
"total_files": total_files,
|
||||
"total_pages": total_pages,
|
||||
"has_next": page < total_pages,
|
||||
"has_prev": page > 1
|
||||
}
|
||||
}
|
||||
except Exception as e:
|
||||
return {"error": f"Error listing files: {str(e)}"}
|
||||
|
||||
@app.delete("/delete/{filename}")
|
||||
async def delete_pdf(filename: str):
|
||||
"""Delete a PDF file from S3 bucket"""
|
||||
try:
|
||||
# Decode URL encoded filename
|
||||
import urllib.parse
|
||||
decoded_filename = urllib.parse.unquote(filename)
|
||||
|
||||
success = delete_from_s3(S3_BUCKET_NAME, decoded_filename)
|
||||
if success:
|
||||
return {"message": f"Fichier {decoded_filename} supprimé avec succès"}
|
||||
else:
|
||||
return {"error": f"Erreur lors de la suppression du fichier {decoded_filename}"}
|
||||
except Exception as e:
|
||||
return {"error": f"Error deleting file: {str(e)}"}
|
||||
|
||||
@app.post("/bulk-delete")
|
||||
async def bulk_delete(filenames: str = Form(...)):
|
||||
"""Delete multiple PDF files from S3 bucket"""
|
||||
try:
|
||||
import json
|
||||
filename_list = json.loads(filenames)
|
||||
deleted_files = []
|
||||
errors = []
|
||||
|
||||
for filename in filename_list:
|
||||
success = delete_from_s3(S3_BUCKET_NAME, filename)
|
||||
if success:
|
||||
deleted_files.append(filename)
|
||||
else:
|
||||
errors.append(filename)
|
||||
|
||||
if errors:
|
||||
return {"message": f"Fichiers supprimés: {len(deleted_files)}", "errors": errors}
|
||||
else:
|
||||
return {"message": f"{len(deleted_files)} fichiers supprimés avec succès"}
|
||||
except Exception as e:
|
||||
return {"error": f"Error deleting files: {str(e)}"}
|
||||
|
||||
from fastapi import Form
|
||||
|
||||
@app.post("/bulk-download")
|
||||
async def bulk_download(filenames: str = Form(...)):
|
||||
"""Download multiple PDF files as a zip archive"""
|
||||
try:
|
||||
import zipfile
|
||||
import tempfile
|
||||
import json
|
||||
|
||||
# Parse the JSON string to get the list of filenames
|
||||
filename_list = json.loads(filenames)
|
||||
|
||||
# Create a temporary zip file
|
||||
with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as tmp_zip:
|
||||
with zipfile.ZipFile(tmp_zip.name, 'w') as zipf:
|
||||
for filename in filename_list:
|
||||
file_data = download_from_s3(S3_BUCKET_NAME, filename)
|
||||
if file_data:
|
||||
zipf.writestr(filename, file_data)
|
||||
|
||||
# Read the zip file
|
||||
with open(tmp_zip.name, 'rb') as f:
|
||||
zip_data = f.read()
|
||||
|
||||
# Clean up temporary file
|
||||
import os
|
||||
os.unlink(tmp_zip.name)
|
||||
|
||||
# Return streaming response with zip data
|
||||
zip_filename = f"math_exercises_{len(filename_list)}_files.zip"
|
||||
return StreamingResponse(
|
||||
io.BytesIO(zip_data),
|
||||
media_type='application/zip',
|
||||
headers={'Content-Disposition': f'attachment; filename="{zip_filename}"'}
|
||||
)
|
||||
except Exception as e:
|
||||
return {"error": f"Error downloading files: {str(e)}"}
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||||
Reference in New Issue
Block a user