From e7ecfe28499a2ca434e6899dcc0f45ebdee9497d Mon Sep 17 00:00:00 2001 From: Keroosha Date: Thu, 13 Apr 2023 03:58:26 +0300 Subject: [PATCH] Initial runner --- .gitignore | 2 + .../.idea.Keroosha.SilencerBot/.idea/vcs.xml | 1 - Keroosha.SilencerBot/Database.fs | 36 +++-- Keroosha.SilencerBot/Env.fs | 52 +++++- .../Keroosha.SilencerBot.fsproj | 8 +- Keroosha.SilencerBot/Processing.fs | 152 ++++++++++++++++++ Keroosha.SilencerBot/Program.fs | 5 +- Keroosha.SilencerBot/Telegram.fs | 94 +++++++++-- Keroosha.SilencerBot/config.example.json | 5 +- 9 files changed, 329 insertions(+), 26 deletions(-) create mode 100644 Keroosha.SilencerBot/Processing.fs diff --git a/.gitignore b/.gitignore index a9bd15c..f1226f4 100644 --- a/.gitignore +++ b/.gitignore @@ -563,3 +563,5 @@ dist # End of https://www.toptal.com/developers/gitignore/api/csharp,fsharp,node,visualstudiocode Keroosha.SilencerBot/token + +Keroosha.SilencerBot/config.local.json diff --git a/.idea/.idea.Keroosha.SilencerBot/.idea/vcs.xml b/.idea/.idea.Keroosha.SilencerBot/.idea/vcs.xml index 288b36b..94a25f7 100644 --- a/.idea/.idea.Keroosha.SilencerBot/.idea/vcs.xml +++ b/.idea/.idea.Keroosha.SilencerBot/.idea/vcs.xml @@ -1,7 +1,6 @@ - \ No newline at end of file diff --git a/Keroosha.SilencerBot/Database.fs b/Keroosha.SilencerBot/Database.fs index d9af61a..c70bd8a 100644 --- a/Keroosha.SilencerBot/Database.fs +++ b/Keroosha.SilencerBot/Database.fs @@ -1,6 +1,7 @@ module Keroosha.SilencerBot.Database open System +open System.Transactions open FluentMigrator open FluentMigrator.Runner; open LinqToDB @@ -17,30 +18,45 @@ type JobState = | [] Executing = 2 | [] UploadingResults = 3 | [] Done = 4 + +type JsonJobContext = { + fileId: String + chatId: int64 + savePath: String + stdout: String + stderr: String +} + [] [] +[] type User = { - [] Id: Guid - [] TgId: int64 - [] Name: string +[] Id: Guid +[] TgId : int64 +[] ChatId : int64 +[] Name : string } + [] [] -type UserJobs = { - [] Id: Guid +[] +type UserJob = { + [] Id: Guid [] UserId: Guid [] State: JobState - [] Context: string + [] WorkerId: Guid Nullable + [] Context: String } -type DbContext(connectionString: string, provider: IDataProvider) = + +type DbContext(connectionString: String, provider: IDataProvider) = inherit DataConnection(provider, connectionString) member this.Users = this.GetTable(); - member this.UserJobs = this.GetTable(); + member this.UserJobs = this.GetTable(); -let migrateApp (connectionString: string) = +let migrateApp (connectionString: String) = use serviceProvider = ServiceCollection() .AddFluentMigratorCore() @@ -64,12 +80,14 @@ type InitialMigration() = this.Create.Table("Users") .WithColumn("Id").AsGuid().PrimaryKey() .WithColumn("TgId").AsInt64().NotNullable() + .WithColumn("ChatId").AsInt64().NotNullable() .WithColumn("Name").AsString().NotNullable() |> ignore this.Create.Table("UserJobs") .WithColumn("Id").AsGuid().PrimaryKey() .WithColumn("UserId").AsGuid().ForeignKey("Users", "Id") .WithColumn("State").AsString().NotNullable().WithDefaultValue("New") + .WithColumn("WorkerId").AsGuid().Nullable() .WithColumn("Context").AsCustom("JSONB").NotNullable() |> ignore () diff --git a/Keroosha.SilencerBot/Env.fs b/Keroosha.SilencerBot/Env.fs index 95c5a53..ba3a214 100644 --- a/Keroosha.SilencerBot/Env.fs +++ b/Keroosha.SilencerBot/Env.fs @@ -1,7 +1,10 @@ module Keroosha.SilencerBot.Env open System +open System.Diagnostics open System.IO +open System.Text open System.Text.Json +open McMaster.Extensions.CommandLineUtils open Serilog [] @@ -13,6 +16,9 @@ module Logging = type public BotConfig = { tempSavePath: string connectionString: string + processingWorkerId: Guid + processorWorkingPath: String + useGPU: bool } let private readConfig = File.ReadAllText >> JsonSerializer.Deserialize @@ -24,4 +30,48 @@ let public createConfig (name: string) = ApplicationException("Missing config path env") |> raise | path -> Logging.logger.Information("Read config from env") - readConfig path \ No newline at end of file + readConfig path + +// http://fssnip.net/sw/1 +// Modified to be non-blocking (async) as fck! +let runProc filename args startDir = + async { + let timer = Stopwatch.StartNew() + let procStartInfo = + ProcessStartInfo( + RedirectStandardOutput = true, + RedirectStandardError = true, + UseShellExecute = false, + FileName = filename, + Arguments = ArgumentEscaper.EscapeAndConcatenate args + ) + + match startDir with | Some d -> procStartInfo.WorkingDirectory <- d | _ -> () + + let outputs = System.Collections.Generic.List() + let errors = System.Collections.Generic.List() + let outputHandler f (_sender:obj) (args:DataReceivedEventArgs) = f args.Data + let p = new Process(StartInfo = procStartInfo) + p.OutputDataReceived.AddHandler(DataReceivedEventHandler (outputHandler outputs.Add)) + p.ErrorDataReceived.AddHandler(DataReceivedEventHandler (outputHandler errors.Add)) + let started = + try + p.Start() + with | ex -> + ex.Data.Add("filename", filename) + reraise() + if not started then failwithf "Failed to start process %s" filename + Logging.logger.Information $"Started {p.ProcessName} with pid {p.Id}" + p.BeginOutputReadLine() + p.BeginErrorReadLine() + do! p.WaitForExitAsync() |> Async.AwaitTask + timer.Stop() + Logging.logger.Information $"Finished {filename} after {timer.ElapsedMilliseconds} milliseconds" + let joinA (x: String array) = String.Join ("\n", x) + let cleanOut l = l + |> Seq.filter (fun o -> String.IsNullOrEmpty o |> not) + |> Seq.toArray + |> joinA + + return (cleanOut outputs, cleanOut errors) + } \ No newline at end of file diff --git a/Keroosha.SilencerBot/Keroosha.SilencerBot.fsproj b/Keroosha.SilencerBot/Keroosha.SilencerBot.fsproj index 90dd9be..73b9c28 100644 --- a/Keroosha.SilencerBot/Keroosha.SilencerBot.fsproj +++ b/Keroosha.SilencerBot/Keroosha.SilencerBot.fsproj @@ -6,22 +6,26 @@ + - + + + - + + diff --git a/Keroosha.SilencerBot/Processing.fs b/Keroosha.SilencerBot/Processing.fs new file mode 100644 index 0000000..995f89e --- /dev/null +++ b/Keroosha.SilencerBot/Processing.fs @@ -0,0 +1,152 @@ +module Keroosha.SilencerBot.Processing + +open System +open System.IO +open System.Net.Http +open System.Text.Json +open Funogram.Telegram +open Funogram.Telegram.Types +open Keroosha.SilencerBot.Database +open Keroosha.SilencerBot.Env +open LinqToDB +open Microsoft.FSharp.Control + +module TgClient = Funogram.Tools.Api + +let http = new HttpClient() + +let inline private (>>=) a b = (a, b) |> async.Bind +let getContext (x: UserJob) = x.Context |> JsonSerializer.Deserialize +let serializeContext (x: JsonJobContext) = x |> JsonSerializer.Serialize +let downloadUrl token path = $"https://api.telegram.org/file/bot{token}/{path}" +let packAudio name stream = + { + InputMediaAudio.Media = InputFile.File(name, stream) + Thumb = None + Caption = None + ParseMode = None + CaptionEntities = None + Duration = None + Performer = None + Title = Some name + Type = "Audio" + } |> InputMedia.Audio + +let failJob (x: UserJob, ctx: JsonJobContext) (errMessage: String) = + { x with + State = JobState.Failed + Context = JsonSerializer.Serialize({ ctx with stderr = errMessage }) + } + +let downloadFile (url: String, filePath: String) = + task { + try + use file = File.OpenWrite(filePath) + use! request = http.GetStreamAsync(url) + do! file |> request.CopyToAsync + return Ok () + with + | ex -> return Error ex.Message + } |> Async.AwaitTask + +let private findJob (dbBuilder: unit -> DbContext, config: BotConfig) = + task { + use db = dbBuilder() + use! __ = db.BeginTransactionAsync() + let! jobInProgress = db.UserJobs.FirstOrDefaultAsync(fun x -> x.WorkerId = config.processingWorkerId) + match box jobInProgress with + | null -> + let! job = db.UserJobs.FirstOrDefaultAsync(fun x -> x.State <> JobState.Failed && x.State <> JobState.Done) + match box job with + | null -> return None + | _ -> + let jobWithWorkerId = { job with WorkerId = config.processingWorkerId } + let! __ = db.InsertOrReplaceAsync(jobWithWorkerId) + return Some jobWithWorkerId + | _ -> return Some jobInProgress + } |> Async.AwaitTask + +let private updateJobState (dbBuilder: unit -> DbContext) (job: UserJob) = + task { + use db = dbBuilder() + use! __ = db.BeginTransactionAsync() + let! __ = db.InsertOrReplaceAsync job + return job + } |> Async.AwaitTask + +let processNew (job: UserJob, botConfig: Funogram.Types.BotConfig, config: BotConfig) = + async { + Logging.logger.Information $"Accepted {job.Id} job" + return { job with State = JobState.Downloading } + } + +let processDownload (job: UserJob, botConfig: Funogram.Types.BotConfig, config: BotConfig) = + async { + Logging.logger.Information $"Downloading {job.Id} job" + let ctx = getContext job + let! res = TgClient.makeRequestAsync botConfig <| Api.getFile ctx.fileId + match res with + | Ok x when x.FilePath.IsNone -> + return (job, ctx) |> failJob <| "file doesnt exist" + | Ok x -> + let url = downloadUrl botConfig.Token x.FilePath.Value + match! downloadFile (url, ctx.savePath) with + | Ok _ -> return { job with State = JobState.Executing } + | Error text -> return (job, ctx) |> failJob <| text + | Error x -> + return (job, ctx) |> failJob <| x.Description + } + +let processExecuting (job: UserJob, botConfig: Funogram.Types.BotConfig, config: BotConfig) = + async { + Logging.logger.Information $"Processing {job.Id} job" + let ctx = getContext job + // let gpuFlag = if config.useGPU then "--gpu 0" else null + let args = ["inference.py"; "--input"; ctx.savePath; "--output_dir"; config.tempSavePath] + let! stdout, stderr = runProc $"/usr/bin/python" args (Some config.processorWorkingPath) + let ctxWithOutput = { ctx with stdout = stdout; stderr = stderr } + return { job with + State = JobState.UploadingResults + Context = serializeContext ctxWithOutput + } + } + +let processUploading (job: UserJob, botConfig: Funogram.Types.BotConfig, config: BotConfig) = + async { + let ctx = getContext job + let cleanName = Path.GetFileNameWithoutExtension ctx.savePath + let withoutVocalsPath = Path.Combine(Path.GetDirectoryName ctx.savePath, $"{cleanName}_Instrumental.wav") + use f = File.OpenRead withoutVocalsPath + Logging.logger.Information $"Uploading results for {job.Id} job" + + let media = InputFile.File (Path.GetFileName withoutVocalsPath, f) + let! res = TgClient.makeRequestAsync botConfig <| Api.sendAudio (ctx.chatId) (media) (0) + return { job with State = JobState.Done } + } + + +let rec processJob (dbBuilder: unit -> DbContext, botConfig: Funogram.Types.BotConfig, config: BotConfig) (job: UserJob) = + let updateAndContinue x = x |> updateJobState(dbBuilder) >>= processJob(dbBuilder, botConfig, config) + let args = (job, botConfig, config) + async { + match job.State with + | JobState.New -> do! processNew args >>= updateAndContinue + | JobState.Downloading -> do! processDownload args >>= updateAndContinue + | JobState.Executing -> do! processExecuting args >>= updateAndContinue + | JobState.UploadingResults -> do! processUploading args >>= updateAndContinue + | JobState.Done -> Logging.logger.Information $"Job {job.Id} done" + | JobState.Failed -> Logging.logger.Error $"Job {job.Id} failed" + () + } + +let rec processingMain (dbBuilder: unit -> DbContext, appConfig: BotConfig, tgConfig: Funogram.Types.BotConfig) = + async { + try + match! findJob(dbBuilder, appConfig) with + | Some x -> do! (dbBuilder, tgConfig, appConfig) |> processJob <| x + | None -> () + do! 150 |> Async.Sleep + do! (dbBuilder, appConfig, tgConfig) |> processingMain + with + | ex -> Logging.logger.Error $"{ex.Message}\n{ex.StackTrace}" + } diff --git a/Keroosha.SilencerBot/Program.fs b/Keroosha.SilencerBot/Program.fs index 976c127..21f5fb7 100644 --- a/Keroosha.SilencerBot/Program.fs +++ b/Keroosha.SilencerBot/Program.fs @@ -9,16 +9,17 @@ open Keroosha.SilencerBot.Telegram let config = Env.createConfig "SILENCER_BOT_CONFIG_PATH" let botConfig = Config.defaultConfig |> Config.withReadTokenFromFile - let ctxFactory = fun () -> Database.createContext <| config.connectionString Database.migrateApp config.connectionString -let botInbox = createBotInbox <| (botConfig, ctxFactory) +let botInbox = createBotInbox <| (config, botConfig, ctxFactory) let handleUpdate (ctx: UpdateContext) = resolveUpdate ctx |> botInbox.Post Console.CancelKeyPress |> Event.add (fun _ -> Environment.Exit <| 0) +Processing.processingMain <| (ctxFactory, config, botConfig) |> Async.Start + async { let! _ = Api.makeRequestAsync botConfig <| Api.deleteWebhookBase() return! startBot botConfig handleUpdate None diff --git a/Keroosha.SilencerBot/Telegram.fs b/Keroosha.SilencerBot/Telegram.fs index 46467df..866a226 100644 --- a/Keroosha.SilencerBot/Telegram.fs +++ b/Keroosha.SilencerBot/Telegram.fs @@ -1,35 +1,109 @@ module Keroosha.SilencerBot.Telegram +open System +open System.IO +open System.Text.Json +open Funogram.Telegram open Funogram.Telegram.Bot open Funogram.Telegram.Types -open Funogram.Types +open Keroosha.SilencerBot.Env +open LinqToDB open Keroosha.SilencerBot.Database +open Microsoft.FSharp.Control +module TgClient = Funogram.Tools.Api type VoiceRemoveArgs = { fileId: string + filename: string chatId: int64 } +type StartArgs = { + id: int64 + chatId: int64 + name: String +} + type Features = | VoiceRemove of VoiceRemoveArgs - | Unknown + | Start of StartArgs + | Skip + +let runDate = DateTime.UtcNow + +let greetingText = "Привет, отправь мне вавку и я попробую убрать из нее голос" +let jobCreatedText = "Запустил задачу, обрабатываю" +let unknownUserText = "Прости, но мы с тобой не знакомы, отправь мне в ЛС /start" let isVoiceRemoveAction(update: Update) = - update.Message.IsSome && update.Message.Value.Audio.IsSome + update.Message.IsSome && + update.Message.Value.Audio.IsSome && + update.Message.Value.From.IsSome && + update.Message.Value.Audio.Value.FileName.IsSome + +let isStartCommand (update: Update) = + update.Message.IsSome && + update.Message.Value.Text.IsSome && + update.Message.Value.From.IsSome && + update.Message.Value.Text.Value.StartsWith "/start" let resolveUpdate (ctx: UpdateContext) = match ctx.Update with + | x when x.Message.IsSome && x.Message.Value.Date < runDate -> Skip | x when isVoiceRemoveAction x -> - VoiceRemove { fileId = x.Message.Value.Audio.Value.FileId; chatId = x.Message.Value.Chat.Id } - | _ -> Unknown + VoiceRemove { + fileId = x.Message.Value.Audio.Value.FileId + chatId = x.Message.Value.From.Value.Id + filename = x.Message.Value.Audio.Value.FileName.Value + } + | x when isStartCommand x -> + Start { + id = x.Message.Value.From.Value.Id + chatId = x.Message.Value.Chat.Id + name = x.Message.Value.From.Value.FirstName + } + | _ -> Skip -let createBotInbox (cfg: BotConfig, db: unit -> DbContext) = MailboxProcessor.Start(fun (inbox) -> +let createBotInbox (cfg: BotConfig, botCfg: Funogram.Types.BotConfig, dbFactory: unit -> DbContext) = MailboxProcessor.Start(fun (inbox) -> let rec loop () = async { - match! inbox.Receive() with - | VoiceRemove x -> () - | Unknown -> () - + try + match! inbox.Receive() with + | VoiceRemove x -> + let db = dbFactory() + let! user = db.Users.FirstOrDefaultAsync(fun u -> u.TgId = x.chatId) |> Async.AwaitTask + match box user with + | null -> + do! TgClient.makeRequestAsync botCfg <| Api.sendMessage x.chatId unknownUserText |> Async.Ignore + () + | _ -> + let jobContext: JsonJobContext = { + stderr = "" + stdout = "" + chatId = x.chatId + fileId = x.fileId + savePath = Path.Combine(cfg.tempSavePath, x.filename) + } + let job: UserJob = { Id = Guid.NewGuid() + State = JobState.New + UserId = user.Id + Context = JsonSerializer.Serialize(jobContext) + WorkerId = Nullable() + } + do! db.InsertAsync(job) |> Async.AwaitTask |> Async.Ignore + do! TgClient.makeRequestAsync botCfg <| Api.sendMessage x.chatId jobCreatedText |> Async.Ignore + () + | Start x -> + let db = dbFactory() + match! db.Users.AnyAsync(fun u -> u.TgId = x.id) |> Async.AwaitTask with + | true -> () + | false -> + let user: User = { Id = Guid.NewGuid(); Name = x.name; TgId = x.id; ChatId = x.chatId } + do! db.InsertAsync(user) |> Async.AwaitTask |> Async.Ignore + do! TgClient.makeRequestAsync botCfg <| Api.sendMessage x.chatId greetingText |> Async.Ignore + | Skip -> () + with + | ex -> Logging.logger.Error $"\n{ex.Message}\n{ex.StackTrace}" return! loop () } loop () diff --git a/Keroosha.SilencerBot/config.example.json b/Keroosha.SilencerBot/config.example.json index cb82363..4f9dcca 100644 --- a/Keroosha.SilencerBot/config.example.json +++ b/Keroosha.SilencerBot/config.example.json @@ -1,4 +1,7 @@ { "connectionString": "Server=127.0.0.1;User id=postgres;password=postgres;database=silencer-bot", - "tempSavePath": "/tmp" + "tempSavePath": "/tmp", + "processingWorkerId": "7632B698-FA2C-431C-90D4-E4562AEE0B0A", + "processorWorkingPath": "external/vocal-remover", + "useGPU": false } \ No newline at end of file