Advertisement
Guest User

Untitled

a guest
May 25th, 2025
19
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.39 KB | None | 0 0
  1. member this.StartAsync(oneBasedCheckpoint) =
  2. task {
  3. currentCheckpoint <- oneBasedCheckpoint
  4. let mutable checkpoint =
  5. if currentCheckpoint = 0UL then
  6. FromStream.Start
  7. else
  8. let startingCheckpoint = currentCheckpoint - 1UL
  9. FromStream.After(startingCheckpoint)
  10.  
  11. let rec subscribe() =
  12. task {
  13. try
  14. use subscription = client.SubscribeToStream(
  15. this.StreamName,
  16. checkpoint,
  17. resolveLinkTos = true)
  18.  
  19. let asyncSubscription = AsyncSeq.ofAsyncEnum subscription.Messages
  20.  
  21. logger.LogInformation(
  22. "Projection {Name} STARTED on stream {StreamName}.",
  23. this.Name, this.StreamName)
  24.  
  25. do! asyncSubscription
  26. |> AsyncSeq.foldAsync (fun _ message ->
  27. async {
  28. match message with
  29. | :? StreamMessage.Event as evnt ->
  30. do! this.handleEvent<'TEvent> evnt.ResolvedEvent |> Async.AwaitTask
  31. checkpoint <- FromStream.After(evnt.ResolvedEvent.OriginalEventNumber)
  32. resubscriptionAttempt <- 0
  33. | _ -> ()
  34. return ()
  35. }) ()
  36.  
  37. with
  38. | :? AggregateException as ex when (ex.InnerException :? OperationCanceledException) ->
  39. logger.LogInformation($"Subscription {this.Name} on stream {this.StreamName} was CANCELED.")
  40. | :? OperationCanceledException ->
  41. logger.LogInformation($"Subscription {this.Name} on stream {this.StreamName} was CANCELED.")
  42.  
  43. | :? AggregateException as ex when (ex.InnerException :? ObjectDisposedException) ->
  44. logger.LogInformation($"Subscription {this.Name} on stream {this.StreamName} was CANCELED by the user.")
  45. | :? ObjectDisposedException ->
  46. logger.LogInformation($"Subscription {this.Name} on stream {this.StreamName} was CANCELED by the user.")
  47.  
  48. | ex ->
  49. logger.LogError("Subscription DROPPED: {Exception}", ex)
  50. resubscriptionAttempt <- resubscriptionAttempt + 1
  51. if resubscriptionAttempt < MaxResubscriptionAttempts then
  52. return! subscribe()
  53. else
  54. hasFailed <- true
  55. error <- ex.Message
  56. logger.LogCritical(ex, "FAILED to RESUBSCRIBE Projection: {Name}-{StreamName}",
  57. this.Name, this.StreamName)
  58. raise ex
  59. }
  60. do! subscribe()
  61. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement