move logic to mailboxes
This commit is contained in:
		
							parent
							
								
									d802845c0a
								
							
						
					
					
						commit
						e921d15e04
					
				
					 6 changed files with 467 additions and 205 deletions
				
			
		|  | @ -3,6 +3,13 @@ module PublishHelperBot.Environment | ||||||
| open System | open System | ||||||
| open System.IO | open System.IO | ||||||
| open Newtonsoft.Json | open Newtonsoft.Json | ||||||
|  | open Serilog | ||||||
|  | 
 | ||||||
|  | [<RequireQualifiedAccess>] | ||||||
|  | module Logging = | ||||||
|  |     let logger = | ||||||
|  |         let config = LoggerConfiguration() | ||||||
|  |         config.WriteTo.Console().CreateLogger() | ||||||
| 
 | 
 | ||||||
| type public BotConfig = { | type public BotConfig = { | ||||||
|     token: string |     token: string | ||||||
|  | @ -12,10 +19,14 @@ type public BotConfig = { | ||||||
|     YoutubeDlUrl: string |     YoutubeDlUrl: string | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| let private ReadConfig = | let private readConfig = | ||||||
|     File.ReadAllText >> JsonConvert.DeserializeObject<BotConfig> |     File.ReadAllText >> JsonConvert.DeserializeObject<BotConfig> | ||||||
| 
 | 
 | ||||||
| let public CreateConfig (name: string) = | let public createConfig (name: string) = | ||||||
|     match Environment.GetEnvironmentVariable(name) with |     match Environment.GetEnvironmentVariable(name) with | ||||||
|   | null ->  raise <| ApplicationException("Missing config path env") |     | null -> | ||||||
|   | path  -> ReadConfig <| path |           Logging.logger.Error("Missing env") | ||||||
|  |           ApplicationException("Missing config path env") |> raise | ||||||
|  |     | path -> | ||||||
|  |           Logging.logger.Information("Read config from env") | ||||||
|  |           readConfig path | ||||||
|  | @ -9,22 +9,35 @@ open Telegram.Bot.Types | ||||||
| open Telegram.Bot.Types.Enums | open Telegram.Bot.Types.Enums | ||||||
| 
 | 
 | ||||||
| type BaseHandlerArgs = Update * BotConfig | type BaseHandlerArgs = Update * BotConfig | ||||||
|  | 
 | ||||||
| type HandlerArgs = Update * BotConfig * ITelegramBotClient | type HandlerArgs = Update * BotConfig * ITelegramBotClient | ||||||
|  | 
 | ||||||
| type HandlerRequirements = BaseHandlerArgs -> bool | type HandlerRequirements = BaseHandlerArgs -> bool | ||||||
|  | 
 | ||||||
| type Handler = HandlerArgs -> Task | type Handler = HandlerArgs -> Task | ||||||
|  | 
 | ||||||
| type Handler<'deps> = 'deps * HandlerArgs -> Task | type Handler<'deps> = 'deps * HandlerArgs -> Task | ||||||
| 
 | 
 | ||||||
| // Utils | // Utils | ||||||
| let UpdateIsAMessage (x: Update) = x.Type = UpdateType.Message | let UpdateIsAMessage (x: Update) = x.Type = UpdateType.Message | ||||||
|  | 
 | ||||||
| let FromAdminChat (x: Message, c: BotConfig) = x.Chat.Id = c.adminChatId | let FromAdminChat (x: Message, c: BotConfig) = x.Chat.Id = c.adminChatId | ||||||
|  | 
 | ||||||
| let HasReply (x: Message) = not(isNull x.ReplyToMessage) | let HasReply (x: Message) = not(isNull x.ReplyToMessage) | ||||||
|  | 
 | ||||||
| let ExtractPhotoFromMessage (x: Message) = Array.map (fun (p: PhotoSize) -> p.FileId) x.Photo | let ExtractPhotoFromMessage (x: Message) = Array.map (fun (p: PhotoSize) -> p.FileId) x.Photo | ||||||
|  | 
 | ||||||
| let HasText (x: Message) = not(isNull x.Text) | let HasText (x: Message) = not(isNull x.Text) | ||||||
|  | 
 | ||||||
| let UrlsAsAlbumInputMedia (urls: string[]): IAlbumInputMedia[] = | let UrlsAsAlbumInputMedia (urls: string[]): IAlbumInputMedia[] = | ||||||
|   Array.map (fun (x: string) -> InputMediaPhoto(x)) urls |   Array.map (fun (x: string) -> InputMediaPhoto(x)) urls | ||||||
| 
 | 
 | ||||||
| // Post (Relay) command | // Post (Relay) command | ||||||
| type RelayCaptionMode = WithAuthor | Anonymous | Unknown | type RelayCaptionMode = | ||||||
|  |     | WithAuthor | ||||||
|  |     | Anonymous | ||||||
|  |     | Unknown | ||||||
|  | 
 | ||||||
| let RelaySupportedContent (x: Message) = | let RelaySupportedContent (x: Message) = | ||||||
|   match x.Type with |   match x.Type with | ||||||
|   | MessageType.Text -> true |   | MessageType.Text -> true | ||||||
|  | @ -53,14 +66,14 @@ let public RelayMatch: HandlerRequirements = fun (u, c) -> | ||||||
|   RelaySupportedContent u.Message.ReplyToMessage && |   RelaySupportedContent u.Message.ReplyToMessage && | ||||||
|   not (RelayCaptionType u.Message.Text = RelayCaptionMode.Unknown) |   not (RelayCaptionType u.Message.Text = RelayCaptionMode.Unknown) | ||||||
| 
 | 
 | ||||||
| let public RelayHandler: Handler = fun (u, c, tg) -> | let public RelayHandler: Handler = fun (update, config, tg) -> | ||||||
|   let reply = u.Message.ReplyToMessage |   let reply = update.Message.ReplyToMessage | ||||||
|   let channelId = c.chanelId |   let channelId = config.chanelId | ||||||
|   let author = $"{reply.From.FirstName} {reply.From.LastName}" |   let author = $"{reply.From.FirstName} {reply.From.LastName}" | ||||||
|   let captionMode = RelayCaptionType u.Message.Text |   let captionMode = RelayCaptionType update.Message.Text | ||||||
| 
 | 
 | ||||||
|   let photoMedia = lazy Array.get (ExtractPhotoFromMessage reply) 0 |   let photoMedia = lazy Array.get (ExtractPhotoFromMessage reply) 0 | ||||||
|   let caption = lazy RelayResolveCaption(captionMode, author, c.relayUrl) |   let caption = lazy RelayResolveCaption(captionMode, author, config.relayUrl) | ||||||
| 
 | 
 | ||||||
|   match reply.Type with |   match reply.Type with | ||||||
|   | MessageType.Text -> tg.ForwardMessageAsync(channelId, reply.Chat.Id, reply.MessageId) |   | MessageType.Text -> tg.ForwardMessageAsync(channelId, reply.Chat.Id, reply.MessageId) | ||||||
|  | @ -81,11 +94,9 @@ let public YoutubeRepostMatch: HandlerRequirements = fun (u, c) -> | ||||||
|   u.Message.Text.StartsWith YoutubeRepostMatchCmd && |   u.Message.Text.StartsWith YoutubeRepostMatchCmd && | ||||||
|   u.Message.Text.Split(' ').Length = 2 |   u.Message.Text.Split(' ').Length = 2 | ||||||
| 
 | 
 | ||||||
| let public YoutubeRepostHandler: Handler<YoutubeDlBackgroundService> = fun (yt, (u, c, tg)) -> | let public YoutubeRepostHandler: Handler<IYoutubeDlService> = fun (yt, (u, c, tg)) -> | ||||||
|   task { |   task { | ||||||
|       let trim (x: string) = x.Trim() |       let trim (x: string) = x.Trim() | ||||||
|       let! id = YoutubeRepostMatchCmd |> u.Message.Text.Split |> Array.last |> trim |> yt.EnqueueJob |       let! id = YoutubeRepostMatchCmd |> u.Message.Text.Split |> Array.last |> trim |> yt.AddJob | ||||||
|       do! tg.SendTextMessageAsync(c.adminChatId, id.ToString()) |> Async.AwaitTask |> Async.Ignore |       do! tg.SendTextMessageAsync(c.adminChatId, id.ToString()) |> Async.AwaitTask |> Async.Ignore | ||||||
|   } |   } | ||||||
| 
 |  | ||||||
|    |  | ||||||
|  | @ -1,6 +1,4 @@ | ||||||
| // For more information see https://aka.ms/fsharp-console-apps | open System | ||||||
| 
 |  | ||||||
| open System |  | ||||||
| open System.Net.Http | open System.Net.Http | ||||||
| open System.Threading | open System.Threading | ||||||
| open System.Threading.Tasks | open System.Threading.Tasks | ||||||
|  | @ -12,31 +10,57 @@ open Telegram.Bot.Polling | ||||||
| open Telegram.Bot.Types | open Telegram.Bot.Types | ||||||
| open Telegram.Bot.Types.Enums | open Telegram.Bot.Types.Enums | ||||||
| 
 | 
 | ||||||
| let CreateBot (config: BotConfig, http: HttpClient) = TelegramBotClient(config.token, http) | let createBot (config: BotConfig, http: HttpClient) = TelegramBotClient(config.token, http) | ||||||
| let config = CreateConfig <| "SBPB_CONFIG_PATH"; | 
 | ||||||
| let botClient = CreateBot <| (config, new HttpClient()) | let config = createConfig "SBPB_CONFIG_PATH" | ||||||
| let YtService = YoutubeDlBackgroundService <| | 
 | ||||||
|                 (new HttpClient(), config.YoutubeDlUrl, botClient, config.chanelId, CancellationToken.None) | let botClient = createBot (config, new HttpClient()) | ||||||
|  | 
 | ||||||
|  | let youtubeDlService = | ||||||
|  |     let youtubeDlClient = | ||||||
|  |         YoutubeDlClient.createClient { | ||||||
|  |             Client = new HttpClient() | ||||||
|  |             BaseUrl = config.YoutubeDlUrl | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |     let tgService = | ||||||
|  |         TgService.createService config.chanelId youtubeDlClient botClient | ||||||
|  | 
 | ||||||
|  |     YoutubeDlService.createService youtubeDlClient tgService | ||||||
| 
 | 
 | ||||||
| let startDate = DateTime.UtcNow | let startDate = DateTime.UtcNow | ||||||
| let isObsoleteUpdate (u: Update) = u.Type = UpdateType.Message && u.Message.Date < startDate; |  | ||||||
| 
 | 
 | ||||||
| let updateHandle (bc: ITelegramBotClient) (u: Update) (ct: CancellationToken): Task = | let (|ObsoleteUpdate|RelayMatchUpdate|YoutubeRepostMatchUpdate|SkipUpdate|) (update: Update) = | ||||||
|   let tgCtx = (u, config, bc) |     let isObsoleteUpdate (update: Update) = | ||||||
|   match u with |         update.Type = UpdateType.Message && update.Message.Date < startDate | ||||||
|   | _ when isObsoleteUpdate u -> Task.CompletedTask |     match update with | ||||||
|   | _ when RelayMatch <| (u, config) -> RelayHandler <| tgCtx |     | _ when isObsoleteUpdate update -> ObsoleteUpdate | ||||||
|   | _ when YoutubeRepostMatch <| (u, config) -> YoutubeRepostHandler <| (YtService, tgCtx) |     | _ when RelayMatch (update, config) -> RelayMatchUpdate | ||||||
|   | _ -> Task.CompletedTask |     | _ when YoutubeRepostMatch (update, config) -> YoutubeRepostMatchUpdate | ||||||
|  |     | _ -> SkipUpdate | ||||||
| 
 | 
 | ||||||
| let handlePollingError (bc: ITelegramBotClient) (e: Exception) (t: CancellationToken) = | let updateHandle (bc: ITelegramBotClient) (update: Update) (ct: CancellationToken): Task = | ||||||
|   printfn $"{e.Message}\n{e.StackTrace}" |     let tgCtx = (update, config, bc) | ||||||
|  |     match update with | ||||||
|  |     | RelayMatchUpdate -> | ||||||
|  |         Logging.logger.Information("RelayMatchUpdate") | ||||||
|  |         RelayHandler tgCtx | ||||||
|  |     | YoutubeRepostMatchUpdate -> | ||||||
|  |         YoutubeRepostHandler <| (youtubeDlService, tgCtx) | ||||||
|  |     | ObsoleteUpdate -> | ||||||
|  |         Logging.logger.Information("Skipping obsolete update") | ||||||
|  |         Task.CompletedTask | ||||||
|  |     | SkipUpdate -> | ||||||
|  |         Logging.logger.Information("Skipping update") | ||||||
|  |         Task.CompletedTask | ||||||
|  | 
 | ||||||
|  | let handlePollingError (_: ITelegramBotClient) (e: Exception) (_: CancellationToken) = | ||||||
|  |     Logging.logger.Error(e, "Polling error") | ||||||
|     Task.CompletedTask |     Task.CompletedTask | ||||||
| 
 | 
 | ||||||
| let receiverOptions = ReceiverOptions(AllowedUpdates = Array.zeroCreate<UpdateType> 0) | let receiverOptions = ReceiverOptions(AllowedUpdates = Array.zeroCreate<UpdateType> 0) | ||||||
| 
 | 
 | ||||||
| botClient.StartReceiving(updateHandle,handlePollingError,receiverOptions) | Logging.logger.Information("Starting bot") | ||||||
| YtService.StartYoutubeDlService() | botClient.StartReceiving(updateHandle, handlePollingError, receiverOptions) | ||||||
| 
 | Logging.logger.Information("Я родился") | ||||||
| printf "Я родился" |  | ||||||
| Console.ReadKey() |> ignore | Console.ReadKey() |> ignore | ||||||
|  | @ -6,8 +6,8 @@ | ||||||
|     </PropertyGroup> |     </PropertyGroup> | ||||||
| 
 | 
 | ||||||
|     <ItemGroup> |     <ItemGroup> | ||||||
|  |         <PackageReference Include="Serilog.Sinks.Console" Version="4.1.0" /> | ||||||
|         <PackageReference Include="Telegram.Bot" Version="18.0.0" /> |         <PackageReference Include="Telegram.Bot" Version="18.0.0" /> | ||||||
|         <PackageReference Include="Nito.AsyncEx" Version="5.1.2" /> |  | ||||||
|     </ItemGroup> |     </ItemGroup> | ||||||
|      |      | ||||||
|     <ItemGroup> |     <ItemGroup> | ||||||
|  |  | ||||||
|  | @ -5,173 +5,388 @@ open System.Collections.Generic | ||||||
| open System.IO | open System.IO | ||||||
| open System.Net.Http | open System.Net.Http | ||||||
| open System.Text | open System.Text | ||||||
| open System.Threading | open System.Threading.Tasks | ||||||
| open Microsoft.FSharp.Core | open Microsoft.FSharp.Core | ||||||
| open Newtonsoft.Json | open Newtonsoft.Json | ||||||
| open Nito.AsyncEx | open PublishHelperBot.Environment | ||||||
| open Telegram.Bot | open Telegram.Bot | ||||||
| open Telegram.Bot.Types.InputFiles | open Telegram.Bot.Types.InputFiles | ||||||
| 
 | 
 | ||||||
| type public CreateYoutubeDLUrl = string | type ChatId = int64 | ||||||
| type public ChatId = int64 | 
 | ||||||
| type public CreateYoutubeDLJob = { | type CreateYoutubeDlJob = { | ||||||
|     url: string |     url: string | ||||||
|     savePath: string |     savePath: string | ||||||
| } | } | ||||||
| type public CreateYoutubeDLJobSuccess = { | 
 | ||||||
|  | type CreateYoutubeDlJobSuccess = { | ||||||
|     task: Guid |     task: Guid | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| type public YoutubeDlStateResponse = { | type YoutubeDlStateResponse = { | ||||||
|     state: string |     state: string | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| type public YoutubeDlError = { | type YoutubeDlError = { | ||||||
|     message: string |     message: string | ||||||
| } | } | ||||||
| type public YoutubeDlJob<'A> = { | 
 | ||||||
|   internalId: Guid | type YoutubeDlClientConfig = { | ||||||
|   externalId: 'A |     BaseUrl: string | ||||||
|   url: string |     Client: HttpClient | ||||||
|   state: string |  | ||||||
|   savePath: string |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| type YoutubeDlClientActions = Create | Check | Delete | type CreateJobResult = Result<CreateYoutubeDlJobSuccess, YoutubeDlError> | ||||||
| type HttpMethods = GET | POST | DELETE |  | ||||||
| type CreateJobResult = Result<CreateYoutubeDLJobSuccess, YoutubeDlError> Async |  | ||||||
| type CheckJobResult = Result<YoutubeDlStateResponse, YoutubeDlError> Async |  | ||||||
| type CleanJobResult = Result<YoutubeDlStateResponse, YoutubeDlError> Async |  | ||||||
| type StartYoutubeDlServiceArgs = HttpClient * CreateYoutubeDLUrl * ITelegramBotClient * ChatId * CancellationToken |  | ||||||
| type YoutubeDlJobWithId = YoutubeDlJob<Guid> |  | ||||||
| type YoutubeDlJobWithoutId = YoutubeDlJob<unit> |  | ||||||
| type YoutubeDlCurrentJob = |  | ||||||
|   | Created of YoutubeDlJobWithoutId |  | ||||||
|   | Awaiting of YoutubeDlJobWithId |  | ||||||
|   | Downloaded of YoutubeDlJobWithId |  | ||||||
|   | Done of YoutubeDlJobWithId |  | ||||||
|   | None of unit |  | ||||||
| 
 | 
 | ||||||
| let inline (<!>) (lck: AsyncLock) f = async { | type CheckJobResult = Result<YoutubeDlStateResponse, YoutubeDlError> | ||||||
|   use! __ = Async.AwaitTask <| lck.LockAsync().AsTask() |  | ||||||
|   return! f |  | ||||||
| } |  | ||||||
| 
 | 
 | ||||||
| type YoutubeDlClient(baseUrl: string, client: HttpClient) = | type CleanJobResult = Result<YoutubeDlStateResponse, YoutubeDlError> | ||||||
|   let lock = AsyncLock() | 
 | ||||||
|   let apiPrefix = $"{baseUrl}api/"; | type IYoutubeDlClient = | ||||||
|   let ResolvePath(action: YoutubeDlClientActions) = |     abstract member CreateJob: CreateYoutubeDlJob -> Async<CreateJobResult> | ||||||
|  |     abstract member CheckJob: externalId: Guid -> Async<CheckJobResult> | ||||||
|  |     abstract member CleanJob: externalId: Guid -> Async<CleanJobResult> | ||||||
|  | 
 | ||||||
|  | [<RequireQualifiedAccess>] | ||||||
|  | module YoutubeDlClient = | ||||||
|  | 
 | ||||||
|  |     type private YoutubeDlClientActions = Create | Check | Delete | ||||||
|  | 
 | ||||||
|  |     type private HttpMethods = GET | POST | DELETE | ||||||
|  | 
 | ||||||
|  |     type private Msg = | ||||||
|  |         | CreateJob of CreateYoutubeDlJob * tcs: TaskCompletionSource<CreateJobResult> | ||||||
|  |         | CheckJob of externalId: Guid * tcs: TaskCompletionSource<CheckJobResult> | ||||||
|  |         | CleanJob of externalId: Guid * tcs: TaskCompletionSource<CleanJobResult> | ||||||
|  | 
 | ||||||
|  |     let private getApiPrefix baseUrl = $"{baseUrl}api/" | ||||||
|  | 
 | ||||||
|  |     let private resolvePath baseUrl (action: YoutubeDlClientActions) = | ||||||
|  |         let apiPrefix = getApiPrefix baseUrl | ||||||
|         match action with |         match action with | ||||||
|         | Create -> $"{apiPrefix}download" |         | Create -> $"{apiPrefix}download" | ||||||
|         | Check -> $"{apiPrefix}status" |         | Check -> $"{apiPrefix}status" | ||||||
|         | Delete -> $"{apiPrefix}clear" |         | Delete -> $"{apiPrefix}clear" | ||||||
| 
 | 
 | ||||||
|   let doHttp (url: string, method: HttpMethods, content: HttpContent): Result<'TRes, YoutubeDlError> Async = async { |     let private doHttp | ||||||
|  |         (method: HttpMethods) | ||||||
|  |         (content: HttpContent) | ||||||
|  |         (client: HttpClient) | ||||||
|  |         (url: string): Async<Result<'TRes, YoutubeDlError>> = | ||||||
|  |         task { | ||||||
|             try |             try | ||||||
|  |                 let! contentText = content.ReadAsStringAsync() | ||||||
|  |                 Logging.logger.Information("Sending request to youtube api, url = {Url}, content = {Content}", url, contentText) | ||||||
|                 let! res = |                 let! res = | ||||||
|                     match method with |                     match method with | ||||||
|         | POST -> client.PostAsync(url, content) |> Async.AwaitTask |                     | POST -> client.PostAsync(url, content) | ||||||
|         | GET -> client.GetAsync(url) |> Async.AwaitTask |                     | GET -> client.GetAsync(url) | ||||||
|         | DELETE -> client.DeleteAsync(url) |> Async.AwaitTask |                     | DELETE -> client.DeleteAsync(url) | ||||||
| 
 | 
 | ||||||
|       let! content = res.Content.ReadAsStringAsync() |> Async.AwaitTask |                 let! content = res.Content.ReadAsStringAsync() | ||||||
|  |                 Logging.logger.Information("Response from youtube api, response = {Response}", content) | ||||||
|                 return |                 return | ||||||
|                     match res.IsSuccessStatusCode with |                     match res.IsSuccessStatusCode with | ||||||
|         | true -> Ok (JsonConvert.DeserializeObject<'TRes> <| content) |                     | true -> | ||||||
|         | false -> Error { message = "Unknown network error" } |                         Logging.logger.Information("Response OK") | ||||||
|     with |                         Ok (JsonConvert.DeserializeObject<'TRes>(content)) | ||||||
|       | ex -> return Error { message = ex.Message  } |                     | false -> | ||||||
|   } |                         Logging.logger.Error("Response network error") | ||||||
|  |                         Error { message = "Unknown network error" } | ||||||
|  |             with ex -> | ||||||
|  |                 Logging.logger.Error(ex, "Youtube api error") | ||||||
|  |                 return Error { message = ex.Message  } | ||||||
|  |         } |> Async.AwaitTask | ||||||
| 
 | 
 | ||||||
|   member this.CreateJob(model: CreateYoutubeDLJob): CreateJobResult = lock <!> async { |     let private createInbox(config: YoutubeDlClientConfig) = MailboxProcessor.Start(fun inbox -> | ||||||
|       use content = new StringContent(JsonConvert.SerializeObject <| model, Encoding.UTF8, "application/json") |         let rec loop() = | ||||||
|       return! doHttp <| (ResolvePath Create, POST, content) |             async { | ||||||
|   } |                 match! inbox.Receive() with | ||||||
|  |                 | CreateJob(args, tcs) -> | ||||||
|  |                     Logging.logger.Information("Received youtube api create job") | ||||||
|  |                     async { | ||||||
|  |                         try | ||||||
|  |                             let json = args |> JsonConvert.SerializeObject | ||||||
|  |                             Logging.logger.Information("Sending create job = {Job}", json) | ||||||
|  |                             use content = new StringContent(json, Encoding.UTF8, "application/json") | ||||||
|  |                             let! result = | ||||||
|  |                                 (config.Client, resolvePath config.BaseUrl Create) | ||||||
|  |                                 ||> doHttp POST content | ||||||
|  |                             Logging.logger.Information("Received response from youtube api for create job") | ||||||
|  |                             tcs.SetResult(result) | ||||||
| 
 | 
 | ||||||
|   member this.CheckJob(id: Guid): CheckJobResult = lock <!> async { |                         with ex -> | ||||||
|     let arg = [KeyValuePair("id", id.ToString())] |                             Logging.logger.Error(ex, "Failed to create youtube api job") | ||||||
|  |                             tcs.SetResult(Error { | ||||||
|  |                                 message = ex.Message | ||||||
|  |                             }) | ||||||
|  | 
 | ||||||
|  |                     } |> Async.Start | ||||||
|  | 
 | ||||||
|  |                     return! loop () | ||||||
|  | 
 | ||||||
|  |                 | CheckJob (externalId, tcs) -> | ||||||
|  |                     Logging.logger.Information("Received youtube api check job, externalId = {id}", externalId) | ||||||
|  |                     async { | ||||||
|  |                         try | ||||||
|  |                             let arg = [KeyValuePair("id", externalId.ToString())] | ||||||
|                             use content = new FormUrlEncodedContent(arg) |                             use content = new FormUrlEncodedContent(arg) | ||||||
|  |                             Logging.logger.Information("Sending youtube api check job") | ||||||
|                             let! query = content.ReadAsStringAsync() |> Async.AwaitTask |                             let! query = content.ReadAsStringAsync() |> Async.AwaitTask | ||||||
|     return! doHttp <| ($"{ResolvePath Check}?{query}", GET, content) |                             let! result = | ||||||
|   } |                                 (config.Client, $"{resolvePath config.BaseUrl Check}?{query}") | ||||||
|  |                                 ||> doHttp GET content | ||||||
|  |                             Logging.logger.Information("Received response from youtube api for check job") | ||||||
|  |                             tcs.SetResult(result) | ||||||
| 
 | 
 | ||||||
|   member this.CleanJob(id: Guid): CleanJobResult = lock <!> async { |                         with ex -> | ||||||
|     let arg = [KeyValuePair("id", id.ToString())] |                             Logging.logger.Error(ex, "Failed to check youtube api job") | ||||||
|  |                             tcs.SetResult(Error { | ||||||
|  |                                 message = ex.Message | ||||||
|  |                             }) | ||||||
|  |                     } |> Async.Start | ||||||
|  | 
 | ||||||
|  |                     return! loop () | ||||||
|  | 
 | ||||||
|  |                 | CleanJob(externalId, tcs) -> | ||||||
|  |                     Logging.logger.Information("Received youtube api clean job, externalId = {id}", externalId) | ||||||
|  |                     async { | ||||||
|  |                         try | ||||||
|  |                             let arg = [KeyValuePair("id", externalId.ToString())] | ||||||
|                             use content = new FormUrlEncodedContent(arg) |                             use content = new FormUrlEncodedContent(arg) | ||||||
|  |                             Logging.logger.Information("Sending youtube api clean job") | ||||||
|                             let! query = content.ReadAsStringAsync() |> Async.AwaitTask |                             let! query = content.ReadAsStringAsync() |> Async.AwaitTask | ||||||
|     return! doHttp <| ($"{ResolvePath Delete}?{query}", DELETE, content) |                             let! result = | ||||||
|  |                                 (config.Client, $"{resolvePath config.BaseUrl Delete}?{query}") | ||||||
|  |                                 ||> doHttp DELETE content | ||||||
|  |                             Logging.logger.Information("Received response from youtube api for clean job") | ||||||
|  |                             tcs.SetResult(result) | ||||||
|  | 
 | ||||||
|  |                         with ex -> | ||||||
|  |                             Logging.logger.Error(ex, "Failed to clean youtube api job") | ||||||
|  |                             tcs.SetResult(Error { | ||||||
|  |                                 message = ex.Message | ||||||
|  |                             }) | ||||||
|  |                     } |> Async.Start | ||||||
|  | 
 | ||||||
|  |                     return! loop () | ||||||
|  |             } | ||||||
|  |         loop () | ||||||
|  |     ) | ||||||
|  | 
 | ||||||
|  |     let createClient (config: YoutubeDlClientConfig) = | ||||||
|  |         let inbox = createInbox(config) | ||||||
|  |         { new IYoutubeDlClient with | ||||||
|  |             member this.CreateJob(args) = | ||||||
|  |                 let tcs = TaskCompletionSource<_>() | ||||||
|  |                 inbox.Post(CreateJob(args, tcs)) | ||||||
|  |                 tcs.Task |> Async.AwaitTask | ||||||
|  | 
 | ||||||
|  |             member this.CheckJob externalId = | ||||||
|  |                 let tcs = TaskCompletionSource<_>() | ||||||
|  |                 inbox.Post(CheckJob(externalId, tcs)) | ||||||
|  |                 tcs.Task |> Async.AwaitTask | ||||||
|  | 
 | ||||||
|  |             member this.CleanJob externalId = | ||||||
|  |                 let tcs = TaskCompletionSource<_>() | ||||||
|  |                 inbox.Post(CleanJob(externalId, tcs)) | ||||||
|  |                 tcs.Task |> Async.AwaitTask } | ||||||
|  | 
 | ||||||
|  | type ITgService = | ||||||
|  |     abstract member PostVideo: url: string * savePath: string * externalId: Guid -> unit | ||||||
|  | 
 | ||||||
|  | [<RequireQualifiedAccess>] | ||||||
|  | module TgService = | ||||||
|  |     type private Msg = | ||||||
|  |         | PostVideo of url: string * savePath: string * externalId: Guid | ||||||
|  | 
 | ||||||
|  |     let private createInbox (channelId: ChatId) (youtubeDlClient: IYoutubeDlClient) (tg: ITelegramBotClient) = | ||||||
|  |         MailboxProcessor.Start(fun inbox -> | ||||||
|  |             let rec loop () = | ||||||
|  |                 async { | ||||||
|  |                     match! inbox.Receive() with | ||||||
|  |                     | PostVideo (url, savePath, externalId) -> | ||||||
|  |                         try | ||||||
|  |                             try | ||||||
|  |                                 Logging.logger.Information("Reading file path = {path}", savePath) | ||||||
|  |                                 use file = File.OpenRead(savePath) | ||||||
|  |                                 let input = InputOnlineFile(file, savePath) | ||||||
|  |                                 let caption = $"Source: {url}" | ||||||
|  |                                 Logging.logger.Information( | ||||||
|  |                                     "Sending video to channel, channelId = {channelId}, caption = {caption}", | ||||||
|  |                                     channelId, | ||||||
|  |                                     caption) | ||||||
|  |                                 do! tg.SendVideoAsync(channelId, input, caption = caption) |> Async.AwaitTask |> Async.Ignore | ||||||
|  |                             with ex -> | ||||||
|  |                                 Logging.logger.Error(ex, "Failed to send video") | ||||||
|  |                         finally | ||||||
|  |                             Logging.logger.Information("Deleting file path = {path}", savePath) | ||||||
|  |                             File.Delete(savePath) | ||||||
|  | 
 | ||||||
|  |                         match! youtubeDlClient.CleanJob(externalId) with | ||||||
|  |                         | Ok _ -> () | ||||||
|  |                         | Error _ -> () | ||||||
|  |                         return! loop () | ||||||
|  |                 } | ||||||
|  |             loop () | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |     let createService chatId youtubeDlClient tg = | ||||||
|  |         let inbox = createInbox chatId youtubeDlClient tg | ||||||
|  |         { new ITgService with | ||||||
|  |             member this.PostVideo(url, savePath, externalId) = | ||||||
|  |                 inbox.Post(PostVideo(url, savePath, externalId)) } | ||||||
|  | 
 | ||||||
|  | type IYoutubeDlService = | ||||||
|  |     abstract member AddJob: url: string -> Async<Guid> | ||||||
|  | 
 | ||||||
|  | [<RequireQualifiedAccess>] | ||||||
|  | module YoutubeDlService = | ||||||
|  |     type private Msg = | ||||||
|  |         | AddJob of url: string * TaskCompletionSource<Guid> | ||||||
|  |         | CheckJob | ||||||
|  | 
 | ||||||
|  |     type private JobState = | ||||||
|  |         | Created | ||||||
|  |         | Awaiting of Guid | ||||||
|  | 
 | ||||||
|  |     type private YoutubeDlJob = { | ||||||
|  |         InternalId: Guid | ||||||
|  |         State: JobState | ||||||
|  |         Url: string | ||||||
|  |         SavePath: string | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
| type YoutubeDlBackgroundService(requirements: StartYoutubeDlServiceArgs) = |     let private createJob (client: IYoutubeDlClient) (job: YoutubeDlJob) = | ||||||
|   let (http, url, tg, chatId, ct) = requirements |         async { | ||||||
|   let lock = AsyncLock() |             Logging.logger.Information("Sending create job to youtube client, job = {job}", job.InternalId) | ||||||
|   let mutable currentJob: YoutubeDlCurrentJob = None () |             let! result = | ||||||
|   let jobPool = Queue<YoutubeDlJobWithoutId>() |                 client.CreateJob { | ||||||
|   let ytClient = YoutubeDlClient <| (url, http) |                     url = job.Url | ||||||
|   let mapJobToApi (job: YoutubeDlJob<_>): CreateYoutubeDLJob = { |                     savePath = job.SavePath | ||||||
|     url = job.url |  | ||||||
|     savePath = job.savePath |  | ||||||
|                 } |                 } | ||||||
|   let attachExternalId (id: Guid, job: YoutubeDlJobWithoutId): YoutubeDlCurrentJob = |  | ||||||
|     Awaiting { internalId = job.internalId; state = job.state; url = job.url; externalId = id; savePath = job.savePath } |  | ||||||
|    |  | ||||||
|   let tryAssignNewJob() = async { |  | ||||||
|     let (result, job) = jobPool.TryDequeue() |  | ||||||
|             match result with |             match result with | ||||||
|     | true -> currentJob <- Created job |             | Ok task -> | ||||||
|     | false -> currentJob <- None () |                 Logging.logger.Information( | ||||||
|  |                     "Created job on youtube client = {job}, externalId = {externalId}", | ||||||
|  |                     job.InternalId, | ||||||
|  |                     task.task) | ||||||
|  |                 let updated = { | ||||||
|  |                     job with | ||||||
|  |                         State = JobState.Awaiting task.task | ||||||
|  |                 } | ||||||
|  |                 return Some updated | ||||||
|  | 
 | ||||||
|  |             | Error e -> | ||||||
|  |                 Logging.logger.Error( | ||||||
|  |                     "Failed to create job on client = {job}, message = {message}", | ||||||
|  |                     job.InternalId, | ||||||
|  |                     e.message) | ||||||
|  |                 return None | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|   let uploadToYtDl(job: YoutubeDlJobWithoutId) = async { |     let private getCurrentJob | ||||||
|         match! ytClient.CreateJob <| mapJobToApi job with |         (jobQueue: Queue<YoutubeDlJob>) | ||||||
|         | Ok x -> currentJob <- attachExternalId <| (x.task, job) |         (current: YoutubeDlJob option) = | ||||||
|         // TODO: Logging! |         match current with | ||||||
|         | Error _ -> currentJob <- None () |         | Some current -> | ||||||
|   } |             Some current | ||||||
| 
 | 
 | ||||||
|   let checkJob(job: YoutubeDlJobWithId) = async { |         | None -> | ||||||
|     match! ytClient.CheckJob <| job.externalId with |             match jobQueue.TryDequeue() with | ||||||
|     | Ok x when x.state.Equals("Finished", StringComparison.OrdinalIgnoreCase) -> currentJob <- Downloaded job |             | true, job -> | ||||||
|     | Error _ -> currentJob <- None () |                 Some job | ||||||
|     | _ -> () |             | _ -> | ||||||
|     // That's take a while |                 None | ||||||
|     do! Async.Sleep 5000 |  | ||||||
|   } |  | ||||||
| 
 | 
 | ||||||
|   let postVideo(job: YoutubeDlJobWithId) = async { |     let private createServiceInbox | ||||||
|     use file = File.OpenRead <| job.savePath |         (youtubeDlClient: IYoutubeDlClient) | ||||||
|     let input = InputOnlineFile(file, job.savePath) |         (tgService: ITgService) = | ||||||
|     let caption = $"Source: {job.url}" |  | ||||||
|     do! tg.SendVideoAsync(chatId, input, caption = caption) |> Async.AwaitTask |> Async.Ignore |  | ||||||
|     currentJob <- Done job |  | ||||||
|   } |  | ||||||
| 
 | 
 | ||||||
|   let cleanUp(job: YoutubeDlJobWithId) = async { |         MailboxProcessor.Start(fun inbox -> | ||||||
|     File.Delete <| job.savePath |             let postCheck() = | ||||||
|     match! ytClient.CleanJob <| job.externalId with |                 async { | ||||||
|     | Ok _ -> currentJob <- None () |                     do! Async.Sleep(TimeSpan.FromSeconds 5) | ||||||
|     | Error _ -> currentJob <- None () |                     inbox.Post(CheckJob) | ||||||
|   } |                 } |> Async.Start | ||||||
| 
 | 
 | ||||||
|   let chooseAction() = lock <!> async { |             let rec loop | ||||||
|       do! match currentJob with |                 (jobQueue: Queue<YoutubeDlJob>) | ||||||
|           | None _ -> tryAssignNewJob() |                 (current: YoutubeDlJob option) = | ||||||
|           | Created x -> x |> uploadToYtDl |                 async { | ||||||
|           | Awaiting x -> x |> checkJob |                     match! inbox.Receive() with | ||||||
|           | Downloaded x -> x |> postVideo |                     | AddJob (url, tcs) -> | ||||||
|           | Done x -> x |> cleanUp |                         Logging.logger.Information("Adding new url = {url}", url) | ||||||
|   } |  | ||||||
| 
 |  | ||||||
|   let rec loop () = async { |  | ||||||
|     do! match ct.IsCancellationRequested with |  | ||||||
|         | false -> chooseAction() |  | ||||||
|         | true -> async { () } |  | ||||||
|     do! Async.Sleep 150 |  | ||||||
|     return! loop() |  | ||||||
|   } |  | ||||||
|   member public this.StartYoutubeDlService() = loop() |> Async.Start |  | ||||||
|   member public this.EnqueueJob(url: string) = lock <!> async { |  | ||||||
|                         let id = Guid.NewGuid() |                         let id = Guid.NewGuid() | ||||||
|     jobPool.Enqueue({ internalId = id; externalId = (); state = "new"; url = url; savePath = $"{Path.GetTempFileName()}.mp4" }) |                         let job = { | ||||||
|     return id |                             InternalId = id | ||||||
|  |                             State = Created | ||||||
|  |                             Url = url | ||||||
|  |                             SavePath = $"{Path.GetTempFileName()}.mp4" | ||||||
|                         } |                         } | ||||||
|  |                         tcs.SetResult(id) | ||||||
|  |                         jobQueue.Enqueue(job) | ||||||
|  | 
 | ||||||
|  |                         if current.IsNone then | ||||||
|  |                             inbox.Post(CheckJob) | ||||||
|  | 
 | ||||||
|  |                         Logging.logger.Information( | ||||||
|  |                             "Added new job = {job}, url = {url}, path = {path}", | ||||||
|  |                             job.InternalId, | ||||||
|  |                             url, | ||||||
|  |                             job.SavePath) | ||||||
|  |                         return! loop jobQueue current | ||||||
|  | 
 | ||||||
|  |                     | CheckJob -> | ||||||
|  |                         let currentJob = getCurrentJob jobQueue current | ||||||
|  |                         match currentJob with | ||||||
|  |                         | Some job -> | ||||||
|  |                             Logging.logger.Information("Checking job = {job}, state = {state}", job.InternalId, job.State) | ||||||
|  |                             match job.State with | ||||||
|  |                             | Created -> | ||||||
|  |                                 match! createJob youtubeDlClient job with | ||||||
|  |                                 | Some job -> | ||||||
|  |                                     postCheck() | ||||||
|  |                                     return! loop jobQueue (Some job) | ||||||
|  | 
 | ||||||
|  |                                 | None -> | ||||||
|  |                                     postCheck() | ||||||
|  |                                     return! loop jobQueue None | ||||||
|  | 
 | ||||||
|  |                             | Awaiting externalId -> | ||||||
|  |                                 Logging.logger.Information("Checking job = {job}", externalId) | ||||||
|  |                                 let! task = youtubeDlClient.CheckJob(externalId) | ||||||
|  |                                 match task with | ||||||
|  |                                 | Ok x when x.state.Equals("Finished", StringComparison.OrdinalIgnoreCase) -> | ||||||
|  |                                     Logging.logger.Information("Sending post video from job = {job}", externalId) | ||||||
|  |                                     tgService.PostVideo(job.Url, job.SavePath, externalId) | ||||||
|  |                                     return! loop jobQueue None | ||||||
|  | 
 | ||||||
|  |                                 | Error e -> | ||||||
|  |                                     Logging.logger.Error( | ||||||
|  |                                         "Failed to receive video from youtube client, job = {job}, message = {message}", | ||||||
|  |                                         externalId, | ||||||
|  |                                         e.message) | ||||||
|  |                                     File.Delete(job.SavePath) | ||||||
|  |                                     return! loop jobQueue None | ||||||
|  | 
 | ||||||
|  |                                 | _ -> | ||||||
|  |                                     Logging.logger.Information( | ||||||
|  |                                         "Waiting for job to complete, job = {job}", | ||||||
|  |                                         externalId) | ||||||
|  |                                     postCheck() | ||||||
|  |                                     return! loop jobQueue current | ||||||
|  | 
 | ||||||
|  |                         | None -> | ||||||
|  |                             return! loop jobQueue None | ||||||
|  |                 } | ||||||
|  |             loop (Queue()) None | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |     let createService youtubeDlClient tgService = | ||||||
|  |         let inbox = createServiceInbox youtubeDlClient tgService | ||||||
|  |         { new IYoutubeDlService with | ||||||
|  |             member this.AddJob(url) = | ||||||
|  |                 let tcs = TaskCompletionSource<_>() | ||||||
|  |                 inbox.Post(AddJob(url, tcs)) | ||||||
|  |                 tcs.Task |> Async.AwaitTask} | ||||||
|  | @ -21,7 +21,8 @@ def report_state(id: str): | ||||||
| 
 | 
 | ||||||
| def load_video(url: str, file_path: str, id: str): | def load_video(url: str, file_path: str, id: str): | ||||||
|     opts = { |     opts = { | ||||||
|         "format": 'mp4', |         "recode-video": "mp4", | ||||||
|  |         "format": 'best[filesize<50M]', | ||||||
|         "quiet": True, |         "quiet": True, | ||||||
|         "outtmpl": file_path, |         "outtmpl": file_path, | ||||||
|         "progress_hooks": [report_state(id)] |         "progress_hooks": [report_state(id)] | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue