325 lines
14 KiB
Forth
325 lines
14 KiB
Forth
module PublishHelperBot.YoutubeDl
|
|
|
|
open System
|
|
open System.Collections.Generic
|
|
open System.IO
|
|
open System.Net.Http
|
|
open System.Text
|
|
open System.Text.Json
|
|
open System.Threading.Tasks
|
|
open Microsoft.FSharp.Core
|
|
open PublishHelperBot.Environment
|
|
open PublishHelperBot.Types
|
|
|
|
type YoutubeDlClientConfig = {
|
|
BaseUrl: string
|
|
Client: HttpClient
|
|
}
|
|
|
|
[<RequireQualifiedAccess>]
|
|
module YoutubeDlClient =
|
|
type private YoutubeDlClientActions = Create | Check | Delete
|
|
|
|
type private HttpMethods = GET | POST | DELETE
|
|
|
|
type private Msg =
|
|
| CreateJob of CreateYoutubeDlJob * tcs: TaskCompletionSource<CreateJobResult>
|
|
| CheckJob of externalId: Guid * tcs: TaskCompletionSource<CheckJobResult>
|
|
| CleanJob of externalId: Guid * tcs: TaskCompletionSource<CleanJobResult>
|
|
|
|
let private getApiPrefix baseUrl = $"{baseUrl}api/"
|
|
|
|
let private resolvePath baseUrl (action: YoutubeDlClientActions) =
|
|
let apiPrefix = getApiPrefix baseUrl
|
|
match action with
|
|
| Create -> $"{apiPrefix}download"
|
|
| Check -> $"{apiPrefix}status"
|
|
| Delete -> $"{apiPrefix}clear"
|
|
|
|
let private doHttp
|
|
(method: HttpMethods)
|
|
(content: HttpContent)
|
|
(client: HttpClient)
|
|
(url: string): Async<Result<'TRes, YoutubeDlError>> =
|
|
task {
|
|
try
|
|
let! contentText = content.ReadAsStringAsync()
|
|
Logging.logger.Information("Sending request to youtube api, url = {Url}, content = {Content}", url, contentText)
|
|
let! res =
|
|
match method with
|
|
| POST -> client.PostAsync(url, content)
|
|
| GET -> client.GetAsync(url)
|
|
| DELETE -> client.DeleteAsync(url)
|
|
|
|
let! content = res.Content.ReadAsStringAsync()
|
|
Logging.logger.Information("Response from youtube api, response = {Response}", content)
|
|
return
|
|
match res.IsSuccessStatusCode with
|
|
| true ->
|
|
Logging.logger.Information("Response OK")
|
|
Ok (JsonSerializer.Deserialize<'TRes>(content))
|
|
| false ->
|
|
Logging.logger.Error("Response network error")
|
|
Error { Message = "Unknown network error" }
|
|
with ex ->
|
|
Logging.logger.Error(ex, "Youtube api error")
|
|
return Error { Message = ex.Message }
|
|
} |> Async.AwaitTask
|
|
|
|
let private createInbox(config: YoutubeDlClientConfig) = MailboxProcessor.Start(fun inbox ->
|
|
let rec loop() =
|
|
async {
|
|
match! inbox.Receive() with
|
|
| CreateJob(args, tcs) ->
|
|
Logging.logger.Information("Received youtube api create job")
|
|
async {
|
|
try
|
|
let json = args |> JsonSerializer.Serialize
|
|
Logging.logger.Information("Sending create job = {Job}", json)
|
|
use content = new StringContent(json, Encoding.UTF8, "application/json")
|
|
let! result =
|
|
(config.Client, resolvePath config.BaseUrl Create)
|
|
||> doHttp POST content
|
|
Logging.logger.Information("Received response from youtube api for create job")
|
|
tcs.SetResult(result)
|
|
|
|
with ex ->
|
|
Logging.logger.Error(ex, "Failed to create youtube api job")
|
|
tcs.SetResult(Error {
|
|
Message = ex.Message
|
|
})
|
|
|
|
} |> Async.Start
|
|
|
|
return! loop ()
|
|
|
|
| CheckJob (externalId, tcs) ->
|
|
Logging.logger.Information("Received youtube api check job, externalId = {id}", externalId)
|
|
async {
|
|
try
|
|
let arg = [KeyValuePair("id", externalId.ToString())]
|
|
use content = new FormUrlEncodedContent(arg)
|
|
Logging.logger.Information("Sending youtube api check job")
|
|
let! query = content.ReadAsStringAsync() |> Async.AwaitTask
|
|
let! result =
|
|
(config.Client, $"{resolvePath config.BaseUrl Check}?{query}")
|
|
||> doHttp GET content
|
|
Logging.logger.Information("Received response from youtube api for check job")
|
|
tcs.SetResult(result)
|
|
|
|
with ex ->
|
|
Logging.logger.Error(ex, "Failed to check youtube api job")
|
|
tcs.SetResult(Error {
|
|
Message = ex.Message
|
|
})
|
|
} |> Async.Start
|
|
|
|
return! loop ()
|
|
|
|
| CleanJob(externalId, tcs) ->
|
|
Logging.logger.Information("Received youtube api clean job, externalId = {id}", externalId)
|
|
async {
|
|
try
|
|
let arg = [KeyValuePair("id", externalId.ToString())]
|
|
use content = new FormUrlEncodedContent(arg)
|
|
Logging.logger.Information("Sending youtube api clean job")
|
|
let! query = content.ReadAsStringAsync() |> Async.AwaitTask
|
|
let! result =
|
|
(config.Client, $"{resolvePath config.BaseUrl Delete}?{query}")
|
|
||> doHttp DELETE content
|
|
Logging.logger.Information("Received response from youtube api for clean job")
|
|
tcs.SetResult(result)
|
|
|
|
with ex ->
|
|
Logging.logger.Error(ex, "Failed to clean youtube api job")
|
|
tcs.SetResult(Error {
|
|
Message = ex.Message
|
|
})
|
|
} |> Async.Start
|
|
|
|
return! loop ()
|
|
}
|
|
loop ()
|
|
)
|
|
|
|
let createClient (config: YoutubeDlClientConfig) =
|
|
let inbox = createInbox(config)
|
|
{ new IYoutubeDlClient with
|
|
member this.CreateJob(args) =
|
|
let tcs = TaskCompletionSource<_>()
|
|
inbox.Post(CreateJob(args, tcs))
|
|
tcs.Task |> Async.AwaitTask
|
|
|
|
member this.CheckJob externalId =
|
|
let tcs = TaskCompletionSource<_>()
|
|
inbox.Post(CheckJob(externalId, tcs))
|
|
tcs.Task |> Async.AwaitTask
|
|
|
|
member this.CleanJob externalId =
|
|
let tcs = TaskCompletionSource<_>()
|
|
inbox.Post(CleanJob(externalId, tcs))
|
|
tcs.Task |> Async.AwaitTask }
|
|
|
|
[<RequireQualifiedAccess>]
|
|
module YoutubeDlService =
|
|
type private Msg =
|
|
| AddJob of url: string * savePath: string * TaskCompletionSource<Guid>
|
|
| CheckJob
|
|
|
|
type private JobState =
|
|
| Created
|
|
| Awaiting of Guid
|
|
|
|
type private YoutubeDlJob = {
|
|
InternalId: Guid
|
|
State: JobState
|
|
Url: string
|
|
SavePath: string
|
|
}
|
|
|
|
let private createJob (client: IYoutubeDlClient) (job: YoutubeDlJob) =
|
|
async {
|
|
Logging.logger.Information("Sending create job to youtube client, job = {job}", job.InternalId)
|
|
let! result =
|
|
client.CreateJob {
|
|
Url = job.Url
|
|
SavePath = job.SavePath
|
|
}
|
|
match result with
|
|
| Ok task ->
|
|
Logging.logger.Information(
|
|
"Created job on youtube client = {job}, externalId = {externalId}",
|
|
job.InternalId,
|
|
task.Task)
|
|
let updated = {
|
|
job with
|
|
State = JobState.Awaiting task.Task
|
|
}
|
|
return Some updated
|
|
|
|
| Error e ->
|
|
Logging.logger.Error(
|
|
"Failed to create job on client = {job}, message = {message}",
|
|
job.InternalId,
|
|
e.Message)
|
|
return None
|
|
}
|
|
|
|
let private getCurrentJob
|
|
(jobQueue: Queue<YoutubeDlJob>)
|
|
(current: YoutubeDlJob option) =
|
|
match current with
|
|
| Some current ->
|
|
Some current
|
|
|
|
| None ->
|
|
match jobQueue.TryDequeue() with
|
|
| true, job ->
|
|
Some job
|
|
| _ ->
|
|
None
|
|
|
|
let private createServiceInbox
|
|
(youtubeDlClient: IYoutubeDlClient)
|
|
(tgService: ITgService) =
|
|
|
|
MailboxProcessor.Start(fun inbox ->
|
|
let postCheck() =
|
|
async {
|
|
do! Async.Sleep(TimeSpan.FromSeconds 5)
|
|
inbox.Post(CheckJob)
|
|
} |> Async.Start
|
|
|
|
let rec loop
|
|
(jobQueue: Queue<YoutubeDlJob>)
|
|
(current: YoutubeDlJob option) =
|
|
async {
|
|
match! inbox.Receive() with
|
|
| AddJob (url, savePath, tcs) ->
|
|
Logging.logger.Information("Adding new url = {url}", url)
|
|
let id = Guid.NewGuid()
|
|
let fullSavePath = $"{Path.Combine(savePath, Path.GetRandomFileName())}.mp4"
|
|
let job = {
|
|
InternalId = id
|
|
State = Created
|
|
Url = url
|
|
SavePath = fullSavePath
|
|
}
|
|
tcs.SetResult(id)
|
|
jobQueue.Enqueue(job)
|
|
|
|
if current.IsNone then
|
|
inbox.Post(CheckJob)
|
|
|
|
Logging.logger.Information(
|
|
"Added new job = {job}, url = {url}, path = {path}",
|
|
job.InternalId,
|
|
url,
|
|
job.SavePath)
|
|
return! loop jobQueue current
|
|
|
|
| CheckJob ->
|
|
let currentJob = getCurrentJob jobQueue current
|
|
match currentJob with
|
|
| Some job ->
|
|
Logging.logger.Information("Checking job = {job}, state = {state}", job.InternalId, job.State)
|
|
match job.State with
|
|
| Created ->
|
|
match! createJob youtubeDlClient job with
|
|
| Some job ->
|
|
postCheck()
|
|
return! loop jobQueue (Some job)
|
|
|
|
| None ->
|
|
postCheck()
|
|
return! loop jobQueue None
|
|
|
|
| Awaiting externalId ->
|
|
Logging.logger.Information("Checking job = {job}", externalId)
|
|
let! checkResult = youtubeDlClient.CheckJob(externalId)
|
|
match checkResult with
|
|
| Ok x when x.State.Equals("Finished", StringComparison.OrdinalIgnoreCase) ->
|
|
Logging.logger.Information("Sending post video from job = {job}", externalId)
|
|
let infoDict =
|
|
x.VideoInfo |> Option.bind (fun v -> v.InfoDict)
|
|
let args = {
|
|
Url = job.Url
|
|
SavePath = job.SavePath
|
|
ExternalId = externalId
|
|
Title = infoDict |> Option.bind (fun i -> i.Title)
|
|
Width = infoDict |> Option.bind (fun i -> i.Width)
|
|
Height = infoDict |> Option.bind (fun i -> i.Height)
|
|
}
|
|
tgService.PostVideo(args)
|
|
postCheck()
|
|
return! loop jobQueue None
|
|
|
|
| Error e ->
|
|
Logging.logger.Error(
|
|
"Failed to receive video from youtube client, job = {job}, message = {message}",
|
|
externalId,
|
|
e.Message)
|
|
Logging.logger.Information("Deleting file path = {path}", job.SavePath)
|
|
File.Delete(job.SavePath)
|
|
return! loop jobQueue None
|
|
|
|
| _ ->
|
|
Logging.logger.Information(
|
|
"Waiting for job to complete, job = {job}",
|
|
externalId)
|
|
postCheck()
|
|
return! loop jobQueue current
|
|
|
|
| None ->
|
|
return! loop jobQueue None
|
|
}
|
|
loop (Queue()) None
|
|
)
|
|
|
|
let createService youtubeDlClient tgService (savePath: string) =
|
|
let inbox = createServiceInbox youtubeDlClient tgService
|
|
{ new IYoutubeDlService with
|
|
member this.AddJob(url) =
|
|
let tcs = TaskCompletionSource<_>()
|
|
inbox.Post(AddJob(url, savePath, tcs))
|
|
tcs.Task |> Async.AwaitTask } |