Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- 1. Yaml configuration section
- <?xml version="1.0" encoding="utf-8"?>
- <akka.persistence>
- <hocon>
- <![CDATA[
- akka {
- persistence{
- query.journal.sql {
- max-buffer-size = 10000
- }
- journal {
- plugin = "akka.persistence.journal.sql-server"
- sql-server {
- class = "Akka.Persistence.SqlServer.Journal.BatchingSqlServerJournal, Akka.Persistence.SqlServer"
- schema-name = dbo
- table-name = EventJournal
- auto-initialize = off
- event-adapters {
- json-adapter = "Nrk.Oddjob.Upload.PersistenceUtils+EventAdapter, Upload"
- }
- event-adapter-bindings {
- # to journal
- "System.Object, mscorlib" = json-adapter
- # from journal
- "Newtonsoft.Json.Linq.JObject, Newtonsoft.Json" = [json-adapter]
- }
- }
- }
- snapshot-store {
- plugin = "akka.persistence.snapshot-store.sql-server"
- sql-server {
- class = "Akka.Persistence.SqlServer.Snapshot.SqlServerSnapshotStore, Akka.Persistence.SqlServer"
- serializer = hyperion
- schema-name = dbo
- table-name = SnapshotStore
- auto-initialize = off
- }
- }
- }
- }
- ]]>
- </hocon>
- </akka.persistence>
- 2. Event adapter
- let private deserializeEvent (evt : obj) =
- let json = evt :?> Newtonsoft.Json.Linq.JObject
- let objectType = json.[eventType]
- let typ = if objectType = null then null else Type.GetType(objectType.ToString())
- match typ with
- | null -> box json
- | typ -> json.ToObject(typ)
- type EventAdapter(__ : Akka.Actor.ExtendedActorSystem) =
- interface Akka.Persistence.Journal.IEventAdapter with
- member __.Manifest(_ : obj) =
- let manifestType = typeof<Newtonsoft.Json.Linq.JObject>
- sprintf "%s,%s" manifestType.FullName <| manifestType.Assembly.GetName().Name
- member __.ToJournal(evt : obj) : obj =
- let jObject = Newtonsoft.Json.Linq.JObject.FromObject(evt)
- jObject.AddFirst(Newtonsoft.Json.Linq.JProperty(eventType, evt.GetType().FullName));
- Akka.Persistence.Journal.Tagged(box jObject, [| anyEventTag |]) :> obj
- member __.FromJournal(evt : obj, _ : string) : Akka.Persistence.Journal.IEventSequence =
- if evt :? Newtonsoft.Json.Linq.JObject then
- Akka.Persistence.Journal.EventSequence.Single(deserializeEvent evt)
- else
- Akka.Persistence.Journal.EventSequence.Empty
- 3. Persistent actors templates that we use for most of our actors
- [<AutoOpen>]
- module PersistenceActorTemplates =
- open Akka
- open Akka.Persistence
- open Akkling
- open Akkling.Persistence
- open Nrk.Oddjob.Core
- open Nrk.Oddjob.Core.ActorUtils
- type TakeSnapshotCommand() = class end
- type PersistentActorContinuation<'memEvent, 'memState> =
- | Store of 'memEvent
- | StoreAll of 'memEvent list
- | Loop
- /// Only used in originAssetAssignmentActor for reasons explained there. Please don't use it.
- | LoopWithExtendedState of 'memState
- let [<Literal>] SnapshotFrequency = 100
- (*
- This function is used to create implementations of persisten actors.
- 'dbState and 'dbEvent live in database. They should be treated with care since it is hard to change them.
- 'memState and 'memEvent live in memory of the actor, and can be changed easily.
- State of the actor (both in db and memory) is constructed from incoming events.
- Notable funcitons:
- - handle: receives current state and incoming event, handles event and returns information for the persistent layer how to proceed
- - updateState: used when actor is restored from db (after restart or after storing a new event). Takes state and event and produces new state.
- *)
- let persistentActorTemplate<'memState, 'dbState, 'memEvent, 'dbEvent, 'cmd>
- (handle : 'memState -> 'cmd -> PersistentActorContinuation<'memEvent, 'memState>)
- (updateState : 'memState -> 'memEvent -> 'memState)
- (initialState : 'memState)
- (memState2db : 'memState -> 'dbState)
- (dbState2mem : 'dbState -> 'memState)
- (memEvent2db : 'memEvent -> 'dbEvent)
- (dbEvent2mem : 'dbEvent -> 'memEvent)
- (onRecoveryCompleted : 'memState -> unit)
- (mailbox: Eventsourced<_>) =
- let rec loop (memState : 'memState) eventCountSinceLastSnapshot =
- actor {
- let! (message : obj) = mailbox.Receive ()
- return!
- try
- match message with
- | :? 'cmd as cmd ->
- match handle memState cmd with
- | Store event -> memEvent2db event |> box |> Persist :> Effect<_>
- | StoreAll events -> events |> List.map (memEvent2db >> box) |> List.toSeq |> PersistAll :> Effect<_>
- | Loop -> loop memState eventCountSinceLastSnapshot
- | LoopWithExtendedState state -> loop state eventCountSinceLastSnapshot
- | :? TakeSnapshotCommand ->
- let snapshotStore = typed mailbox.SnapshotStore
- snapshotStore <! SaveSnapshot(SnapshotMetadata(mailbox.Pid, mailbox.LastSequenceNr ()), memState2db memState)
- loop memState eventCountSinceLastSnapshot
- // This message appears 1) after data has been persisted or 2) when restoring state from event stream
- | :? 'dbEvent as dbEvent ->
- let updated = updateState memState (dbEvent2mem dbEvent)
- let eventCountSinceLastSnapshot = eventCountSinceLastSnapshot + 1
- if eventCountSinceLastSnapshot = SnapshotFrequency then mailbox.Self <! box (TakeSnapshotCommand())
- loop updated eventCountSinceLastSnapshot
- // Restoring state from snapshot
- | :? SnapshotOffer as o ->
- let dbState = o.Snapshot :?> 'dbState
- loop (dbState2mem dbState) 0
- | :? PersistentLifecycleEvent as e ->
- match e with
- | ReplaySucceed ->
- logDebugf mailbox "Replaying persistent events succeeded. Current state [%A]" memState
- loop memState eventCountSinceLastSnapshot
- | ReplayFailed (exn, message) ->
- let text = sprintf "ReplayFailed when replaying persistent events. Current state [%A]" memState
- logErrorWithExnf mailbox exn "ReplayFailed due to error when processing message %O [%A]" (message.GetType()) message
- failwith text
- | :? LifecycleEvent as e ->
- match e with
- | PreRestart (exn, message) ->
- match message with
- | null -> logErrorWithExnf mailbox exn "PreRestart due to error when processing <null> message"
- | _ -> logErrorWithExnf mailbox exn "PreRestart due to error when processing message %O [%A]" (message.GetType()) message
- loop memState eventCountSinceLastSnapshot
- | _ -> IgnoredMessage
- | :? Akka.Persistence.RecoveryCompleted ->
- logDebugf mailbox "Replaying persistent events succeeded. Current state [%A]" memState
- onRecoveryCompleted memState
- loop memState eventCountSinceLastSnapshot
- | :? SaveSnapshotSuccess -> loop memState (eventCountSinceLastSnapshot - SnapshotFrequency)
- | :? SaveSnapshotFailure as e ->
- let text = sprintf "Error when saving snapshot. Metadata [%A]. Current state [%A]" e.Metadata memState
- logErrorWithExn mailbox e.Cause text
- loop memState eventCountSinceLastSnapshot
- | _ -> UnhandledMessage
- with exn ->
- logErrorWithExnf mailbox exn "Error processing persistent actor message %A" message
- reraise ()
- }
- loop initialState 0
- (*
- Simpler version of persistentActorTemplate. In this actor, there is no difference between state and event.
- State is the event and vice versa, they are of the same type.
- This means that each new event replaces the actor state completely.
- *)
- let basicPersistentActorTemplate<'memState, 'dbState, 'cmd>
- (handler : 'memState -> 'cmd -> PersistentActorContinuation<'memState, 'memState>)
- (initialState : 'memState)
- (memState2db : 'memState -> 'dbState)
- (dbState2mem : 'dbState -> 'memState)
- (onRecoveryCompleted : 'memState -> unit)
- (mailbox: Eventsourced<_>) =
- persistentActorTemplate handler (fun _ newState -> newState) initialState memState2db dbState2mem memState2db dbState2mem onRecoveryCompleted mailbox
- 4. Example of a persistent actor
- type StorageAssignment =
- { StorageId : string
- EdgeChar : string
- Timestamp : DateTimeOffset }
- static member FromPersistentType (db : PersistentTypes.AkamaiStorageAssignment) = { StorageAssignment.StorageId = db.StorageId; EdgeChar = db.EdgeChar; Timestamp = db.Timestamp }
- static member ToPersistentType (mem : StorageAssignment) = PersistentTypes.AkamaiStorageAssignment.Create(mem.StorageId, mem.EdgeChar, mem.Timestamp)
- static member Zero = StorageAssignment.FromPersistentType (PersistentTypes.AkamaiStorageAssignment.Zero())
- type StorageCommand =
- /// Assigns (overwriting previous assignment) storage details to a persistent actor
- | AssignStorage of StorageAssignment
- /// Checks current storage assignment and returns it to the sender if found, otherwise returns StorageAssignment.Zero
- | QueryStorage
- let akamaiStorageAssignmentActor (storagePicker:IActorRef<StoragePickerCommand>) (mailbox:Eventsourced<_>) =
- let random = Random()
- let makeEdgeChar () =
- match random.Next(3) with | 0 -> "a" | 1 -> "b" | _ -> "c"
- let handler state cmd =
- match cmd with
- | AssignStorage storage -> Store storage
- | QueryStorage ->
- match state with
- | storage when String.IsNullOrEmpty(storage.StorageId) ->
- let (storageId : string, _ : IActorRef<Message<SftpCommand>>) = storagePicker <? SelectStorage |> Async.RunSynchronously
- let edgeChar = makeEdgeChar ()
- let storage = { StorageAssignment.StorageId = storageId; EdgeChar = edgeChar; Timestamp = DateTimeOffset.Now }
- mailbox.Sender() <! storage
- Store storage
- | storage ->
- mailbox.Sender() <! storage
- Loop
- basicPersistentActorTemplate handler StorageAssignment.Zero StorageAssignment.ToPersistentType StorageAssignment.FromPersistentType (fun _ -> ()) mailbox
Add Comment
Please, Sign In to add comment