Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- module Fluent
- class ServerStatusInput < Input
- Plugin.register_input('serverstatus', self)
- config_param :uris, :array, :default => nil
- config_param :uri, :string
- config_param :stats_interval, :time, :default => 60 # every minute
- config_param :tag_prefix, :string, :default => "serverstatus"
- def initialize
- super
- require 'mongo'
- end
- def configure(conf)
- super
- unless @uris or @uri
- raise ConfigError, 'uris or uri must be specified'
- end
- if @uris.nil?
- @uris = [@uri]
- end
- @conns = @uris.map do |uri_str|
- uri_str = "mongodb://#{uri_str}" if not uri_str.start_with?("mongodb://")
- user, password = uri_str.split('@')[0].split('//')[1].split(':')
- uri = Mongo::URI.new(uri_str, :connect => :direct, :user => user, :password => password)
- client = Mongo::Client.new(uri.servers, uri.options)
- [client, uri]
- end
- end
- def start
- @loop = Coolio::Loop.new
- tw = TimerWatcher.new(@stats_interval, true, @log, &method(:collect_serverstatus))
- tw.attach(@loop)
- @thread = Thread.new(&method(:run))
- end
- def run
- @loop.run
- rescue
- log.error "unexpected error", :error=>$!.to_s
- log.error_backtrace
- end
- def shutdown
- @loop.stop
- @thread.join
- end
- def collect_serverstatus
- begin
- for conn, conn_uri in @conns
- database = conn.database
- stats = database.command(:serverStatus => :true).first
- make_data_msgpack_compatible(stats)
- host, port = conn_uri.servers[0].split(':')
- tag = [@tag_prefix, host.gsub(/[\.-]/, "_"), port].join(".")
- Engine.emit(tag, Engine.now, stats)
- end
- rescue => e
- log.error "failed to collect MongoDB stats", :error_class => e.class, :error => e
- end
- end
- # MessagePack doesn't like it when the field is of Time class.
- # This is a convenient method that traverses through the
- # getServerStatus response and update any field that is of Time class.
- def make_data_msgpack_compatible(data)
- print data
- if [Hash, BSON::Document].include?(data.class)
- data.each {|k, v|
- if v.respond_to?(:each)
- make_data_msgpack_compatible(v)
- elsif v.class == Time
- data[k] = v.to_i
- elsif v.class == BSON::ObjectId
- data[k] = v.to_s
- end
- }
- # serverStatus's "locks" field has "." as a key, which can't be
- # inserted back to MongoDB withou wreaking havoc. Replace it with
- # "global"
- data["global"] = data.delete(".") if data["."]
- elsif data.class == Array
- data.each_with_index { |v, i|
- if v.respond_to?(:each)
- make_data_msgpack_compatible(v)
- elsif v.class == Time
- data[i] = v.to_i
- elsif v.class == BSON::ObjectId('56bc92803b941dd6db3cb61a')
- data[i] = 'teste'
- end
- }
- else
- print data.class
- end
- end
- class TimerWatcher < Coolio::TimerWatcher
- def initialize(interval, repeat, log, &callback)
- @callback = callback
- @log = log
- super(interval, repeat)
- end
- def on_timer
- @callback.call
- rescue
- @log.error $!.to_s
- @log.error_backtrace
- end
- end
- end
- end
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement