AsyncSeq (Polyglot)¶
In [ ]:
#!import ../../lib/fsharp/Notebooks.dib
#!import ../../lib/fsharp/Testing.dib
In [ ]:
#r @"../../../../../../../.nuget/packages/fsharp.control.asyncseq/3.2.1/lib/netstandard2.1/FSharp.Control.AsyncSeq.dll"
#r @"../../../../../../../.nuget/packages/system.reactive/6.0.1-preview.1/lib/net6.0/System.Reactive.dll"
#r @"../../../../../../../.nuget/packages/system.reactive.linq/6.0.1-preview.1/lib/netstandard2.0/System.Reactive.Linq.dll"
In [ ]:
#!import ../../lib/fsharp/Common.fs
#!import ../../lib/fsharp/Async.fs
In [ ]:
#if !INTERACTIVE
open Lib
#endif
In [ ]:
open Common
subscribeEvent¶
In [ ]:
let inline subscribeEvent (event: IEvent<'H, 'A>) map =
let observable = System.Reactive.Linq.Observable.FromEventPattern<'H, 'A>(event.AddHandler, event.RemoveHandler)
System.Reactive.Linq.Observable.Select (observable, fun event -> map event.EventArgs)
|> FSharp.Control.AsyncSeq.ofObservableBuffered
In [ ]:
//// test
type TestEvent () as self =
member val Calls = [] with get, set
member val Event = Event<ErrorEventHandler, ErrorEventArgs> () with get
member _.AddCall text =
self.Calls <- self.Calls @ [ text ]
member _.EventInterface =
{ new IEvent<ErrorEventHandler, ErrorEventArgs> with
member _.AddHandler handler =
self.AddCall "AddHandler"
self.Event.Publish.AddHandler handler
member _.RemoveHandler handler =
self.AddCall "RemoveHandler"
self.Event.Publish.RemoveHandler handler
member _.Subscribe observer =
self.AddCall "IObservable.Subscribe"
let disposable = self.Event.Publish.Subscribe observer
new_disposable (fun () ->
self.AddCall "IObservable.Dispose"
disposable.Dispose ()
)
}
member _.Subscribe () =
subscribeEvent
self.EventInterface
(fun args ->
let result = args.GetException () |> SpiralSm.format_exception
self.AddCall $"TestEvent.Subscribe({result})"
result
)
member _.Iter subscription =
subscription
|> FSharp.Control.AsyncSeq.iteriAsync (fun i error -> async {
self.AddCall $"TestEvent.Iter({i}: {error})"
})
member _.WaitCall text = async {
while self.Calls |> List.last <> text do
do! Async.SwitchToThreadPool ()
}
In [ ]:
//// test
let testEvent = TestEvent ()
async {
testEvent.AddCall "1"
let! child = testEvent.Subscribe () |> testEvent.Iter |> Async.StartChild
do! testEvent.WaitCall "AddHandler"
testEvent.AddCall "2"
do! child
testEvent.AddCall "3"
}
|> Async.runWithTimeout 300
|> _assertEqual None
testEvent.Calls
|> Seq.toList
|> _assertEqual [ "1"; "AddHandler"; "2"; "RemoveHandler" ]
00:00:01 d #1 Async.runWithTimeoutAsync / timeout: 300
<null>
["1"; "AddHandler"; "2"; "RemoveHandler"]
In [ ]:
//// test
let testEvent = TestEvent ()
async {
testEvent.AddCall "1"
let! child = testEvent.Subscribe () |> testEvent.Iter |> Async.StartChild
do! testEvent.WaitCall "AddHandler"
testEvent.AddCall "2"
use _ = testEvent.EventInterface.Subscribe (fun args ->
testEvent.AddCall $"testEvent.EventInterface.Subscribe({args})"
)
testEvent.AddCall "3"
do! child
testEvent.AddCall "4"
}
|> Async.runWithTimeout 300
|> _assertEqual None
testEvent.Calls
|> _assertEqual [ "1"; "AddHandler"; "2"; "IObservable.Subscribe"; "3"; "RemoveHandler"; "IObservable.Dispose" ]
00:00:01 d #2 Async.runWithTimeoutAsync / timeout: 300
<null>
["1"; "AddHandler"; "2"; "IObservable.Subscribe"; "3"; "RemoveHandler"; "IObservable.Dispose"]
In [ ]:
//// test
let testEvent = TestEvent ()
async {
testEvent.AddCall "1"
let! child = testEvent.Subscribe () |> testEvent.Iter |> Async.StartChild
do! testEvent.WaitCall "AddHandler"
testEvent.AddCall "2"
use _ = testEvent.EventInterface.Subscribe (fun args ->
async {
do! testEvent.WaitCall "TestEvent.Iter(0: System.Exception: error)"
testEvent.AddCall $"testEvent.EventInterface.Subscribe({args.GetException () |> SpiralSm.format_exception})"
}
|> Async.RunSynchronously
)
testEvent.AddCall "3"
testEvent.Event.Trigger (null, ErrorEventArgs (Exception "error"))
testEvent.AddCall "4"
do! child
testEvent.AddCall "5"
}
|> Async.runWithTimeout 300
|> _assertEqual None
testEvent.Calls
|> _assertEqual [
"1"
"AddHandler"
"2"
"IObservable.Subscribe"
"3"
"TestEvent.Subscribe(System.Exception: error)"
"TestEvent.Iter(0: System.Exception: error)"
"testEvent.EventInterface.Subscribe(System.Exception: error)"
"4"
"RemoveHandler"
"IObservable.Dispose"
]
00:00:02 d #3 Async.runWithTimeoutAsync / timeout: 300
<null>
["1"; "AddHandler"; "2"; "IObservable.Subscribe"; "3"; "TestEvent.Subscribe(System.Exception: error)";
"TestEvent.Iter(0: System.Exception: error)"; "testEvent.EventInterface.Subscribe(System.Exception: error)"; "4";
"RemoveHandler"; "IObservable.Dispose"]
subscribeToken¶
In [ ]:
let subscribeToken (token : System.Threading.CancellationToken) =
let tcs = new System.Threading.Tasks.TaskCompletionSource ()
System.Action tcs.SetResult |> token.Register |> ignore
let start = System.DateTime.Now.Ticks
FSharp.Control.AsyncSeq.unfoldAsync
(fun () -> async {
do! tcs.Task |> Async.AwaitTask
return Some (System.DateTime.Now.Ticks - start, ())
})
()
In [ ]:
//// test
let cts = new System.Threading.CancellationTokenSource ()
async {
let! child =
cts.Token
|> subscribeToken
|> FSharp.Control.AsyncSeq.tryFirst
|> Async.StartChild
do! Async.Sleep 100
cts.Cancel ()
return! child
}
|> Async.RunSynchronously
|> Option.get
|> fun x -> x > 900000
|> _assertEqual true
true