from fastapi import FastAPI, BackgroundTasks, Response from yt_dlp import YoutubeDL from uuid import uuid4, UUID from pydantic import BaseModel backgroundJobs = {} app = FastAPI() class DownloadModel(BaseModel): url: str savePath: str def report_state(id: str): def update(d): backgroundJobs[id]["state"] = d["status"] backgroundJobs[id]["_youtube-dl_"] = d return update def load_video(url: str, file_path: str, id: str): opts = { "format": 'best[height<=480][ext=mp4]', "quiet": True, "outtmpl": file_path, "progress_hooks": [report_state(id)] } with YoutubeDL(opts) as ydl: ydl.download([url]) @app.get("/api/info") def info(url: str): with YoutubeDL({}) as ydl: return ydl.extract_info(url, False) @app.post("/api/download") def download(background_tasks: BackgroundTasks, model: DownloadModel): id = uuid4() backgroundJobs[id] = {"state": "created", "url": model.url, "savePath": model.savePath} background_tasks.add_task(load_video, model.url, model.savePath, id) return {"task": id} @app.get("/api/status") def status(response: Response, id: UUID): if id in backgroundJobs.keys(): return backgroundJobs[id] response.status_code = 404 return {"state": "not found"} @app.delete("/api/clear") def clear(response: Response, id: UUID): if id in backgroundJobs.keys(): del backgroundJobs[id] return {"state": "done"} response.status_code = 404 return {"state": "not found"}