diff --git a/Keroosha.SilencerBot/Database.fs b/Keroosha.SilencerBot/Database.fs index c70bd8a..dc0915b 100644 --- a/Keroosha.SilencerBot/Database.fs +++ b/Keroosha.SilencerBot/Database.fs @@ -17,7 +17,8 @@ type JobState = | [] Downloading = 1 | [] Executing = 2 | [] UploadingResults = 3 - | [] Done = 4 + | [] CleanUp = 4 + | [] Done = 5 type JsonJobContext = { fileId: String diff --git a/Keroosha.SilencerBot/Processing.fs b/Keroosha.SilencerBot/Processing.fs index 4e7e499..b4aeac3 100644 --- a/Keroosha.SilencerBot/Processing.fs +++ b/Keroosha.SilencerBot/Processing.fs @@ -1,6 +1,7 @@ module Keroosha.SilencerBot.Processing open System +open System.Collections.Generic open System.IO open System.Net.Http open System.Text.Json @@ -9,6 +10,7 @@ open Funogram.Telegram.Types open Keroosha.SilencerBot.Database open Keroosha.SilencerBot.Env open LinqToDB +open Microsoft.FSharp.Collections open Microsoft.FSharp.Control module TgClient = Funogram.Tools.Api @@ -40,16 +42,19 @@ let downloadFile (url: String, filePath: String) = let private findJob (dbBuilder: unit -> DbContext, config: BotConfig) = task { use db = dbBuilder() - use! __ = db.BeginTransactionAsync() - let! jobInProgress = db.UserJobs.FirstOrDefaultAsync(fun x -> x.WorkerId = config.processingWorkerId) + use! trx = db.BeginTransactionAsync() + let! jobInProgress = db.UserJobs.FirstOrDefaultAsync( + fun x -> x.WorkerId = config.processingWorkerId && x.State <> JobState.Failed && x.State <> JobState.Done) match box jobInProgress with | null -> - let! job = db.UserJobs.FirstOrDefaultAsync(fun x -> x.State <> JobState.Failed && x.State <> JobState.Done) + let! job = db.UserJobs.FirstOrDefaultAsync( + fun x -> x.State <> JobState.Failed && x.State <> JobState.Done && not x.WorkerId.HasValue) match box job with | null -> return None | _ -> let jobWithWorkerId = { job with WorkerId = config.processingWorkerId } let! __ = db.InsertOrReplaceAsync(jobWithWorkerId) + do! trx.CommitAsync() return Some jobWithWorkerId | _ -> return Some jobInProgress } |> Async.AwaitTask @@ -57,8 +62,9 @@ let private findJob (dbBuilder: unit -> DbContext, config: BotConfig) = let private updateJobState (dbBuilder: unit -> DbContext) (job: UserJob) = task { use db = dbBuilder() - use! __ = db.BeginTransactionAsync() - let! __ = db.InsertOrReplaceAsync job + use! trx = db.BeginTransactionAsync() + let! __ = db.InsertOrReplaceAsync job + do! trx.CommitAsync() |> Async.AwaitTask return job } |> Async.AwaitTask @@ -89,8 +95,12 @@ let processExecuting (job: UserJob, botConfig: Funogram.Types.BotConfig, config: async { Logging.logger.Information $"Processing {job.Id} job" let ctx = getContext job - // let gpuFlag = if config.useGPU then "--gpu 0" else null - let args = ["inference.py"; "--input"; ctx.savePath; "--output_dir"; config.tempSavePath] + let gpuFlag = if config.useGPU then ["--gpu 0"] else [] + let args = List.collect id <| + [ + ["inference.py"; "--input"; ctx.savePath; "--output_dir"; config.tempSavePath] + gpuFlag + ] let! stdout, stderr = runProc $"/usr/bin/python" args (Some config.processorWorkingPath) let ctxWithOutput = { ctx with stdout = stdout; stderr = stderr } return { job with @@ -103,15 +113,36 @@ let processUploading (job: UserJob, botConfig: Funogram.Types.BotConfig, config: async { let ctx = getContext job let cleanName = Path.GetFileNameWithoutExtension ctx.savePath - let withoutVocalsPath = Path.Combine(Path.GetDirectoryName ctx.savePath, $"{cleanName}_Instruments.wav") - use f = File.OpenRead withoutVocalsPath - Logging.logger.Information $"Uploading results for {job.Id} job" + let instrumentalPath = Path.Combine(Path.GetDirectoryName ctx.savePath, $"{cleanName}_Instruments.wav") + let vocalsPath = Path.Combine(Path.GetDirectoryName ctx.savePath, $"{cleanName}_Vocals.wav") + use fInstrumental = File.OpenRead instrumentalPath + use fVocals = File.OpenRead vocalsPath - let media = InputFile.File (Path.GetFileName withoutVocalsPath, f) - let! res = TgClient.makeRequestAsync botConfig <| Api.sendAudio (ctx.chatId) (media) (0) - return { job with State = JobState.Done } + let media = [| + InputFile.File (Path.GetFileName instrumentalPath, fInstrumental) + InputFile.File (Path.GetFileName vocalsPath, fVocals) + |] + + Logging.logger.Information $"Uploading results for {job.Id} job" + // TODO Error handling! + let uploadMedia (x: InputFile) = TgClient.makeRequestAsync botConfig <| Api.sendAudio (ctx.chatId) (x) (0) + do! media |> Seq.map uploadMedia |> Async.Sequential |> Async.Ignore + return { job with State = JobState.CleanUp } } +let processCleanUp (job: UserJob, botConfig: Funogram.Types.BotConfig, config: BotConfig) = + async { + let ctx = getContext job + let cleanName = Path.GetFileNameWithoutExtension ctx.savePath + List.iter File.Delete <| [ + ctx.savePath + Path.Combine(Path.GetDirectoryName ctx.savePath, $"{cleanName}_Instruments.wav") + Path.Combine(Path.GetDirectoryName ctx.savePath, $"{cleanName}_Vocals.wav") + ] + return { job with State = JobState.Done } + } + + let rec processJob (dbBuilder: unit -> DbContext, botConfig: Funogram.Types.BotConfig, config: BotConfig) (job: UserJob) = let updateAndContinue x = x |> updateJobState(dbBuilder) >>= processJob(dbBuilder, botConfig, config) @@ -122,8 +153,10 @@ let rec processJob (dbBuilder: unit -> DbContext, botConfig: Funogram.Types.BotC | JobState.Downloading -> do! processDownload args >>= updateAndContinue | JobState.Executing -> do! processExecuting args >>= updateAndContinue | JobState.UploadingResults -> do! processUploading args >>= updateAndContinue + | JobState.CleanUp -> do! processCleanUp args >>= updateAndContinue | JobState.Done -> Logging.logger.Information $"Job {job.Id} done" | JobState.Failed -> Logging.logger.Error $"Job {job.Id} failed" + | x -> do! failJob (job, getContext job) ($"Invalid state {x}") |> updateJobState(dbBuilder) |> Async.Ignore () } diff --git a/Keroosha.SilencerBot/Telegram.fs b/Keroosha.SilencerBot/Telegram.fs index 866a226..392fdbc 100644 --- a/Keroosha.SilencerBot/Telegram.fs +++ b/Keroosha.SilencerBot/Telegram.fs @@ -71,6 +71,7 @@ let createBotInbox (cfg: BotConfig, botCfg: Funogram.Types.BotConfig, dbFactory: match! inbox.Receive() with | VoiceRemove x -> let db = dbFactory() + use! trx = db.BeginTransactionAsync() |> Async.AwaitTask let! user = db.Users.FirstOrDefaultAsync(fun u -> u.TgId = x.chatId) |> Async.AwaitTask match box user with | null -> @@ -91,15 +92,18 @@ let createBotInbox (cfg: BotConfig, botCfg: Funogram.Types.BotConfig, dbFactory: WorkerId = Nullable() } do! db.InsertAsync(job) |> Async.AwaitTask |> Async.Ignore + do! trx.CommitAsync() |> Async.AwaitTask do! TgClient.makeRequestAsync botCfg <| Api.sendMessage x.chatId jobCreatedText |> Async.Ignore () | Start x -> let db = dbFactory() + use! trx = Async.AwaitTask <| db.BeginTransactionAsync() match! db.Users.AnyAsync(fun u -> u.TgId = x.id) |> Async.AwaitTask with | true -> () | false -> let user: User = { Id = Guid.NewGuid(); Name = x.name; TgId = x.id; ChatId = x.chatId } do! db.InsertAsync(user) |> Async.AwaitTask |> Async.Ignore + do! trx.CommitAsync() |> Async.AwaitTask do! TgClient.makeRequestAsync botCfg <| Api.sendMessage x.chatId greetingText |> Async.Ignore | Skip -> () with