module PublishHelperBot.YoutubeDl open System open System.Collections.Generic open System.IO open System.Net.Http open System.Text open System.Text.Json open System.Threading.Tasks open Microsoft.FSharp.Core open PublishHelperBot.Environment open PublishHelperBot.Types type YoutubeDlClientConfig = { BaseUrl: string Client: HttpClient } [] module YoutubeDlClient = type private YoutubeDlClientActions = Create | Check | Delete type private HttpMethods = GET | POST | DELETE type private Msg = | CreateJob of CreateYoutubeDlJob * tcs: TaskCompletionSource | CheckJob of externalId: Guid * tcs: TaskCompletionSource | CleanJob of externalId: Guid * tcs: TaskCompletionSource let private getApiPrefix baseUrl = $"{baseUrl}api/" let private resolvePath baseUrl (action: YoutubeDlClientActions) = let apiPrefix = getApiPrefix baseUrl match action with | Create -> $"{apiPrefix}download" | Check -> $"{apiPrefix}status" | Delete -> $"{apiPrefix}clear" let private doHttp (method: HttpMethods) (content: HttpContent) (client: HttpClient) (url: string): Async> = task { try let! contentText = content.ReadAsStringAsync() Logging.logger.Information("Sending request to youtube api, url = {Url}, content = {Content}", url, contentText) let! res = match method with | POST -> client.PostAsync(url, content) | GET -> client.GetAsync(url) | DELETE -> client.DeleteAsync(url) let! content = res.Content.ReadAsStringAsync() Logging.logger.Information("Response from youtube api, response = {Response}", content) return match res.IsSuccessStatusCode with | true -> Logging.logger.Information("Response OK") Ok (JsonSerializer.Deserialize<'TRes>(content)) | false -> Logging.logger.Error("Response network error") Error { Message = "Unknown network error" } with ex -> Logging.logger.Error(ex, "Youtube api error") return Error { Message = ex.Message } } |> Async.AwaitTask let private createInbox(config: YoutubeDlClientConfig) = MailboxProcessor.Start(fun inbox -> let rec loop() = async { match! inbox.Receive() with | CreateJob(args, tcs) -> Logging.logger.Information("Received youtube api create job") async { try let json = args |> JsonSerializer.Serialize Logging.logger.Information("Sending create job = {Job}", json) use content = new StringContent(json, Encoding.UTF8, "application/json") let! result = (config.Client, resolvePath config.BaseUrl Create) ||> doHttp POST content Logging.logger.Information("Received response from youtube api for create job") tcs.SetResult(result) with ex -> Logging.logger.Error(ex, "Failed to create youtube api job") tcs.SetResult(Error { Message = ex.Message }) } |> Async.Start return! loop () | CheckJob (externalId, tcs) -> Logging.logger.Information("Received youtube api check job, externalId = {id}", externalId) async { try let arg = [KeyValuePair("id", externalId.ToString())] use content = new FormUrlEncodedContent(arg) Logging.logger.Information("Sending youtube api check job") let! query = content.ReadAsStringAsync() |> Async.AwaitTask let! result = (config.Client, $"{resolvePath config.BaseUrl Check}?{query}") ||> doHttp GET content Logging.logger.Information("Received response from youtube api for check job") tcs.SetResult(result) with ex -> Logging.logger.Error(ex, "Failed to check youtube api job") tcs.SetResult(Error { Message = ex.Message }) } |> Async.Start return! loop () | CleanJob(externalId, tcs) -> Logging.logger.Information("Received youtube api clean job, externalId = {id}", externalId) async { try let arg = [KeyValuePair("id", externalId.ToString())] use content = new FormUrlEncodedContent(arg) Logging.logger.Information("Sending youtube api clean job") let! query = content.ReadAsStringAsync() |> Async.AwaitTask let! result = (config.Client, $"{resolvePath config.BaseUrl Delete}?{query}") ||> doHttp DELETE content Logging.logger.Information("Received response from youtube api for clean job") tcs.SetResult(result) with ex -> Logging.logger.Error(ex, "Failed to clean youtube api job") tcs.SetResult(Error { Message = ex.Message }) } |> Async.Start return! loop () } loop () ) let createClient (config: YoutubeDlClientConfig) = let inbox = createInbox(config) { new IYoutubeDlClient with member this.CreateJob(args) = let tcs = TaskCompletionSource<_>() inbox.Post(CreateJob(args, tcs)) tcs.Task |> Async.AwaitTask member this.CheckJob externalId = let tcs = TaskCompletionSource<_>() inbox.Post(CheckJob(externalId, tcs)) tcs.Task |> Async.AwaitTask member this.CleanJob externalId = let tcs = TaskCompletionSource<_>() inbox.Post(CleanJob(externalId, tcs)) tcs.Task |> Async.AwaitTask } [] module YoutubeDlService = type private Msg = | AddJob of url: string * TaskCompletionSource | CheckJob type private JobState = | Created | Awaiting of Guid type private YoutubeDlJob = { InternalId: Guid State: JobState Url: string SavePath: string } let private createJob (client: IYoutubeDlClient) (job: YoutubeDlJob) = async { Logging.logger.Information("Sending create job to youtube client, job = {job}", job.InternalId) let! result = client.CreateJob { Url = job.Url SavePath = job.SavePath } match result with | Ok task -> Logging.logger.Information( "Created job on youtube client = {job}, externalId = {externalId}", job.InternalId, task.Task) let updated = { job with State = JobState.Awaiting task.Task } return Some updated | Error e -> Logging.logger.Error( "Failed to create job on client = {job}, message = {message}", job.InternalId, e.Message) return None } let private getCurrentJob (jobQueue: Queue) (current: YoutubeDlJob option) = match current with | Some current -> Some current | None -> match jobQueue.TryDequeue() with | true, job -> Some job | _ -> None let private createServiceInbox (youtubeDlClient: IYoutubeDlClient) (tgService: ITgService) = MailboxProcessor.Start(fun inbox -> let postCheck() = async { do! Async.Sleep(TimeSpan.FromSeconds 5) inbox.Post(CheckJob) } |> Async.Start let rec loop (jobQueue: Queue) (current: YoutubeDlJob option) = async { match! inbox.Receive() with | AddJob (url, tcs) -> Logging.logger.Information("Adding new url = {url}", url) let id = Guid.NewGuid() let job = { InternalId = id State = Created Url = url SavePath = $"{Path.GetTempFileName()}.mp4" } tcs.SetResult(id) jobQueue.Enqueue(job) if current.IsNone then inbox.Post(CheckJob) Logging.logger.Information( "Added new job = {job}, url = {url}, path = {path}", job.InternalId, url, job.SavePath) return! loop jobQueue current | CheckJob -> let currentJob = getCurrentJob jobQueue current match currentJob with | Some job -> Logging.logger.Information("Checking job = {job}, state = {state}", job.InternalId, job.State) match job.State with | Created -> match! createJob youtubeDlClient job with | Some job -> postCheck() return! loop jobQueue (Some job) | None -> postCheck() return! loop jobQueue None | Awaiting externalId -> Logging.logger.Information("Checking job = {job}", externalId) let! checkResult = youtubeDlClient.CheckJob(externalId) match checkResult with | Ok x when x.State.Equals("Finished", StringComparison.OrdinalIgnoreCase) -> Logging.logger.Information("Sending post video from job = {job}", externalId) let infoDict = x.VideoInfo |> Option.bind (fun v -> v.InfoDict) let args = { Url = job.Url SavePath = job.SavePath ExternalId = externalId Title = infoDict |> Option.bind (fun i -> i.Title) Width = infoDict |> Option.bind (fun i -> i.Width) Height = infoDict |> Option.bind (fun i -> i.Height) } tgService.PostVideo(args) postCheck() return! loop jobQueue None | Error e -> Logging.logger.Error( "Failed to receive video from youtube client, job = {job}, message = {message}", externalId, e.Message) Logging.logger.Information("Deleting file path = {path}", job.SavePath) File.Delete(job.SavePath) return! loop jobQueue None | _ -> Logging.logger.Information( "Waiting for job to complete, job = {job}", externalId) postCheck() return! loop jobQueue current | None -> return! loop jobQueue None } loop (Queue()) None ) let createService youtubeDlClient tgService = let inbox = createServiceInbox youtubeDlClient tgService { new IYoutubeDlService with member this.AddJob(url) = let tcs = TaskCompletionSource<_>() inbox.Post(AddJob(url, tcs)) tcs.Task |> Async.AwaitTask }