Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- -module(twitter.streaming).
- -author("KONDO Takahiro <heartery@gmail.com>").
- -export([behaviour_info/1]).
- -export([start_link/4, start_link/5, start/4, start/5]).
- -export([call/2, call/3, cast/2, reply/2]).
- -behaviour(gen_server).
- -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
- -include("object.hrl").
- -record(args, {user, password, method, params = [], format = record}).
- -record(state, {module, id, format, env}).
- behaviour_info(callbacks) ->
- [{init, 1},
- {handle_start, 2},
- {handle_info, 2},
- {terminate, 2}].
- start_link(Module, InitArgs, StreamingArgs, Options) ->
- boot(fun(Args) ->
- .gen_server:start_link(?MODULE, Args, Options)
- end, Module, InitArgs, StreamingArgs).
- start_link(ServerName, Module, InitArgs, StreamingArgs, Options) ->
- boot(fun(Args) ->
- .gen_server:start_link(ServerName, ?MODULE, Args, Options)
- end, Module, InitArgs, StreamingArgs).
- start(Module, InitArgs, StreamingArgs, Options) ->
- boot(fun(Args) ->
- .gen_server:start(?MODULE, Args, Options)
- end, Module, InitArgs, StreamingArgs).
- start(ServerName, Module, InitArgs, StreamingArgs, Options) ->
- boot(fun(Args) ->
- .gen_server:start(ServerName, ?MODULE, Args, Options)
- end, Module, InitArgs, StreamingArgs).
- call(ServerRef, Request) ->
- call(ServerRef, Request, infinity).
- call(ServerRef, Request, Timeout) ->
- try .gen_server:call(ServerRef, Request, Timeout) of
- Reply -> Reply
- catch
- exit:{noproc, _} -> {error, closed};
- exit:{timeout, _} -> {error, timeout}
- end.
- cast(ServerRef, Request) ->
- .gen_server:cast(ServerRef, Request).
- reply(Client, Reply) ->
- .gen_server:reply(Client, Reply).
- %%% callback functions
- init([Module, InitArgs, #args{user = User,
- password = Password,
- method = Method,
- params = Params,
- format = Format}]) ->
- RequestMethod = if Method == filter -> post; true -> get end,
- F = if Format == record -> json; true -> Format end,
- Url = api:stream_url("/statuses/" ++ atom_to_list(Method), F),
- case api:stream(RequestMethod, Url, Params, {basic, User, Password}) of
- {ok, Id} ->
- State = #state{module = Module, id = Id, format = Format},
- try Module:init(InitArgs) of
- {ok, Env} -> {ok, State#state{env = Env}};
- {ok, Env, Timeout} -> {ok, State#state{env = Env}, Timeout};
- Result -> Result
- catch
- _:Reason -> {stop, Reason}
- end;
- {error, Reason} ->
- {stop, Reason}
- end.
- handle_call(Request, From, State) ->
- sync_call(handle_call, [Request, From], State).
- handle_cast(Request, State) ->
- sync_call(handle_call, [Request], State).
- handle_info({http, {Id, stream_start, Headers}}, #state{id = Id} = State) ->
- async_call(handle_start, [Headers], State),
- {noreply, State};
- handle_info({http, {Id, stream_end, _}}, #state{id = Id} = State) ->
- {stop, disconnect, State};
- handle_info({http, {Id, stream, <<"\r\n">>}}, #state{id = Id} = State) ->
- {noreply, State};
- handle_info({http, {Id, stream, Part}}, #state{id = Id, format = record} = State) ->
- parse_and_call(Part, State),
- {noreply, State};
- handle_info({http, {Id, stream, Part}}, #state{id = Id} = State) ->
- async_call(handle_stream, [Part], State),
- {noreply, State};
- handle_info({http, {Id, {error, Reason}}}, #state{id = Id} = State) ->
- {stop, {http_error, Reason}, State};
- handle_info(Info, State) ->
- async_call(handle_info, [Info], State),
- {noreply, State}.
- terminate(Reason, #state{module = Module, id = Id, env = Env}) ->
- Module:terminate(Reason, Env),
- .http:cancel_request(Id).
- code_change(_, State, _) ->
- {ok, State}.
- %%% private functions
- boot(Fun, Module, InitArgs, StreamingArgs) ->
- Args = case .lists:foldl(fun args/2, #args{}, StreamingArgs) of
- #args{method = undefined, params = Params} = As ->
- L = length(.lists:filter(fun({Key, _}) ->
- Key == follow orelse
- Key == track
- end, Params)),
- As#args{method = if L > 0 -> filter; true -> sample end};
- As ->
- As
- end,
- case exported(Module, Args#args.format) of
- true -> Fun([Module, InitArgs, Args]);
- false -> {error, bad_callback}
- end.
- exported(Module, record) ->
- .erlang:function_exported(Module, handle_status, 2) andalso
- .erlang:function_exported(Module, handle_delete, 2) andalso
- .erlang:function_exported(Module, handle_limit, 2);
- exported(Module, _) ->
- .erlang:function_exported(Module, handle_stream, 2).
- args({user, User}, Args) -> Args#args{user = User};
- args({password, Password}, Args) -> Args#args{password = Password};
- args({method, Method}, Args) -> Args#args{method = Method};
- args(filter, Args) -> Args#args{method = filter};
- args(firehose, Args) -> Args#args{method = firehose};
- args(sample, Args) -> Args#args{method = sample};
- args({follow, UserIds}, #args{params = Params} = Args) ->
- Value = .string:join(.lists:map(fun integer_to_list/1, UserIds), ","),
- Args#args{params = [{follow, Value}|Params]};
- args({track, Keywords}, #args{params = Params} = Args) ->
- Args#args{params = [{track, .string:join(Keywords, ",")}|Params]};
- args({count, Count}, #args{params = Params} = Args) ->
- Args#args{params = [{count, Count}|Params]};
- args({format, Format}, Args) -> Args#args{format = Format};
- args(json, Args) -> Args#args{format = json};
- args(xml, Args) -> Args#args{format = xml}.
- sync_call(Callback, Args, #state{module = Module, env = Env} = State) ->
- try apply(Module, Callback, Args ++ [Env]) of
- {reply, Reply, NewEnv} ->
- {reply, Reply, State#state{env = NewEnv}};
- {reply, Reply, NewEnv, Timeout} ->
- {reply, Reply, State#state{env = NewEnv}, Timeout};
- {noreply, NewEnv} ->
- {noreply, State#state{env = NewEnv}};
- {noreply, NewEnv, Timeout} ->
- {noreply, State#state{env = NewEnv}, Timeout};
- {stop, Reason, Reply, NewEnv} ->
- {stop, Reason, Reply, State#state{env = NewEnv}};
- {stop, Reason, NewEnv} ->
- {stop, Reason, State#state{env = NewEnv}};
- Result ->
- Result
- catch
- error:undef ->
- {noreply, State}
- end.
- async_call(Callback, Args, #state{module = Module, env = Env}) ->
- spawn(Module, Callback, Args ++ [Env]).
- parse_and_call(Part, #state{module = Module, env = Env}) ->
- spawn(fun() ->
- case .rfc4627:decode(Part) of
- {ok, Json, _} ->
- {Callback, Record} = recognize(Json),
- Module:Callback(Record, Env);
- {error, _} ->
- Module:handle_info({bad_part, Part}, Env)
- end
- end).
- recognize({obj, [{"delete", {obj, [{"status", Delete}]}}]}) ->
- {handle_delete, object:to_delete(Delete)};
- recognize({obj, [{"limit", Limit}]}) ->
- {handle_limit, object:to_limit(Limit)};
- recognize(Status) ->
- {handle_status, object:to_status(Status)}.
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement