Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- require 'influxdb'
- require 'thread'
- class Registry
- def self.default
- @default ||= new
- end
- def initialize(options = {})
- @mutex = Mutex.new
- @metrics = {}
- @logger = options[:logger]
- end
- def metrics
- mutex.synchronize { @metrics.dup }
- end
- def add(measurement, tags = {}, &metric_block)
- mutex.synchronize do
- log("Registering #{measurement}")
- @metrics[measurement] = [metric_block, tags]
- end
- end
- private
- attr_reader :mutex, :logger
- def log(message, severity = :debug)
- return if logger.nil?
- logger.public_send(
- severity,
- "[#{self.class}][#{Thread.current.object_id.to_s(16)}] #{message}"
- )
- end
- end
- class Agent
- HOST = Socket.gethostname.freeze
- def initialize(client, registry, interval, logger)
- @client = client
- @registry = registry
- @interval = interval
- @logger = logger
- @thread = nil
- end
- def start
- return if alive?
- log('Starting...')
- @thread = Thread.new do
- loop do
- begin
- log("sleeping for #{interval}s...")
- sleep(interval)
- Thread.new do
- metrics.each do |metric_name, (metric, tags)|
- begin
- log("[#{metric_name}] Writing data...")
- data = {
- values: { value: metric.call }, tags: tags.merge(host: HOST)
- }
- client.write_point(metric_name, data)
- rescue Exception => e
- log(
- "[#{metric_name}] ERROR: #{e.class}: #{e.message}", :error
- )
- end
- end
- end.join
- rescue => e
- log("ERROR: #{e.class}: #{e.message}", :error)
- end
- end
- end
- end
- def alive?
- @thread && @thread.alive?
- end
- def stop
- log('Stopping...')
- @thread.kill if @thread
- @thread = nil
- end
- def restart
- start unless alive?
- end
- private
- attr_reader :client, :logger, :interval, :registry
- def metrics
- registry.metrics
- end
- def log(message, severity = :debug)
- return if logger.nil?
- logger.public_send(
- severity,
- "[#{self.class}][#{Thread.current.object_id.to_s(16)}] #{message}"
- )
- end
- end
- class Supervisor
- INTERVAL_CHECK = 10
- def initialize(agent, options = {})
- @agent = agent
- @logger = options[:logger]
- @thread = nil
- end
- def start
- unless @thread && @thread.alive?
- log('Starting...')
- @thread = Thread.new do
- loop { sleep(INTERVAL_CHECK); agent.restart }
- end
- end
- end
- def stop
- log('Stopping...')
- @thread.kill if @thread
- @thread = nil
- end
- private
- attr_reader :agent, :logger
- def log(message, severity = :debug)
- return if logger.nil?
- logger.public_send(
- severity,
- "[#{self.class}][#{Thread.current.object_id.to_s(16)}] #{message}"
- )
- end
- end
- class Reporter
- extend Forwardable
- def_delegator :registry, :add
- def initialize(options = {})
- @options = options
- @logger = options.fetch(:logger, Logger.new(STDOUT))
- @client = ::InfluxDB::Client.new(
- host: options.fetch(:host, '192.168.99.100'),
- port: options.fetch(:port, 8086),
- database: options.fetch(:database, 'wadus'),
- username: options.fetch(:user, 'root'),
- password: options.fetch(:password, 'root'),
- async: options.fetch(:async, true)
- )
- @registry = options.fetch(:registry, Registry.new(logger: logger))
- @interval = options.fetch(:interval, 5)
- @agent = Agent.new(client, registry, interval, logger)
- @supervisor = Supervisor.new(agent, logger: logger)
- end
- def start
- agent.start
- supervisor.start
- end
- def stop
- supervisor.stop
- agent.stop
- end
- private
- attr_reader(
- :options, :client, :logger, :interval, :agent, :registry, :supervisor
- )
- end
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement