Advertisement
Guest User

Untitled

a guest
Mar 28th, 2017
71
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.74 KB | None | 0 0
  1. require Logger
  2.  
  3. defmodule MinerJobWoker do
  4.  
  5. def start_link(worker_count, subscribe_options) do
  6.  
  7. # Start the producer
  8. {:ok, producer_pid} = GenStage.start_link(__MODULE__.JobProducer, :ok)
  9.  
  10. # Override consumer subscribe options
  11. subscribe_options = Keyword.put(subscribe_options, :to, producer_pid)
  12.  
  13. # Start the worker consumers
  14. Enum.each(1..worker_count, fn _ ->
  15.  
  16. # Start consumer
  17. {:ok, consumer_pid} = GenStage.start_link(__MODULE__.Worker, :ok)
  18.  
  19. # Subscribe to producer
  20. GenStage.sync_subscribe(consumer_pid, subscribe_options)
  21. end)
  22.  
  23. {:ok, producer_pid}
  24. end
  25.  
  26. defmodule JobProducer do
  27. use GenStage
  28.  
  29. def init(:ok), do: {:producer, {nil, 0}}
  30.  
  31. def handle_demand(_demand, state) do
  32. take_jobs(state)
  33. end
  34.  
  35. def handle_info(:poll, state) do
  36. take_jobs(state)
  37. end
  38.  
  39. @doc """
  40. If there is pending_demand, process the queue
  41. """
  42. defp take_jobs({ next_timer, n }) do
  43.  
  44. if (next_timer != nil) do
  45. Process.cancel_timer(next_timer)
  46. end
  47.  
  48. IO.inspect(["count", n])
  49. cond do
  50. n < 10 ->
  51. {:noreply, Enum.to_list(1..10), {nil, n + 1}}
  52. n >= 10 and n < 20 ->
  53. next_timer = Process.send_after(self(), :poll, 200)
  54. {:noreply, [], { next_timer, n + 1 }}
  55. n == 20 ->
  56. next_timer = Process.send_after(self(), :poll, 200)
  57. {:noreply, [], { next_timer, 0 }}
  58. end
  59. end
  60. end
  61.  
  62. defmodule Worker do
  63. use GenStage
  64.  
  65. def init(:ok), do: {:consumer, nil}
  66.  
  67. def handle_events(jobs, _from, _next_timer) do
  68. Enum.each(jobs, fn (n) ->
  69. IO.inspect(n)
  70. end)
  71. {:noreply, [], nil}
  72. end
  73. end
  74. end
  75.  
  76. {:ok, pid} = MinerJobWoker.start_link(2, max_demand: 1)
  77.  
  78. Process.sleep(:infinity)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement