Advertisement
Guest User

Untitled

a guest
Apr 7th, 2018
565
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.46 KB | None | 0 0
  1. -module(twitter.streaming).
  2. -author("KONDO Takahiro <heartery@gmail.com>").
  3.  
  4. -export([behaviour_info/1]).
  5. -export([start_link/4, start_link/5, start/4, start/5]).
  6. -export([call/2, call/3, cast/2, reply/2]).
  7.  
  8. -behaviour(gen_server).
  9. -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
  10. terminate/2, code_change/3]).
  11.  
  12. -include("object.hrl").
  13.  
  14. -record(args, {user, password, method, params = [], format = record}).
  15. -record(state, {module, id, format, env}).
  16.  
  17. behaviour_info(callbacks) ->
  18. [{init, 1},
  19. {handle_start, 2},
  20. {handle_info, 2},
  21. {terminate, 2}].
  22.  
  23. start_link(Module, InitArgs, StreamingArgs, Options) ->
  24. boot(fun(Args) ->
  25. .gen_server:start_link(?MODULE, Args, Options)
  26. end, Module, InitArgs, StreamingArgs).
  27.  
  28. start_link(ServerName, Module, InitArgs, StreamingArgs, Options) ->
  29. boot(fun(Args) ->
  30. .gen_server:start_link(ServerName, ?MODULE, Args, Options)
  31. end, Module, InitArgs, StreamingArgs).
  32.  
  33. start(Module, InitArgs, StreamingArgs, Options) ->
  34. boot(fun(Args) ->
  35. .gen_server:start(?MODULE, Args, Options)
  36. end, Module, InitArgs, StreamingArgs).
  37.  
  38. start(ServerName, Module, InitArgs, StreamingArgs, Options) ->
  39. boot(fun(Args) ->
  40. .gen_server:start(ServerName, ?MODULE, Args, Options)
  41. end, Module, InitArgs, StreamingArgs).
  42.  
  43. call(ServerRef, Request) ->
  44. call(ServerRef, Request, infinity).
  45.  
  46. call(ServerRef, Request, Timeout) ->
  47. try .gen_server:call(ServerRef, Request, Timeout) of
  48. Reply -> Reply
  49. catch
  50. exit:{noproc, _} -> {error, closed};
  51. exit:{timeout, _} -> {error, timeout}
  52. end.
  53.  
  54. cast(ServerRef, Request) ->
  55. .gen_server:cast(ServerRef, Request).
  56.  
  57. reply(Client, Reply) ->
  58. .gen_server:reply(Client, Reply).
  59.  
  60. %%% callback functions
  61.  
  62. init([Module, InitArgs, #args{user = User,
  63. password = Password,
  64. method = Method,
  65. params = Params,
  66. format = Format}]) ->
  67. RequestMethod = if Method == filter -> post; true -> get end,
  68. F = if Format == record -> json; true -> Format end,
  69. Url = api:stream_url("/statuses/" ++ atom_to_list(Method), F),
  70. case api:stream(RequestMethod, Url, Params, {basic, User, Password}) of
  71. {ok, Id} ->
  72. State = #state{module = Module, id = Id, format = Format},
  73. try Module:init(InitArgs) of
  74. {ok, Env} -> {ok, State#state{env = Env}};
  75. {ok, Env, Timeout} -> {ok, State#state{env = Env}, Timeout};
  76. Result -> Result
  77. catch
  78. _:Reason -> {stop, Reason}
  79. end;
  80. {error, Reason} ->
  81. {stop, Reason}
  82. end.
  83.  
  84. handle_call(Request, From, State) ->
  85. sync_call(handle_call, [Request, From], State).
  86.  
  87. handle_cast(Request, State) ->
  88. sync_call(handle_call, [Request], State).
  89.  
  90. handle_info({http, {Id, stream_start, Headers}}, #state{id = Id} = State) ->
  91. async_call(handle_start, [Headers], State),
  92. {noreply, State};
  93.  
  94. handle_info({http, {Id, stream_end, _}}, #state{id = Id} = State) ->
  95. {stop, disconnect, State};
  96.  
  97. handle_info({http, {Id, stream, <<"\r\n">>}}, #state{id = Id} = State) ->
  98. {noreply, State};
  99.  
  100. handle_info({http, {Id, stream, Part}}, #state{id = Id, format = record} = State) ->
  101. parse_and_call(Part, State),
  102. {noreply, State};
  103.  
  104. handle_info({http, {Id, stream, Part}}, #state{id = Id} = State) ->
  105. async_call(handle_stream, [Part], State),
  106. {noreply, State};
  107.  
  108. handle_info({http, {Id, {error, Reason}}}, #state{id = Id} = State) ->
  109. {stop, {http_error, Reason}, State};
  110.  
  111. handle_info(Info, State) ->
  112. async_call(handle_info, [Info], State),
  113. {noreply, State}.
  114.  
  115. terminate(Reason, #state{module = Module, id = Id, env = Env}) ->
  116. Module:terminate(Reason, Env),
  117. .http:cancel_request(Id).
  118.  
  119. code_change(_, State, _) ->
  120. {ok, State}.
  121.  
  122. %%% private functions
  123.  
  124. boot(Fun, Module, InitArgs, StreamingArgs) ->
  125. Args = case .lists:foldl(fun args/2, #args{}, StreamingArgs) of
  126. #args{method = undefined, params = Params} = As ->
  127. L = length(.lists:filter(fun({Key, _}) ->
  128. Key == follow orelse
  129. Key == track
  130. end, Params)),
  131. As#args{method = if L > 0 -> filter; true -> sample end};
  132. As ->
  133. As
  134. end,
  135. case exported(Module, Args#args.format) of
  136. true -> Fun([Module, InitArgs, Args]);
  137. false -> {error, bad_callback}
  138. end.
  139.  
  140. exported(Module, record) ->
  141. .erlang:function_exported(Module, handle_status, 2) andalso
  142. .erlang:function_exported(Module, handle_delete, 2) andalso
  143. .erlang:function_exported(Module, handle_limit, 2);
  144.  
  145. exported(Module, _) ->
  146. .erlang:function_exported(Module, handle_stream, 2).
  147.  
  148. args({user, User}, Args) -> Args#args{user = User};
  149. args({password, Password}, Args) -> Args#args{password = Password};
  150.  
  151. args({method, Method}, Args) -> Args#args{method = Method};
  152. args(filter, Args) -> Args#args{method = filter};
  153. args(firehose, Args) -> Args#args{method = firehose};
  154. args(sample, Args) -> Args#args{method = sample};
  155.  
  156. args({follow, UserIds}, #args{params = Params} = Args) ->
  157. Value = .string:join(.lists:map(fun integer_to_list/1, UserIds), ","),
  158. Args#args{params = [{follow, Value}|Params]};
  159.  
  160. args({track, Keywords}, #args{params = Params} = Args) ->
  161. Args#args{params = [{track, .string:join(Keywords, ",")}|Params]};
  162.  
  163. args({count, Count}, #args{params = Params} = Args) ->
  164. Args#args{params = [{count, Count}|Params]};
  165.  
  166. args({format, Format}, Args) -> Args#args{format = Format};
  167. args(json, Args) -> Args#args{format = json};
  168. args(xml, Args) -> Args#args{format = xml}.
  169.  
  170. sync_call(Callback, Args, #state{module = Module, env = Env} = State) ->
  171. try apply(Module, Callback, Args ++ [Env]) of
  172. {reply, Reply, NewEnv} ->
  173. {reply, Reply, State#state{env = NewEnv}};
  174. {reply, Reply, NewEnv, Timeout} ->
  175. {reply, Reply, State#state{env = NewEnv}, Timeout};
  176. {noreply, NewEnv} ->
  177. {noreply, State#state{env = NewEnv}};
  178. {noreply, NewEnv, Timeout} ->
  179. {noreply, State#state{env = NewEnv}, Timeout};
  180. {stop, Reason, Reply, NewEnv} ->
  181. {stop, Reason, Reply, State#state{env = NewEnv}};
  182. {stop, Reason, NewEnv} ->
  183. {stop, Reason, State#state{env = NewEnv}};
  184. Result ->
  185. Result
  186. catch
  187. error:undef ->
  188. {noreply, State}
  189. end.
  190.  
  191. async_call(Callback, Args, #state{module = Module, env = Env}) ->
  192. spawn(Module, Callback, Args ++ [Env]).
  193.  
  194. parse_and_call(Part, #state{module = Module, env = Env}) ->
  195. spawn(fun() ->
  196. case .rfc4627:decode(Part) of
  197. {ok, Json, _} ->
  198. {Callback, Record} = recognize(Json),
  199. Module:Callback(Record, Env);
  200. {error, _} ->
  201. Module:handle_info({bad_part, Part}, Env)
  202. end
  203. end).
  204.  
  205. recognize({obj, [{"delete", {obj, [{"status", Delete}]}}]}) ->
  206. {handle_delete, object:to_delete(Delete)};
  207.  
  208. recognize({obj, [{"limit", Limit}]}) ->
  209. {handle_limit, object:to_limit(Limit)};
  210.  
  211. recognize(Status) ->
  212. {handle_status, object:to_status(Status)}.
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement