Async (Polyglot)¶
In [ ]:
#!import ../../lib/fsharp/Notebooks.dib
#!import ../../lib/fsharp/Testing.dib
In [ ]:
#!import ../../lib/fsharp/Common.fs
In [ ]:
#if !INTERACTIVE
open Lib
#endif
In [ ]:
open Common
choice¶
In [ ]:
let inline choice asyncs = async {
let e = Event<_> ()
use cts = new System.Threading.CancellationTokenSource ()
let fn =
asyncs
|> Seq.map (fun a -> async {
let! x = a
e.Trigger x
})
|> Async.Parallel
|> Async.Ignore
Async.Start (fn, cts.Token)
let! result = Async.AwaitEvent e.Publish
cts.Cancel ()
return result
}
map¶
In [ ]:
let inline map fn a = async {
let! x = a
return fn x
}
runWithTimeoutChoiceAsync¶
In [ ]:
let inline runWithTimeoutChoiceAsync (timeout : int) fn =
let _locals () = $"timeout: {timeout} / {_locals ()}"
let timeoutTask = async {
do! Async.Sleep timeout
trace Debug (fun () -> "runWithTimeoutChoiceAsync") _locals
return None
}
let task = async {
try
let! result = fn
return Some result
with
| :? System.AggregateException as ex when
ex.InnerExceptions
|> Seq.exists (function :? System.Threading.Tasks.TaskCanceledException -> true | _ -> false)
->
trace Warning
(fun () -> "runWithTimeoutChoiceAsync")
(fun () -> $"ex: {ex |> SpiralSm.format_exception} / {_locals ()}")
return None
| ex ->
trace Critical
(fun () -> "runWithTimeoutChoiceAsync")
(fun () -> $"ex: {ex |> SpiralSm.format_exception} / {_locals ()}")
return None
}
[ timeoutTask; task ]
|> choice
In [ ]:
let inline runWithTimeoutChoice timeout fn =
fn
|> runWithTimeoutChoiceAsync timeout
|> Async.RunSynchronously
In [ ]:
//// test
Async.Sleep 120
|> runWithTimeoutChoice 10
|> _assertEqual None
00:00:00 d #1 runWithTimeoutChoiceAsync / timeout: 10
<null>
In [ ]:
//// test
Async.Sleep 10
|> runWithTimeoutChoice 60
|> _assertEqual (Some ())
Some ()
In [ ]:
//// test
async {
raise (exn "error")
}
|> runWithTimeoutChoice 60
|> _assertEqual None
00:00:00 c #2 runWithTimeoutChoiceAsync / ex: System.Exception: error / timeout: 60
<null>
catch¶
In [ ]:
let inline catch a =
a
|> Async.Catch
|> map (function
| Choice1Of2 result -> Ok result
| Choice2Of2 ex -> Error ex
)
runWithTimeoutAsync¶
In [ ]:
let inline runWithTimeoutAsync (timeout : int) fn = async {
let _locals () = $"timeout: {timeout} / {_locals ()}"
let! child = Async.StartChild (fn, timeout)
return!
child
|> catch
|> map (function
| Ok result -> Some result
| Error (:? System.TimeoutException as ex) ->
trace Debug (fun () -> $"Async.runWithTimeoutAsync") _locals
None
| Error ex ->
trace Critical (fun () -> $"Async.runWithTimeoutAsync** / ex: %A{ex}") _locals
None
)
}
In [ ]:
let inline runWithTimeout timeout fn =
fn
|> runWithTimeoutAsync timeout
|> Async.RunSynchronously
In [ ]:
//// test
Async.Sleep 60
|> runWithTimeout 10
|> _assertEqual None
00:00:00 d #3 Async.runWithTimeoutAsync / timeout: 10
<null>
In [ ]:
//// test
Async.Sleep 10
|> runWithTimeout 60
|> _assertEqual (Some ())
Some ()
In [ ]:
//// test
async {
raise (exn "error")
}
|> runWithTimeout 60
|> _assertEqual None
00:00:01 c #4 Async.runWithTimeoutAsync** / ex: System.Exception: error
at FSI_0036.it@4-119.Invoke(Unit unitVar)
at Microsoft.FSharp.Control.AsyncPrimitives.CallThenInvoke[T,TResult](AsyncActivation`1 ctxt, TResult result1, FSharpFunc`2 part2) in D:\a\_work\1\s\src\FSharp.Core\async.fs:line 510
at Microsoft.FSharp.Control.Trampoline.Execute(FSharpFunc`2 firstAction) in D:\a\_work\1\s\src\FSharp.Core\async.fs:line 112
--- End of stack trace from previous location ---
at Microsoft.FSharp.Control.AsyncResult`1.Commit() in D:\a\_work\1\s\src\FSharp.Core\async.fs:line 454
at <StartupCode$FSharp-Core>.$Async.AwaitAndBindChildResult@1962-4.Invoke(Unit unitVar) in D:\a\_work\1\s\src\FSharp.Core\async.fs:line 1964
at Microsoft.FSharp.Control.AsyncPrimitives.CallThenInvoke[T,TResult](AsyncActivation`1 ctxt, TResult result1, FSharpFunc`2 part2) in D:\a\_work\1\s\src\FSharp.Core\async.fs:line 510
at Microsoft.FSharp.Control.Trampoline.Execute(FSharpFunc`2 firstAction) in D:\a\_work\1\s\src\FSharp.Core\async.fs:line 112 / timeout: 60
<null>
runWithTimeoutStrict¶
In [ ]:
let inline runWithTimeoutStrict (timeout : int) fn =
let _locals () = $"timeout: {timeout} / {_locals ()}"
let timeoutTask = async {
do! Async.Sleep timeout
return None, _locals
}
let task = async {
try
return Async.RunSynchronously (fn, timeout) |> Some, _locals
with
| :? System.TimeoutException as ex ->
let _locals () = $"ex: {ex |> SpiralSm.format_exception} / {_locals ()}"
return None, _locals
| ex ->
trace Critical
(fun () -> "Async.runWithTimeoutStrict / async error")
(fun () -> $"ex: {ex |> SpiralSm.format_exception} / {_locals ()}")
return raise ex
}
try
[| timeoutTask; task |]
|> Array.map Async.StartAsTask
|> System.Threading.Tasks.Task.WhenAny
|> fun task ->
match task.Result.Result with
| None, _locals ->
trace Debug (fun () -> "runWithTimeoutStrict") _locals
None
| result, _ -> result
with
| :? System.AggregateException as ex when
ex.InnerExceptions
|> Seq.exists (function :? System.Threading.Tasks.TaskCanceledException -> true | _ -> false)
->
trace Warning
(fun () -> "Async.runWithTimeoutStrict")
(fun () -> $"ex: {ex |> SpiralSm.format_exception} / {_locals ()}")
None
| ex ->
trace Critical
(fun () -> "Async.runWithTimeoutStrict / task error")
(fun () -> $"ex: {ex |> SpiralSm.format_exception} / {_locals ()}")
None
In [ ]:
//// test
Async.Sleep 60
|> runWithTimeoutStrict 10
|> _assertEqual None
00:00:01 d #5 runWithTimeoutStrict / ex: System.TimeoutException: The operation has timed out. / timeout: 10
<null>
In [ ]:
//// test
Async.Sleep 10
|> runWithTimeoutStrict 60
|> _assertEqual (Some ())
Some ()
In [ ]:
//// test
async {
raise (exn "error")
}
|> runWithTimeoutStrict 60
|> _assertEqual None
00:00:01 c #6 Async.runWithTimeoutStrict / async error / ex: System.Exception: error / timeout: 60 00:00:01 c #7 Async.runWithTimeoutStrict / task error / ex: System.AggregateException: One or more errors occurred. (error) / timeout: 60 <null>
awaitValueTask¶
In [ ]:
let inline awaitValueTaskUnit (task : System.Threading.Tasks.ValueTask) =
task.AsTask () |> Async.AwaitTask
let inline awaitValueTask (task : System.Threading.Tasks.ValueTask<_>) =
task.AsTask () |> Async.AwaitTask
init¶
In [ ]:
let inline init x = async {
return x
}
In [ ]:
//// test
init 1
|> Async.RunSynchronously
|> _assertEqual 1
1
withCancellationToken¶
In [ ]:
let inline withCancellationToken (ct : System.Threading.CancellationToken) fn =
Async.StartImmediateAsTask (fn, ct)
|> Async.AwaitTask
In [ ]:
//// test
let cts = new System.Threading.CancellationTokenSource ()
async {
let run = async {
do! Async.Sleep 100
return 1
}
let! child =
run
|> withCancellationToken cts.Token
|> catch
|> Async.StartChild
do! Async.Sleep 50
cts.Cancel ()
return! child
}
|> Async.RunSynchronously
|> Result.mapError _.Message
|> _assertEqual (Error ("A task was canceled."))
Error "A task was canceled."
retryAsync¶
In [ ]:
let inline retryAsync retries fn =
let rec loop retry lastError = async {
try
return!
if retry <= retries
then fn |> map Ok
else lastError |> Error |> init
with ex ->
trace Debug (fun () -> $"Async.retryAsync / retry: {retry}/{retries} / ex: {ex |> SpiralSm.format_exception}") _locals
do! Async.Sleep 30
return! loop (retry + 1) (ex |> SpiralSm.format_exception)
}
loop 1 "Async.retryAsync / invalid retries / retries: {retries}"
In [ ]:
//// test
let retry_fn_test = ref 0
async {
retry_fn_test.Value <- retry_fn_test.Value + 1
return retry_fn_test.Value
}
|> retryAsync 3
|> Async.RunSynchronously
|> _assertEqual (Ok 1)
Ok 1
In [ ]:
//// test
let retry_fn_test = ref 0
async {
return
if retry_fn_test.Value >= 2
then retry_fn_test.Value
else
retry_fn_test.Value <- retry_fn_test.Value + 1
failwith "test"
}
|> retryAsync 3
|> Async.RunSynchronously
|> _assertEqual (Ok 2)
00:00:01 d #8 Async.retryAsync / retry: 1/3 / ex: System.Exception: test 00:00:01 d #9 Async.retryAsync / retry: 2/3 / ex: System.Exception: test Ok 2
fold¶
In [ ]:
let fold folder state array =
let rec loop acc i =
async {
if i < Array.length array then
let! newAcc = folder acc array.[i]
return! loop newAcc (i + 1)
else
return acc
}
loop state 0