Async (Polyglot)¶

In [ ]:
#!import ../../lib/fsharp/Notebooks.dib
#!import ../../lib/fsharp/Testing.dib
In [ ]:
#!import ../../lib/fsharp/Common.fs
In [ ]:
#if !INTERACTIVE
open Lib
#endif
In [ ]:
open Common

choice¶

In [ ]:
let inline choice asyncs = async {
    let e = Event<_> ()
    use cts = new System.Threading.CancellationTokenSource ()
    let fn =
        asyncs
        |> Seq.map (fun a -> async {
            let! x = a
            e.Trigger x
        })
        |> Async.Parallel
        |> Async.Ignore
    Async.Start (fn, cts.Token)
    let! result = Async.AwaitEvent e.Publish
    cts.Cancel ()
    return result
}

map¶

In [ ]:
let inline map fn a = async {
    let! x = a
    return fn x
}

runWithTimeoutChoiceAsync¶

In [ ]:
let inline runWithTimeoutChoiceAsync (timeout : int) fn =
    let _locals () = $"timeout: {timeout} / {_locals ()}"

    let timeoutTask = async {
        do! Async.Sleep timeout
        trace Debug (fun () -> "runWithTimeoutChoiceAsync") _locals
        return None
    }

    let task = async {
        try
            let! result = fn
            return Some result
        with
        | :? System.AggregateException as ex when
            ex.InnerExceptions
            |> Seq.exists (function :? System.Threading.Tasks.TaskCanceledException -> true | _ -> false)
            ->
            trace Warning
                (fun () -> "runWithTimeoutChoiceAsync")
                (fun () -> $"ex: {ex |> SpiralSm.format_exception} / {_locals ()}")
            return None
        | ex ->
            trace Critical
                (fun () -> "runWithTimeoutChoiceAsync")
                (fun () -> $"ex: {ex |> SpiralSm.format_exception} / {_locals ()}")
            return None
    }

    [ timeoutTask; task ]
    |> choice
In [ ]:
let inline runWithTimeoutChoice timeout fn =
    fn
    |> runWithTimeoutChoiceAsync timeout
    |> Async.RunSynchronously
In [ ]:
//// test

Async.Sleep 120
|> runWithTimeoutChoice 10
|> _assertEqual None
00:00:00 d #1 runWithTimeoutChoiceAsync / timeout: 10
<null>

In [ ]:
//// test

Async.Sleep 10
|> runWithTimeoutChoice 60
|> _assertEqual (Some ())
Some ()

In [ ]:
//// test

async {
    raise (exn "error")
}
|> runWithTimeoutChoice 60
|> _assertEqual None
00:00:00 c #2 runWithTimeoutChoiceAsync / ex: System.Exception: error / timeout: 60
<null>

catch¶

In [ ]:
let inline catch a =
    a
    |> Async.Catch
    |> map (function
        | Choice1Of2 result -> Ok result
        | Choice2Of2 ex -> Error ex
    )

runWithTimeoutAsync¶

In [ ]:
let inline runWithTimeoutAsync (timeout : int) fn = async {
    let _locals () = $"timeout: {timeout} / {_locals ()}"
    let! child = Async.StartChild (fn, timeout)
    return!
        child
        |> catch
        |> map (function
            | Ok result -> Some result
            | Error (:? System.TimeoutException as ex) ->
                trace Debug (fun () -> $"Async.runWithTimeoutAsync") _locals
                None
            | Error ex ->
                trace Critical (fun () -> $"Async.runWithTimeoutAsync** / ex: %A{ex}") _locals
                None
        )
}
In [ ]:
let inline runWithTimeout timeout fn =
    fn
    |> runWithTimeoutAsync timeout
    |> Async.RunSynchronously
In [ ]:
//// test

Async.Sleep 60
|> runWithTimeout 10
|> _assertEqual None
00:00:00 d #3 Async.runWithTimeoutAsync / timeout: 10
<null>

In [ ]:
//// test

Async.Sleep 10
|> runWithTimeout 60
|> _assertEqual (Some ())
Some ()

In [ ]:
//// test

async {
    raise (exn "error")
}
|> runWithTimeout 60
|> _assertEqual None
00:00:01 c #4 Async.runWithTimeoutAsync** / ex: System.Exception: error
   at FSI_0041.it@4-119.Invoke(Unit unitVar)
   at Microsoft.FSharp.Control.AsyncPrimitives.CallThenInvoke[T,TResult](AsyncActivation`1 ctxt, TResult result1, FSharpFunc`2 part2) in D:\a\_work\1\s\src\FSharp.Core\async.fs:line 509
   at Microsoft.FSharp.Control.Trampoline.Execute(FSharpFunc`2 firstAction) in D:\a\_work\1\s\src\FSharp.Core\async.fs:line 112
--- End of stack trace from previous location ---
   at Microsoft.FSharp.Control.AsyncResult`1.Commit() in D:\a\_work\1\s\src\FSharp.Core\async.fs:line 453
   at <StartupCode$FSharp-Core>.$Async.AwaitAndBindChildResult@1948-4.Invoke(Unit unitVar) in D:\a\_work\1\s\src\FSharp.Core\async.fs:line 1950
   at Microsoft.FSharp.Control.AsyncPrimitives.CallThenInvoke[T,TResult](AsyncActivation`1 ctxt, TResult result1, FSharpFunc`2 part2) in D:\a\_work\1\s\src\FSharp.Core\async.fs:line 509
   at Microsoft.FSharp.Control.Trampoline.Execute(FSharpFunc`2 firstAction) in D:\a\_work\1\s\src\FSharp.Core\async.fs:line 112 / timeout: 60
<null>

runWithTimeoutStrict¶

In [ ]:
let inline runWithTimeoutStrict (timeout : int) fn =
    let _locals () = $"timeout: {timeout} / {_locals ()}"

    let timeoutTask = async {
        do! Async.Sleep timeout
        return None, _locals
    }

    let task = async {
        try
            return Async.RunSynchronously (fn, timeout) |> Some, _locals
        with
        | :? System.TimeoutException as ex ->
            let _locals () = $"ex: {ex |> SpiralSm.format_exception} / {_locals ()}"
            return None, _locals
        | ex ->
            trace Critical
                (fun () -> "Async.runWithTimeoutStrict / async error")
                (fun () -> $"ex: {ex |> SpiralSm.format_exception} / {_locals ()}")
            return raise ex
    }

    try
        [| timeoutTask; task |]
        |> Array.map Async.StartAsTask
        |> System.Threading.Tasks.Task.WhenAny
        |> fun task ->
            match task.Result.Result with
            | None, _locals ->
                trace Debug (fun () -> "runWithTimeoutStrict") _locals
                None
            | result, _ -> result
    with
    | :? System.AggregateException as ex when
        ex.InnerExceptions
        |> Seq.exists (function :? System.Threading.Tasks.TaskCanceledException -> true | _ -> false)
        ->
        trace Warning
            (fun () -> "Async.runWithTimeoutStrict")
            (fun () -> $"ex: {ex |> SpiralSm.format_exception} / {_locals ()}")
        None
    | ex ->
        trace Critical
            (fun () -> "Async.runWithTimeoutStrict / task error")
            (fun () -> $"ex: {ex |> SpiralSm.format_exception} / {_locals ()}")
        None
In [ ]:
//// test

Async.Sleep 60
|> runWithTimeoutStrict 10
|> _assertEqual None
00:00:01 d #5 runWithTimeoutStrict / timeout: 10
<null>

In [ ]:
//// test

Async.Sleep 10
|> runWithTimeoutStrict 60
|> _assertEqual (Some ())
Some ()

In [ ]:
//// test

async {
    raise (exn "error")
}
|> runWithTimeoutStrict 60
|> _assertEqual None
00:00:01 c #6 Async.runWithTimeoutStrict / async error / ex: System.Exception: error / timeout: 60
00:00:01 c #7 Async.runWithTimeoutStrict / task error / ex: System.AggregateException: One or more errors occurred. (error) / timeout: 60
<null>

awaitValueTask¶

In [ ]:
let inline awaitValueTaskUnit (task : System.Threading.Tasks.ValueTask) =
    task.AsTask () |> Async.AwaitTask

let inline awaitValueTask (task : System.Threading.Tasks.ValueTask<_>) =
    task.AsTask () |> Async.AwaitTask

init¶

In [ ]:
let inline init x = async {
    return x
}
In [ ]:
//// test

init 1
|> Async.RunSynchronously
|> _assertEqual 1
1

withCancellationToken¶

In [ ]:
let inline withCancellationToken (ct : System.Threading.CancellationToken) fn =
    Async.StartImmediateAsTask (fn, ct)
    |> Async.AwaitTask
In [ ]:
//// test

let cts = new System.Threading.CancellationTokenSource ()

async {
    let run = async {
        do! Async.Sleep 100
        return 1
    }

    let! child =
        run
        |> withCancellationToken cts.Token
        |> catch
        |> Async.StartChild

    do! Async.Sleep 50
    cts.Cancel ()
    return! child
}
|> Async.RunSynchronously
|> Result.mapError _.Message
|> _assertEqual (Error ("A task was canceled."))
Error "A task was canceled."

retryAsync¶

In [ ]:
let inline retryAsync retries fn =
    let rec 루프 retry lastError = async {
        try
            return!
                if retry <= retries
                then fn |> map Ok
                else lastError |> Error |> init
        with ex ->
            trace Debug (fun () -> $"Async.retryAsync / retry: {retry}/{retries} / ex: {ex |> SpiralSm.format_exception}") _locals
            do! Async.Sleep 30
            return! 루프 (retry + 1) (ex |> SpiralSm.format_exception)
    }
    루프 1 "Async.retryAsync / invalid retries / retries: {retries}"
In [ ]:
//// test

let retry_fn_test = ref 0
async {
    retry_fn_test.Value <- retry_fn_test.Value + 1
    return retry_fn_test.Value
}
|> retryAsync 3
|> Async.RunSynchronously
|> _assertEqual (Ok 1)
Ok 1

In [ ]:
//// test

let retry_fn_test = ref 0
async {
    return
        if retry_fn_test.Value >= 2
        then retry_fn_test.Value
        else
            retry_fn_test.Value <- retry_fn_test.Value + 1
            failwith "test"
}
|> retryAsync 3
|> Async.RunSynchronously
|> _assertEqual (Ok 2)
00:00:01 d #8 Async.retryAsync / retry: 1/3 / ex: System.Exception: test
00:00:01 d #9 Async.retryAsync / retry: 2/3 / ex: System.Exception: test
Ok 2

fold¶

In [ ]:
let fold folder state array =
    let rec 루프 acc i =
        async {
            if i < Array.length array then
                let! newAcc = folder acc array.[i]
                return! 루프 newAcc (i + 1)
            else
                return acc
        }
    루프 state 0