publishhelperbot/PublishHelperBot/YoutubeDl.fs

325 lines
14 KiB
Forth

module PublishHelperBot.YoutubeDl
open System
open System.Collections.Generic
open System.IO
open System.Net.Http
open System.Text
open System.Text.Json
open System.Threading.Tasks
open Microsoft.FSharp.Core
open PublishHelperBot.Environment
open PublishHelperBot.Types
type YoutubeDlClientConfig = {
BaseUrl: string
Client: HttpClient
}
[<RequireQualifiedAccess>]
module YoutubeDlClient =
type private YoutubeDlClientActions = Create | Check | Delete
type private HttpMethods = GET | POST | DELETE
type private Msg =
| CreateJob of CreateYoutubeDlJob * tcs: TaskCompletionSource<CreateJobResult>
| CheckJob of externalId: Guid * tcs: TaskCompletionSource<CheckJobResult>
| CleanJob of externalId: Guid * tcs: TaskCompletionSource<CleanJobResult>
let private getApiPrefix baseUrl = $"{baseUrl}api/"
let private resolvePath baseUrl (action: YoutubeDlClientActions) =
let apiPrefix = getApiPrefix baseUrl
match action with
| Create -> $"{apiPrefix}download"
| Check -> $"{apiPrefix}status"
| Delete -> $"{apiPrefix}clear"
let private doHttp
(method: HttpMethods)
(content: HttpContent)
(client: HttpClient)
(url: string): Async<Result<'TRes, YoutubeDlError>> =
task {
try
let! contentText = content.ReadAsStringAsync()
Logging.logger.Information("Sending request to youtube api, url = {Url}, content = {Content}", url, contentText)
let! res =
match method with
| POST -> client.PostAsync(url, content)
| GET -> client.GetAsync(url)
| DELETE -> client.DeleteAsync(url)
let! content = res.Content.ReadAsStringAsync()
Logging.logger.Information("Response from youtube api, response = {Response}", content)
return
match res.IsSuccessStatusCode with
| true ->
Logging.logger.Information("Response OK")
Ok (JsonSerializer.Deserialize<'TRes>(content))
| false ->
Logging.logger.Error("Response network error")
Error { Message = "Unknown network error" }
with ex ->
Logging.logger.Error(ex, "Youtube api error")
return Error { Message = ex.Message }
} |> Async.AwaitTask
let private createInbox(config: YoutubeDlClientConfig) = MailboxProcessor.Start(fun inbox ->
let rec loop() =
async {
match! inbox.Receive() with
| CreateJob(args, tcs) ->
Logging.logger.Information("Received youtube api create job")
async {
try
let json = args |> JsonSerializer.Serialize
Logging.logger.Information("Sending create job = {Job}", json)
use content = new StringContent(json, Encoding.UTF8, "application/json")
let! result =
(config.Client, resolvePath config.BaseUrl Create)
||> doHttp POST content
Logging.logger.Information("Received response from youtube api for create job")
tcs.SetResult(result)
with ex ->
Logging.logger.Error(ex, "Failed to create youtube api job")
tcs.SetResult(Error {
Message = ex.Message
})
} |> Async.Start
return! loop ()
| CheckJob (externalId, tcs) ->
Logging.logger.Information("Received youtube api check job, externalId = {id}", externalId)
async {
try
let arg = [KeyValuePair("id", externalId.ToString())]
use content = new FormUrlEncodedContent(arg)
Logging.logger.Information("Sending youtube api check job")
let! query = content.ReadAsStringAsync() |> Async.AwaitTask
let! result =
(config.Client, $"{resolvePath config.BaseUrl Check}?{query}")
||> doHttp GET content
Logging.logger.Information("Received response from youtube api for check job")
tcs.SetResult(result)
with ex ->
Logging.logger.Error(ex, "Failed to check youtube api job")
tcs.SetResult(Error {
Message = ex.Message
})
} |> Async.Start
return! loop ()
| CleanJob(externalId, tcs) ->
Logging.logger.Information("Received youtube api clean job, externalId = {id}", externalId)
async {
try
let arg = [KeyValuePair("id", externalId.ToString())]
use content = new FormUrlEncodedContent(arg)
Logging.logger.Information("Sending youtube api clean job")
let! query = content.ReadAsStringAsync() |> Async.AwaitTask
let! result =
(config.Client, $"{resolvePath config.BaseUrl Delete}?{query}")
||> doHttp DELETE content
Logging.logger.Information("Received response from youtube api for clean job")
tcs.SetResult(result)
with ex ->
Logging.logger.Error(ex, "Failed to clean youtube api job")
tcs.SetResult(Error {
Message = ex.Message
})
} |> Async.Start
return! loop ()
}
loop ()
)
let createClient (config: YoutubeDlClientConfig) =
let inbox = createInbox(config)
{ new IYoutubeDlClient with
member this.CreateJob(args) =
let tcs = TaskCompletionSource<_>()
inbox.Post(CreateJob(args, tcs))
tcs.Task |> Async.AwaitTask
member this.CheckJob externalId =
let tcs = TaskCompletionSource<_>()
inbox.Post(CheckJob(externalId, tcs))
tcs.Task |> Async.AwaitTask
member this.CleanJob externalId =
let tcs = TaskCompletionSource<_>()
inbox.Post(CleanJob(externalId, tcs))
tcs.Task |> Async.AwaitTask }
[<RequireQualifiedAccess>]
module YoutubeDlService =
type private Msg =
| AddJob of url: string * savePath: string * TaskCompletionSource<Guid>
| CheckJob
type private JobState =
| Created
| Awaiting of Guid
type private YoutubeDlJob = {
InternalId: Guid
State: JobState
Url: string
SavePath: string
}
let private createJob (client: IYoutubeDlClient) (job: YoutubeDlJob) =
async {
Logging.logger.Information("Sending create job to youtube client, job = {job}", job.InternalId)
let! result =
client.CreateJob {
Url = job.Url
SavePath = job.SavePath
}
match result with
| Ok task ->
Logging.logger.Information(
"Created job on youtube client = {job}, externalId = {externalId}",
job.InternalId,
task.Task)
let updated = {
job with
State = JobState.Awaiting task.Task
}
return Some updated
| Error e ->
Logging.logger.Error(
"Failed to create job on client = {job}, message = {message}",
job.InternalId,
e.Message)
return None
}
let private getCurrentJob
(jobQueue: Queue<YoutubeDlJob>)
(current: YoutubeDlJob option) =
match current with
| Some current ->
Some current
| None ->
match jobQueue.TryDequeue() with
| true, job ->
Some job
| _ ->
None
let private createServiceInbox
(youtubeDlClient: IYoutubeDlClient)
(tgService: ITgService) =
MailboxProcessor.Start(fun inbox ->
let postCheck() =
async {
do! Async.Sleep(TimeSpan.FromSeconds 5)
inbox.Post(CheckJob)
} |> Async.Start
let rec loop
(jobQueue: Queue<YoutubeDlJob>)
(current: YoutubeDlJob option) =
async {
match! inbox.Receive() with
| AddJob (url, savePath, tcs) ->
Logging.logger.Information("Adding new url = {url}", url)
let id = Guid.NewGuid()
let fullSavePath = $"{Path.Combine(savePath, Path.GetRandomFileName())}.mp4"
let job = {
InternalId = id
State = Created
Url = url
SavePath = fullSavePath
}
tcs.SetResult(id)
jobQueue.Enqueue(job)
if current.IsNone then
inbox.Post(CheckJob)
Logging.logger.Information(
"Added new job = {job}, url = {url}, path = {path}",
job.InternalId,
url,
job.SavePath)
return! loop jobQueue current
| CheckJob ->
let currentJob = getCurrentJob jobQueue current
match currentJob with
| Some job ->
Logging.logger.Information("Checking job = {job}, state = {state}", job.InternalId, job.State)
match job.State with
| Created ->
match! createJob youtubeDlClient job with
| Some job ->
postCheck()
return! loop jobQueue (Some job)
| None ->
postCheck()
return! loop jobQueue None
| Awaiting externalId ->
Logging.logger.Information("Checking job = {job}", externalId)
let! checkResult = youtubeDlClient.CheckJob(externalId)
match checkResult with
| Ok x when x.State.Equals("Finished", StringComparison.OrdinalIgnoreCase) ->
Logging.logger.Information("Sending post video from job = {job}", externalId)
let infoDict =
x.VideoInfo |> Option.bind (fun v -> v.InfoDict)
let args = {
Url = job.Url
SavePath = job.SavePath
ExternalId = externalId
Title = infoDict |> Option.bind (fun i -> i.Title)
Width = infoDict |> Option.bind (fun i -> i.Width)
Height = infoDict |> Option.bind (fun i -> i.Height)
}
tgService.PostVideo(args)
postCheck()
return! loop jobQueue None
| Error e ->
Logging.logger.Error(
"Failed to receive video from youtube client, job = {job}, message = {message}",
externalId,
e.Message)
Logging.logger.Information("Deleting file path = {path}", job.SavePath)
File.Delete(job.SavePath)
return! loop jobQueue None
| _ ->
Logging.logger.Information(
"Waiting for job to complete, job = {job}",
externalId)
postCheck()
return! loop jobQueue current
| None ->
return! loop jobQueue None
}
loop (Queue()) None
)
let createService youtubeDlClient tgService (savePath: string) =
let inbox = createServiceInbox youtubeDlClient tgService
{ new IYoutubeDlService with
member this.AddJob(url) =
let tcs = TaskCompletionSource<_>()
inbox.Post(AddJob(url, savePath, tcs))
tcs.Task |> Async.AwaitTask }