Guest User

Untitled

a guest
Jul 18th, 2018
106
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 13.18 KB | None | 0 0
  1. require "resolv"
  2. require_relative "../util/choria"
  3. require_relative "../util/natswrapper"
  4.  
  5. module MCollective
  6. module Connector
  7. class Nats < Base
  8. attr_reader :connection
  9.  
  10. def initialize
  11. @config = Config.instance
  12. @subscriptions = []
  13. @connection = Util::NatsWrapper.new
  14.  
  15. Log.info("Choria NATS.io connector using pure ruby nats/io/client %s with protocol version %s" % [NATS::IO::VERSION, NATS::IO::PROTOCOL])
  16. end
  17.  
  18. # Determines if the NATS connection is active
  19. #
  20. # @return [Boolean]
  21. def connected?
  22. connection.connected?
  23. end
  24.  
  25. # Current connected server
  26. #
  27. # @return [String,nil]
  28. def connected_server
  29. connection.connected_server
  30. end
  31.  
  32. # Retrieves the NATS connection stats
  33. #
  34. # @return [Hash]
  35. def stats
  36. connection.stats
  37. end
  38.  
  39. # Client library version
  40. #
  41. # @return [String]
  42. def client_version
  43. connection.client_version
  44. end
  45.  
  46. # Client library flavour
  47. #
  48. # @return [String]
  49. def client_flavour
  50. connection.client_flavour
  51. end
  52.  
  53. # Connection options from the NATS gem
  54. #
  55. # @return [Hash]
  56. def active_options
  57. connection.active_options
  58. end
  59.  
  60. # Attempts to connect to the middleware, noop when already connected
  61. #
  62. # @return [void]
  63. # @raise [StandardError] when SSL files are not readable
  64. def connect
  65. if connection && connection.started?
  66. Log.debug("Already connection, not re-initializing connection")
  67. return
  68. end
  69.  
  70. parameters = {
  71. :max_reconnect_attempts => -1,
  72. :reconnect_time_wait => 1,
  73. :dont_randomize_servers => !choria.randomize_middleware_servers?,
  74. :name => @config.identity,
  75. :tls => {
  76. :context => choria.ssl_context
  77. }
  78. }
  79.  
  80. if $choria_unsafe_disable_nats_tls # rubocop:disable Style/GlobalVars
  81. Log.warn("Disabling TLS in NATS connector, this is not a production supported setup")
  82. parameters.delete(:tls)
  83. end
  84.  
  85. servers = server_list
  86.  
  87. unless servers.empty?
  88. Log.debug("Connecting to servers: %s" % servers.join(", "))
  89. parameters[:servers] = servers
  90. end
  91.  
  92. choria.check_ssl_setup
  93.  
  94. connection.start(parameters)
  95.  
  96. nil
  97. end
  98.  
  99. # Disconnects from NATS
  100. def disconnect
  101. connection.stop
  102. end
  103.  
  104. # Creates the middleware headers needed for a given message
  105. #
  106. # @param msg [Message]
  107. # @return [Hash]
  108. def headers_for(msg)
  109. # mc_sender is only passed bacause M::Message incorrectly assumed this is some required
  110. # part of messages when its just some internals of the stomp based connectors that bled out
  111. headers = {
  112. "mc_sender" => @config.identity
  113. }
  114.  
  115. headers["seen-by"] = [] if msg.headers.include?("seen-by")
  116.  
  117. if [:request, :direct_request].include?(msg.type)
  118. if msg.reply_to
  119. headers["reply-to"] = msg.reply_to
  120. else
  121. # if its a request/direct_request style message and its not
  122. # one we're replying to - ie. its a new message we're making
  123. # we'll need to set a reply-to target that the daemon will
  124. # subscribe to
  125. headers["reply-to"] = make_target(msg.agent, :reply, msg.collective)
  126. end
  127.  
  128. if msg.headers.include?("seen-by")
  129. headers["seen-by"] << [@config.identity, connected_server.to_s]
  130. end
  131. elsif msg.type == :reply
  132. if msg.request.headers.include?("seen-by")
  133. headers["seen-by"] = msg.request.headers["seen-by"]
  134. headers["seen-by"].last << connected_server.to_s
  135. end
  136. end
  137.  
  138. headers
  139. end
  140.  
  141. # Create a target structure for a message
  142. #
  143. # @example data
  144. #
  145. # {
  146. # :name => "nats.name",
  147. # :headers => { headers... }
  148. # }
  149. #
  150. # @param msg [Message]
  151. # @param identity [String,nil] override identity
  152. # @return [Hash]
  153. def target_for(msg, identity=nil)
  154. target = nil
  155.  
  156. if msg.type == :reply
  157. raise("Do not know how to reply, no reply-to header has been set on message %s" % msg.requestid) unless msg.request.headers["reply-to"]
  158.  
  159. target = {:name => msg.request.headers["reply-to"], :headers => {}}
  160.  
  161. elsif [:request, :direct_request].include?(msg.type)
  162. target = {:name => make_target(msg.agent, msg.type, msg.collective, identity), :headers => {}}
  163.  
  164. else
  165. raise("Don't now how to create a target for message type %s" % msg.type)
  166.  
  167. end
  168.  
  169. target[:headers].merge!(headers_for(msg))
  170.  
  171. target
  172. end
  173.  
  174. # Retrieves the current process pid
  175. #
  176. # @note mainly used for testing
  177. # @return [Fixnum]
  178. def current_pid
  179. $$
  180. end
  181.  
  182. # Creates a target structure
  183. #
  184. # @param agent [String] agent name
  185. # @param type [:directed, :broadcast, :reply, :request, :direct_request]
  186. # @param collective [String] target collective name
  187. # @param identity [String,nil] identity for the request, else node configured identity
  188. # @return [String] target name
  189. # @raise [StandardError] on invalid input
  190. def make_target(agent, type, collective, identity=nil)
  191. raise("Unknown target type %s" % type) unless [:directed, :broadcast, :reply, :request, :direct_request].include?(type)
  192.  
  193. raise("Unknown collective '%s' known collectives are '%s'" % [collective, @config.collectives.join(", ")]) unless @config.collectives.include?(collective)
  194.  
  195. identity ||= @config.identity
  196.  
  197. case type
  198. when :reply
  199. "%s.reply.%s.%d.%d" % [collective, identity, current_pid, Client.request_sequence]
  200.  
  201. when :broadcast, :request
  202. "%s.broadcast.agent.%s" % [collective, agent]
  203.  
  204. when :direct_request, :directed
  205. "%s.node.%s" % [collective, identity]
  206. end
  207. end
  208.  
  209. # Publishes a message to the middleware
  210. #
  211. # @param msg [Message]
  212. def publish(msg)
  213. msg.base64_encode!
  214.  
  215. if choria.federated?
  216. msg.type == :direct_request ? publish_federated_directed(msg) : publish_federated_broadcast(msg)
  217. else
  218. msg.type == :direct_request ? publish_connected_directed(msg) : publish_connected_broadcast(msg)
  219. end
  220. end
  221.  
  222. # Publish a directed request via a Federation Broker
  223. #
  224. # @param msg [Message]
  225. def publish_federated_directed(msg)
  226. messages = []
  227. target = target_for(msg, msg.discovered_hosts[0])
  228.  
  229. msg.discovered_hosts.in_groups_of(200) do |nodes|
  230. node_targets = nodes.compact.map do |node|
  231. target_for(msg, node)[:name]
  232. end
  233.  
  234. data = {
  235. "protocol" => "choria:transport:1",
  236. "data" => msg.payload,
  237. "headers" => {
  238. "federation" => {
  239. "target" => node_targets,
  240. "req" => msg.requestid
  241. }
  242. }.merge(target[:headers])
  243. }
  244.  
  245. messages << JSON.dump(data)
  246. end
  247.  
  248. choria.federation_collectives.each do |network|
  249. messages.each do |data|
  250. network_target = "choria.federation.%s.federation" % network
  251.  
  252. Log.debug("Sending a federated direct message via NATS target '%s' for message type %s" % [network_target, msg.type])
  253.  
  254. connection.publish(network_target, data, target[:headers]["reply-to"])
  255. end
  256. end
  257. end
  258.  
  259. # Publish a directed request to a connected collective
  260. #
  261. # @param msg [Message]
  262. def publish_connected_directed(msg)
  263. msg.discovered_hosts.each do |node|
  264. target = target_for(msg, node)
  265. data = {
  266. "protocol" => "choria:transport:1",
  267. "data" => msg.payload,
  268. "headers" => target[:headers]
  269. }
  270.  
  271. Log.debug("Sending a direct message to %s via NATS target '%s' for message type %s" % [node, target.inspect, msg.type])
  272.  
  273. connection.publish(target[:name], data.to_json, target[:headers]["reply-to"])
  274. end
  275. end
  276.  
  277. # Publish a broadcast message to via a Federation Broker
  278. #
  279. # @param msg [Message]
  280. def publish_federated_broadcast(msg)
  281. target = target_for(msg)
  282. data = {
  283. "protocol" => "choria:transport:1",
  284. "data" => msg.payload,
  285. "headers" => {
  286. "federation" => {
  287. "target" => [target[:name]],
  288. "req" => msg.requestid
  289. }
  290. }.merge(target[:headers])
  291. }
  292.  
  293. data = JSON.dump(data)
  294.  
  295. choria.federation_collectives.each do |network|
  296. target[:name] = "choria.federation.%s.federation" % network
  297.  
  298. Log.debug("Sending a federated broadcast message to NATS target '%s' for message type %s" % [target.inspect, msg.type])
  299.  
  300. connection.publish(target[:name], data, target[:headers]["reply-to"])
  301. end
  302. end
  303.  
  304. # Publish a broadcast message to a connected collective
  305. #
  306. # @param msg [Message]
  307. def publish_connected_broadcast(msg)
  308. target = target_for(msg)
  309. data = {
  310. "protocol" => "choria:transport:1",
  311. "data" => msg.payload,
  312. "headers" => target[:headers]
  313. }
  314.  
  315. # only happens when replying
  316. if received_message = msg.request
  317. if received_message.headers.include?("federation")
  318. data["headers"]["federation"] = received_message.headers["federation"]
  319. end
  320. end
  321.  
  322. Log.debug("Sending a broadcast message to NATS target '%s' for message type %s" % [target.inspect, msg.type])
  323.  
  324. connection.publish(target[:name], JSON.dump(data), target[:headers]["reply-to"])
  325. end
  326.  
  327. # Unsubscribe from the target for a agent
  328. #
  329. # @see make_target
  330. # @param agent [String] agent name
  331. # @param type [:reply, :broadcast, :request, :direct_request, :directed] type of message you want a subscription for
  332. # @param collective [String] the collective to subscribe for
  333. # @return [void]
  334. def unsubscribe(agent, type, collective)
  335. target = make_target(agent, type, collective)
  336. Log.debug("Unsubscribing from %s" % target)
  337.  
  338. connection.unsubscribe(target)
  339. end
  340.  
  341. # Subscribes to the topics/queues needed for a particular agent
  342. #
  343. # @see make_target
  344. # @param agent [String] agent name
  345. # @param type [:reply, :broadcast, :request, :direct_request, :directed] type of message you want a subscription for
  346. # @param collective [String] the collective to subscribe for
  347. # @return [void]
  348. def subscribe(agent, type, collective)
  349. target = make_target(agent, type, collective)
  350.  
  351. connection.subscribe(target)
  352. end
  353.  
  354. # Receives a message from the middleware
  355. #
  356. # @note blocks until one is received
  357. # @return [Message]
  358. def receive
  359. msg = nil
  360.  
  361. until msg
  362. Log.debug("Waiting for a message from NATS")
  363.  
  364. received = connection.receive
  365.  
  366. begin
  367. msg = JSON.parse(received)
  368. rescue
  369. Log.warn("Got non JSON data from the broker: %s" % [received])
  370. msg = nil
  371. end
  372. end
  373.  
  374. if msg["headers"].include?("seen-by")
  375. msg["headers"]["seen-by"] << [connected_server.to_s, @config.identity]
  376. end
  377.  
  378. Message.new(msg["data"], msg, :base64 => true, :headers => msg["headers"])
  379. end
  380.  
  381. # Retrieves the list of server and port combos to attempt to connect to
  382. #
  383. # Configured servers are checked, then SRV records and finally a fall
  384. # back to puppet:4222 is done
  385. #
  386. # @return [Array<String>] list of servers in form of a URI
  387. def server_list
  388. uris = choria.middleware_servers("puppet", "4222").map do |host, port|
  389. URI("nats://%s:%s" % [host, port])
  390. end
  391.  
  392. decorate_servers_with_users(uris).map(&:to_s)
  393. end
  394.  
  395. # Add user and pass to a series of URIs
  396. #
  397. # @param servers [Array<URI>] list of URI's to decorate
  398. # @return [Array<URI>]
  399. def decorate_servers_with_users(servers)
  400. user = get_option("nats.user", environment["MCOLLECTIVE_NATS_USERNAME"])
  401. pass = get_option("nats.pass", environment["MCOLLECTIVE_NATS_PASSWORD"])
  402.  
  403. if user && pass
  404. servers.each do |uri|
  405. uri.user = user
  406. uri.password = pass
  407. end
  408. end
  409.  
  410. servers
  411. end
  412.  
  413. # Retrieves the environment, mainly used for testing
  414. def environment
  415. ENV
  416. end
  417.  
  418. # Gets a config option
  419. #
  420. # @param opt [String] config option to look up
  421. # @param default [Object] default to return when not found
  422. # @return [Object] the found data or default
  423. # @raise [StandardError] when no default is given and option is not found
  424. def get_option(opt, default=:_unset)
  425. return @config.pluginconf[opt] if @config.pluginconf.include?(opt)
  426. return default unless default == :_unset
  427.  
  428. raise("No plugin.%s configuration option given" % opt)
  429. end
  430.  
  431. def choria
  432. @_choria ||= Util::Choria.new(false)
  433. end
  434. end
  435. end
  436. end
Add Comment
Please, Sign In to add comment