Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- member this.StartAsync(oneBasedCheckpoint) =
- task {
- currentCheckpoint <- oneBasedCheckpoint
- let mutable checkpoint =
- if currentCheckpoint = 0UL then
- FromStream.Start
- else
- let startingCheckpoint = currentCheckpoint - 1UL
- FromStream.After(startingCheckpoint)
- let rec subscribe() =
- task {
- try
- use subscription = client.SubscribeToStream(
- this.StreamName,
- checkpoint,
- resolveLinkTos = true)
- let asyncSubscription = AsyncSeq.ofAsyncEnum subscription.Messages
- logger.LogInformation(
- "Projection {Name} STARTED on stream {StreamName}.",
- this.Name, this.StreamName)
- do! asyncSubscription
- |> AsyncSeq.foldAsync (fun _ message ->
- async {
- match message with
- | :? StreamMessage.Event as evnt ->
- do! this.handleEvent<'TEvent> evnt.ResolvedEvent |> Async.AwaitTask
- checkpoint <- FromStream.After(evnt.ResolvedEvent.OriginalEventNumber)
- resubscriptionAttempt <- 0
- | _ -> ()
- return ()
- }) ()
- with
- | :? AggregateException as ex when (ex.InnerException :? OperationCanceledException) ->
- logger.LogInformation($"Subscription {this.Name} on stream {this.StreamName} was CANCELED.")
- | :? OperationCanceledException ->
- logger.LogInformation($"Subscription {this.Name} on stream {this.StreamName} was CANCELED.")
- | :? AggregateException as ex when (ex.InnerException :? ObjectDisposedException) ->
- logger.LogInformation($"Subscription {this.Name} on stream {this.StreamName} was CANCELED by the user.")
- | :? ObjectDisposedException ->
- logger.LogInformation($"Subscription {this.Name} on stream {this.StreamName} was CANCELED by the user.")
- | ex ->
- logger.LogError("Subscription DROPPED: {Exception}", ex)
- resubscriptionAttempt <- resubscriptionAttempt + 1
- if resubscriptionAttempt < MaxResubscriptionAttempts then
- return! subscribe()
- else
- hasFailed <- true
- error <- ex.Message
- logger.LogCritical(ex, "FAILED to RESUBSCRIBE Projection: {Name}-{StreamName}",
- this.Name, this.StreamName)
- raise ex
- }
- do! subscribe()
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement