import json import os import shutil import yt_dlp from datetime import datetime from pathlib import Path from typing import List, Optional import yt_dlp from fastapi import FastAPI, File, Form, HTTPException, UploadFile from fastapi.responses import FileResponse, JSONResponse from fastapi.staticfiles import StaticFiles app = FastAPI() # Configuration BASE_DIR = Path(__file__).parent ASSETS_DIR = BASE_DIR / "assets" IMAGES_DIR = ASSETS_DIR / "images" VIDEOS_DIR = ASSETS_DIR / "videos" METADATA_FILE = BASE_DIR / "metadata.json" STATIC_DIR = BASE_DIR / "static" # Ensure directories exist for directory in [IMAGES_DIR, VIDEOS_DIR, STATIC_DIR]: directory.mkdir(parents=True, exist_ok=True) if not METADATA_FILE.exists(): METADATA_FILE.write_text("[]") # Mount static files app.mount("/web", StaticFiles(directory=str(STATIC_DIR), html=True), name="static") def load_metadata() -> List[dict]: try: with open(METADATA_FILE, "r") as f: return json.load(f) except (json.JSONDecodeError, FileNotFoundError): return [] def save_metadata(metadata: List[dict]): with open(METADATA_FILE, "w") as f: json.dump(metadata, f, indent=4) def get_file_category(content_type: str) -> str: if content_type.startswith("image/"): return "images" elif content_type.startswith("video/"): return "videos" return "others" def sanitize_filename(filename: str) -> str: # Basic sanitization: remove path traversal attempts and keep only alphanumeric/dots/underscores return os.path.basename(filename).replace(" ", "_") @app.post("/api/files/upload") async def upload_file( file: UploadFile = File(...), description: Optional[str] = Form(None), uploader: Optional[str] = Form(None) ): category = get_file_category(file.content_type) if category == "others": raise HTTPException(status_code=400, detail="Unsupported file type. Only images and videos are allowed.") filename = sanitize_filename(file.filename) target_dir = IMAGES_DIR if category == "images" else VIDEOS_DIR file_path = target_dir / filename # Handle duplicate filenames by appending timestamp if file_path.exists(): timestamp = datetime.now().strftime("%Y%m%d%H%M%S") name, ext = os.path.splitext(filename) filename = f"{name}_{timestamp}{ext}" file_path = target_dir / filename try: with file_path.open("wb") as buffer: shutil.copyfileobj(file.file, buffer) except Exception as e: raise HTTPException(status_code=500, detail=f"Could not save file: {str(e)}") metadata_entry = { "filename": filename, "type": file.content_type, "upload_date": datetime.now().isoformat(), "file_path": str(file_path.relative_to(BASE_DIR)), "description": description, "uploader": uploader } metadata = load_metadata() metadata.append(metadata_entry) save_metadata(metadata) return metadata_entry @app.post("/api/files/youtube") async def download_youtube( url: str = Form(...), description: Optional[str] = Form(None), uploader: Optional[str] = Form(None) ): ydl_opts = { 'format': 'bestvideo+bestaudio/best', 'outtmpl': str(VIDEOS_DIR / '%(title)s.%(ext)s'), 'quiet': True, 'no_warnings': True, } try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) file_path_str = ydl.prepare_filename(info) file_path = Path(file_path_str) filename = file_path.name metadata_entry = { "filename": filename, "type": "video/mp4", # Simplified, yt-dlp might download different formats "upload_date": datetime.now().isoformat(), "file_path": str(file_path.relative_to(BASE_DIR)), "description": description or info.get('title'), "uploader": uploader or info.get('uploader') } metadata = load_metadata() metadata.append(metadata_entry) save_metadata(metadata) return metadata_entry except Exception as e: raise HTTPException(status_code=500, detail=f"YouTube download failed: {str(e)}") @app.get("/api/files") async def list_files(): return load_metadata() @app.get("/api/files/download") async def download_file(filename: str, stream: bool = False): metadata = load_metadata() entry = next((item for item in metadata if item["filename"] == filename), None) if not entry: raise HTTPException(status_code=404, detail="File not found in metadata") file_path = BASE_DIR / entry["file_path"] if not file_path.exists(): raise HTTPException(status_code=404, detail="File not found on disk") # If stream is True, we use 'inline' so the browser can play it disposition = "inline" if stream else "attachment" return FileResponse( path=file_path, filename=filename if not stream else None, media_type=entry["type"], content_disposition_type=disposition ) @app.delete("/api/files") async def delete_file(filename: str): metadata = load_metadata() entry_index = next((i for i, item in enumerate(metadata) if item["filename"] == filename), None) if entry_index is None: raise HTTPException(status_code=404, detail="File not found in metadata") entry = metadata[entry_index] file_path = BASE_DIR / entry["file_path"] # Delete from filesystem try: if file_path.exists(): file_path.unlink() except Exception as e: raise HTTPException(status_code=500, detail=f"Could not delete file from disk: {str(e)}") # Remove from metadata metadata.pop(entry_index) save_metadata(metadata) return {"message": f"File {filename} deleted successfully"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)