Initial runner

This commit is contained in:
Keroosha 2023-04-13 03:58:26 +03:00
parent b4658c4559
commit e7ecfe2849
9 changed files with 329 additions and 26 deletions

2
.gitignore vendored
View File

@ -563,3 +563,5 @@ dist
# End of https://www.toptal.com/developers/gitignore/api/csharp,fsharp,node,visualstudiocode
Keroosha.SilencerBot/token
Keroosha.SilencerBot/config.local.json

View File

@ -1,7 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$/.." vcs="Git" />
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

View File

@ -1,6 +1,7 @@
module Keroosha.SilencerBot.Database
open System
open System.Transactions
open FluentMigrator
open FluentMigrator.Runner;
open LinqToDB
@ -17,30 +18,45 @@ type JobState =
| [<MapValue("Executing")>] Executing = 2
| [<MapValue("UploadingResults")>] UploadingResults = 3
| [<MapValue("Done")>] Done = 4
type JsonJobContext = {
fileId: String
chatId: int64
savePath: String
stdout: String
stderr: String
}
[<CLIMutable>]
[<Table("Users")>]
[<NoComparison; NoEquality>]
type User = {
[<Column>] Id: Guid
[<Column>] TgId: int64
[<Column>] Name: string
[<PrimaryKey>] Id: Guid
[<Column>] TgId : int64
[<Column>] ChatId : int64
[<Column>] Name : string
}
[<CLIMutable>]
[<Table("UserJobs")>]
type UserJobs = {
[<Column>] Id: Guid
[<NoComparison; NoEquality>]
type UserJob = {
[<PrimaryKey>] Id: Guid
[<Column>] UserId: Guid
[<Column>] State: JobState
[<Column(DataType = DataType.BinaryJson)>] Context: string
[<Column>] WorkerId: Guid Nullable
[<Column(DataType = DataType.BinaryJson)>] Context: String
}
type DbContext(connectionString: string, provider: IDataProvider) =
type DbContext(connectionString: String, provider: IDataProvider) =
inherit DataConnection(provider, connectionString)
member this.Users = this.GetTable<User>();
member this.UserJobs = this.GetTable<UserJobs>();
member this.UserJobs = this.GetTable<UserJob>();
let migrateApp (connectionString: string) =
let migrateApp (connectionString: String) =
use serviceProvider =
ServiceCollection()
.AddFluentMigratorCore()
@ -64,12 +80,14 @@ type InitialMigration() =
this.Create.Table("Users")
.WithColumn("Id").AsGuid().PrimaryKey()
.WithColumn("TgId").AsInt64().NotNullable()
.WithColumn("ChatId").AsInt64().NotNullable()
.WithColumn("Name").AsString().NotNullable()
|> ignore
this.Create.Table("UserJobs")
.WithColumn("Id").AsGuid().PrimaryKey()
.WithColumn("UserId").AsGuid().ForeignKey("Users", "Id")
.WithColumn("State").AsString().NotNullable().WithDefaultValue("New")
.WithColumn("WorkerId").AsGuid().Nullable()
.WithColumn("Context").AsCustom("JSONB").NotNullable()
|> ignore
()

View File

@ -1,7 +1,10 @@
module Keroosha.SilencerBot.Env
open System
open System.Diagnostics
open System.IO
open System.Text
open System.Text.Json
open McMaster.Extensions.CommandLineUtils
open Serilog
[<RequireQualifiedAccess>]
@ -13,6 +16,9 @@ module Logging =
type public BotConfig = {
tempSavePath: string
connectionString: string
processingWorkerId: Guid
processorWorkingPath: String
useGPU: bool
}
let private readConfig = File.ReadAllText >> JsonSerializer.Deserialize<BotConfig>
@ -24,4 +30,48 @@ let public createConfig (name: string) =
ApplicationException("Missing config path env") |> raise
| path ->
Logging.logger.Information("Read config from env")
readConfig path
readConfig path
// http://fssnip.net/sw/1
// Modified to be non-blocking (async) as fck!
let runProc filename args startDir =
async {
let timer = Stopwatch.StartNew()
let procStartInfo =
ProcessStartInfo(
RedirectStandardOutput = true,
RedirectStandardError = true,
UseShellExecute = false,
FileName = filename,
Arguments = ArgumentEscaper.EscapeAndConcatenate args
)
match startDir with | Some d -> procStartInfo.WorkingDirectory <- d | _ -> ()
let outputs = System.Collections.Generic.List<string>()
let errors = System.Collections.Generic.List<string>()
let outputHandler f (_sender:obj) (args:DataReceivedEventArgs) = f args.Data
let p = new Process(StartInfo = procStartInfo)
p.OutputDataReceived.AddHandler(DataReceivedEventHandler (outputHandler outputs.Add))
p.ErrorDataReceived.AddHandler(DataReceivedEventHandler (outputHandler errors.Add))
let started =
try
p.Start()
with | ex ->
ex.Data.Add("filename", filename)
reraise()
if not started then failwithf "Failed to start process %s" filename
Logging.logger.Information $"Started {p.ProcessName} with pid {p.Id}"
p.BeginOutputReadLine()
p.BeginErrorReadLine()
do! p.WaitForExitAsync() |> Async.AwaitTask
timer.Stop()
Logging.logger.Information $"Finished {filename} after {timer.ElapsedMilliseconds} milliseconds"
let joinA (x: String array) = String.Join ("\n", x)
let cleanOut l = l
|> Seq.filter (fun o -> String.IsNullOrEmpty o |> not)
|> Seq.toArray
|> joinA
return (cleanOut outputs, cleanOut errors)
}

View File

@ -6,22 +6,26 @@
</PropertyGroup>
<ItemGroup>
<Compile Include="Env.fs" />
<Compile Include="Database.fs" />
<Compile Include="Telegram.fs" />
<Compile Include="Env.fs" />
<Compile Include="Processing.fs" />
<Compile Include="Program.fs" />
<Content Include="config.example.json" />
<Content Include="config.local.json" />
<Content Include="token" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Funogram" Version="2.0.6" />
<PackageReference Include="Funogram.Telegram" Version="6.5.0.1" />
<PackageReference Include="linq2db.PostgreSQL" Version="5.1.1" />
<PackageReference Include="linq2db.PostgreSQL" Version="4.4.1" />
<PackageReference Include="FluentMigrator" Version="3.3.2" />
<PackageReference Include="FluentMigrator.Runner" Version="3.3.2" />
<PackageReference Include="FluentMigrator.Runner.Postgres" Version="3.3.2" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.1.0" />
<PackageReference Include="System.Text.Json" Version="7.0.2" />
<PackageReference Include="McMaster.Extensions.CommandLineUtils" Version="4.0.2" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,152 @@
module Keroosha.SilencerBot.Processing
open System
open System.IO
open System.Net.Http
open System.Text.Json
open Funogram.Telegram
open Funogram.Telegram.Types
open Keroosha.SilencerBot.Database
open Keroosha.SilencerBot.Env
open LinqToDB
open Microsoft.FSharp.Control
module TgClient = Funogram.Tools.Api
let http = new HttpClient()
let inline private (>>=) a b = (a, b) |> async.Bind
let getContext (x: UserJob) = x.Context |> JsonSerializer.Deserialize<JsonJobContext>
let serializeContext (x: JsonJobContext) = x |> JsonSerializer.Serialize<JsonJobContext>
let downloadUrl token path = $"https://api.telegram.org/file/bot{token}/{path}"
let packAudio name stream =
{
InputMediaAudio.Media = InputFile.File(name, stream)
Thumb = None
Caption = None
ParseMode = None
CaptionEntities = None
Duration = None
Performer = None
Title = Some name
Type = "Audio"
} |> InputMedia.Audio
let failJob (x: UserJob, ctx: JsonJobContext) (errMessage: String) =
{ x with
State = JobState.Failed
Context = JsonSerializer.Serialize({ ctx with stderr = errMessage })
}
let downloadFile (url: String, filePath: String) =
task {
try
use file = File.OpenWrite(filePath)
use! request = http.GetStreamAsync(url)
do! file |> request.CopyToAsync
return Ok ()
with
| ex -> return Error ex.Message
} |> Async.AwaitTask
let private findJob (dbBuilder: unit -> DbContext, config: BotConfig) =
task {
use db = dbBuilder()
use! __ = db.BeginTransactionAsync()
let! jobInProgress = db.UserJobs.FirstOrDefaultAsync(fun x -> x.WorkerId = config.processingWorkerId)
match box jobInProgress with
| null ->
let! job = db.UserJobs.FirstOrDefaultAsync(fun x -> x.State <> JobState.Failed && x.State <> JobState.Done)
match box job with
| null -> return None
| _ ->
let jobWithWorkerId = { job with WorkerId = config.processingWorkerId }
let! __ = db.InsertOrReplaceAsync(jobWithWorkerId)
return Some jobWithWorkerId
| _ -> return Some jobInProgress
} |> Async.AwaitTask
let private updateJobState (dbBuilder: unit -> DbContext) (job: UserJob) =
task {
use db = dbBuilder()
use! __ = db.BeginTransactionAsync()
let! __ = db.InsertOrReplaceAsync job
return job
} |> Async.AwaitTask
let processNew (job: UserJob, botConfig: Funogram.Types.BotConfig, config: BotConfig) =
async {
Logging.logger.Information $"Accepted {job.Id} job"
return { job with State = JobState.Downloading }
}
let processDownload (job: UserJob, botConfig: Funogram.Types.BotConfig, config: BotConfig) =
async {
Logging.logger.Information $"Downloading {job.Id} job"
let ctx = getContext job
let! res = TgClient.makeRequestAsync botConfig <| Api.getFile ctx.fileId
match res with
| Ok x when x.FilePath.IsNone ->
return (job, ctx) |> failJob <| "file doesnt exist"
| Ok x ->
let url = downloadUrl botConfig.Token x.FilePath.Value
match! downloadFile (url, ctx.savePath) with
| Ok _ -> return { job with State = JobState.Executing }
| Error text -> return (job, ctx) |> failJob <| text
| Error x ->
return (job, ctx) |> failJob <| x.Description
}
let processExecuting (job: UserJob, botConfig: Funogram.Types.BotConfig, config: BotConfig) =
async {
Logging.logger.Information $"Processing {job.Id} job"
let ctx = getContext job
// let gpuFlag = if config.useGPU then "--gpu 0" else null
let args = ["inference.py"; "--input"; ctx.savePath; "--output_dir"; config.tempSavePath]
let! stdout, stderr = runProc $"/usr/bin/python" args (Some config.processorWorkingPath)
let ctxWithOutput = { ctx with stdout = stdout; stderr = stderr }
return { job with
State = JobState.UploadingResults
Context = serializeContext ctxWithOutput
}
}
let processUploading (job: UserJob, botConfig: Funogram.Types.BotConfig, config: BotConfig) =
async {
let ctx = getContext job
let cleanName = Path.GetFileNameWithoutExtension ctx.savePath
let withoutVocalsPath = Path.Combine(Path.GetDirectoryName ctx.savePath, $"{cleanName}_Instrumental.wav")
use f = File.OpenRead withoutVocalsPath
Logging.logger.Information $"Uploading results for {job.Id} job"
let media = InputFile.File (Path.GetFileName withoutVocalsPath, f)
let! res = TgClient.makeRequestAsync botConfig <| Api.sendAudio (ctx.chatId) (media) (0)
return { job with State = JobState.Done }
}
let rec processJob (dbBuilder: unit -> DbContext, botConfig: Funogram.Types.BotConfig, config: BotConfig) (job: UserJob) =
let updateAndContinue x = x |> updateJobState(dbBuilder) >>= processJob(dbBuilder, botConfig, config)
let args = (job, botConfig, config)
async {
match job.State with
| JobState.New -> do! processNew args >>= updateAndContinue
| JobState.Downloading -> do! processDownload args >>= updateAndContinue
| JobState.Executing -> do! processExecuting args >>= updateAndContinue
| JobState.UploadingResults -> do! processUploading args >>= updateAndContinue
| JobState.Done -> Logging.logger.Information $"Job {job.Id} done"
| JobState.Failed -> Logging.logger.Error $"Job {job.Id} failed"
()
}
let rec processingMain (dbBuilder: unit -> DbContext, appConfig: BotConfig, tgConfig: Funogram.Types.BotConfig) =
async {
try
match! findJob(dbBuilder, appConfig) with
| Some x -> do! (dbBuilder, tgConfig, appConfig) |> processJob <| x
| None -> ()
do! 150 |> Async.Sleep
do! (dbBuilder, appConfig, tgConfig) |> processingMain
with
| ex -> Logging.logger.Error $"{ex.Message}\n{ex.StackTrace}"
}

View File

@ -9,16 +9,17 @@ open Keroosha.SilencerBot.Telegram
let config = Env.createConfig "SILENCER_BOT_CONFIG_PATH"
let botConfig = Config.defaultConfig |> Config.withReadTokenFromFile
let ctxFactory = fun () -> Database.createContext <| config.connectionString
Database.migrateApp config.connectionString
let botInbox = createBotInbox <| (botConfig, ctxFactory)
let botInbox = createBotInbox <| (config, botConfig, ctxFactory)
let handleUpdate (ctx: UpdateContext) = resolveUpdate ctx |> botInbox.Post
Console.CancelKeyPress |> Event.add (fun _ -> Environment.Exit <| 0)
Processing.processingMain <| (ctxFactory, config, botConfig) |> Async.Start
async {
let! _ = Api.makeRequestAsync botConfig <| Api.deleteWebhookBase()
return! startBot botConfig handleUpdate None

View File

@ -1,35 +1,109 @@
module Keroosha.SilencerBot.Telegram
open System
open System.IO
open System.Text.Json
open Funogram.Telegram
open Funogram.Telegram.Bot
open Funogram.Telegram.Types
open Funogram.Types
open Keroosha.SilencerBot.Env
open LinqToDB
open Keroosha.SilencerBot.Database
open Microsoft.FSharp.Control
module TgClient = Funogram.Tools.Api
type VoiceRemoveArgs = {
fileId: string
filename: string
chatId: int64
}
type StartArgs = {
id: int64
chatId: int64
name: String
}
type Features =
| VoiceRemove of VoiceRemoveArgs
| Unknown
| Start of StartArgs
| Skip
let runDate = DateTime.UtcNow
let greetingText = "Привет, отправь мне вавку и я попробую убрать из нее голос"
let jobCreatedText = "Запустил задачу, обрабатываю"
let unknownUserText = "Прости, но мы с тобой не знакомы, отправь мне в ЛС /start"
let isVoiceRemoveAction(update: Update) =
update.Message.IsSome && update.Message.Value.Audio.IsSome
update.Message.IsSome &&
update.Message.Value.Audio.IsSome &&
update.Message.Value.From.IsSome &&
update.Message.Value.Audio.Value.FileName.IsSome
let isStartCommand (update: Update) =
update.Message.IsSome &&
update.Message.Value.Text.IsSome &&
update.Message.Value.From.IsSome &&
update.Message.Value.Text.Value.StartsWith "/start"
let resolveUpdate (ctx: UpdateContext) =
match ctx.Update with
| x when x.Message.IsSome && x.Message.Value.Date < runDate -> Skip
| x when isVoiceRemoveAction x ->
VoiceRemove { fileId = x.Message.Value.Audio.Value.FileId; chatId = x.Message.Value.Chat.Id }
| _ -> Unknown
VoiceRemove {
fileId = x.Message.Value.Audio.Value.FileId
chatId = x.Message.Value.From.Value.Id
filename = x.Message.Value.Audio.Value.FileName.Value
}
| x when isStartCommand x ->
Start {
id = x.Message.Value.From.Value.Id
chatId = x.Message.Value.Chat.Id
name = x.Message.Value.From.Value.FirstName
}
| _ -> Skip
let createBotInbox (cfg: BotConfig, db: unit -> DbContext) = MailboxProcessor.Start(fun (inbox) ->
let createBotInbox (cfg: BotConfig, botCfg: Funogram.Types.BotConfig, dbFactory: unit -> DbContext) = MailboxProcessor.Start(fun (inbox) ->
let rec loop () =
async {
match! inbox.Receive() with
| VoiceRemove x -> ()
| Unknown -> ()
try
match! inbox.Receive() with
| VoiceRemove x ->
let db = dbFactory()
let! user = db.Users.FirstOrDefaultAsync(fun u -> u.TgId = x.chatId) |> Async.AwaitTask
match box user with
| null ->
do! TgClient.makeRequestAsync botCfg <| Api.sendMessage x.chatId unknownUserText |> Async.Ignore
()
| _ ->
let jobContext: JsonJobContext = {
stderr = ""
stdout = ""
chatId = x.chatId
fileId = x.fileId
savePath = Path.Combine(cfg.tempSavePath, x.filename)
}
let job: UserJob = { Id = Guid.NewGuid()
State = JobState.New
UserId = user.Id
Context = JsonSerializer.Serialize(jobContext)
WorkerId = Nullable()
}
do! db.InsertAsync(job) |> Async.AwaitTask |> Async.Ignore
do! TgClient.makeRequestAsync botCfg <| Api.sendMessage x.chatId jobCreatedText |> Async.Ignore
()
| Start x ->
let db = dbFactory()
match! db.Users.AnyAsync(fun u -> u.TgId = x.id) |> Async.AwaitTask with
| true -> ()
| false ->
let user: User = { Id = Guid.NewGuid(); Name = x.name; TgId = x.id; ChatId = x.chatId }
do! db.InsertAsync(user) |> Async.AwaitTask |> Async.Ignore
do! TgClient.makeRequestAsync botCfg <| Api.sendMessage x.chatId greetingText |> Async.Ignore
| Skip -> ()
with
| ex -> Logging.logger.Error $"\n{ex.Message}\n{ex.StackTrace}"
return! loop ()
}
loop ()

View File

@ -1,4 +1,7 @@
{
"connectionString": "Server=127.0.0.1;User id=postgres;password=postgres;database=silencer-bot",
"tempSavePath": "/tmp"
"tempSavePath": "/tmp",
"processingWorkerId": "7632B698-FA2C-431C-90D4-E4562AEE0B0A",
"processorWorkingPath": "external/vocal-remover",
"useGPU": false
}