Advertisement
Guest User

problem

a guest
Sep 6th, 2016
138
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.68 KB | None | 0 0
  1. defmodule Longpoll.Worker do
  2.  
  3. use Amnesia
  4. use GenServer
  5.  
  6. # API
  7.  
  8. def start_link(system_objects \\ Map.new()) do
  9. GenServer.start_link(__MODULE__, system_objects, name: __MODULE__)
  10. end
  11.  
  12. def add(object) do
  13. GenServer.cast(__MODULE__, {:add, object})
  14. end
  15.  
  16. def get(pid, element) do
  17. GenServer.call(pid, {:get, element})
  18. end
  19.  
  20. def add_filter() do
  21. GenServer.cast(__MODULE__, :add_filter)
  22. end
  23.  
  24. def poll(pid) do
  25. GenServer.call(pid, :poll, :infinity)
  26. end
  27.  
  28. def poll() do
  29. GenServer.call(__MODULE__, :poll)
  30. end
  31.  
  32. def get_objects(pid, objects) do
  33. GenServer.call(pid, {:get_objects, objects})
  34. end
  35.  
  36. def remove_item(id, user) do
  37. GenServer.cast(__MODULE__, {:remove_item, id, user})
  38. end
  39.  
  40. # CALLBACKS
  41. def init(system_objects) do
  42. port = "3080"
  43. # port = "5000"
  44. location = "192.168.8.180"
  45. user = "root"
  46. pass = "somepass"
  47. clientip = "192.168.8.38"
  48. sessionkey = "yepthisissessionkey"
  49. options = [hackney: [basic_auth: {user, pass}],
  50. connect_timeout: 3000000,
  51. recv_timeout: :infinity,
  52. timeout: 3000000,
  53. stream_to: self,
  54. ]
  55. register_command = "register.json"
  56. url = "http://#{location}:#{port}/"
  57. headers = [{"Content-Type", "application/x-www-form-urlencoded"},
  58. {"Connection", "Keep-Alive"}]
  59.  
  60. filter_name = "gis"
  61.  
  62. data = %{"sessionkey" => sessionkey,
  63. "username" => "poma",
  64. "clientip" => clientip,
  65. "jsondata" => nil}
  66.  
  67. %HTTPoison.AsyncResponse{id: id} = HTTPoison.post!(url <> register_command, URI.encode_query(data), headers, options)
  68. response = collect_response(id, self, <<>>)
  69.  
  70. subscriber_id = Map.get(response, "SubscriberId")
  71.  
  72. filter_command = "addfilter.json"
  73.  
  74. {:ok, cmd_addfilter} = %{"Msg" => %{
  75. "CmdRegFilter" => %{
  76. "TransactionNo" => 1,
  77. "FilterRef" => filter_name,
  78. "Filter" => %{
  79. "StsCamera" => 1,
  80. "StsLock" => 1,
  81. "StsMaintenance" => 1,
  82. }
  83. }
  84. }
  85. } |> Poison.encode
  86.  
  87. data_filter = data |>
  88. Map.put(:jsondata, cmd_addfilter) |>
  89. Map.put(:cumulative, 1) |>
  90. Map.put(:cookie, subscriber_id) |>
  91. URI.encode_query
  92.  
  93. %HTTPoison.AsyncResponse{id: id} = HTTPoison.post!(url <> filter_command, data_filter, headers, options)
  94. %{"FilterRef" => filter_name} = collect_response(id, self, <<>>)
  95. Process.send(self, :poll_request, [])
  96. {:ok, %{system_objects: system_objects, subscriber_id: subscriber_id,
  97. filter_name: filter_name, data: data, headers: headers, url: "http://#{location}:#{port}/",
  98. options: options, transation_n: 1}}
  99.  
  100. end
  101.  
  102. def collect_response(id, par, data) do
  103. receive do
  104. %HTTPoison.AsyncStatus{id: ^id, code: 200} ->
  105. collect_response(id, par, data)
  106. %HTTPoison.AsyncHeaders{id: ^id, headers: headers} ->
  107. collect_response(id, par, data)
  108. %HTTPoison.AsyncChunk{id: ^id, chunk: chunk} ->
  109. collect_response(id, par, data <> chunk)
  110. %HTTPoison.AsyncEnd{id: ^id} ->
  111. send par, handle_response({:ok, %{status_code: 200, body: data}})
  112. end
  113. end
  114.  
  115. def handle_response({:ok, %{status_code: 200, body: data}}) do
  116. {:ok, response} = Poison.decode(data)
  117. response
  118. end
  119.  
  120. def add_response_to_state(resp, system_objects) do
  121. case resp do
  122. [h|t] ->
  123. sts_aggreagete = Map.get(h, "StsAggregate")
  124. sts_type = sts_aggreagete |> Map.keys() |> List.first()
  125. sts = Map.get(sts_aggreagete, sts_type) |> Map.pop("Device")
  126. device = elem(sts, 0)
  127. properties = elem(sts, 1)
  128. device_type = Map.get(device, "@Type")
  129. device_id = Map.get(device, "Id")
  130. status_device = case Map.get(system_objects, device_id <> device_type) do
  131. %{id: id_, type: type_, users: users_, statueses: statuses_} ->
  132. statuses_ = Map.put(statuses_, sts_type, sts_aggreagete)
  133. %{id: id_, type: type_, users: users_, statueses: statuses_}
  134. nil -> %{id: device_id, type: device_type,
  135. users: MapSet.new(),
  136. statueses: %{sts_type => properties}}
  137. end
  138. system_objects = Map.put(system_objects, device_id <> device_type, status_device)
  139. add_response_to_state(t, system_objects)
  140. [] -> system_objects
  141. end
  142. end
  143.  
  144. def handle_cast(:add_filter, state) do
  145. command_json = "addfilter.json"
  146. url = Map.get(state, :url) <> command_json
  147. filter_name = Map.get(state, :filter_name)
  148. transation_n = Map.get(state, :transation_n) + 1
  149. options = Map.get(state, :options)
  150. headers = Map.get(state, :headers)
  151.  
  152. {:ok, cmd_addfilter} = %{"Msg" => %{
  153. "CmdRegFilter" => %{
  154. "TransactionNo" => transation_n,
  155. "FilterRef" => filter_name,
  156. "Filter" => %{
  157. "StsCamera" => 1,
  158. "StsLock" => 1,
  159. "StsMaintenance" => 1,
  160. }
  161. }
  162. }
  163. } |> Poison.encode
  164.  
  165. data = Map.get(state, :data) |>
  166. Map.put(:jsondata, cmd_addfilter) |>
  167. Map.put(:cumulative, 1) |>
  168. Map.put(:cookie, Map.get(state, :subscriber_id)) |>
  169. URI.encode_query
  170. %HTTPoison.AsyncResponse{id: id} = HTTPoison.post!(url, data, headers, options)
  171. response = collect_response(id, self, <<>>)
  172. state = Map.put(state, :filter_name, filter_name) |>
  173. Map.put(:transation_n, transation_n)
  174. {:noreply, state}
  175. end
  176.  
  177. # ADD
  178. def handle_cast({:add, object}, state) do
  179. %{id: id, type: type, users: users} = object
  180. system_objects = Map.get(state, :system_objects)
  181. users = case Map.get(system_objects, id) do
  182. %{id: id_, type: type_, users: users_} -> MapSet.union(users_, users)
  183. nil -> users
  184. end
  185. system_objects = Map.put(system_objects, id, %{id: id, type: type, users: users})
  186. state = Map.put(state, :system_objects, system_objects)
  187. IO.inspect(state)
  188. {:noreply, state}
  189. end
  190.  
  191. # POLL
  192. def handle_info(:poll_request, state) do
  193. data = poll_request(state)
  194. IO.inspect(data)
  195. {:noreply, state}
  196. end
  197.  
  198. def poll_request(state) do
  199. command_json = "poll.json"
  200. url = Map.get(state, :url) <> command_json
  201. data = data = Map.get(state, :data) |>
  202. Map.put(:cookie, Map.get(state, :subscriber_id)) |>
  203. URI.encode_query
  204. headers = Map.get(state, :headers)
  205. options = Map.get(state, :options)
  206. # %HTTPoison.AsyncResponse{id: id} = HTTPoison.post!(url, data, headers, options)
  207. # {:ok, response} = HTTPoison.post!(url, data, headers, options).body |> Poison.decode
  208. %HTTPoison.AsyncResponse{id: id} = HTTPoison.post!(url, data, headers, options)
  209. response = collect_response(id, self, <<>>)
  210. response
  211. end
  212.  
  213. def handle_call(:poll, _from, state) do
  214. response = poll_request(state)
  215. system_objects = add_response_to_state(response, Map.get(state, :system_objects))
  216. state = Map.put(state, :system_objects, system_objects)
  217. {:reply, system_objects, state}
  218. end
  219.  
  220. # GET
  221. def handle_call({:get, element}, _from, state) do
  222. status_element = Map.get(state, element)
  223. {:reply, status_element, state}
  224. end
  225.  
  226. def handle_call({:get_objects, objects}, _from, state) do
  227. system_objects = Map.get(state, :system_objects)
  228. return_objects = Enum.filter(system_objects, fn{k, v} -> MapSet.member?(objects, k) end)
  229. {:reply, return_objects, state}
  230. end
  231.  
  232. defp schedule_work() do
  233. Process.send(self(), :poll, [])
  234. end
  235.  
  236. end
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement