Advertisement
zogzog

Untitled

Nov 27th, 2014
185
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Erlang 1.43 KB | None | 0 0
  1.  
  2. -module(mapred).
  3.  
  4. %% API
  5. -export([mapred/1, maptags/3, redtags/2,decoder/1,get_riak_hostport/1]).
  6. -record(hostport, {host, port}).
  7.  
  8. get_riak_hostport(Name) ->
  9.   {ok, Nodes} = application:get_env(twitterminer, riak_nodes),
  10.   {Name, Keys} = lists:keyfind(Name, 1, Nodes),
  11.   #hostport{host=keyfind(host, Keys),
  12.             port=keyfind(port, Keys)}.
  13.  
  14. keyfind(Key, L) ->
  15.   {Key, V} = lists:keyfind(Key, 1, L),
  16.   V.
  17.  
  18. mapred(Keys) ->
  19. RHP = get_riak_hostport(riak1),
  20.   {ok, Pid} = riakc_pb_socket:start(RHP#hostport.host, RHP#hostport.port),
  21.   {ok, [{1, [Result]}]} = riakc_pb_socket:mapred(
  22.     Pid,
  23.     Keys,
  24.     [
  25.       {map, {modfun, ?MODULE, maptags}, none, false},
  26.       {reduce, {modfun, ?MODULE, redtags}, none, true}
  27.     ]
  28.   ),
  29.   dict:to_list(Result).
  30.  
  31. maptags(RiakObject, _, _) ->  %We don't care about keydata or the static argument
  32.   [dict:from_list([{I, 1} || I <- decoder(riak_object:get_value(RiakObject))])].
  33.  
  34. redtags(Input, _) ->  %Once again we don't care about the static argument
  35.   [lists:foldl(
  36.     fun(Tag, Acc) ->
  37.       dict:merge(
  38.         fun(_, Amount1, Amount2) ->
  39.           Amount1 + Amount2
  40.         end,
  41.         Tag,
  42.         Acc
  43.       )
  44.     end,
  45.     dict:new(),
  46.     Input
  47.   )].
  48.  
  49. decoder(B) ->
  50.   case jiffy:decode(B) of
  51.     {L}->
  52.       case lists:keyfind(<<"coordinates">>, 1, L) of
  53.         {_, M} -> {found, M};
  54.         false -> not_found
  55.       end;
  56.     _->{invalid_tweet, B}
  57.   end.
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement