Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- require Logger
- defmodule MinerJobWoker do
- def start_link(worker_count, subscribe_options) do
- # Start the producer
- {:ok, producer_pid} = GenStage.start_link(__MODULE__.JobProducer, :ok)
- # Override consumer subscribe options
- subscribe_options = Keyword.put(subscribe_options, :to, producer_pid)
- # Start the worker consumers
- Enum.each(1..worker_count, fn _ ->
- # Start consumer
- {:ok, consumer_pid} = GenStage.start_link(__MODULE__.Worker, :ok)
- # Subscribe to producer
- GenStage.sync_subscribe(consumer_pid, subscribe_options)
- end)
- {:ok, producer_pid}
- end
- defmodule JobProducer do
- use GenStage
- def init(:ok), do: {:producer, {nil, 0}}
- def handle_demand(_demand, state) do
- take_jobs(state)
- end
- def handle_info(:poll, state) do
- take_jobs(state)
- end
- @doc """
- If there is pending_demand, process the queue
- """
- defp take_jobs({ next_timer, n }) do
- if (next_timer != nil) do
- Process.cancel_timer(next_timer)
- end
- IO.inspect(["count", n])
- cond do
- n < 10 ->
- {:noreply, Enum.to_list(1..10), {nil, n + 1}}
- n >= 10 and n < 20 ->
- next_timer = Process.send_after(self(), :poll, 200)
- {:noreply, [], { next_timer, n + 1 }}
- n == 20 ->
- next_timer = Process.send_after(self(), :poll, 200)
- {:noreply, [], { next_timer, 0 }}
- end
- end
- end
- defmodule Worker do
- use GenStage
- def init(:ok), do: {:consumer, nil}
- def handle_events(jobs, _from, _next_timer) do
- Enum.each(jobs, fn (n) ->
- IO.inspect(n)
- end)
- {:noreply, [], nil}
- end
- end
- end
- {:ok, pid} = MinerJobWoker.start_link(2, max_demand: 1)
- Process.sleep(:infinity)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement