Advertisement
Guest User

Untitled

a guest
Jun 13th, 2016
80
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.75 KB | None | 0 0
  1. require 'influxdb'
  2. require 'thread'
  3.  
  4. class Registry
  5. def self.default
  6. @default ||= new
  7. end
  8.  
  9. def initialize(options = {})
  10. @mutex = Mutex.new
  11. @metrics = {}
  12. @logger = options[:logger]
  13. end
  14.  
  15. def metrics
  16. mutex.synchronize { @metrics.dup }
  17. end
  18.  
  19. def add(measurement, tags = {}, &metric_block)
  20. mutex.synchronize do
  21. log("Registering #{measurement}")
  22. @metrics[measurement] = [metric_block, tags]
  23. end
  24. end
  25.  
  26. private
  27.  
  28. attr_reader :mutex, :logger
  29.  
  30. def log(message, severity = :debug)
  31. return if logger.nil?
  32.  
  33. logger.public_send(
  34. severity,
  35. "[#{self.class}][#{Thread.current.object_id.to_s(16)}] #{message}"
  36. )
  37. end
  38. end
  39.  
  40. class Agent
  41. HOST = Socket.gethostname.freeze
  42.  
  43. def initialize(client, registry, interval, logger)
  44. @client = client
  45. @registry = registry
  46. @interval = interval
  47. @logger = logger
  48. @thread = nil
  49. end
  50.  
  51. def start
  52. return if alive?
  53.  
  54. log('Starting...')
  55. @thread = Thread.new do
  56. loop do
  57. begin
  58. log("sleeping for #{interval}s...")
  59. sleep(interval)
  60.  
  61. Thread.new do
  62. metrics.each do |metric_name, (metric, tags)|
  63. begin
  64. log("[#{metric_name}] Writing data...")
  65. data = {
  66. values: { value: metric.call }, tags: tags.merge(host: HOST)
  67. }
  68. client.write_point(metric_name, data)
  69. rescue Exception => e
  70. log(
  71. "[#{metric_name}] ERROR: #{e.class}: #{e.message}", :error
  72. )
  73. end
  74. end
  75. end.join
  76. rescue => e
  77. log("ERROR: #{e.class}: #{e.message}", :error)
  78. end
  79. end
  80. end
  81. end
  82.  
  83. def alive?
  84. @thread && @thread.alive?
  85. end
  86.  
  87. def stop
  88. log('Stopping...')
  89. @thread.kill if @thread
  90. @thread = nil
  91. end
  92.  
  93. def restart
  94. start unless alive?
  95. end
  96.  
  97. private
  98.  
  99. attr_reader :client, :logger, :interval, :registry
  100.  
  101. def metrics
  102. registry.metrics
  103. end
  104.  
  105. def log(message, severity = :debug)
  106. return if logger.nil?
  107.  
  108. logger.public_send(
  109. severity,
  110. "[#{self.class}][#{Thread.current.object_id.to_s(16)}] #{message}"
  111. )
  112. end
  113. end
  114.  
  115. class Supervisor
  116. INTERVAL_CHECK = 10
  117.  
  118. def initialize(agent, options = {})
  119. @agent = agent
  120. @logger = options[:logger]
  121. @thread = nil
  122. end
  123.  
  124. def start
  125. unless @thread && @thread.alive?
  126. log('Starting...')
  127. @thread = Thread.new do
  128. loop { sleep(INTERVAL_CHECK); agent.restart }
  129. end
  130. end
  131. end
  132.  
  133. def stop
  134. log('Stopping...')
  135. @thread.kill if @thread
  136. @thread = nil
  137. end
  138.  
  139. private
  140.  
  141. attr_reader :agent, :logger
  142.  
  143. def log(message, severity = :debug)
  144. return if logger.nil?
  145.  
  146. logger.public_send(
  147. severity,
  148. "[#{self.class}][#{Thread.current.object_id.to_s(16)}] #{message}"
  149. )
  150. end
  151. end
  152.  
  153. class Reporter
  154. extend Forwardable
  155.  
  156. def_delegator :registry, :add
  157.  
  158. def initialize(options = {})
  159. @options = options
  160. @logger = options.fetch(:logger, Logger.new(STDOUT))
  161. @client = ::InfluxDB::Client.new(
  162. host: options.fetch(:host, '192.168.99.100'),
  163. port: options.fetch(:port, 8086),
  164. database: options.fetch(:database, 'wadus'),
  165. username: options.fetch(:user, 'root'),
  166. password: options.fetch(:password, 'root'),
  167. async: options.fetch(:async, true)
  168. )
  169. @registry = options.fetch(:registry, Registry.new(logger: logger))
  170. @interval = options.fetch(:interval, 5)
  171. @agent = Agent.new(client, registry, interval, logger)
  172. @supervisor = Supervisor.new(agent, logger: logger)
  173. end
  174.  
  175. def start
  176. agent.start
  177. supervisor.start
  178. end
  179.  
  180. def stop
  181. supervisor.stop
  182. agent.stop
  183. end
  184.  
  185. private
  186.  
  187. attr_reader(
  188. :options, :client, :logger, :interval, :agent, :registry, :supervisor
  189. )
  190. end
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement