Advertisement
Guest User

Untitled

a guest
Feb 16th, 2016
71
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.28 KB | None | 0 0
  1. module Fluent
  2. class ServerStatusInput < Input
  3. Plugin.register_input('serverstatus', self)
  4.  
  5. config_param :uris, :array, :default => nil
  6. config_param :uri, :string
  7. config_param :stats_interval, :time, :default => 60 # every minute
  8. config_param :tag_prefix, :string, :default => "serverstatus"
  9.  
  10. def initialize
  11. super
  12. require 'mongo'
  13. end
  14.  
  15. def configure(conf)
  16. super
  17.  
  18. unless @uris or @uri
  19. raise ConfigError, 'uris or uri must be specified'
  20. end
  21.  
  22. if @uris.nil?
  23. @uris = [@uri]
  24. end
  25.  
  26. @conns = @uris.map do |uri_str|
  27. uri_str = "mongodb://#{uri_str}" if not uri_str.start_with?("mongodb://")
  28. user, password = uri_str.split('@')[0].split('//')[1].split(':')
  29. uri = Mongo::URI.new(uri_str, :connect => :direct, :user => user, :password => password)
  30. client = Mongo::Client.new(uri.servers, uri.options)
  31. [client, uri]
  32. end
  33. end
  34.  
  35. def start
  36. @loop = Coolio::Loop.new
  37. tw = TimerWatcher.new(@stats_interval, true, @log, &method(:collect_serverstatus))
  38. tw.attach(@loop)
  39. @thread = Thread.new(&method(:run))
  40. end
  41. def run
  42. @loop.run
  43. rescue
  44. log.error "unexpected error", :error=>$!.to_s
  45. log.error_backtrace
  46. end
  47.  
  48. def shutdown
  49. @loop.stop
  50. @thread.join
  51. end
  52.  
  53. def collect_serverstatus
  54. begin
  55.  
  56. for conn, conn_uri in @conns
  57. database = conn.database
  58. stats = database.command(:serverStatus => :true).first
  59. make_data_msgpack_compatible(stats)
  60. host, port = conn_uri.servers[0].split(':')
  61. tag = [@tag_prefix, host.gsub(/[\.-]/, "_"), port].join(".")
  62. Engine.emit(tag, Engine.now, stats)
  63. end
  64.  
  65. rescue => e
  66. log.error "failed to collect MongoDB stats", :error_class => e.class, :error => e
  67. end
  68. end
  69.  
  70. # MessagePack doesn't like it when the field is of Time class.
  71. # This is a convenient method that traverses through the
  72. # getServerStatus response and update any field that is of Time class.
  73. def make_data_msgpack_compatible(data)
  74. print data
  75. if [Hash, BSON::Document].include?(data.class)
  76. data.each {|k, v|
  77. if v.respond_to?(:each)
  78. make_data_msgpack_compatible(v)
  79. elsif v.class == Time
  80. data[k] = v.to_i
  81. elsif v.class == BSON::ObjectId
  82. data[k] = v.to_s
  83. end
  84. }
  85. # serverStatus's "locks" field has "." as a key, which can't be
  86. # inserted back to MongoDB withou wreaking havoc. Replace it with
  87. # "global"
  88. data["global"] = data.delete(".") if data["."]
  89. elsif data.class == Array
  90. data.each_with_index { |v, i|
  91. if v.respond_to?(:each)
  92. make_data_msgpack_compatible(v)
  93. elsif v.class == Time
  94. data[i] = v.to_i
  95. elsif v.class == BSON::ObjectId('56bc92803b941dd6db3cb61a')
  96. data[i] = 'teste'
  97. end
  98. }
  99. else
  100. print data.class
  101. end
  102. end
  103.  
  104. class TimerWatcher < Coolio::TimerWatcher
  105.  
  106. def initialize(interval, repeat, log, &callback)
  107. @callback = callback
  108. @log = log
  109. super(interval, repeat)
  110. end
  111. def on_timer
  112. @callback.call
  113. rescue
  114. @log.error $!.to_s
  115. @log.error_backtrace
  116. end
  117. end
  118. end
  119. end
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement