Guest User

Untitled

a guest
May 24th, 2018
99
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 11.81 KB | None | 0 0
  1. 1. Yaml configuration section
  2.  
  3. <?xml version="1.0" encoding="utf-8"?>
  4. <akka.persistence>
  5. <hocon>
  6. <![CDATA[
  7. akka {
  8. persistence{
  9. query.journal.sql {
  10. max-buffer-size = 10000
  11. }
  12. journal {
  13. plugin = "akka.persistence.journal.sql-server"
  14. sql-server {
  15. class = "Akka.Persistence.SqlServer.Journal.BatchingSqlServerJournal, Akka.Persistence.SqlServer"
  16. schema-name = dbo
  17. table-name = EventJournal
  18. auto-initialize = off
  19. event-adapters {
  20. json-adapter = "Nrk.Oddjob.Upload.PersistenceUtils+EventAdapter, Upload"
  21. }
  22. event-adapter-bindings {
  23. # to journal
  24. "System.Object, mscorlib" = json-adapter
  25. # from journal
  26. "Newtonsoft.Json.Linq.JObject, Newtonsoft.Json" = [json-adapter]
  27. }
  28. }
  29. }
  30. snapshot-store {
  31. plugin = "akka.persistence.snapshot-store.sql-server"
  32. sql-server {
  33. class = "Akka.Persistence.SqlServer.Snapshot.SqlServerSnapshotStore, Akka.Persistence.SqlServer"
  34. serializer = hyperion
  35. schema-name = dbo
  36. table-name = SnapshotStore
  37. auto-initialize = off
  38. }
  39. }
  40. }
  41. }
  42. ]]>
  43. </hocon>
  44. </akka.persistence>
  45.  
  46. 2. Event adapter
  47.  
  48. let private deserializeEvent (evt : obj) =
  49. let json = evt :?> Newtonsoft.Json.Linq.JObject
  50. let objectType = json.[eventType]
  51. let typ = if objectType = null then null else Type.GetType(objectType.ToString())
  52. match typ with
  53. | null -> box json
  54. | typ -> json.ToObject(typ)
  55.  
  56. type EventAdapter(__ : Akka.Actor.ExtendedActorSystem) =
  57.  
  58. interface Akka.Persistence.Journal.IEventAdapter with
  59.  
  60. member __.Manifest(_ : obj) =
  61. let manifestType = typeof<Newtonsoft.Json.Linq.JObject>
  62. sprintf "%s,%s" manifestType.FullName <| manifestType.Assembly.GetName().Name
  63.  
  64. member __.ToJournal(evt : obj) : obj =
  65. let jObject = Newtonsoft.Json.Linq.JObject.FromObject(evt)
  66. jObject.AddFirst(Newtonsoft.Json.Linq.JProperty(eventType, evt.GetType().FullName));
  67. Akka.Persistence.Journal.Tagged(box jObject, [| anyEventTag |]) :> obj
  68.  
  69. member __.FromJournal(evt : obj, _ : string) : Akka.Persistence.Journal.IEventSequence =
  70. if evt :? Newtonsoft.Json.Linq.JObject then
  71. Akka.Persistence.Journal.EventSequence.Single(deserializeEvent evt)
  72. else
  73. Akka.Persistence.Journal.EventSequence.Empty
  74.  
  75. 3. Persistent actors templates that we use for most of our actors
  76.  
  77.  
  78. [<AutoOpen>]
  79. module PersistenceActorTemplates =
  80. open Akka
  81. open Akka.Persistence
  82. open Akkling
  83. open Akkling.Persistence
  84.  
  85. open Nrk.Oddjob.Core
  86. open Nrk.Oddjob.Core.ActorUtils
  87.  
  88. type TakeSnapshotCommand() = class end
  89.  
  90. type PersistentActorContinuation<'memEvent, 'memState> =
  91. | Store of 'memEvent
  92. | StoreAll of 'memEvent list
  93. | Loop
  94. /// Only used in originAssetAssignmentActor for reasons explained there. Please don't use it.
  95. | LoopWithExtendedState of 'memState
  96.  
  97. let [<Literal>] SnapshotFrequency = 100
  98.  
  99. (*
  100. This function is used to create implementations of persisten actors.
  101.  
  102. 'dbState and 'dbEvent live in database. They should be treated with care since it is hard to change them.
  103. 'memState and 'memEvent live in memory of the actor, and can be changed easily.
  104.  
  105. State of the actor (both in db and memory) is constructed from incoming events.
  106.  
  107. Notable funcitons:
  108. - handle: receives current state and incoming event, handles event and returns information for the persistent layer how to proceed
  109. - updateState: used when actor is restored from db (after restart or after storing a new event). Takes state and event and produces new state.
  110. *)
  111. let persistentActorTemplate<'memState, 'dbState, 'memEvent, 'dbEvent, 'cmd>
  112. (handle : 'memState -> 'cmd -> PersistentActorContinuation<'memEvent, 'memState>)
  113. (updateState : 'memState -> 'memEvent -> 'memState)
  114. (initialState : 'memState)
  115. (memState2db : 'memState -> 'dbState)
  116. (dbState2mem : 'dbState -> 'memState)
  117. (memEvent2db : 'memEvent -> 'dbEvent)
  118. (dbEvent2mem : 'dbEvent -> 'memEvent)
  119. (onRecoveryCompleted : 'memState -> unit)
  120. (mailbox: Eventsourced<_>) =
  121.  
  122. let rec loop (memState : 'memState) eventCountSinceLastSnapshot =
  123. actor {
  124. let! (message : obj) = mailbox.Receive ()
  125. return!
  126. try
  127. match message with
  128. | :? 'cmd as cmd ->
  129. match handle memState cmd with
  130. | Store event -> memEvent2db event |> box |> Persist :> Effect<_>
  131. | StoreAll events -> events |> List.map (memEvent2db >> box) |> List.toSeq |> PersistAll :> Effect<_>
  132. | Loop -> loop memState eventCountSinceLastSnapshot
  133. | LoopWithExtendedState state -> loop state eventCountSinceLastSnapshot
  134.  
  135. | :? TakeSnapshotCommand ->
  136. let snapshotStore = typed mailbox.SnapshotStore
  137. snapshotStore <! SaveSnapshot(SnapshotMetadata(mailbox.Pid, mailbox.LastSequenceNr ()), memState2db memState)
  138. loop memState eventCountSinceLastSnapshot
  139.  
  140. // This message appears 1) after data has been persisted or 2) when restoring state from event stream
  141. | :? 'dbEvent as dbEvent ->
  142. let updated = updateState memState (dbEvent2mem dbEvent)
  143. let eventCountSinceLastSnapshot = eventCountSinceLastSnapshot + 1
  144. if eventCountSinceLastSnapshot = SnapshotFrequency then mailbox.Self <! box (TakeSnapshotCommand())
  145. loop updated eventCountSinceLastSnapshot
  146.  
  147. // Restoring state from snapshot
  148. | :? SnapshotOffer as o ->
  149. let dbState = o.Snapshot :?> 'dbState
  150. loop (dbState2mem dbState) 0
  151.  
  152. | :? PersistentLifecycleEvent as e ->
  153. match e with
  154. | ReplaySucceed ->
  155. logDebugf mailbox "Replaying persistent events succeeded. Current state [%A]" memState
  156. loop memState eventCountSinceLastSnapshot
  157. | ReplayFailed (exn, message) ->
  158. let text = sprintf "ReplayFailed when replaying persistent events. Current state [%A]" memState
  159. logErrorWithExnf mailbox exn "ReplayFailed due to error when processing message %O [%A]" (message.GetType()) message
  160. failwith text
  161.  
  162. | :? LifecycleEvent as e ->
  163. match e with
  164. | PreRestart (exn, message) ->
  165. match message with
  166. | null -> logErrorWithExnf mailbox exn "PreRestart due to error when processing <null> message"
  167. | _ -> logErrorWithExnf mailbox exn "PreRestart due to error when processing message %O [%A]" (message.GetType()) message
  168. loop memState eventCountSinceLastSnapshot
  169. | _ -> IgnoredMessage
  170.  
  171. | :? Akka.Persistence.RecoveryCompleted ->
  172. logDebugf mailbox "Replaying persistent events succeeded. Current state [%A]" memState
  173. onRecoveryCompleted memState
  174. loop memState eventCountSinceLastSnapshot
  175.  
  176. | :? SaveSnapshotSuccess -> loop memState (eventCountSinceLastSnapshot - SnapshotFrequency)
  177.  
  178. | :? SaveSnapshotFailure as e ->
  179. let text = sprintf "Error when saving snapshot. Metadata [%A]. Current state [%A]" e.Metadata memState
  180. logErrorWithExn mailbox e.Cause text
  181. loop memState eventCountSinceLastSnapshot
  182.  
  183. | _ -> UnhandledMessage
  184. with exn ->
  185. logErrorWithExnf mailbox exn "Error processing persistent actor message %A" message
  186. reraise ()
  187. }
  188.  
  189. loop initialState 0
  190.  
  191. (*
  192. Simpler version of persistentActorTemplate. In this actor, there is no difference between state and event.
  193. State is the event and vice versa, they are of the same type.
  194. This means that each new event replaces the actor state completely.
  195. *)
  196. let basicPersistentActorTemplate<'memState, 'dbState, 'cmd>
  197. (handler : 'memState -> 'cmd -> PersistentActorContinuation<'memState, 'memState>)
  198. (initialState : 'memState)
  199. (memState2db : 'memState -> 'dbState)
  200. (dbState2mem : 'dbState -> 'memState)
  201. (onRecoveryCompleted : 'memState -> unit)
  202. (mailbox: Eventsourced<_>) =
  203.  
  204. persistentActorTemplate handler (fun _ newState -> newState) initialState memState2db dbState2mem memState2db dbState2mem onRecoveryCompleted mailbox
  205.  
  206. 4. Example of a persistent actor
  207.  
  208.  
  209. type StorageAssignment =
  210. { StorageId : string
  211. EdgeChar : string
  212. Timestamp : DateTimeOffset }
  213. static member FromPersistentType (db : PersistentTypes.AkamaiStorageAssignment) = { StorageAssignment.StorageId = db.StorageId; EdgeChar = db.EdgeChar; Timestamp = db.Timestamp }
  214. static member ToPersistentType (mem : StorageAssignment) = PersistentTypes.AkamaiStorageAssignment.Create(mem.StorageId, mem.EdgeChar, mem.Timestamp)
  215. static member Zero = StorageAssignment.FromPersistentType (PersistentTypes.AkamaiStorageAssignment.Zero())
  216.  
  217. type StorageCommand =
  218. /// Assigns (overwriting previous assignment) storage details to a persistent actor
  219. | AssignStorage of StorageAssignment
  220. /// Checks current storage assignment and returns it to the sender if found, otherwise returns StorageAssignment.Zero
  221. | QueryStorage
  222.  
  223.  
  224. let akamaiStorageAssignmentActor (storagePicker:IActorRef<StoragePickerCommand>) (mailbox:Eventsourced<_>) =
  225.  
  226. let random = Random()
  227.  
  228. let makeEdgeChar () =
  229. match random.Next(3) with | 0 -> "a" | 1 -> "b" | _ -> "c"
  230.  
  231. let handler state cmd =
  232. match cmd with
  233. | AssignStorage storage -> Store storage
  234. | QueryStorage ->
  235. match state with
  236. | storage when String.IsNullOrEmpty(storage.StorageId) ->
  237. let (storageId : string, _ : IActorRef<Message<SftpCommand>>) = storagePicker <? SelectStorage |> Async.RunSynchronously
  238. let edgeChar = makeEdgeChar ()
  239. let storage = { StorageAssignment.StorageId = storageId; EdgeChar = edgeChar; Timestamp = DateTimeOffset.Now }
  240. mailbox.Sender() <! storage
  241. Store storage
  242. | storage ->
  243. mailbox.Sender() <! storage
  244. Loop
  245.  
  246. basicPersistentActorTemplate handler StorageAssignment.Zero StorageAssignment.ToPersistentType StorageAssignment.FromPersistentType (fun _ -> ()) mailbox
Add Comment
Please, Sign In to add comment