From e921d15e04f998e2e10e7e43c90ec676ca7a0aca Mon Sep 17 00:00:00 2001
From: Vladislav Khapin <vkhapin@accesssoftek.com>
Date: Fri, 24 Feb 2023 07:19:45 +0400
Subject: [PATCH] move logic to mailboxes

---
 PublishHelperBot/Environment.fs           |  33 +-
 PublishHelperBot/Handlers.fs              |  39 +-
 PublishHelperBot/Program.fs               |  72 ++-
 PublishHelperBot/PublishHelperBot.fsproj  |   2 +-
 PublishHelperBot/YoutubeDl.fs             | 523 +++++++++++++++-------
 youtube-dl-api/youtube_dl_api/__init__.py |   3 +-
 6 files changed, 467 insertions(+), 205 deletions(-)

diff --git a/PublishHelperBot/Environment.fs b/PublishHelperBot/Environment.fs
index 0f3ec47..b0468cc 100644
--- a/PublishHelperBot/Environment.fs
+++ b/PublishHelperBot/Environment.fs
@@ -3,19 +3,30 @@ module PublishHelperBot.Environment
 open System
 open System.IO
 open Newtonsoft.Json
+open Serilog
+
+[<RequireQualifiedAccess>]
+module Logging =
+    let logger =
+        let config = LoggerConfiguration()
+        config.WriteTo.Console().CreateLogger()
 
 type public BotConfig = {
-  token: string
-  relayUrl: string
-  chanelId: int64
-  adminChatId: int64
-  YoutubeDlUrl: string
+    token: string
+    relayUrl: string
+    chanelId: int64
+    adminChatId: int64
+    YoutubeDlUrl: string
 }
 
-let private ReadConfig =
-  File.ReadAllText >> JsonConvert.DeserializeObject<BotConfig>
+let private readConfig =
+    File.ReadAllText >> JsonConvert.DeserializeObject<BotConfig>
 
-let public CreateConfig (name: string) =
-  match Environment.GetEnvironmentVariable(name) with
-  | null ->  raise <| ApplicationException("Missing config path env")
-  | path  -> ReadConfig <| path
\ No newline at end of file
+let public createConfig (name: string) =
+    match Environment.GetEnvironmentVariable(name) with
+    | null ->
+          Logging.logger.Error("Missing env")
+          ApplicationException("Missing config path env") |> raise
+    | path ->
+          Logging.logger.Information("Read config from env")
+          readConfig path
\ No newline at end of file
diff --git a/PublishHelperBot/Handlers.fs b/PublishHelperBot/Handlers.fs
index 3baedac..a0167de 100644
--- a/PublishHelperBot/Handlers.fs
+++ b/PublishHelperBot/Handlers.fs
@@ -9,22 +9,35 @@ open Telegram.Bot.Types
 open Telegram.Bot.Types.Enums
 
 type BaseHandlerArgs = Update * BotConfig
+
 type HandlerArgs = Update * BotConfig * ITelegramBotClient
+
 type HandlerRequirements = BaseHandlerArgs -> bool
+
 type Handler = HandlerArgs -> Task
+
 type Handler<'deps> = 'deps * HandlerArgs -> Task
 
 // Utils
 let UpdateIsAMessage (x: Update) = x.Type = UpdateType.Message
+
 let FromAdminChat (x: Message, c: BotConfig) = x.Chat.Id = c.adminChatId
-let HasReply (x: Message) = not(isNull x.ReplyToMessage) 
+
+let HasReply (x: Message) = not(isNull x.ReplyToMessage)
+
 let ExtractPhotoFromMessage (x: Message) = Array.map (fun (p: PhotoSize) -> p.FileId) x.Photo
+
 let HasText (x: Message) = not(isNull x.Text)
+
 let UrlsAsAlbumInputMedia (urls: string[]): IAlbumInputMedia[] =
   Array.map (fun (x: string) -> InputMediaPhoto(x)) urls
 
 // Post (Relay) command
-type RelayCaptionMode = WithAuthor | Anonymous | Unknown
+type RelayCaptionMode =
+    | WithAuthor
+    | Anonymous
+    | Unknown
+
 let RelaySupportedContent (x: Message) =
   match x.Type with
   | MessageType.Text -> true
@@ -53,15 +66,15 @@ let public RelayMatch: HandlerRequirements = fun (u, c) ->
   RelaySupportedContent u.Message.ReplyToMessage &&
   not (RelayCaptionType u.Message.Text = RelayCaptionMode.Unknown)
 
-let public RelayHandler: Handler = fun (u, c, tg) ->
-  let reply = u.Message.ReplyToMessage
-  let channelId = c.chanelId
+let public RelayHandler: Handler = fun (update, config, tg) ->
+  let reply = update.Message.ReplyToMessage
+  let channelId = config.chanelId
   let author = $"{reply.From.FirstName} {reply.From.LastName}"
-  let captionMode = RelayCaptionType u.Message.Text
-  
+  let captionMode = RelayCaptionType update.Message.Text
+
   let photoMedia = lazy Array.get (ExtractPhotoFromMessage reply) 0
-  let caption = lazy RelayResolveCaption(captionMode, author, c.relayUrl)
-  
+  let caption = lazy RelayResolveCaption(captionMode, author, config.relayUrl)
+
   match reply.Type with
   | MessageType.Text -> tg.ForwardMessageAsync(channelId, reply.Chat.Id, reply.MessageId)
   | MessageType.Photo -> tg.SendPhotoAsync(channelId, photoMedia.Value, caption = caption.Value,
@@ -81,11 +94,9 @@ let public YoutubeRepostMatch: HandlerRequirements = fun (u, c) ->
   u.Message.Text.StartsWith YoutubeRepostMatchCmd &&
   u.Message.Text.Split(' ').Length = 2
 
-let public YoutubeRepostHandler: Handler<YoutubeDlBackgroundService> = fun (yt, (u, c, tg)) ->
+let public YoutubeRepostHandler: Handler<IYoutubeDlService> = fun (yt, (u, c, tg)) ->
   task {
       let trim (x: string) = x.Trim()
-      let! id = YoutubeRepostMatchCmd |> u.Message.Text.Split |> Array.last |> trim |> yt.EnqueueJob
+      let! id = YoutubeRepostMatchCmd |> u.Message.Text.Split |> Array.last |> trim |> yt.AddJob
       do! tg.SendTextMessageAsync(c.adminChatId, id.ToString()) |> Async.AwaitTask |> Async.Ignore
-  }
-
-  
\ No newline at end of file
+  }
\ No newline at end of file
diff --git a/PublishHelperBot/Program.fs b/PublishHelperBot/Program.fs
index 9e4d39e..a80148d 100644
--- a/PublishHelperBot/Program.fs
+++ b/PublishHelperBot/Program.fs
@@ -1,6 +1,4 @@
-// For more information see https://aka.ms/fsharp-console-apps
-
-open System
+open System
 open System.Net.Http
 open System.Threading
 open System.Threading.Tasks
@@ -12,31 +10,57 @@ open Telegram.Bot.Polling
 open Telegram.Bot.Types
 open Telegram.Bot.Types.Enums
 
-let CreateBot (config: BotConfig, http: HttpClient) = TelegramBotClient(config.token, http)
-let config = CreateConfig <| "SBPB_CONFIG_PATH";
-let botClient = CreateBot <| (config, new HttpClient())
-let YtService = YoutubeDlBackgroundService <|
-                (new HttpClient(), config.YoutubeDlUrl, botClient, config.chanelId, CancellationToken.None)
+let createBot (config: BotConfig, http: HttpClient) = TelegramBotClient(config.token, http)
+
+let config = createConfig "SBPB_CONFIG_PATH"
+
+let botClient = createBot (config, new HttpClient())
+
+let youtubeDlService =
+    let youtubeDlClient =
+        YoutubeDlClient.createClient {
+            Client = new HttpClient()
+            BaseUrl = config.YoutubeDlUrl
+        }
+
+    let tgService =
+        TgService.createService config.chanelId youtubeDlClient botClient
+
+    YoutubeDlService.createService youtubeDlClient tgService
 
 let startDate = DateTime.UtcNow
-let isObsoleteUpdate (u: Update) = u.Type = UpdateType.Message && u.Message.Date < startDate;
 
-let updateHandle (bc: ITelegramBotClient) (u: Update) (ct: CancellationToken): Task =
-  let tgCtx = (u, config, bc)
-  match u with
-  | _ when isObsoleteUpdate u -> Task.CompletedTask
-  | _ when RelayMatch <| (u, config) -> RelayHandler <| tgCtx
-  | _ when YoutubeRepostMatch <| (u, config) -> YoutubeRepostHandler <| (YtService, tgCtx)
-  | _ -> Task.CompletedTask
+let (|ObsoleteUpdate|RelayMatchUpdate|YoutubeRepostMatchUpdate|SkipUpdate|) (update: Update) =
+    let isObsoleteUpdate (update: Update) =
+        update.Type = UpdateType.Message && update.Message.Date < startDate
+    match update with
+    | _ when isObsoleteUpdate update -> ObsoleteUpdate
+    | _ when RelayMatch (update, config) -> RelayMatchUpdate
+    | _ when YoutubeRepostMatch (update, config) -> YoutubeRepostMatchUpdate
+    | _ -> SkipUpdate
 
-let handlePollingError (bc: ITelegramBotClient) (e: Exception) (t: CancellationToken) =
-  printfn $"{e.Message}\n{e.StackTrace}"
-  Task.CompletedTask
-  
-let receiverOptions = ReceiverOptions(AllowedUpdates = Array.zeroCreate<UpdateType> 0) 
+let updateHandle (bc: ITelegramBotClient) (update: Update) (ct: CancellationToken): Task =
+    let tgCtx = (update, config, bc)
+    match update with
+    | RelayMatchUpdate ->
+        Logging.logger.Information("RelayMatchUpdate")
+        RelayHandler tgCtx
+    | YoutubeRepostMatchUpdate ->
+        YoutubeRepostHandler <| (youtubeDlService, tgCtx)
+    | ObsoleteUpdate ->
+        Logging.logger.Information("Skipping obsolete update")
+        Task.CompletedTask
+    | SkipUpdate ->
+        Logging.logger.Information("Skipping update")
+        Task.CompletedTask
 
-botClient.StartReceiving(updateHandle,handlePollingError,receiverOptions)
-YtService.StartYoutubeDlService()
+let handlePollingError (_: ITelegramBotClient) (e: Exception) (_: CancellationToken) =
+    Logging.logger.Error(e, "Polling error")
+    Task.CompletedTask
 
-printf "Я родился"
+let receiverOptions = ReceiverOptions(AllowedUpdates = Array.zeroCreate<UpdateType> 0)
+
+Logging.logger.Information("Starting bot")
+botClient.StartReceiving(updateHandle, handlePollingError, receiverOptions)
+Logging.logger.Information("Я родился")
 Console.ReadKey() |> ignore
\ No newline at end of file
diff --git a/PublishHelperBot/PublishHelperBot.fsproj b/PublishHelperBot/PublishHelperBot.fsproj
index 78fdc6f..091d7e5 100644
--- a/PublishHelperBot/PublishHelperBot.fsproj
+++ b/PublishHelperBot/PublishHelperBot.fsproj
@@ -6,8 +6,8 @@
     </PropertyGroup>
 
     <ItemGroup>
+        <PackageReference Include="Serilog.Sinks.Console" Version="4.1.0" />
         <PackageReference Include="Telegram.Bot" Version="18.0.0" />
-        <PackageReference Include="Nito.AsyncEx" Version="5.1.2" />
     </ItemGroup>
     
     <ItemGroup>
diff --git a/PublishHelperBot/YoutubeDl.fs b/PublishHelperBot/YoutubeDl.fs
index fb91dfa..626b23f 100644
--- a/PublishHelperBot/YoutubeDl.fs
+++ b/PublishHelperBot/YoutubeDl.fs
@@ -5,173 +5,388 @@ open System.Collections.Generic
 open System.IO
 open System.Net.Http
 open System.Text
-open System.Threading
+open System.Threading.Tasks
 open Microsoft.FSharp.Core
 open Newtonsoft.Json
-open Nito.AsyncEx
+open PublishHelperBot.Environment
 open Telegram.Bot
 open Telegram.Bot.Types.InputFiles
 
-type public CreateYoutubeDLUrl = string
-type public ChatId = int64
-type public CreateYoutubeDLJob = {
-      url: string
-      savePath: string
-}
-type public CreateYoutubeDLJobSuccess = {
-  task: Guid
+type ChatId = int64
+
+type CreateYoutubeDlJob = {
+    url: string
+    savePath: string
 }
 
-type public YoutubeDlStateResponse = {
-  state: string
+type CreateYoutubeDlJobSuccess = {
+    task: Guid
 }
 
-type public YoutubeDlError = {
-  message: string
-}
-type public YoutubeDlJob<'A> = {
-  internalId: Guid
-  externalId: 'A
-  url: string
-  state: string
-  savePath: string
+type YoutubeDlStateResponse = {
+    state: string
 }
 
-type YoutubeDlClientActions = Create | Check | Delete
-type HttpMethods = GET | POST | DELETE
-type CreateJobResult = Result<CreateYoutubeDLJobSuccess, YoutubeDlError> Async
-type CheckJobResult = Result<YoutubeDlStateResponse, YoutubeDlError> Async
-type CleanJobResult = Result<YoutubeDlStateResponse, YoutubeDlError> Async
-type StartYoutubeDlServiceArgs = HttpClient * CreateYoutubeDLUrl * ITelegramBotClient * ChatId * CancellationToken
-type YoutubeDlJobWithId = YoutubeDlJob<Guid>
-type YoutubeDlJobWithoutId = YoutubeDlJob<unit>
-type YoutubeDlCurrentJob =
-  | Created of YoutubeDlJobWithoutId
-  | Awaiting of YoutubeDlJobWithId
-  | Downloaded of YoutubeDlJobWithId
-  | Done of YoutubeDlJobWithId
-  | None of unit
-  
-let inline (<!>) (lck: AsyncLock) f = async {
-  use! __ = Async.AwaitTask <| lck.LockAsync().AsTask()
-  return! f
+type YoutubeDlError = {
+    message: string
 }
 
-type YoutubeDlClient(baseUrl: string, client: HttpClient) =
-  let lock = AsyncLock()
-  let apiPrefix = $"{baseUrl}api/";
-  let ResolvePath(action: YoutubeDlClientActions) =
-    match action with
-    | Create -> $"{apiPrefix}download"
-    | Check -> $"{apiPrefix}status"
-    | Delete -> $"{apiPrefix}clear"
-  
-  let doHttp (url: string, method: HttpMethods, content: HttpContent): Result<'TRes, YoutubeDlError> Async = async {
-    try
-      let! res =
-        match method with
-        | POST -> client.PostAsync(url, content) |> Async.AwaitTask
-        | GET -> client.GetAsync(url) |> Async.AwaitTask
-        | DELETE -> client.DeleteAsync(url) |> Async.AwaitTask
-        
-      let! content = res.Content.ReadAsStringAsync() |> Async.AwaitTask
-      return
-        match res.IsSuccessStatusCode with
-        | true -> Ok (JsonConvert.DeserializeObject<'TRes> <| content)
-        | false -> Error { message = "Unknown network error" }
-    with
-      | ex -> return Error { message = ex.Message  }
-  }
-  
-  member this.CreateJob(model: CreateYoutubeDLJob): CreateJobResult = lock <!> async {
-      use content = new StringContent(JsonConvert.SerializeObject <| model, Encoding.UTF8, "application/json")
-      return! doHttp <| (ResolvePath Create, POST, content)
-  }
-  
-  member this.CheckJob(id: Guid): CheckJobResult = lock <!> async {
-    let arg = [KeyValuePair("id", id.ToString())]
-    use content = new FormUrlEncodedContent(arg)
-    let! query = content.ReadAsStringAsync() |> Async.AwaitTask
-    return! doHttp <| ($"{ResolvePath Check}?{query}", GET, content)
-  }
-  
-  member this.CleanJob(id: Guid): CleanJobResult = lock <!> async {
-    let arg = [KeyValuePair("id", id.ToString())]
-    use content = new FormUrlEncodedContent(arg)
-    let! query = content.ReadAsStringAsync() |> Async.AwaitTask
-    return! doHttp <| ($"{ResolvePath Delete}?{query}", DELETE, content)
-  }
+type YoutubeDlClientConfig = {
+    BaseUrl: string
+    Client: HttpClient
+}
 
-type YoutubeDlBackgroundService(requirements: StartYoutubeDlServiceArgs) =
-  let (http, url, tg, chatId, ct) = requirements
-  let lock = AsyncLock()
-  let mutable currentJob: YoutubeDlCurrentJob = None ()
-  let jobPool = Queue<YoutubeDlJobWithoutId>()
-  let ytClient = YoutubeDlClient <| (url, http)
-  let mapJobToApi (job: YoutubeDlJob<_>): CreateYoutubeDLJob = {
-    url = job.url
-    savePath = job.savePath
-  }
-  let attachExternalId (id: Guid, job: YoutubeDlJobWithoutId): YoutubeDlCurrentJob =
-    Awaiting { internalId = job.internalId; state = job.state; url = job.url; externalId = id; savePath = job.savePath }
-  
-  let tryAssignNewJob() = async {
-    let (result, job) = jobPool.TryDequeue()
-    match result with
-    | true -> currentJob <- Created job
-    | false -> currentJob <- None ()
-  }
-    
-  let uploadToYtDl(job: YoutubeDlJobWithoutId) = async {
-        match! ytClient.CreateJob <| mapJobToApi job with
-        | Ok x -> currentJob <- attachExternalId <| (x.task, job)
-        // TODO: Logging!
-        | Error _ -> currentJob <- None ()
-  }
+type CreateJobResult = Result<CreateYoutubeDlJobSuccess, YoutubeDlError>
 
-  let checkJob(job: YoutubeDlJobWithId) = async {
-    match! ytClient.CheckJob <| job.externalId with
-    | Ok x when x.state.Equals("Finished", StringComparison.OrdinalIgnoreCase) -> currentJob <- Downloaded job
-    | Error _ -> currentJob <- None ()
-    | _ -> ()
-    // That's take a while
-    do! Async.Sleep 5000
-  }
-  
-  let postVideo(job: YoutubeDlJobWithId) = async {
-    use file = File.OpenRead <| job.savePath
-    let input = InputOnlineFile(file, job.savePath)
-    let caption = $"Source: {job.url}"
-    do! tg.SendVideoAsync(chatId, input, caption = caption) |> Async.AwaitTask |> Async.Ignore
-    currentJob <- Done job
-  }
-  
-  let cleanUp(job: YoutubeDlJobWithId) = async {
-    File.Delete <| job.savePath
-    match! ytClient.CleanJob <| job.externalId with
-    | Ok _ -> currentJob <- None ()
-    | Error _ -> currentJob <- None ()
-  }
-  
-  let chooseAction() = lock <!> async {
-      do! match currentJob with
-          | None _ -> tryAssignNewJob()
-          | Created x -> x |> uploadToYtDl
-          | Awaiting x -> x |> checkJob
-          | Downloaded x -> x |> postVideo
-          | Done x -> x |> cleanUp
-  }
+type CheckJobResult = Result<YoutubeDlStateResponse, YoutubeDlError>
 
-  let rec loop () = async {
-    do! match ct.IsCancellationRequested with
-        | false -> chooseAction()
-        | true -> async { () }
-    do! Async.Sleep 150
-    return! loop()
-  }
-  member public this.StartYoutubeDlService() = loop() |> Async.Start
-  member public this.EnqueueJob(url: string) = lock <!> async {
-    let id = Guid.NewGuid()
-    jobPool.Enqueue({ internalId = id; externalId = (); state = "new"; url = url; savePath = $"{Path.GetTempFileName()}.mp4" })
-    return id
-  }
\ No newline at end of file
+type CleanJobResult = Result<YoutubeDlStateResponse, YoutubeDlError>
+
+type IYoutubeDlClient =
+    abstract member CreateJob: CreateYoutubeDlJob -> Async<CreateJobResult>
+    abstract member CheckJob: externalId: Guid -> Async<CheckJobResult>
+    abstract member CleanJob: externalId: Guid -> Async<CleanJobResult>
+
+[<RequireQualifiedAccess>]
+module YoutubeDlClient =
+
+    type private YoutubeDlClientActions = Create | Check | Delete
+
+    type private HttpMethods = GET | POST | DELETE
+
+    type private Msg =
+        | CreateJob of CreateYoutubeDlJob * tcs: TaskCompletionSource<CreateJobResult>
+        | CheckJob of externalId: Guid * tcs: TaskCompletionSource<CheckJobResult>
+        | CleanJob of externalId: Guid * tcs: TaskCompletionSource<CleanJobResult>
+
+    let private getApiPrefix baseUrl = $"{baseUrl}api/"
+
+    let private resolvePath baseUrl (action: YoutubeDlClientActions) =
+        let apiPrefix = getApiPrefix baseUrl
+        match action with
+        | Create -> $"{apiPrefix}download"
+        | Check -> $"{apiPrefix}status"
+        | Delete -> $"{apiPrefix}clear"
+
+    let private doHttp
+        (method: HttpMethods)
+        (content: HttpContent)
+        (client: HttpClient)
+        (url: string): Async<Result<'TRes, YoutubeDlError>> =
+        task {
+            try
+                let! contentText = content.ReadAsStringAsync()
+                Logging.logger.Information("Sending request to youtube api, url = {Url}, content = {Content}", url, contentText)
+                let! res =
+                    match method with
+                    | POST -> client.PostAsync(url, content)
+                    | GET -> client.GetAsync(url)
+                    | DELETE -> client.DeleteAsync(url)
+
+                let! content = res.Content.ReadAsStringAsync()
+                Logging.logger.Information("Response from youtube api, response = {Response}", content)
+                return
+                    match res.IsSuccessStatusCode with
+                    | true ->
+                        Logging.logger.Information("Response OK")
+                        Ok (JsonConvert.DeserializeObject<'TRes>(content))
+                    | false ->
+                        Logging.logger.Error("Response network error")
+                        Error { message = "Unknown network error" }
+            with ex ->
+                Logging.logger.Error(ex, "Youtube api error")
+                return Error { message = ex.Message  }
+        } |> Async.AwaitTask
+
+    let private createInbox(config: YoutubeDlClientConfig) = MailboxProcessor.Start(fun inbox ->
+        let rec loop() =
+            async {
+                match! inbox.Receive() with
+                | CreateJob(args, tcs) ->
+                    Logging.logger.Information("Received youtube api create job")
+                    async {
+                        try
+                            let json = args |> JsonConvert.SerializeObject
+                            Logging.logger.Information("Sending create job = {Job}", json)
+                            use content = new StringContent(json, Encoding.UTF8, "application/json")
+                            let! result =
+                                (config.Client, resolvePath config.BaseUrl Create)
+                                ||> doHttp POST content
+                            Logging.logger.Information("Received response from youtube api for create job")
+                            tcs.SetResult(result)
+
+                        with ex ->
+                            Logging.logger.Error(ex, "Failed to create youtube api job")
+                            tcs.SetResult(Error {
+                                message = ex.Message
+                            })
+
+                    } |> Async.Start
+
+                    return! loop ()
+
+                | CheckJob (externalId, tcs) ->
+                    Logging.logger.Information("Received youtube api check job, externalId = {id}", externalId)
+                    async {
+                        try
+                            let arg = [KeyValuePair("id", externalId.ToString())]
+                            use content = new FormUrlEncodedContent(arg)
+                            Logging.logger.Information("Sending youtube api check job")
+                            let! query = content.ReadAsStringAsync() |> Async.AwaitTask
+                            let! result =
+                                (config.Client, $"{resolvePath config.BaseUrl Check}?{query}")
+                                ||> doHttp GET content
+                            Logging.logger.Information("Received response from youtube api for check job")
+                            tcs.SetResult(result)
+
+                        with ex ->
+                            Logging.logger.Error(ex, "Failed to check youtube api job")
+                            tcs.SetResult(Error {
+                                message = ex.Message
+                            })
+                    } |> Async.Start
+
+                    return! loop ()
+
+                | CleanJob(externalId, tcs) ->
+                    Logging.logger.Information("Received youtube api clean job, externalId = {id}", externalId)
+                    async {
+                        try
+                            let arg = [KeyValuePair("id", externalId.ToString())]
+                            use content = new FormUrlEncodedContent(arg)
+                            Logging.logger.Information("Sending youtube api clean job")
+                            let! query = content.ReadAsStringAsync() |> Async.AwaitTask
+                            let! result =
+                                (config.Client, $"{resolvePath config.BaseUrl Delete}?{query}")
+                                ||> doHttp DELETE content
+                            Logging.logger.Information("Received response from youtube api for clean job")
+                            tcs.SetResult(result)
+
+                        with ex ->
+                            Logging.logger.Error(ex, "Failed to clean youtube api job")
+                            tcs.SetResult(Error {
+                                message = ex.Message
+                            })
+                    } |> Async.Start
+
+                    return! loop ()
+            }
+        loop ()
+    )
+
+    let createClient (config: YoutubeDlClientConfig) =
+        let inbox = createInbox(config)
+        { new IYoutubeDlClient with
+            member this.CreateJob(args) =
+                let tcs = TaskCompletionSource<_>()
+                inbox.Post(CreateJob(args, tcs))
+                tcs.Task |> Async.AwaitTask
+
+            member this.CheckJob externalId =
+                let tcs = TaskCompletionSource<_>()
+                inbox.Post(CheckJob(externalId, tcs))
+                tcs.Task |> Async.AwaitTask
+
+            member this.CleanJob externalId =
+                let tcs = TaskCompletionSource<_>()
+                inbox.Post(CleanJob(externalId, tcs))
+                tcs.Task |> Async.AwaitTask }
+
+type ITgService =
+    abstract member PostVideo: url: string * savePath: string * externalId: Guid -> unit
+
+[<RequireQualifiedAccess>]
+module TgService =
+    type private Msg =
+        | PostVideo of url: string * savePath: string * externalId: Guid
+
+    let private createInbox (channelId: ChatId) (youtubeDlClient: IYoutubeDlClient) (tg: ITelegramBotClient) =
+        MailboxProcessor.Start(fun inbox ->
+            let rec loop () =
+                async {
+                    match! inbox.Receive() with
+                    | PostVideo (url, savePath, externalId) ->
+                        try
+                            try
+                                Logging.logger.Information("Reading file path = {path}", savePath)
+                                use file = File.OpenRead(savePath)
+                                let input = InputOnlineFile(file, savePath)
+                                let caption = $"Source: {url}"
+                                Logging.logger.Information(
+                                    "Sending video to channel, channelId = {channelId}, caption = {caption}",
+                                    channelId,
+                                    caption)
+                                do! tg.SendVideoAsync(channelId, input, caption = caption) |> Async.AwaitTask |> Async.Ignore
+                            with ex ->
+                                Logging.logger.Error(ex, "Failed to send video")
+                        finally
+                            Logging.logger.Information("Deleting file path = {path}", savePath)
+                            File.Delete(savePath)
+
+                        match! youtubeDlClient.CleanJob(externalId) with
+                        | Ok _ -> ()
+                        | Error _ -> ()
+                        return! loop ()
+                }
+            loop ()
+        )
+
+    let createService chatId youtubeDlClient tg =
+        let inbox = createInbox chatId youtubeDlClient tg
+        { new ITgService with
+            member this.PostVideo(url, savePath, externalId) =
+                inbox.Post(PostVideo(url, savePath, externalId)) }
+
+type IYoutubeDlService =
+    abstract member AddJob: url: string -> Async<Guid>
+
+[<RequireQualifiedAccess>]
+module YoutubeDlService =
+    type private Msg =
+        | AddJob of url: string * TaskCompletionSource<Guid>
+        | CheckJob
+
+    type private JobState =
+        | Created
+        | Awaiting of Guid
+
+    type private YoutubeDlJob = {
+        InternalId: Guid
+        State: JobState
+        Url: string
+        SavePath: string
+    }
+
+    let private createJob (client: IYoutubeDlClient) (job: YoutubeDlJob) =
+        async {
+            Logging.logger.Information("Sending create job to youtube client, job = {job}", job.InternalId)
+            let! result =
+                client.CreateJob {
+                    url = job.Url
+                    savePath = job.SavePath
+                }
+            match result with
+            | Ok task ->
+                Logging.logger.Information(
+                    "Created job on youtube client = {job}, externalId = {externalId}",
+                    job.InternalId,
+                    task.task)
+                let updated = {
+                    job with
+                        State = JobState.Awaiting task.task
+                }
+                return Some updated
+
+            | Error e ->
+                Logging.logger.Error(
+                    "Failed to create job on client = {job}, message = {message}",
+                    job.InternalId,
+                    e.message)
+                return None
+        }
+
+    let private getCurrentJob
+        (jobQueue: Queue<YoutubeDlJob>)
+        (current: YoutubeDlJob option) =
+        match current with
+        | Some current ->
+            Some current
+
+        | None ->
+            match jobQueue.TryDequeue() with
+            | true, job ->
+                Some job
+            | _ ->
+                None
+
+    let private createServiceInbox
+        (youtubeDlClient: IYoutubeDlClient)
+        (tgService: ITgService) =
+
+        MailboxProcessor.Start(fun inbox ->
+            let postCheck() =
+                async {
+                    do! Async.Sleep(TimeSpan.FromSeconds 5)
+                    inbox.Post(CheckJob)
+                } |> Async.Start
+
+            let rec loop
+                (jobQueue: Queue<YoutubeDlJob>)
+                (current: YoutubeDlJob option) =
+                async {
+                    match! inbox.Receive() with
+                    | AddJob (url, tcs) ->
+                        Logging.logger.Information("Adding new url = {url}", url)
+                        let id = Guid.NewGuid()
+                        let job = {
+                            InternalId = id
+                            State = Created
+                            Url = url
+                            SavePath = $"{Path.GetTempFileName()}.mp4"
+                        }
+                        tcs.SetResult(id)
+                        jobQueue.Enqueue(job)
+
+                        if current.IsNone then
+                            inbox.Post(CheckJob)
+
+                        Logging.logger.Information(
+                            "Added new job = {job}, url = {url}, path = {path}",
+                            job.InternalId,
+                            url,
+                            job.SavePath)
+                        return! loop jobQueue current
+
+                    | CheckJob ->
+                        let currentJob = getCurrentJob jobQueue current
+                        match currentJob with
+                        | Some job ->
+                            Logging.logger.Information("Checking job = {job}, state = {state}", job.InternalId, job.State)
+                            match job.State with
+                            | Created ->
+                                match! createJob youtubeDlClient job with
+                                | Some job ->
+                                    postCheck()
+                                    return! loop jobQueue (Some job)
+
+                                | None ->
+                                    postCheck()
+                                    return! loop jobQueue None
+
+                            | Awaiting externalId ->
+                                Logging.logger.Information("Checking job = {job}", externalId)
+                                let! task = youtubeDlClient.CheckJob(externalId)
+                                match task with
+                                | Ok x when x.state.Equals("Finished", StringComparison.OrdinalIgnoreCase) ->
+                                    Logging.logger.Information("Sending post video from job = {job}", externalId)
+                                    tgService.PostVideo(job.Url, job.SavePath, externalId)
+                                    return! loop jobQueue None
+
+                                | Error e ->
+                                    Logging.logger.Error(
+                                        "Failed to receive video from youtube client, job = {job}, message = {message}",
+                                        externalId,
+                                        e.message)
+                                    File.Delete(job.SavePath)
+                                    return! loop jobQueue None
+
+                                | _ ->
+                                    Logging.logger.Information(
+                                        "Waiting for job to complete, job = {job}",
+                                        externalId)
+                                    postCheck()
+                                    return! loop jobQueue current
+
+                        | None ->
+                            return! loop jobQueue None
+                }
+            loop (Queue()) None
+        )
+
+    let createService youtubeDlClient tgService =
+        let inbox = createServiceInbox youtubeDlClient tgService
+        { new IYoutubeDlService with
+            member this.AddJob(url) =
+                let tcs = TaskCompletionSource<_>()
+                inbox.Post(AddJob(url, tcs))
+                tcs.Task |> Async.AwaitTask}
\ No newline at end of file
diff --git a/youtube-dl-api/youtube_dl_api/__init__.py b/youtube-dl-api/youtube_dl_api/__init__.py
index 0189496..b4e0025 100644
--- a/youtube-dl-api/youtube_dl_api/__init__.py
+++ b/youtube-dl-api/youtube_dl_api/__init__.py
@@ -21,7 +21,8 @@ def report_state(id: str):
 
 def load_video(url: str, file_path: str, id: str):
     opts = {
-        "format": 'mp4',
+        "recode-video": "mp4",
+        "format": 'best[filesize<50M]',
         "quiet": True,
         "outtmpl": file_path,
         "progress_hooks": [report_state(id)]