publishhelperbot/PublishHelperBot/YoutubeDl.fs

325 lines
No EOL
14 KiB
FSharp

module PublishHelperBot.YoutubeDl
open System
open System.Collections.Generic
open System.IO
open System.Net.Http
open System.Text
open System.Text.Json
open System.Threading.Tasks
open Microsoft.FSharp.Core
open PublishHelperBot.Environment
open PublishHelperBot.Types
type YoutubeDlClientConfig = {
BaseUrl: string
Client: HttpClient
}
[<RequireQualifiedAccess>]
module YoutubeDlClient =
type private YoutubeDlClientActions = Create | Check | Delete
type private HttpMethods = GET | POST | DELETE
type private Msg =
| CreateJob of CreateYoutubeDlJob * tcs: TaskCompletionSource<CreateJobResult>
| CheckJob of externalId: Guid * tcs: TaskCompletionSource<CheckJobResult>
| CleanJob of externalId: Guid * tcs: TaskCompletionSource<CleanJobResult>
let private getApiPrefix baseUrl = $"{baseUrl}api/"
let private resolvePath baseUrl (action: YoutubeDlClientActions) =
let apiPrefix = getApiPrefix baseUrl
match action with
| Create -> $"{apiPrefix}download"
| Check -> $"{apiPrefix}status"
| Delete -> $"{apiPrefix}clear"
let private doHttp
(method: HttpMethods)
(content: HttpContent)
(client: HttpClient)
(url: string): Async<Result<'TRes, YoutubeDlError>> =
task {
try
let! contentText = content.ReadAsStringAsync()
Logging.logger.Information("Sending request to youtube api, url = {Url}, content = {Content}", url, contentText)
let! res =
match method with
| POST -> client.PostAsync(url, content)
| GET -> client.GetAsync(url)
| DELETE -> client.DeleteAsync(url)
let! content = res.Content.ReadAsStringAsync()
Logging.logger.Information("Response from youtube api, response = {Response}", content)
return
match res.IsSuccessStatusCode with
| true ->
Logging.logger.Information("Response OK")
Ok (JsonSerializer.Deserialize<'TRes>(content))
| false ->
Logging.logger.Error("Response network error")
Error { Message = "Unknown network error" }
with ex ->
Logging.logger.Error(ex, "Youtube api error")
return Error { Message = ex.Message }
} |> Async.AwaitTask
let private createInbox(config: YoutubeDlClientConfig) = MailboxProcessor.Start(fun inbox ->
let rec loop() =
async {
match! inbox.Receive() with
| CreateJob(args, tcs) ->
Logging.logger.Information("Received youtube api create job")
async {
try
let json = args |> JsonSerializer.Serialize
Logging.logger.Information("Sending create job = {Job}", json)
use content = new StringContent(json, Encoding.UTF8, "application/json")
let! result =
(config.Client, resolvePath config.BaseUrl Create)
||> doHttp POST content
Logging.logger.Information("Received response from youtube api for create job")
tcs.SetResult(result)
with ex ->
Logging.logger.Error(ex, "Failed to create youtube api job")
tcs.SetResult(Error {
Message = ex.Message
})
} |> Async.Start
return! loop ()
| CheckJob (externalId, tcs) ->
Logging.logger.Information("Received youtube api check job, externalId = {id}", externalId)
async {
try
let arg = [KeyValuePair("id", externalId.ToString())]
use content = new FormUrlEncodedContent(arg)
Logging.logger.Information("Sending youtube api check job")
let! query = content.ReadAsStringAsync() |> Async.AwaitTask
let! result =
(config.Client, $"{resolvePath config.BaseUrl Check}?{query}")
||> doHttp GET content
Logging.logger.Information("Received response from youtube api for check job")
tcs.SetResult(result)
with ex ->
Logging.logger.Error(ex, "Failed to check youtube api job")
tcs.SetResult(Error {
Message = ex.Message
})
} |> Async.Start
return! loop ()
| CleanJob(externalId, tcs) ->
Logging.logger.Information("Received youtube api clean job, externalId = {id}", externalId)
async {
try
let arg = [KeyValuePair("id", externalId.ToString())]
use content = new FormUrlEncodedContent(arg)
Logging.logger.Information("Sending youtube api clean job")
let! query = content.ReadAsStringAsync() |> Async.AwaitTask
let! result =
(config.Client, $"{resolvePath config.BaseUrl Delete}?{query}")
||> doHttp DELETE content
Logging.logger.Information("Received response from youtube api for clean job")
tcs.SetResult(result)
with ex ->
Logging.logger.Error(ex, "Failed to clean youtube api job")
tcs.SetResult(Error {
Message = ex.Message
})
} |> Async.Start
return! loop ()
}
loop ()
)
let createClient (config: YoutubeDlClientConfig) =
let inbox = createInbox(config)
{ new IYoutubeDlClient with
member this.CreateJob(args) =
let tcs = TaskCompletionSource<_>()
inbox.Post(CreateJob(args, tcs))
tcs.Task |> Async.AwaitTask
member this.CheckJob externalId =
let tcs = TaskCompletionSource<_>()
inbox.Post(CheckJob(externalId, tcs))
tcs.Task |> Async.AwaitTask
member this.CleanJob externalId =
let tcs = TaskCompletionSource<_>()
inbox.Post(CleanJob(externalId, tcs))
tcs.Task |> Async.AwaitTask }
[<RequireQualifiedAccess>]
module YoutubeDlService =
type private Msg =
| AddJob of url: string * savePath: string * TaskCompletionSource<Guid>
| CheckJob
type private JobState =
| Created
| Awaiting of Guid
type private YoutubeDlJob = {
InternalId: Guid
State: JobState
Url: string
SavePath: string
}
let private createJob (client: IYoutubeDlClient) (job: YoutubeDlJob) =
async {
Logging.logger.Information("Sending create job to youtube client, job = {job}", job.InternalId)
let! result =
client.CreateJob {
Url = job.Url
SavePath = job.SavePath
}
match result with
| Ok task ->
Logging.logger.Information(
"Created job on youtube client = {job}, externalId = {externalId}",
job.InternalId,
task.Task)
let updated = {
job with
State = JobState.Awaiting task.Task
}
return Some updated
| Error e ->
Logging.logger.Error(
"Failed to create job on client = {job}, message = {message}",
job.InternalId,
e.Message)
return None
}
let private getCurrentJob
(jobQueue: Queue<YoutubeDlJob>)
(current: YoutubeDlJob option) =
match current with
| Some current ->
Some current
| None ->
match jobQueue.TryDequeue() with
| true, job ->
Some job
| _ ->
None
let private createServiceInbox
(youtubeDlClient: IYoutubeDlClient)
(tgService: ITgService) =
MailboxProcessor.Start(fun inbox ->
let postCheck() =
async {
do! Async.Sleep(TimeSpan.FromSeconds 5)
inbox.Post(CheckJob)
} |> Async.Start
let rec loop
(jobQueue: Queue<YoutubeDlJob>)
(current: YoutubeDlJob option) =
async {
match! inbox.Receive() with
| AddJob (url, savePath, tcs) ->
Logging.logger.Information("Adding new url = {url}", url)
let id = Guid.NewGuid()
let fullSavePath = $"{Path.Combine(savePath, Path.GetRandomFileName())}.mp4"
let job = {
InternalId = id
State = Created
Url = url
SavePath = fullSavePath
}
tcs.SetResult(id)
jobQueue.Enqueue(job)
if current.IsNone then
inbox.Post(CheckJob)
Logging.logger.Information(
"Added new job = {job}, url = {url}, path = {path}",
job.InternalId,
url,
job.SavePath)
return! loop jobQueue current
| CheckJob ->
let currentJob = getCurrentJob jobQueue current
match currentJob with
| Some job ->
Logging.logger.Information("Checking job = {job}, state = {state}", job.InternalId, job.State)
match job.State with
| Created ->
match! createJob youtubeDlClient job with
| Some job ->
postCheck()
return! loop jobQueue (Some job)
| None ->
postCheck()
return! loop jobQueue None
| Awaiting externalId ->
Logging.logger.Information("Checking job = {job}", externalId)
let! checkResult = youtubeDlClient.CheckJob(externalId)
match checkResult with
| Ok x when x.State.Equals("Finished", StringComparison.OrdinalIgnoreCase) ->
Logging.logger.Information("Sending post video from job = {job}", externalId)
let infoDict =
x.VideoInfo |> Option.bind (fun v -> v.InfoDict)
let args = {
Url = job.Url
SavePath = job.SavePath
ExternalId = externalId
Title = infoDict |> Option.bind (fun i -> i.Title)
Width = infoDict |> Option.bind (fun i -> i.Width)
Height = infoDict |> Option.bind (fun i -> i.Height)
}
tgService.PostVideo(args)
postCheck()
return! loop jobQueue None
| Error e ->
Logging.logger.Error(
"Failed to receive video from youtube client, job = {job}, message = {message}",
externalId,
e.Message)
Logging.logger.Information("Deleting file path = {path}", job.SavePath)
File.Delete(job.SavePath)
return! loop jobQueue None
| _ ->
Logging.logger.Information(
"Waiting for job to complete, job = {job}",
externalId)
postCheck()
return! loop jobQueue current
| None ->
return! loop jobQueue None
}
loop (Queue()) None
)
let createService youtubeDlClient tgService (savePath: string) =
let inbox = createServiceInbox youtubeDlClient tgService
{ new IYoutubeDlService with
member this.AddJob(url) =
let tcs = TaskCompletionSource<_>()
inbox.Post(AddJob(url, savePath, tcs))
tcs.Task |> Async.AwaitTask }