diff --git a/PublishHelperBot/Environment.fs b/PublishHelperBot/Environment.fs index 0f3ec47..b0468cc 100644 --- a/PublishHelperBot/Environment.fs +++ b/PublishHelperBot/Environment.fs @@ -3,19 +3,30 @@ module PublishHelperBot.Environment open System open System.IO open Newtonsoft.Json +open Serilog + +[] +module Logging = + let logger = + let config = LoggerConfiguration() + config.WriteTo.Console().CreateLogger() type public BotConfig = { - token: string - relayUrl: string - chanelId: int64 - adminChatId: int64 - YoutubeDlUrl: string + token: string + relayUrl: string + chanelId: int64 + adminChatId: int64 + YoutubeDlUrl: string } -let private ReadConfig = - File.ReadAllText >> JsonConvert.DeserializeObject +let private readConfig = + File.ReadAllText >> JsonConvert.DeserializeObject -let public CreateConfig (name: string) = - match Environment.GetEnvironmentVariable(name) with - | null -> raise <| ApplicationException("Missing config path env") - | path -> ReadConfig <| path \ No newline at end of file +let public createConfig (name: string) = + match Environment.GetEnvironmentVariable(name) with + | null -> + Logging.logger.Error("Missing env") + ApplicationException("Missing config path env") |> raise + | path -> + Logging.logger.Information("Read config from env") + readConfig path \ No newline at end of file diff --git a/PublishHelperBot/Handlers.fs b/PublishHelperBot/Handlers.fs index 3baedac..a0167de 100644 --- a/PublishHelperBot/Handlers.fs +++ b/PublishHelperBot/Handlers.fs @@ -9,22 +9,35 @@ open Telegram.Bot.Types open Telegram.Bot.Types.Enums type BaseHandlerArgs = Update * BotConfig + type HandlerArgs = Update * BotConfig * ITelegramBotClient + type HandlerRequirements = BaseHandlerArgs -> bool + type Handler = HandlerArgs -> Task + type Handler<'deps> = 'deps * HandlerArgs -> Task // Utils let UpdateIsAMessage (x: Update) = x.Type = UpdateType.Message + let FromAdminChat (x: Message, c: BotConfig) = x.Chat.Id = c.adminChatId -let HasReply (x: Message) = not(isNull x.ReplyToMessage) + +let HasReply (x: Message) = not(isNull x.ReplyToMessage) + let ExtractPhotoFromMessage (x: Message) = Array.map (fun (p: PhotoSize) -> p.FileId) x.Photo + let HasText (x: Message) = not(isNull x.Text) + let UrlsAsAlbumInputMedia (urls: string[]): IAlbumInputMedia[] = Array.map (fun (x: string) -> InputMediaPhoto(x)) urls // Post (Relay) command -type RelayCaptionMode = WithAuthor | Anonymous | Unknown +type RelayCaptionMode = + | WithAuthor + | Anonymous + | Unknown + let RelaySupportedContent (x: Message) = match x.Type with | MessageType.Text -> true @@ -53,15 +66,15 @@ let public RelayMatch: HandlerRequirements = fun (u, c) -> RelaySupportedContent u.Message.ReplyToMessage && not (RelayCaptionType u.Message.Text = RelayCaptionMode.Unknown) -let public RelayHandler: Handler = fun (u, c, tg) -> - let reply = u.Message.ReplyToMessage - let channelId = c.chanelId +let public RelayHandler: Handler = fun (update, config, tg) -> + let reply = update.Message.ReplyToMessage + let channelId = config.chanelId let author = $"{reply.From.FirstName} {reply.From.LastName}" - let captionMode = RelayCaptionType u.Message.Text - + let captionMode = RelayCaptionType update.Message.Text + let photoMedia = lazy Array.get (ExtractPhotoFromMessage reply) 0 - let caption = lazy RelayResolveCaption(captionMode, author, c.relayUrl) - + let caption = lazy RelayResolveCaption(captionMode, author, config.relayUrl) + match reply.Type with | MessageType.Text -> tg.ForwardMessageAsync(channelId, reply.Chat.Id, reply.MessageId) | MessageType.Photo -> tg.SendPhotoAsync(channelId, photoMedia.Value, caption = caption.Value, @@ -81,11 +94,9 @@ let public YoutubeRepostMatch: HandlerRequirements = fun (u, c) -> u.Message.Text.StartsWith YoutubeRepostMatchCmd && u.Message.Text.Split(' ').Length = 2 -let public YoutubeRepostHandler: Handler = fun (yt, (u, c, tg)) -> +let public YoutubeRepostHandler: Handler = fun (yt, (u, c, tg)) -> task { let trim (x: string) = x.Trim() - let! id = YoutubeRepostMatchCmd |> u.Message.Text.Split |> Array.last |> trim |> yt.EnqueueJob + let! id = YoutubeRepostMatchCmd |> u.Message.Text.Split |> Array.last |> trim |> yt.AddJob do! tg.SendTextMessageAsync(c.adminChatId, id.ToString()) |> Async.AwaitTask |> Async.Ignore - } - - \ No newline at end of file + } \ No newline at end of file diff --git a/PublishHelperBot/Program.fs b/PublishHelperBot/Program.fs index 9e4d39e..a80148d 100644 --- a/PublishHelperBot/Program.fs +++ b/PublishHelperBot/Program.fs @@ -1,6 +1,4 @@ -// For more information see https://aka.ms/fsharp-console-apps - -open System +open System open System.Net.Http open System.Threading open System.Threading.Tasks @@ -12,31 +10,57 @@ open Telegram.Bot.Polling open Telegram.Bot.Types open Telegram.Bot.Types.Enums -let CreateBot (config: BotConfig, http: HttpClient) = TelegramBotClient(config.token, http) -let config = CreateConfig <| "SBPB_CONFIG_PATH"; -let botClient = CreateBot <| (config, new HttpClient()) -let YtService = YoutubeDlBackgroundService <| - (new HttpClient(), config.YoutubeDlUrl, botClient, config.chanelId, CancellationToken.None) +let createBot (config: BotConfig, http: HttpClient) = TelegramBotClient(config.token, http) + +let config = createConfig "SBPB_CONFIG_PATH" + +let botClient = createBot (config, new HttpClient()) + +let youtubeDlService = + let youtubeDlClient = + YoutubeDlClient.createClient { + Client = new HttpClient() + BaseUrl = config.YoutubeDlUrl + } + + let tgService = + TgService.createService config.chanelId youtubeDlClient botClient + + YoutubeDlService.createService youtubeDlClient tgService let startDate = DateTime.UtcNow -let isObsoleteUpdate (u: Update) = u.Type = UpdateType.Message && u.Message.Date < startDate; -let updateHandle (bc: ITelegramBotClient) (u: Update) (ct: CancellationToken): Task = - let tgCtx = (u, config, bc) - match u with - | _ when isObsoleteUpdate u -> Task.CompletedTask - | _ when RelayMatch <| (u, config) -> RelayHandler <| tgCtx - | _ when YoutubeRepostMatch <| (u, config) -> YoutubeRepostHandler <| (YtService, tgCtx) - | _ -> Task.CompletedTask +let (|ObsoleteUpdate|RelayMatchUpdate|YoutubeRepostMatchUpdate|SkipUpdate|) (update: Update) = + let isObsoleteUpdate (update: Update) = + update.Type = UpdateType.Message && update.Message.Date < startDate + match update with + | _ when isObsoleteUpdate update -> ObsoleteUpdate + | _ when RelayMatch (update, config) -> RelayMatchUpdate + | _ when YoutubeRepostMatch (update, config) -> YoutubeRepostMatchUpdate + | _ -> SkipUpdate -let handlePollingError (bc: ITelegramBotClient) (e: Exception) (t: CancellationToken) = - printfn $"{e.Message}\n{e.StackTrace}" - Task.CompletedTask - -let receiverOptions = ReceiverOptions(AllowedUpdates = Array.zeroCreate 0) +let updateHandle (bc: ITelegramBotClient) (update: Update) (ct: CancellationToken): Task = + let tgCtx = (update, config, bc) + match update with + | RelayMatchUpdate -> + Logging.logger.Information("RelayMatchUpdate") + RelayHandler tgCtx + | YoutubeRepostMatchUpdate -> + YoutubeRepostHandler <| (youtubeDlService, tgCtx) + | ObsoleteUpdate -> + Logging.logger.Information("Skipping obsolete update") + Task.CompletedTask + | SkipUpdate -> + Logging.logger.Information("Skipping update") + Task.CompletedTask -botClient.StartReceiving(updateHandle,handlePollingError,receiverOptions) -YtService.StartYoutubeDlService() +let handlePollingError (_: ITelegramBotClient) (e: Exception) (_: CancellationToken) = + Logging.logger.Error(e, "Polling error") + Task.CompletedTask -printf "Я родился" +let receiverOptions = ReceiverOptions(AllowedUpdates = Array.zeroCreate 0) + +Logging.logger.Information("Starting bot") +botClient.StartReceiving(updateHandle, handlePollingError, receiverOptions) +Logging.logger.Information("Я родился") Console.ReadKey() |> ignore \ No newline at end of file diff --git a/PublishHelperBot/PublishHelperBot.fsproj b/PublishHelperBot/PublishHelperBot.fsproj index 78fdc6f..091d7e5 100644 --- a/PublishHelperBot/PublishHelperBot.fsproj +++ b/PublishHelperBot/PublishHelperBot.fsproj @@ -6,8 +6,8 @@ + - diff --git a/PublishHelperBot/YoutubeDl.fs b/PublishHelperBot/YoutubeDl.fs index fb91dfa..626b23f 100644 --- a/PublishHelperBot/YoutubeDl.fs +++ b/PublishHelperBot/YoutubeDl.fs @@ -5,173 +5,388 @@ open System.Collections.Generic open System.IO open System.Net.Http open System.Text -open System.Threading +open System.Threading.Tasks open Microsoft.FSharp.Core open Newtonsoft.Json -open Nito.AsyncEx +open PublishHelperBot.Environment open Telegram.Bot open Telegram.Bot.Types.InputFiles -type public CreateYoutubeDLUrl = string -type public ChatId = int64 -type public CreateYoutubeDLJob = { - url: string - savePath: string -} -type public CreateYoutubeDLJobSuccess = { - task: Guid +type ChatId = int64 + +type CreateYoutubeDlJob = { + url: string + savePath: string } -type public YoutubeDlStateResponse = { - state: string +type CreateYoutubeDlJobSuccess = { + task: Guid } -type public YoutubeDlError = { - message: string -} -type public YoutubeDlJob<'A> = { - internalId: Guid - externalId: 'A - url: string - state: string - savePath: string +type YoutubeDlStateResponse = { + state: string } -type YoutubeDlClientActions = Create | Check | Delete -type HttpMethods = GET | POST | DELETE -type CreateJobResult = Result Async -type CheckJobResult = Result Async -type CleanJobResult = Result Async -type StartYoutubeDlServiceArgs = HttpClient * CreateYoutubeDLUrl * ITelegramBotClient * ChatId * CancellationToken -type YoutubeDlJobWithId = YoutubeDlJob -type YoutubeDlJobWithoutId = YoutubeDlJob -type YoutubeDlCurrentJob = - | Created of YoutubeDlJobWithoutId - | Awaiting of YoutubeDlJobWithId - | Downloaded of YoutubeDlJobWithId - | Done of YoutubeDlJobWithId - | None of unit - -let inline () (lck: AsyncLock) f = async { - use! __ = Async.AwaitTask <| lck.LockAsync().AsTask() - return! f +type YoutubeDlError = { + message: string } -type YoutubeDlClient(baseUrl: string, client: HttpClient) = - let lock = AsyncLock() - let apiPrefix = $"{baseUrl}api/"; - let ResolvePath(action: YoutubeDlClientActions) = - match action with - | Create -> $"{apiPrefix}download" - | Check -> $"{apiPrefix}status" - | Delete -> $"{apiPrefix}clear" - - let doHttp (url: string, method: HttpMethods, content: HttpContent): Result<'TRes, YoutubeDlError> Async = async { - try - let! res = - match method with - | POST -> client.PostAsync(url, content) |> Async.AwaitTask - | GET -> client.GetAsync(url) |> Async.AwaitTask - | DELETE -> client.DeleteAsync(url) |> Async.AwaitTask - - let! content = res.Content.ReadAsStringAsync() |> Async.AwaitTask - return - match res.IsSuccessStatusCode with - | true -> Ok (JsonConvert.DeserializeObject<'TRes> <| content) - | false -> Error { message = "Unknown network error" } - with - | ex -> return Error { message = ex.Message } - } - - member this.CreateJob(model: CreateYoutubeDLJob): CreateJobResult = lock async { - use content = new StringContent(JsonConvert.SerializeObject <| model, Encoding.UTF8, "application/json") - return! doHttp <| (ResolvePath Create, POST, content) - } - - member this.CheckJob(id: Guid): CheckJobResult = lock async { - let arg = [KeyValuePair("id", id.ToString())] - use content = new FormUrlEncodedContent(arg) - let! query = content.ReadAsStringAsync() |> Async.AwaitTask - return! doHttp <| ($"{ResolvePath Check}?{query}", GET, content) - } - - member this.CleanJob(id: Guid): CleanJobResult = lock async { - let arg = [KeyValuePair("id", id.ToString())] - use content = new FormUrlEncodedContent(arg) - let! query = content.ReadAsStringAsync() |> Async.AwaitTask - return! doHttp <| ($"{ResolvePath Delete}?{query}", DELETE, content) - } +type YoutubeDlClientConfig = { + BaseUrl: string + Client: HttpClient +} -type YoutubeDlBackgroundService(requirements: StartYoutubeDlServiceArgs) = - let (http, url, tg, chatId, ct) = requirements - let lock = AsyncLock() - let mutable currentJob: YoutubeDlCurrentJob = None () - let jobPool = Queue() - let ytClient = YoutubeDlClient <| (url, http) - let mapJobToApi (job: YoutubeDlJob<_>): CreateYoutubeDLJob = { - url = job.url - savePath = job.savePath - } - let attachExternalId (id: Guid, job: YoutubeDlJobWithoutId): YoutubeDlCurrentJob = - Awaiting { internalId = job.internalId; state = job.state; url = job.url; externalId = id; savePath = job.savePath } - - let tryAssignNewJob() = async { - let (result, job) = jobPool.TryDequeue() - match result with - | true -> currentJob <- Created job - | false -> currentJob <- None () - } - - let uploadToYtDl(job: YoutubeDlJobWithoutId) = async { - match! ytClient.CreateJob <| mapJobToApi job with - | Ok x -> currentJob <- attachExternalId <| (x.task, job) - // TODO: Logging! - | Error _ -> currentJob <- None () - } +type CreateJobResult = Result - let checkJob(job: YoutubeDlJobWithId) = async { - match! ytClient.CheckJob <| job.externalId with - | Ok x when x.state.Equals("Finished", StringComparison.OrdinalIgnoreCase) -> currentJob <- Downloaded job - | Error _ -> currentJob <- None () - | _ -> () - // That's take a while - do! Async.Sleep 5000 - } - - let postVideo(job: YoutubeDlJobWithId) = async { - use file = File.OpenRead <| job.savePath - let input = InputOnlineFile(file, job.savePath) - let caption = $"Source: {job.url}" - do! tg.SendVideoAsync(chatId, input, caption = caption) |> Async.AwaitTask |> Async.Ignore - currentJob <- Done job - } - - let cleanUp(job: YoutubeDlJobWithId) = async { - File.Delete <| job.savePath - match! ytClient.CleanJob <| job.externalId with - | Ok _ -> currentJob <- None () - | Error _ -> currentJob <- None () - } - - let chooseAction() = lock async { - do! match currentJob with - | None _ -> tryAssignNewJob() - | Created x -> x |> uploadToYtDl - | Awaiting x -> x |> checkJob - | Downloaded x -> x |> postVideo - | Done x -> x |> cleanUp - } +type CheckJobResult = Result - let rec loop () = async { - do! match ct.IsCancellationRequested with - | false -> chooseAction() - | true -> async { () } - do! Async.Sleep 150 - return! loop() - } - member public this.StartYoutubeDlService() = loop() |> Async.Start - member public this.EnqueueJob(url: string) = lock async { - let id = Guid.NewGuid() - jobPool.Enqueue({ internalId = id; externalId = (); state = "new"; url = url; savePath = $"{Path.GetTempFileName()}.mp4" }) - return id - } \ No newline at end of file +type CleanJobResult = Result + +type IYoutubeDlClient = + abstract member CreateJob: CreateYoutubeDlJob -> Async + abstract member CheckJob: externalId: Guid -> Async + abstract member CleanJob: externalId: Guid -> Async + +[] +module YoutubeDlClient = + + type private YoutubeDlClientActions = Create | Check | Delete + + type private HttpMethods = GET | POST | DELETE + + type private Msg = + | CreateJob of CreateYoutubeDlJob * tcs: TaskCompletionSource + | CheckJob of externalId: Guid * tcs: TaskCompletionSource + | CleanJob of externalId: Guid * tcs: TaskCompletionSource + + let private getApiPrefix baseUrl = $"{baseUrl}api/" + + let private resolvePath baseUrl (action: YoutubeDlClientActions) = + let apiPrefix = getApiPrefix baseUrl + match action with + | Create -> $"{apiPrefix}download" + | Check -> $"{apiPrefix}status" + | Delete -> $"{apiPrefix}clear" + + let private doHttp + (method: HttpMethods) + (content: HttpContent) + (client: HttpClient) + (url: string): Async> = + task { + try + let! contentText = content.ReadAsStringAsync() + Logging.logger.Information("Sending request to youtube api, url = {Url}, content = {Content}", url, contentText) + let! res = + match method with + | POST -> client.PostAsync(url, content) + | GET -> client.GetAsync(url) + | DELETE -> client.DeleteAsync(url) + + let! content = res.Content.ReadAsStringAsync() + Logging.logger.Information("Response from youtube api, response = {Response}", content) + return + match res.IsSuccessStatusCode with + | true -> + Logging.logger.Information("Response OK") + Ok (JsonConvert.DeserializeObject<'TRes>(content)) + | false -> + Logging.logger.Error("Response network error") + Error { message = "Unknown network error" } + with ex -> + Logging.logger.Error(ex, "Youtube api error") + return Error { message = ex.Message } + } |> Async.AwaitTask + + let private createInbox(config: YoutubeDlClientConfig) = MailboxProcessor.Start(fun inbox -> + let rec loop() = + async { + match! inbox.Receive() with + | CreateJob(args, tcs) -> + Logging.logger.Information("Received youtube api create job") + async { + try + let json = args |> JsonConvert.SerializeObject + Logging.logger.Information("Sending create job = {Job}", json) + use content = new StringContent(json, Encoding.UTF8, "application/json") + let! result = + (config.Client, resolvePath config.BaseUrl Create) + ||> doHttp POST content + Logging.logger.Information("Received response from youtube api for create job") + tcs.SetResult(result) + + with ex -> + Logging.logger.Error(ex, "Failed to create youtube api job") + tcs.SetResult(Error { + message = ex.Message + }) + + } |> Async.Start + + return! loop () + + | CheckJob (externalId, tcs) -> + Logging.logger.Information("Received youtube api check job, externalId = {id}", externalId) + async { + try + let arg = [KeyValuePair("id", externalId.ToString())] + use content = new FormUrlEncodedContent(arg) + Logging.logger.Information("Sending youtube api check job") + let! query = content.ReadAsStringAsync() |> Async.AwaitTask + let! result = + (config.Client, $"{resolvePath config.BaseUrl Check}?{query}") + ||> doHttp GET content + Logging.logger.Information("Received response from youtube api for check job") + tcs.SetResult(result) + + with ex -> + Logging.logger.Error(ex, "Failed to check youtube api job") + tcs.SetResult(Error { + message = ex.Message + }) + } |> Async.Start + + return! loop () + + | CleanJob(externalId, tcs) -> + Logging.logger.Information("Received youtube api clean job, externalId = {id}", externalId) + async { + try + let arg = [KeyValuePair("id", externalId.ToString())] + use content = new FormUrlEncodedContent(arg) + Logging.logger.Information("Sending youtube api clean job") + let! query = content.ReadAsStringAsync() |> Async.AwaitTask + let! result = + (config.Client, $"{resolvePath config.BaseUrl Delete}?{query}") + ||> doHttp DELETE content + Logging.logger.Information("Received response from youtube api for clean job") + tcs.SetResult(result) + + with ex -> + Logging.logger.Error(ex, "Failed to clean youtube api job") + tcs.SetResult(Error { + message = ex.Message + }) + } |> Async.Start + + return! loop () + } + loop () + ) + + let createClient (config: YoutubeDlClientConfig) = + let inbox = createInbox(config) + { new IYoutubeDlClient with + member this.CreateJob(args) = + let tcs = TaskCompletionSource<_>() + inbox.Post(CreateJob(args, tcs)) + tcs.Task |> Async.AwaitTask + + member this.CheckJob externalId = + let tcs = TaskCompletionSource<_>() + inbox.Post(CheckJob(externalId, tcs)) + tcs.Task |> Async.AwaitTask + + member this.CleanJob externalId = + let tcs = TaskCompletionSource<_>() + inbox.Post(CleanJob(externalId, tcs)) + tcs.Task |> Async.AwaitTask } + +type ITgService = + abstract member PostVideo: url: string * savePath: string * externalId: Guid -> unit + +[] +module TgService = + type private Msg = + | PostVideo of url: string * savePath: string * externalId: Guid + + let private createInbox (channelId: ChatId) (youtubeDlClient: IYoutubeDlClient) (tg: ITelegramBotClient) = + MailboxProcessor.Start(fun inbox -> + let rec loop () = + async { + match! inbox.Receive() with + | PostVideo (url, savePath, externalId) -> + try + try + Logging.logger.Information("Reading file path = {path}", savePath) + use file = File.OpenRead(savePath) + let input = InputOnlineFile(file, savePath) + let caption = $"Source: {url}" + Logging.logger.Information( + "Sending video to channel, channelId = {channelId}, caption = {caption}", + channelId, + caption) + do! tg.SendVideoAsync(channelId, input, caption = caption) |> Async.AwaitTask |> Async.Ignore + with ex -> + Logging.logger.Error(ex, "Failed to send video") + finally + Logging.logger.Information("Deleting file path = {path}", savePath) + File.Delete(savePath) + + match! youtubeDlClient.CleanJob(externalId) with + | Ok _ -> () + | Error _ -> () + return! loop () + } + loop () + ) + + let createService chatId youtubeDlClient tg = + let inbox = createInbox chatId youtubeDlClient tg + { new ITgService with + member this.PostVideo(url, savePath, externalId) = + inbox.Post(PostVideo(url, savePath, externalId)) } + +type IYoutubeDlService = + abstract member AddJob: url: string -> Async + +[] +module YoutubeDlService = + type private Msg = + | AddJob of url: string * TaskCompletionSource + | CheckJob + + type private JobState = + | Created + | Awaiting of Guid + + type private YoutubeDlJob = { + InternalId: Guid + State: JobState + Url: string + SavePath: string + } + + let private createJob (client: IYoutubeDlClient) (job: YoutubeDlJob) = + async { + Logging.logger.Information("Sending create job to youtube client, job = {job}", job.InternalId) + let! result = + client.CreateJob { + url = job.Url + savePath = job.SavePath + } + match result with + | Ok task -> + Logging.logger.Information( + "Created job on youtube client = {job}, externalId = {externalId}", + job.InternalId, + task.task) + let updated = { + job with + State = JobState.Awaiting task.task + } + return Some updated + + | Error e -> + Logging.logger.Error( + "Failed to create job on client = {job}, message = {message}", + job.InternalId, + e.message) + return None + } + + let private getCurrentJob + (jobQueue: Queue) + (current: YoutubeDlJob option) = + match current with + | Some current -> + Some current + + | None -> + match jobQueue.TryDequeue() with + | true, job -> + Some job + | _ -> + None + + let private createServiceInbox + (youtubeDlClient: IYoutubeDlClient) + (tgService: ITgService) = + + MailboxProcessor.Start(fun inbox -> + let postCheck() = + async { + do! Async.Sleep(TimeSpan.FromSeconds 5) + inbox.Post(CheckJob) + } |> Async.Start + + let rec loop + (jobQueue: Queue) + (current: YoutubeDlJob option) = + async { + match! inbox.Receive() with + | AddJob (url, tcs) -> + Logging.logger.Information("Adding new url = {url}", url) + let id = Guid.NewGuid() + let job = { + InternalId = id + State = Created + Url = url + SavePath = $"{Path.GetTempFileName()}.mp4" + } + tcs.SetResult(id) + jobQueue.Enqueue(job) + + if current.IsNone then + inbox.Post(CheckJob) + + Logging.logger.Information( + "Added new job = {job}, url = {url}, path = {path}", + job.InternalId, + url, + job.SavePath) + return! loop jobQueue current + + | CheckJob -> + let currentJob = getCurrentJob jobQueue current + match currentJob with + | Some job -> + Logging.logger.Information("Checking job = {job}, state = {state}", job.InternalId, job.State) + match job.State with + | Created -> + match! createJob youtubeDlClient job with + | Some job -> + postCheck() + return! loop jobQueue (Some job) + + | None -> + postCheck() + return! loop jobQueue None + + | Awaiting externalId -> + Logging.logger.Information("Checking job = {job}", externalId) + let! task = youtubeDlClient.CheckJob(externalId) + match task with + | Ok x when x.state.Equals("Finished", StringComparison.OrdinalIgnoreCase) -> + Logging.logger.Information("Sending post video from job = {job}", externalId) + tgService.PostVideo(job.Url, job.SavePath, externalId) + return! loop jobQueue None + + | Error e -> + Logging.logger.Error( + "Failed to receive video from youtube client, job = {job}, message = {message}", + externalId, + e.message) + File.Delete(job.SavePath) + return! loop jobQueue None + + | _ -> + Logging.logger.Information( + "Waiting for job to complete, job = {job}", + externalId) + postCheck() + return! loop jobQueue current + + | None -> + return! loop jobQueue None + } + loop (Queue()) None + ) + + let createService youtubeDlClient tgService = + let inbox = createServiceInbox youtubeDlClient tgService + { new IYoutubeDlService with + member this.AddJob(url) = + let tcs = TaskCompletionSource<_>() + inbox.Post(AddJob(url, tcs)) + tcs.Task |> Async.AwaitTask} \ No newline at end of file diff --git a/youtube-dl-api/youtube_dl_api/__init__.py b/youtube-dl-api/youtube_dl_api/__init__.py index 0189496..b4e0025 100644 --- a/youtube-dl-api/youtube_dl_api/__init__.py +++ b/youtube-dl-api/youtube_dl_api/__init__.py @@ -21,7 +21,8 @@ def report_state(id: str): def load_video(url: str, file_path: str, id: str): opts = { - "format": 'mp4', + "recode-video": "mp4", + "format": 'best[filesize<50M]', "quiet": True, "outtmpl": file_path, "progress_hooks": [report_state(id)]