This commit is contained in:
Keroosha 2023-04-13 20:21:54 +03:00
parent 64f4086370
commit f9feb1f0e7
3 changed files with 52 additions and 14 deletions

View File

@ -17,7 +17,8 @@ type JobState =
| [<MapValue("Downloading")>] Downloading = 1 | [<MapValue("Downloading")>] Downloading = 1
| [<MapValue("Executing")>] Executing = 2 | [<MapValue("Executing")>] Executing = 2
| [<MapValue("UploadingResults")>] UploadingResults = 3 | [<MapValue("UploadingResults")>] UploadingResults = 3
| [<MapValue("Done")>] Done = 4 | [<MapValue("CleanUp")>] CleanUp = 4
| [<MapValue("Done")>] Done = 5
type JsonJobContext = { type JsonJobContext = {
fileId: String fileId: String

View File

@ -1,6 +1,7 @@
module Keroosha.SilencerBot.Processing module Keroosha.SilencerBot.Processing
open System open System
open System.Collections.Generic
open System.IO open System.IO
open System.Net.Http open System.Net.Http
open System.Text.Json open System.Text.Json
@ -9,6 +10,7 @@ open Funogram.Telegram.Types
open Keroosha.SilencerBot.Database open Keroosha.SilencerBot.Database
open Keroosha.SilencerBot.Env open Keroosha.SilencerBot.Env
open LinqToDB open LinqToDB
open Microsoft.FSharp.Collections
open Microsoft.FSharp.Control open Microsoft.FSharp.Control
module TgClient = Funogram.Tools.Api module TgClient = Funogram.Tools.Api
@ -40,16 +42,19 @@ let downloadFile (url: String, filePath: String) =
let private findJob (dbBuilder: unit -> DbContext, config: BotConfig) = let private findJob (dbBuilder: unit -> DbContext, config: BotConfig) =
task { task {
use db = dbBuilder() use db = dbBuilder()
use! __ = db.BeginTransactionAsync() use! trx = db.BeginTransactionAsync()
let! jobInProgress = db.UserJobs.FirstOrDefaultAsync(fun x -> x.WorkerId = config.processingWorkerId) let! jobInProgress = db.UserJobs.FirstOrDefaultAsync(
fun x -> x.WorkerId = config.processingWorkerId && x.State <> JobState.Failed && x.State <> JobState.Done)
match box jobInProgress with match box jobInProgress with
| null -> | null ->
let! job = db.UserJobs.FirstOrDefaultAsync(fun x -> x.State <> JobState.Failed && x.State <> JobState.Done) let! job = db.UserJobs.FirstOrDefaultAsync(
fun x -> x.State <> JobState.Failed && x.State <> JobState.Done && not x.WorkerId.HasValue)
match box job with match box job with
| null -> return None | null -> return None
| _ -> | _ ->
let jobWithWorkerId = { job with WorkerId = config.processingWorkerId } let jobWithWorkerId = { job with WorkerId = config.processingWorkerId }
let! __ = db.InsertOrReplaceAsync(jobWithWorkerId) let! __ = db.InsertOrReplaceAsync(jobWithWorkerId)
do! trx.CommitAsync()
return Some jobWithWorkerId return Some jobWithWorkerId
| _ -> return Some jobInProgress | _ -> return Some jobInProgress
} |> Async.AwaitTask } |> Async.AwaitTask
@ -57,8 +62,9 @@ let private findJob (dbBuilder: unit -> DbContext, config: BotConfig) =
let private updateJobState (dbBuilder: unit -> DbContext) (job: UserJob) = let private updateJobState (dbBuilder: unit -> DbContext) (job: UserJob) =
task { task {
use db = dbBuilder() use db = dbBuilder()
use! __ = db.BeginTransactionAsync() use! trx = db.BeginTransactionAsync()
let! __ = db.InsertOrReplaceAsync job let! __ = db.InsertOrReplaceAsync job
do! trx.CommitAsync() |> Async.AwaitTask
return job return job
} |> Async.AwaitTask } |> Async.AwaitTask
@ -89,8 +95,12 @@ let processExecuting (job: UserJob, botConfig: Funogram.Types.BotConfig, config:
async { async {
Logging.logger.Information $"Processing {job.Id} job" Logging.logger.Information $"Processing {job.Id} job"
let ctx = getContext job let ctx = getContext job
// let gpuFlag = if config.useGPU then "--gpu 0" else null let gpuFlag = if config.useGPU then ["--gpu 0"] else []
let args = ["inference.py"; "--input"; ctx.savePath; "--output_dir"; config.tempSavePath] let args = List.collect id <|
[
["inference.py"; "--input"; ctx.savePath; "--output_dir"; config.tempSavePath]
gpuFlag
]
let! stdout, stderr = runProc $"/usr/bin/python" args (Some config.processorWorkingPath) let! stdout, stderr = runProc $"/usr/bin/python" args (Some config.processorWorkingPath)
let ctxWithOutput = { ctx with stdout = stdout; stderr = stderr } let ctxWithOutput = { ctx with stdout = stdout; stderr = stderr }
return { job with return { job with
@ -103,15 +113,36 @@ let processUploading (job: UserJob, botConfig: Funogram.Types.BotConfig, config:
async { async {
let ctx = getContext job let ctx = getContext job
let cleanName = Path.GetFileNameWithoutExtension ctx.savePath let cleanName = Path.GetFileNameWithoutExtension ctx.savePath
let withoutVocalsPath = Path.Combine(Path.GetDirectoryName ctx.savePath, $"{cleanName}_Instruments.wav") let instrumentalPath = Path.Combine(Path.GetDirectoryName ctx.savePath, $"{cleanName}_Instruments.wav")
use f = File.OpenRead withoutVocalsPath let vocalsPath = Path.Combine(Path.GetDirectoryName ctx.savePath, $"{cleanName}_Vocals.wav")
Logging.logger.Information $"Uploading results for {job.Id} job" use fInstrumental = File.OpenRead instrumentalPath
use fVocals = File.OpenRead vocalsPath
let media = InputFile.File (Path.GetFileName withoutVocalsPath, f) let media = [|
let! res = TgClient.makeRequestAsync botConfig <| Api.sendAudio (ctx.chatId) (media) (0) InputFile.File (Path.GetFileName instrumentalPath, fInstrumental)
return { job with State = JobState.Done } InputFile.File (Path.GetFileName vocalsPath, fVocals)
|]
Logging.logger.Information $"Uploading results for {job.Id} job"
// TODO Error handling!
let uploadMedia (x: InputFile) = TgClient.makeRequestAsync botConfig <| Api.sendAudio (ctx.chatId) (x) (0)
do! media |> Seq.map uploadMedia |> Async.Sequential |> Async.Ignore
return { job with State = JobState.CleanUp }
} }
let processCleanUp (job: UserJob, botConfig: Funogram.Types.BotConfig, config: BotConfig) =
async {
let ctx = getContext job
let cleanName = Path.GetFileNameWithoutExtension ctx.savePath
List.iter File.Delete <| [
ctx.savePath
Path.Combine(Path.GetDirectoryName ctx.savePath, $"{cleanName}_Instruments.wav")
Path.Combine(Path.GetDirectoryName ctx.savePath, $"{cleanName}_Vocals.wav")
]
return { job with State = JobState.Done }
}
let rec processJob (dbBuilder: unit -> DbContext, botConfig: Funogram.Types.BotConfig, config: BotConfig) (job: UserJob) = let rec processJob (dbBuilder: unit -> DbContext, botConfig: Funogram.Types.BotConfig, config: BotConfig) (job: UserJob) =
let updateAndContinue x = x |> updateJobState(dbBuilder) >>= processJob(dbBuilder, botConfig, config) let updateAndContinue x = x |> updateJobState(dbBuilder) >>= processJob(dbBuilder, botConfig, config)
@ -122,8 +153,10 @@ let rec processJob (dbBuilder: unit -> DbContext, botConfig: Funogram.Types.BotC
| JobState.Downloading -> do! processDownload args >>= updateAndContinue | JobState.Downloading -> do! processDownload args >>= updateAndContinue
| JobState.Executing -> do! processExecuting args >>= updateAndContinue | JobState.Executing -> do! processExecuting args >>= updateAndContinue
| JobState.UploadingResults -> do! processUploading args >>= updateAndContinue | JobState.UploadingResults -> do! processUploading args >>= updateAndContinue
| JobState.CleanUp -> do! processCleanUp args >>= updateAndContinue
| JobState.Done -> Logging.logger.Information $"Job {job.Id} done" | JobState.Done -> Logging.logger.Information $"Job {job.Id} done"
| JobState.Failed -> Logging.logger.Error $"Job {job.Id} failed" | JobState.Failed -> Logging.logger.Error $"Job {job.Id} failed"
| x -> do! failJob (job, getContext job) ($"Invalid state {x}") |> updateJobState(dbBuilder) |> Async.Ignore
() ()
} }

View File

@ -71,6 +71,7 @@ let createBotInbox (cfg: BotConfig, botCfg: Funogram.Types.BotConfig, dbFactory:
match! inbox.Receive() with match! inbox.Receive() with
| VoiceRemove x -> | VoiceRemove x ->
let db = dbFactory() let db = dbFactory()
use! trx = db.BeginTransactionAsync() |> Async.AwaitTask
let! user = db.Users.FirstOrDefaultAsync(fun u -> u.TgId = x.chatId) |> Async.AwaitTask let! user = db.Users.FirstOrDefaultAsync(fun u -> u.TgId = x.chatId) |> Async.AwaitTask
match box user with match box user with
| null -> | null ->
@ -91,15 +92,18 @@ let createBotInbox (cfg: BotConfig, botCfg: Funogram.Types.BotConfig, dbFactory:
WorkerId = Nullable() WorkerId = Nullable()
} }
do! db.InsertAsync(job) |> Async.AwaitTask |> Async.Ignore do! db.InsertAsync(job) |> Async.AwaitTask |> Async.Ignore
do! trx.CommitAsync() |> Async.AwaitTask
do! TgClient.makeRequestAsync botCfg <| Api.sendMessage x.chatId jobCreatedText |> Async.Ignore do! TgClient.makeRequestAsync botCfg <| Api.sendMessage x.chatId jobCreatedText |> Async.Ignore
() ()
| Start x -> | Start x ->
let db = dbFactory() let db = dbFactory()
use! trx = Async.AwaitTask <| db.BeginTransactionAsync()
match! db.Users.AnyAsync(fun u -> u.TgId = x.id) |> Async.AwaitTask with match! db.Users.AnyAsync(fun u -> u.TgId = x.id) |> Async.AwaitTask with
| true -> () | true -> ()
| false -> | false ->
let user: User = { Id = Guid.NewGuid(); Name = x.name; TgId = x.id; ChatId = x.chatId } let user: User = { Id = Guid.NewGuid(); Name = x.name; TgId = x.id; ChatId = x.chatId }
do! db.InsertAsync(user) |> Async.AwaitTask |> Async.Ignore do! db.InsertAsync(user) |> Async.AwaitTask |> Async.Ignore
do! trx.CommitAsync() |> Async.AwaitTask
do! TgClient.makeRequestAsync botCfg <| Api.sendMessage x.chatId greetingText |> Async.Ignore do! TgClient.makeRequestAsync botCfg <| Api.sendMessage x.chatId greetingText |> Async.Ignore
| Skip -> () | Skip -> ()
with with