Files
fastapi_file_uploader/main.py
2026-02-19 11:00:53 +09:00

188 lines
5.9 KiB
Python

import json
import os
import shutil
import yt_dlp
from datetime import datetime
from pathlib import Path
from typing import List, Optional
import yt_dlp
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
from fastapi.responses import FileResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
app = FastAPI()
# Configuration
BASE_DIR = Path(__file__).parent
ASSETS_DIR = BASE_DIR / "assets"
IMAGES_DIR = ASSETS_DIR / "images"
VIDEOS_DIR = ASSETS_DIR / "videos"
METADATA_FILE = BASE_DIR / "metadata.json"
STATIC_DIR = BASE_DIR / "static"
# Ensure directories exist
for directory in [IMAGES_DIR, VIDEOS_DIR, STATIC_DIR]:
directory.mkdir(parents=True, exist_ok=True)
if not METADATA_FILE.exists():
METADATA_FILE.write_text("[]")
# Mount static files
app.mount("/web", StaticFiles(directory=str(STATIC_DIR), html=True), name="static")
def load_metadata() -> List[dict]:
try:
with open(METADATA_FILE, "r") as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError):
return []
def save_metadata(metadata: List[dict]):
with open(METADATA_FILE, "w") as f:
json.dump(metadata, f, indent=4)
def get_file_category(content_type: str) -> str:
if content_type.startswith("image/"):
return "images"
elif content_type.startswith("video/"):
return "videos"
return "others"
def sanitize_filename(filename: str) -> str:
# Basic sanitization: remove path traversal attempts and keep only alphanumeric/dots/underscores
return os.path.basename(filename).replace(" ", "_")
@app.post("/api/files/upload")
async def upload_file(
file: UploadFile = File(...),
description: Optional[str] = Form(None),
uploader: Optional[str] = Form(None)
):
category = get_file_category(file.content_type)
if category == "others":
raise HTTPException(status_code=400, detail="Unsupported file type. Only images and videos are allowed.")
filename = sanitize_filename(file.filename)
target_dir = IMAGES_DIR if category == "images" else VIDEOS_DIR
file_path = target_dir / filename
# Handle duplicate filenames by appending timestamp
if file_path.exists():
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
name, ext = os.path.splitext(filename)
filename = f"{name}_{timestamp}{ext}"
file_path = target_dir / filename
try:
with file_path.open("wb") as buffer:
shutil.copyfileobj(file.file, buffer)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Could not save file: {str(e)}")
metadata_entry = {
"filename": filename,
"type": file.content_type,
"upload_date": datetime.now().isoformat(),
"file_path": str(file_path.relative_to(BASE_DIR)),
"description": description,
"uploader": uploader
}
metadata = load_metadata()
metadata.append(metadata_entry)
save_metadata(metadata)
return metadata_entry
@app.post("/api/files/youtube")
async def download_youtube(
url: str = Form(...),
description: Optional[str] = Form(None),
uploader: Optional[str] = Form(None)
):
ydl_opts = {
'format': 'bestvideo+bestaudio/best',
'outtmpl': str(VIDEOS_DIR / '%(title)s.%(ext)s'),
'quiet': True,
'no_warnings': True,
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
file_path_str = ydl.prepare_filename(info)
file_path = Path(file_path_str)
filename = file_path.name
metadata_entry = {
"filename": filename,
"type": "video/mp4", # Simplified, yt-dlp might download different formats
"upload_date": datetime.now().isoformat(),
"file_path": str(file_path.relative_to(BASE_DIR)),
"description": description or info.get('title'),
"uploader": uploader or info.get('uploader')
}
metadata = load_metadata()
metadata.append(metadata_entry)
save_metadata(metadata)
return metadata_entry
except Exception as e:
raise HTTPException(status_code=500, detail=f"YouTube download failed: {str(e)}")
@app.get("/api/files")
async def list_files():
return load_metadata()
@app.get("/api/files/download")
async def download_file(filename: str, stream: bool = False):
metadata = load_metadata()
entry = next((item for item in metadata if item["filename"] == filename), None)
if not entry:
raise HTTPException(status_code=404, detail="File not found in metadata")
file_path = BASE_DIR / entry["file_path"]
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found on disk")
# If stream is True, we use 'inline' so the browser can play it
disposition = "inline" if stream else "attachment"
return FileResponse(
path=file_path,
filename=filename if not stream else None,
media_type=entry["type"],
content_disposition_type=disposition
)
@app.delete("/api/files")
async def delete_file(filename: str):
metadata = load_metadata()
entry_index = next((i for i, item in enumerate(metadata) if item["filename"] == filename), None)
if entry_index is None:
raise HTTPException(status_code=404, detail="File not found in metadata")
entry = metadata[entry_index]
file_path = BASE_DIR / entry["file_path"]
# Delete from filesystem
try:
if file_path.exists():
file_path.unlink()
except Exception as e:
raise HTTPException(status_code=500, detail=f"Could not delete file from disk: {str(e)}")
# Remove from metadata
metadata.pop(entry_index)
save_metadata(metadata)
return {"message": f"File {filename} deleted successfully"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)