Guest User

Untitled

a guest
Jul 22nd, 2018
145
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.87 KB | None | 0 0
  1. %%
  2. %% Simple pool for gen_servers that only use :call
  3. %% @author RJ <rj@metabrew.com>
  4. %%
  5. -module(gen_server_call_pool).
  6.  
  7. -behaviour(gen_server).
  8. -include("irc.hrl").
  9.  
  10. %% --------------------------------------------------------------------
  11. %% External exports
  12. -export([start_link/3, stats/1, forcetimeout/1]).
  13.  
  14. %% gen_server callbacks
  15. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  16.  
  17. -define(ENABLE_DEBUG, false).
  18. -define(QWAIT_WARNING, 100). % in ms
  19. -define(QSIZE_WARNING, 25).
  20.  
  21. -record(state, {
  22. ready, % worker pids ready
  23. busy, % worker pids currently busy
  24. work, % FIFO: jobs to send to workers
  25. name, % gen_server name to register as
  26. m,f,a
  27. }).
  28.  
  29.  
  30. -record(job, {request, % opaque gen_server request tuple
  31. from, % opaque gen_server reply tuple
  32. dob % time job entered work queue
  33. }).
  34.  
  35. start_link(Name, {M,F,A}, Num) ->
  36. gen_server:start_link({local, Name}, ?MODULE, [Name,{M,F,A},Num], []).
  37.  
  38. stats(Name) ->
  39. gen_server:call(Name, {?MODULE,stats}).
  40.  
  41. forcetimeout(Name) -> gen_server:cast(Name, forcetimeout).
  42.  
  43. %% --------------------------------------------------------------------
  44.  
  45. init([Name, {M,F,A}, Num]) ->
  46. process_flag(trap_exit, true),
  47. % start all workers:
  48. ?INFO("Starting DB workers..",[]),
  49. Ready = lists:map(fun(_N) -> {ok, Pid} = erlang:apply(M,F,A), Pid end, lists:seq(1,Num)),
  50. ?INFO("Started ~B [~w] workers", [length(Ready),M]),
  51. {ok, #state{
  52. ready=Ready,
  53. busy=[],
  54. work=queue:new(),
  55. name=Name,
  56. m=M,f=F,a=A
  57. }}.
  58.  
  59.  
  60. handle_call({?MODULE,stats}, _From, State) ->
  61. case queue:out(State#state.work) of
  62. {empty, _} -> T = 0;
  63. {value, #job{dob=Dob}} -> T = timer:now_diff(erlang:now(), Dob)/1000
  64. end,
  65. S = [
  66. {num_busy, length(State#state.busy)},
  67. {num_ready, length(State#state.ready)},
  68. {jobs_queued, queue:len(State#state.work)},
  69. {current_wait, T}
  70. ],
  71. {reply, S, State};
  72.  
  73. % job arrives when worker available immediately
  74. handle_call(Request, From, State=#state{ready=Ready}) when Ready /= [] ->
  75. [Worker|NewReady] = Ready,
  76. F = fun() ->
  77. case ?ENABLE_DEBUG == true andalso get(dotimeout) == true of
  78. true ->
  79. Reply = gen_server:call(Worker, Request, 1);
  80. _ ->
  81. Reply = gen_server:call(Worker, Request)
  82. end,
  83. % this sends the reply to 'From' and puts worker back in ready list:
  84. gen_server:cast(State#state.name, {send_reply, Worker, From, Reply})
  85. end,
  86. spawn(F),
  87. NewState = State#state{ready=NewReady,busy=[Worker|State#state.busy]},
  88. {noreply, NewState};
  89.  
  90. % job arrives, no available workers, add to queue
  91. handle_call(Request, From, State=#state{ready=[]}) ->
  92. Job = #job{request=Request, from=From, dob=erlang:now()},
  93. NewWork = queue:in(Job, State#state.work),
  94. %?INFO("Job queue size: ~p, num workers: ~B", [queue:len(NewWork), length(State#state.busy)]),
  95. NewState = State#state{work = NewWork},
  96. {noreply, NewState}.
  97.  
  98. handle_cast(forcetimeout, State) ->
  99. put(dotimeout, true),
  100. {noreply, State};
  101.  
  102. % worker finished and sends response, dispatch next job, if any waiting
  103. handle_cast({send_reply, Worker, To, Reply}, State) ->
  104. gen_server:reply(To, Reply),
  105. NewReady = [Worker | State#state.ready],
  106. NewBusy = lists:delete(Worker, State#state.busy),
  107. NewState = State#state{ready=NewReady, busy=NewBusy},
  108. case queue:out(State#state.work) of
  109. {empty, _} ->
  110. {noreply, NewState};
  111. {{value, #job{request=JobRequest, from=JobFrom, dob=Dob}}, NewWork} ->
  112. T = timer:now_diff(erlang:now(), Dob)/1000,
  113. QSize = queue:len(State#state.work),
  114. case QSize > ?QSIZE_WARNING orelse T > ?QWAIT_WARNING of
  115. true ->
  116. ?INFO("Queued job started, wait time: ~p ms, Qsize: ~p", [T, QSize]);
  117. false ->
  118. nop
  119. end,
  120. NewState2 = NewState#state{work=NewWork},
  121. handle_call(JobRequest, JobFrom, NewState2)
  122. end.
  123.  
  124. handle_info({'EXIT', Pid, Reason}, State = #state{m=M,f=F,a=A}) ->
  125. ?WARN("WORKER CRASH: ~p ~p ready:~p busy:~p",[Pid, Reason,State#state.ready, State#state.busy]),
  126. R = lists:delete(Pid, State#state.ready),
  127. B = lists:delete(Pid, State#state.busy),
  128. % spawn a new worker to replace the crashed one
  129. {ok, W} = erlang:apply(M,F,A),
  130. ?INFO("Added new worker to pool to replace crashed: ~p", [W]),
  131. NewState = State#state{ready=[W|R],busy=B},
  132. %?WARN("New ready:~p busy:~p",[NewState#state.ready, NewState#state.busy]),
  133. {noreply, NewState}.
  134.  
  135. terminate(_Reason, _State) ->
  136. ok.
  137.  
  138. code_change(_OldVsn, State, _Extra) ->
  139. {ok, State}.
Add Comment
Please, Sign In to add comment