publishhelperbot/youtube-dl-api/youtube_dl_api/__init__.py
2023-02-26 20:08:42 +04:00

67 lines
1.6 KiB
Python

from fastapi import FastAPI, BackgroundTasks, Response
from yt_dlp import YoutubeDL
from uuid import uuid4, UUID
from pydantic import BaseModel
backgroundJobs = {}
app = FastAPI()
class DownloadModel(BaseModel):
url: str
savePath: str
def report_state(id: str):
def update(d):
backgroundJobs[id]["state"] = d["status"]
backgroundJobs[id]["_youtube-dl_"] = d
return update
def load_video(url: str, file_path: str, id: str):
try:
opts = {
"format": 'best[height<=480][ext=mp4]/bestvideo+bestaudio[ext=mp4]',
"quiet": True,
"outtmpl": file_path,
"progress_hooks": [report_state(id)]
}
with YoutubeDL(opts) as ydl:
ydl.download([url])
except Exception:
del backgroundJobs[id]
@app.get("/api/info")
def info(url: str):
with YoutubeDL({}) as ydl:
return ydl.extract_info(url, False)
@app.post("/api/download")
def download(background_tasks: BackgroundTasks, model: DownloadModel):
id = uuid4()
backgroundJobs[id] = {"state": "created", "url": model.url, "savePath": model.savePath}
background_tasks.add_task(load_video, model.url, model.savePath, id)
return {"task": id}
@app.get("/api/status")
def status(response: Response, id: UUID):
if id in backgroundJobs.keys():
return backgroundJobs[id]
response.status_code = 404
return {"state": "not found"}
@app.delete("/api/clear")
def clear(response: Response, id: UUID):
if id in backgroundJobs.keys():
del backgroundJobs[id]
return {"state": "done"}
response.status_code = 404
return {"state": "not found"}