Guest User

Untitled

a guest
Oct 3rd, 2018
234
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.56 KB | None | 0 0
  1. open System
  2. open Akka.FSharp
  3. open Akka
  4.  
  5. type CRange = CRange of low: string * high: string
  6. type ErrorMessage = string
  7.  
  8. type CError =
  9. | LoginFailed of string // Optional reason
  10. | CTimeOut of CRange list // new range
  11. | CException of CRange list * exn
  12. | CMessage of CRange list * ErrorMessage
  13.  
  14.  
  15. type CTask =
  16. CTask of
  17. c: string *
  18. username: string *
  19. password: string *
  20. proxyAddress: string *
  21. cycle: DateTime *
  22. issuerRange: CRange list *
  23. filterTable: bool
  24.  
  25. type DoWork = Map<string, CTask -> Result<int * string, CError>>
  26.  
  27. type PartKeyCount = PartKeyCount of Key: string * Count: int
  28. type PartGroupCount = PartGroupCount of Group: string list * Count: int
  29.  
  30. let system = System.create "ASystem" <| Configuration.load ()
  31.  
  32. let scheduler (actors: Actor.IActorRef list) task (mailbox: Actor<Result<int, CError>>) =
  33. let newTask task range =
  34. let (CTask(c, username, password, proxy, cycle, _, filter)) = task
  35. CTask(c, username, password, proxy, cycle, range, filter)
  36. let rec loop (list: CRange list list) running = actor {
  37. let akkaName = mailbox.Self.Path.Name
  38. printfn "%s scheduler loop (Running: %d Todo:%d)" akkaName running list.Length
  39. let! m = mailbox.Receive ()
  40. let sender = mailbox.Sender ()
  41. printfn "%s received message %A from %A" akkaName m sender
  42. match m with
  43. | Ok _ ->
  44. printfn "finished one"
  45. match list with
  46. | [] ->
  47. if running = 1 then
  48. //Log.Information("....")
  49. Threading.Thread.Sleep 5000 // Wait for 5 seconds
  50. mailbox.Context.System.Terminate() |> ignore
  51. else
  52. return! loop [] (running - 1)
  53. | x :: xs ->
  54. printfn "Finished one. Todo %d, running %d - %A. New task %A to %A" xs.Length running sender x sender
  55. let t = newTask task x
  56. sender.Tell(t, mailbox.Self) // <!
  57. return! loop xs running
  58. | _ -> ()
  59. }
  60. let groups = [(CRange ("A","A"), 1); (CRange ("B","B"), 1); (CRange ("C","C"), 1);
  61. (CRange ("D","D"), 1); (CRange ("zzz","zzz"), 1)]
  62. let n = if actors.Length < groups.Length then actors.Length else groups.Length
  63. [0..n-1] |> List.iter(fun i ->
  64. let t = newTask task [fst groups.[i]]
  65. actors.[i].Tell(t, mailbox.Self))
  66. let todo = groups |> Seq.skip n |> Seq.toList |> List.map(fun (x, _) -> [x])
  67. Console.WriteLine("Groups {0}; Running {1}; Todo: {2}", groups.Length, n, todo.Length)
  68. loop todo n
  69.  
  70. let processor (mailbox: Actor<CTask>) =
  71. let rec loop () = actor {
  72. let! m = mailbox.Receive ()
  73. let sender = mailbox.Sender ()
  74. let akkaName = mailbox.Self.Path.Name
  75. printfn "* %s received message %A from %A" akkaName m sender
  76. sender <! (Ok DateTime.Now.Millisecond :> Result<int, CError>)
  77. printfn "* %s sent to %A." akkaName sender
  78. return! loop()
  79. }
  80. loop ()
  81.  
  82. let spawnSystems n =
  83. [1..n]
  84. |> List.map(fun i ->
  85. spawn system (sprintf "Processor%d" i) (processor)
  86. )
  87.  
  88. let startAkka task actorNumber =
  89. let actors = spawnSystems actorNumber
  90. Threading.Thread.Sleep 1000
  91. scheduler actors task
  92. |> spawn system "Scheduler"
  93. |> ignore
  94. system.WhenTerminated.Wait()
  95.  
  96. [<EntryPoint>]
  97. let main argv =
  98. let c = "priv"
  99. let cycle = new DateTime(2020, 1, 1)
  100. let username, password = "username", "password"
  101. let task = CTask(c, username, password, "", cycle, [], false)
  102. startAkka task 2
  103. 0
Add Comment
Please, Sign In to add comment