408 lines
No EOL
16 KiB
FSharp
408 lines
No EOL
16 KiB
FSharp
module PublishHelperBot.YoutubeDl
|
||
|
||
open System
|
||
open System.Collections.Generic
|
||
open System.IO
|
||
open System.Net.Http
|
||
open System.Text
|
||
open System.Threading.Tasks
|
||
open Microsoft.FSharp.Core
|
||
open Newtonsoft.Json
|
||
open PublishHelperBot.Environment
|
||
open Telegram.Bot
|
||
open Telegram.Bot.Types.InputFiles
|
||
|
||
type ChatId = int64
|
||
|
||
type CreateYoutubeDlJob = {
|
||
url: string
|
||
savePath: string
|
||
}
|
||
|
||
type CreateYoutubeDlJobSuccess = {
|
||
task: Guid
|
||
}
|
||
|
||
type YoutubeDlStateResponse = {
|
||
state: string
|
||
}
|
||
|
||
type YoutubeDlError = {
|
||
message: string
|
||
}
|
||
|
||
type CreateJobResult = Result<CreateYoutubeDlJobSuccess, YoutubeDlError>
|
||
|
||
type CheckJobResult = Result<YoutubeDlStateResponse, YoutubeDlError>
|
||
|
||
type CleanJobResult = Result<YoutubeDlStateResponse, YoutubeDlError>
|
||
|
||
type IYoutubeDlClient =
|
||
abstract member CreateJob: CreateYoutubeDlJob -> Async<CreateJobResult>
|
||
abstract member CheckJob: externalId: Guid -> Async<CheckJobResult>
|
||
abstract member CleanJob: externalId: Guid -> Async<CleanJobResult>
|
||
|
||
type YoutubeDlClientConfig = {
|
||
BaseUrl: string
|
||
Client: HttpClient
|
||
}
|
||
|
||
[<RequireQualifiedAccess>]
|
||
module YoutubeDlClient =
|
||
|
||
type private YoutubeDlClientActions = Create | Check | Delete
|
||
|
||
type private HttpMethods = GET | POST | DELETE
|
||
|
||
type private Msg =
|
||
| CreateJob of CreateYoutubeDlJob * tcs: TaskCompletionSource<CreateJobResult>
|
||
| CheckJob of externalId: Guid * tcs: TaskCompletionSource<CheckJobResult>
|
||
| CleanJob of externalId: Guid * tcs: TaskCompletionSource<CleanJobResult>
|
||
|
||
let private getApiPrefix baseUrl = $"{baseUrl}api/"
|
||
|
||
let private resolvePath baseUrl (action: YoutubeDlClientActions) =
|
||
let apiPrefix = getApiPrefix baseUrl
|
||
match action with
|
||
| Create -> $"{apiPrefix}download"
|
||
| Check -> $"{apiPrefix}status"
|
||
| Delete -> $"{apiPrefix}clear"
|
||
|
||
let private doHttp
|
||
(method: HttpMethods)
|
||
(content: HttpContent)
|
||
(client: HttpClient)
|
||
(url: string): Async<Result<'TRes, YoutubeDlError>> =
|
||
task {
|
||
try
|
||
let! contentText = content.ReadAsStringAsync()
|
||
Logging.logger.Information("Sending request to youtube api, url = {Url}, content = {Content}", url, contentText)
|
||
let! res =
|
||
match method with
|
||
| POST -> client.PostAsync(url, content)
|
||
| GET -> client.GetAsync(url)
|
||
| DELETE -> client.DeleteAsync(url)
|
||
|
||
let! content = res.Content.ReadAsStringAsync()
|
||
Logging.logger.Information("Response from youtube api, response = {Response}", content)
|
||
return
|
||
match res.IsSuccessStatusCode with
|
||
| true ->
|
||
Logging.logger.Information("Response OK")
|
||
Ok (JsonConvert.DeserializeObject<'TRes>(content))
|
||
| false ->
|
||
Logging.logger.Error("Response network error")
|
||
Error { message = "Unknown network error" }
|
||
with ex ->
|
||
Logging.logger.Error(ex, "Youtube api error")
|
||
return Error { message = ex.Message }
|
||
} |> Async.AwaitTask
|
||
|
||
let private createInbox(config: YoutubeDlClientConfig) = MailboxProcessor.Start(fun inbox ->
|
||
let rec loop() =
|
||
async {
|
||
match! inbox.Receive() with
|
||
| CreateJob(args, tcs) ->
|
||
Logging.logger.Information("Received youtube api create job")
|
||
async {
|
||
try
|
||
let json = args |> JsonConvert.SerializeObject
|
||
Logging.logger.Information("Sending create job = {Job}", json)
|
||
use content = new StringContent(json, Encoding.UTF8, "application/json")
|
||
let! result =
|
||
(config.Client, resolvePath config.BaseUrl Create)
|
||
||> doHttp POST content
|
||
Logging.logger.Information("Received response from youtube api for create job")
|
||
tcs.SetResult(result)
|
||
|
||
with ex ->
|
||
Logging.logger.Error(ex, "Failed to create youtube api job")
|
||
tcs.SetResult(Error {
|
||
message = ex.Message
|
||
})
|
||
|
||
} |> Async.Start
|
||
|
||
return! loop ()
|
||
|
||
| CheckJob (externalId, tcs) ->
|
||
Logging.logger.Information("Received youtube api check job, externalId = {id}", externalId)
|
||
async {
|
||
try
|
||
let arg = [KeyValuePair("id", externalId.ToString())]
|
||
use content = new FormUrlEncodedContent(arg)
|
||
Logging.logger.Information("Sending youtube api check job")
|
||
let! query = content.ReadAsStringAsync() |> Async.AwaitTask
|
||
let! result =
|
||
(config.Client, $"{resolvePath config.BaseUrl Check}?{query}")
|
||
||> doHttp GET content
|
||
Logging.logger.Information("Received response from youtube api for check job")
|
||
tcs.SetResult(result)
|
||
|
||
with ex ->
|
||
Logging.logger.Error(ex, "Failed to check youtube api job")
|
||
tcs.SetResult(Error {
|
||
message = ex.Message
|
||
})
|
||
} |> Async.Start
|
||
|
||
return! loop ()
|
||
|
||
| CleanJob(externalId, tcs) ->
|
||
Logging.logger.Information("Received youtube api clean job, externalId = {id}", externalId)
|
||
async {
|
||
try
|
||
let arg = [KeyValuePair("id", externalId.ToString())]
|
||
use content = new FormUrlEncodedContent(arg)
|
||
Logging.logger.Information("Sending youtube api clean job")
|
||
let! query = content.ReadAsStringAsync() |> Async.AwaitTask
|
||
let! result =
|
||
(config.Client, $"{resolvePath config.BaseUrl Delete}?{query}")
|
||
||> doHttp DELETE content
|
||
Logging.logger.Information("Received response from youtube api for clean job")
|
||
tcs.SetResult(result)
|
||
|
||
with ex ->
|
||
Logging.logger.Error(ex, "Failed to clean youtube api job")
|
||
tcs.SetResult(Error {
|
||
message = ex.Message
|
||
})
|
||
} |> Async.Start
|
||
|
||
return! loop ()
|
||
}
|
||
loop ()
|
||
)
|
||
|
||
let createClient (config: YoutubeDlClientConfig) =
|
||
let inbox = createInbox(config)
|
||
{ new IYoutubeDlClient with
|
||
member this.CreateJob(args) =
|
||
let tcs = TaskCompletionSource<_>()
|
||
inbox.Post(CreateJob(args, tcs))
|
||
tcs.Task |> Async.AwaitTask
|
||
|
||
member this.CheckJob externalId =
|
||
let tcs = TaskCompletionSource<_>()
|
||
inbox.Post(CheckJob(externalId, tcs))
|
||
tcs.Task |> Async.AwaitTask
|
||
|
||
member this.CleanJob externalId =
|
||
let tcs = TaskCompletionSource<_>()
|
||
inbox.Post(CleanJob(externalId, tcs))
|
||
tcs.Task |> Async.AwaitTask }
|
||
|
||
type TgServiceConfig = {
|
||
Client: ITelegramBotClient
|
||
ChannelId: ChatId
|
||
AdminChatId: ChatId
|
||
YoutubeDlClient: IYoutubeDlClient
|
||
}
|
||
|
||
type ITgService =
|
||
abstract member PostVideo: url: string * savePath: string * externalId: Guid -> unit
|
||
|
||
[<RequireQualifiedAccess>]
|
||
module TgService =
|
||
type private Msg =
|
||
| PostVideo of url: string * savePath: string * externalId: Guid
|
||
|
||
let private createInbox (config: TgServiceConfig) =
|
||
MailboxProcessor.Start(fun inbox ->
|
||
let rec loop () =
|
||
async {
|
||
match! inbox.Receive() with
|
||
| PostVideo (url, savePath, externalId) ->
|
||
try
|
||
try
|
||
Logging.logger.Information("Reading file path = {path}", savePath)
|
||
use file = File.OpenRead(savePath)
|
||
if (file.Length / 1024L / 1024L) < 50L then
|
||
let input = InputOnlineFile(file, Path.GetRandomFileName())
|
||
let caption = $"Source: {url}"
|
||
Logging.logger.Information(
|
||
"Sending video to channel, channelId = {channelId}, caption = {caption}",
|
||
config.ChannelId,
|
||
caption)
|
||
do! config.Client.SendVideoAsync(
|
||
config.ChannelId,
|
||
input,
|
||
caption = caption
|
||
)
|
||
|> Async.AwaitTask |> Async.Ignore
|
||
else
|
||
do! config.Client.SendTextMessageAsync(config.AdminChatId, $"Да блять, видео вышло больше 50мб: {externalId}") |> Async.AwaitTask |> Async.Ignore
|
||
with ex ->
|
||
Logging.logger.Error(ex, "Failed to send video")
|
||
finally
|
||
Logging.logger.Information("Deleting file path = {path}", savePath)
|
||
File.Delete(savePath)
|
||
|
||
match! config.YoutubeDlClient.CleanJob(externalId) with
|
||
| Ok _ -> ()
|
||
| Error _ -> ()
|
||
return! loop ()
|
||
}
|
||
loop ()
|
||
)
|
||
|
||
let createService config =
|
||
let inbox = createInbox config
|
||
{ new ITgService with
|
||
member this.PostVideo(url, savePath, externalId) =
|
||
inbox.Post(PostVideo(url, savePath, externalId)) }
|
||
|
||
type IYoutubeDlService =
|
||
abstract member AddJob: url: string -> Async<Guid>
|
||
|
||
[<RequireQualifiedAccess>]
|
||
module YoutubeDlService =
|
||
type private Msg =
|
||
| AddJob of url: string * TaskCompletionSource<Guid>
|
||
| CheckJob
|
||
|
||
type private JobState =
|
||
| Created
|
||
| Awaiting of Guid
|
||
|
||
type private YoutubeDlJob = {
|
||
InternalId: Guid
|
||
State: JobState
|
||
Url: string
|
||
SavePath: string
|
||
}
|
||
|
||
let private createJob (client: IYoutubeDlClient) (job: YoutubeDlJob) =
|
||
async {
|
||
Logging.logger.Information("Sending create job to youtube client, job = {job}", job.InternalId)
|
||
let! result =
|
||
client.CreateJob {
|
||
url = job.Url
|
||
savePath = job.SavePath
|
||
}
|
||
match result with
|
||
| Ok task ->
|
||
Logging.logger.Information(
|
||
"Created job on youtube client = {job}, externalId = {externalId}",
|
||
job.InternalId,
|
||
task.task)
|
||
let updated = {
|
||
job with
|
||
State = JobState.Awaiting task.task
|
||
}
|
||
return Some updated
|
||
|
||
| Error e ->
|
||
Logging.logger.Error(
|
||
"Failed to create job on client = {job}, message = {message}",
|
||
job.InternalId,
|
||
e.message)
|
||
return None
|
||
}
|
||
|
||
let private getCurrentJob
|
||
(jobQueue: Queue<YoutubeDlJob>)
|
||
(current: YoutubeDlJob option) =
|
||
match current with
|
||
| Some current ->
|
||
Some current
|
||
|
||
| None ->
|
||
match jobQueue.TryDequeue() with
|
||
| true, job ->
|
||
Some job
|
||
| _ ->
|
||
None
|
||
|
||
let private createServiceInbox
|
||
(youtubeDlClient: IYoutubeDlClient)
|
||
(tgService: ITgService) =
|
||
|
||
MailboxProcessor.Start(fun inbox ->
|
||
let postCheck() =
|
||
async {
|
||
do! Async.Sleep(TimeSpan.FromSeconds 5)
|
||
inbox.Post(CheckJob)
|
||
} |> Async.Start
|
||
|
||
let rec loop
|
||
(jobQueue: Queue<YoutubeDlJob>)
|
||
(current: YoutubeDlJob option) =
|
||
async {
|
||
match! inbox.Receive() with
|
||
| AddJob (url, tcs) ->
|
||
Logging.logger.Information("Adding new url = {url}", url)
|
||
let id = Guid.NewGuid()
|
||
let job = {
|
||
InternalId = id
|
||
State = Created
|
||
Url = url
|
||
SavePath = $"{Path.GetTempFileName()}.mp4"
|
||
}
|
||
tcs.SetResult(id)
|
||
jobQueue.Enqueue(job)
|
||
|
||
if current.IsNone then
|
||
inbox.Post(CheckJob)
|
||
|
||
Logging.logger.Information(
|
||
"Added new job = {job}, url = {url}, path = {path}",
|
||
job.InternalId,
|
||
url,
|
||
job.SavePath)
|
||
return! loop jobQueue current
|
||
|
||
| CheckJob ->
|
||
let currentJob = getCurrentJob jobQueue current
|
||
match currentJob with
|
||
| Some job ->
|
||
Logging.logger.Information("Checking job = {job}, state = {state}", job.InternalId, job.State)
|
||
match job.State with
|
||
| Created ->
|
||
match! createJob youtubeDlClient job with
|
||
| Some job ->
|
||
postCheck()
|
||
return! loop jobQueue (Some job)
|
||
|
||
| None ->
|
||
postCheck()
|
||
return! loop jobQueue None
|
||
|
||
| Awaiting externalId ->
|
||
Logging.logger.Information("Checking job = {job}", externalId)
|
||
let! task = youtubeDlClient.CheckJob(externalId)
|
||
match task with
|
||
| Ok x when x.state.Equals("Finished", StringComparison.OrdinalIgnoreCase) ->
|
||
Logging.logger.Information("Sending post video from job = {job}", externalId)
|
||
tgService.PostVideo(job.Url, job.SavePath, externalId)
|
||
postCheck()
|
||
return! loop jobQueue None
|
||
|
||
| Error e ->
|
||
Logging.logger.Error(
|
||
"Failed to receive video from youtube client, job = {job}, message = {message}",
|
||
externalId,
|
||
e.message)
|
||
File.Delete(job.SavePath)
|
||
return! loop jobQueue None
|
||
|
||
| _ ->
|
||
Logging.logger.Information(
|
||
"Waiting for job to complete, job = {job}",
|
||
externalId)
|
||
postCheck()
|
||
return! loop jobQueue current
|
||
|
||
| None ->
|
||
return! loop jobQueue None
|
||
}
|
||
loop (Queue()) None
|
||
)
|
||
|
||
let createService youtubeDlClient tgService =
|
||
let inbox = createServiceInbox youtubeDlClient tgService
|
||
{ new IYoutubeDlService with
|
||
member this.AddJob(url) =
|
||
let tcs = TaskCompletionSource<_>()
|
||
inbox.Post(AddJob(url, tcs))
|
||
tcs.Task |> Async.AwaitTask } |