Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- -module(mapred).
- %% API
- -export([mapred/1, maptags/3, redtags/2,decoder/1,get_riak_hostport/1]).
- -record(hostport, {host, port}).
- get_riak_hostport(Name) ->
- {ok, Nodes} = application:get_env(twitterminer, riak_nodes),
- {Name, Keys} = lists:keyfind(Name, 1, Nodes),
- #hostport{host=keyfind(host, Keys),
- port=keyfind(port, Keys)}.
- keyfind(Key, L) ->
- {Key, V} = lists:keyfind(Key, 1, L),
- V.
- mapred(Keys) ->
- RHP = get_riak_hostport(riak1),
- {ok, Pid} = riakc_pb_socket:start(RHP#hostport.host, RHP#hostport.port),
- {ok, [{1, [Result]}]} = riakc_pb_socket:mapred(
- Pid,
- Keys,
- [
- {map, {modfun, ?MODULE, maptags}, none, false},
- {reduce, {modfun, ?MODULE, redtags}, none, true}
- ]
- ),
- dict:to_list(Result).
- maptags(RiakObject, _, _) -> %We don't care about keydata or the static argument
- [dict:from_list([{I, 1} || I <- decoder(riak_object:get_value(RiakObject))])].
- redtags(Input, _) -> %Once again we don't care about the static argument
- [lists:foldl(
- fun(Tag, Acc) ->
- dict:merge(
- fun(_, Amount1, Amount2) ->
- Amount1 + Amount2
- end,
- Tag,
- Acc
- )
- end,
- dict:new(),
- Input
- )].
- decoder(B) ->
- case jiffy:decode(B) of
- {L}->
- case lists:keyfind(<<"coordinates">>, 1, L) of
- {_, M} -> {found, M};
- false -> not_found
- end;
- _->{invalid_tweet, B}
- end.
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement