AsyncSeq (Polyglot)¶

In [ ]:
#!import ../../lib/fsharp/Notebooks.dib
#!import ../../lib/fsharp/Testing.dib
In [ ]:
#r @"../../../../../../../.nuget/packages/fsharp.control.asyncseq/3.2.1/lib/netstandard2.1/FSharp.Control.AsyncSeq.dll"
#r @"../../../../../../../.nuget/packages/system.reactive/6.0.1-preview.1/lib/net6.0/System.Reactive.dll"
#r @"../../../../../../../.nuget/packages/system.reactive.linq/6.0.1-preview.1/lib/netstandard2.0/System.Reactive.Linq.dll"
In [ ]:
#!import ../../lib/fsharp/Common.fs
#!import ../../lib/fsharp/Async.fs
In [ ]:
#if !INTERACTIVE
open Lib
#endif
In [ ]:
open Common

subscribeEvent¶

In [ ]:
let inline subscribeEvent (event: IEvent<'H, 'A>) map =
    let observable = System.Reactive.Linq.Observable.FromEventPattern<'H, 'A>(event.AddHandler, event.RemoveHandler)
    System.Reactive.Linq.Observable.Select (observable, fun event -> map event.EventArgs)
    |> FSharp.Control.AsyncSeq.ofObservableBuffered
In [ ]:
//// test

type TestEvent () as self =
    member val Calls = [] with get, set
    member val Event = Event<ErrorEventHandler, ErrorEventArgs> () with get

    member _.AddCall text =
        self.Calls <- self.Calls @ [ text ]

    member _.EventInterface =
        { new IEvent<ErrorEventHandler, ErrorEventArgs> with
            member _.AddHandler handler =
                self.AddCall "AddHandler"
                self.Event.Publish.AddHandler handler

            member _.RemoveHandler handler =
                self.AddCall "RemoveHandler"
                self.Event.Publish.RemoveHandler handler

            member _.Subscribe observer =
                self.AddCall "IObservable.Subscribe"
                let disposable = self.Event.Publish.Subscribe observer
                new_disposable (fun () ->
                    self.AddCall "IObservable.Dispose"
                    disposable.Dispose ()
                )
        }

    member _.Subscribe () =
        subscribeEvent
            self.EventInterface
            (fun args ->
                let result = args.GetException () |> SpiralSm.format_exception
                self.AddCall $"TestEvent.Subscribe({result})"
                result
            )

    member _.Iter subscription =
        subscription
        |> FSharp.Control.AsyncSeq.iteriAsync (fun i error -> async {
            self.AddCall $"TestEvent.Iter({i}: {error})"
        })

    member _.WaitCall text = async {
        while self.Calls |> List.last <> text do
            do! Async.SwitchToThreadPool ()
    }
In [ ]:
//// test

let testEvent = TestEvent ()

async {
    testEvent.AddCall "1"
    let! child = testEvent.Subscribe () |> testEvent.Iter |> Async.StartChild
    do! testEvent.WaitCall "AddHandler"
    testEvent.AddCall "2"
    do! child
    testEvent.AddCall "3"
}
|> Async.runWithTimeout 300
|> _assertEqual None

testEvent.Calls
|> Seq.toList
|> _assertEqual [ "1"; "AddHandler"; "2"; "RemoveHandler" ]
00:00:01 d #1 Async.runWithTimeoutAsync / timeout: 300
<null>

["1"; "AddHandler"; "2"; "RemoveHandler"]

In [ ]:
//// test

let testEvent = TestEvent ()

async {
    testEvent.AddCall "1"
    let! child = testEvent.Subscribe () |> testEvent.Iter |> Async.StartChild
    do! testEvent.WaitCall "AddHandler"
    testEvent.AddCall "2"
    use _ = testEvent.EventInterface.Subscribe (fun args ->
        testEvent.AddCall $"testEvent.EventInterface.Subscribe({args})"
    )
    testEvent.AddCall "3"
    do! child
    testEvent.AddCall "4"
}
|> Async.runWithTimeout 300
|> _assertEqual None

testEvent.Calls
|> _assertEqual [ "1"; "AddHandler"; "2"; "IObservable.Subscribe"; "3"; "RemoveHandler"; "IObservable.Dispose" ]
00:00:01 d #2 Async.runWithTimeoutAsync / timeout: 300
<null>

["1"; "AddHandler"; "2"; "IObservable.Subscribe"; "3"; "RemoveHandler"; "IObservable.Dispose"]

In [ ]:
//// test

let testEvent = TestEvent ()

async {
    testEvent.AddCall "1"
    let! child = testEvent.Subscribe () |> testEvent.Iter |> Async.StartChild
    do! testEvent.WaitCall "AddHandler"
    testEvent.AddCall "2"
    use _ = testEvent.EventInterface.Subscribe (fun args ->
        async {
            do! testEvent.WaitCall "TestEvent.Iter(0: System.Exception: error)"
            testEvent.AddCall $"testEvent.EventInterface.Subscribe({args.GetException () |> SpiralSm.format_exception})"
        }
        |> Async.RunSynchronously
    )
    testEvent.AddCall "3"
    testEvent.Event.Trigger (null, ErrorEventArgs (Exception "error"))
    testEvent.AddCall "4"
    do! child
    testEvent.AddCall "5"
}
|> Async.runWithTimeout 300
|> _assertEqual None

testEvent.Calls
|> _assertEqual [
    "1"
    "AddHandler"
    "2"
    "IObservable.Subscribe"
    "3"
    "TestEvent.Subscribe(System.Exception: error)"
    "TestEvent.Iter(0: System.Exception: error)"
    "testEvent.EventInterface.Subscribe(System.Exception: error)"
    "4"
    "RemoveHandler"
    "IObservable.Dispose"
]
00:00:01 d #3 Async.runWithTimeoutAsync / timeout: 300
<null>

["1"; "AddHandler"; "2"; "IObservable.Subscribe"; "3"; "TestEvent.Subscribe(System.Exception: error)";
 "TestEvent.Iter(0: System.Exception: error)"; "testEvent.EventInterface.Subscribe(System.Exception: error)"; "4";
 "RemoveHandler"; "IObservable.Dispose"]

subscribeToken¶

In [ ]:
let subscribeToken (token : System.Threading.CancellationToken) =
    let tcs = new System.Threading.Tasks.TaskCompletionSource ()
    System.Action tcs.SetResult |> token.Register |> ignore
    let start = System.DateTime.Now.Ticks
    FSharp.Control.AsyncSeq.unfoldAsync
        (fun () -> async {
            do! tcs.Task |> Async.AwaitTask
            return Some (System.DateTime.Now.Ticks - start, ())
        })
        ()
In [ ]:
//// test

let cts = new System.Threading.CancellationTokenSource ()

async {
    let! child =
        cts.Token
        |> subscribeToken
        |> FSharp.Control.AsyncSeq.tryFirst
        |> Async.StartChild

    do! Async.Sleep 100
    cts.Cancel ()
    return! child
}
|> Async.RunSynchronously
|> Option.get
|> fun x -> x > 900000
|> _assertEqual true
true