Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- open System
- open Akka.FSharp
- open Akka
- type CRange = CRange of low: string * high: string
- type ErrorMessage = string
- type CError =
- | LoginFailed of string // Optional reason
- | CTimeOut of CRange list // new range
- | CException of CRange list * exn
- | CMessage of CRange list * ErrorMessage
- type CTask =
- CTask of
- c: string *
- username: string *
- password: string *
- proxyAddress: string *
- cycle: DateTime *
- issuerRange: CRange list *
- filterTable: bool
- type DoWork = Map<string, CTask -> Result<int * string, CError>>
- type PartKeyCount = PartKeyCount of Key: string * Count: int
- type PartGroupCount = PartGroupCount of Group: string list * Count: int
- let system = System.create "ASystem" <| Configuration.load ()
- let scheduler (actors: Actor.IActorRef list) task (mailbox: Actor<Result<int, CError>>) =
- let newTask task range =
- let (CTask(c, username, password, proxy, cycle, _, filter)) = task
- CTask(c, username, password, proxy, cycle, range, filter)
- let rec loop (list: CRange list list) running = actor {
- let akkaName = mailbox.Self.Path.Name
- printfn "%s scheduler loop (Running: %d Todo:%d)" akkaName running list.Length
- let! m = mailbox.Receive ()
- let sender = mailbox.Sender ()
- printfn "%s received message %A from %A" akkaName m sender
- match m with
- | Ok _ ->
- printfn "finished one"
- match list with
- | [] ->
- if running = 1 then
- //Log.Information("....")
- Threading.Thread.Sleep 5000 // Wait for 5 seconds
- mailbox.Context.System.Terminate() |> ignore
- else
- return! loop [] (running - 1)
- | x :: xs ->
- printfn "Finished one. Todo %d, running %d - %A. New task %A to %A" xs.Length running sender x sender
- let t = newTask task x
- sender.Tell(t, mailbox.Self) // <!
- return! loop xs running
- | _ -> ()
- }
- let groups = [(CRange ("A","A"), 1); (CRange ("B","B"), 1); (CRange ("C","C"), 1);
- (CRange ("D","D"), 1); (CRange ("zzz","zzz"), 1)]
- let n = if actors.Length < groups.Length then actors.Length else groups.Length
- [0..n-1] |> List.iter(fun i ->
- let t = newTask task [fst groups.[i]]
- actors.[i].Tell(t, mailbox.Self))
- let todo = groups |> Seq.skip n |> Seq.toList |> List.map(fun (x, _) -> [x])
- Console.WriteLine("Groups {0}; Running {1}; Todo: {2}", groups.Length, n, todo.Length)
- loop todo n
- let processor (mailbox: Actor<CTask>) =
- let rec loop () = actor {
- let! m = mailbox.Receive ()
- let sender = mailbox.Sender ()
- let akkaName = mailbox.Self.Path.Name
- printfn "* %s received message %A from %A" akkaName m sender
- sender <! (Ok DateTime.Now.Millisecond :> Result<int, CError>)
- printfn "* %s sent to %A." akkaName sender
- return! loop()
- }
- loop ()
- let spawnSystems n =
- [1..n]
- |> List.map(fun i ->
- spawn system (sprintf "Processor%d" i) (processor)
- )
- let startAkka task actorNumber =
- let actors = spawnSystems actorNumber
- Threading.Thread.Sleep 1000
- scheduler actors task
- |> spawn system "Scheduler"
- |> ignore
- system.WhenTerminated.Wait()
- [<EntryPoint>]
- let main argv =
- let c = "priv"
- let cycle = new DateTime(2020, 1, 1)
- let username, password = "username", "password"
- let task = CTask(c, username, password, "", cycle, [], false)
- startAkka task 2
- 0
Add Comment
Please, Sign In to add comment