Guest User

Untitled

a guest
Jun 19th, 2018
87
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.53 KB | None | 0 0
  1. #!/usr/bin/ruby -I/usr/local/qpid-svn/qpid/ruby
  2. #
  3. #
  4. __doc__ = %q(
  5.  
  6. disttailf.rb - distributed "tail -f"
  7.  
  8. Aggregates "tail -f" output from multiple machines and multiple files
  9. into a single RabbitMQ pubsub queue (kind of splunk's log consolidation
  10. function)
  11.  
  12. Usage:
  13. Producer: disttailf.rb [-s broker_host] [-p broker_port] [-x spec_xml] file ...
  14. Consumer: disttailf.rb [-s broker_host] [-p broker_port] [-x spec_xml] -c
  15.  
  16. )
  17.  
  18. require 'qpid'
  19. require 'socket'
  20.  
  21. def consumer(client, ch)
  22. myqueue = ch.queue_declare()
  23. ch.queue_bind(:queue=>myqueue.queue, :exchange=>'amq.topic',
  24. :routing_key=>'disttailf.#')
  25. cons = ch.basic_consume(:queue=>myqueue.queue, :no_ack => true)
  26. ruby_queue = client.queue(cons.consumer_tag)
  27.  
  28. while true
  29. raise "Rabbitmq broker disconnected" if client.closed?
  30. begin
  31. msg = ruby_queue.pop(non_block=true)
  32. puts "== #{msg.content.headers[:headers]} " \
  33. "#{msg.routing_key.split('.')[-1]}"
  34. puts msg.content.body
  35. rescue
  36. sleep(0.5)
  37. end
  38. end
  39. end
  40.  
  41. def producer(client, ch, filenames)
  42. rkey = "disttailf." + Socket.gethostname.split('.')[-1]
  43. tail_f(filenames) do |filename, line|
  44. h = {'sent' => Time.now.to_i, 'filename' => filename }
  45. c = Qpid::Content.new({:headers=>h}, line)
  46. ch.basic_publish(:routing_key=>rkey, :content=>c,
  47. :exchange=>'amq.topic')
  48. puts "#{filename}: #{line}"
  49. end
  50. end
  51.  
  52. def tail_f(filenames, &block)
  53. filedict = Hash.new
  54. filenames.each { |f| filedict[f] = open_or_nil(f) }
  55. reopen_counter = 0
  56. while true:
  57. if reopen_counter > 120
  58. reopen_counter = 0
  59. filenames.reject { |f| filedict[f] }.each {
  60. |f| filedict[f] = open_or_nil(f) }
  61. end
  62.  
  63. filedict.values.reject { |f| not f }.each do |f|
  64. begin
  65. raise "trunc" unless File.stat(f.path).size >= f.tell
  66. rescue
  67. $stderr << "#{f.path}: removed or truncated\n"
  68. f.close
  69. filedict[f.path] = nil
  70. next
  71. end
  72.  
  73. begin
  74. block.call(f.path,f.readline) while true
  75. rescue EOFError
  76. true
  77. end
  78. end
  79.  
  80. reopen_counter += 1
  81. sleep(0.5)
  82. end # while true
  83. end
  84.  
  85. def open_or_nil(filename)
  86. begin
  87. File.open(filename)
  88. rescue
  89. nil
  90. end
  91. end
  92.  
  93. if __FILE__ == $0
  94. require 'getoptlong'
  95.  
  96. server = '127.0.0.1'
  97. port = 5672
  98. specxml = '/etc/amqp0-8.xml'
  99. acts_as_consumer = false
  100.  
  101. opts = GetoptLong.new(
  102. ['--server', '-s', GetoptLong::REQUIRED_ARGUMENT],
  103. ['--port', '-p', GetoptLong::REQUIRED_ARGUMENT],
  104. ['--specxml', '-x', GetoptLong::REQUIRED_ARGUMENT],
  105. ['--consume', '-c', GetoptLong::NO_ARGUMENT])
  106. opts.each do |opt,arg|
  107. case opt
  108. when '--server'
  109. server = arg
  110. when '--port'
  111. port = arg.to_i
  112. when '--specxml'
  113. specxml = arg
  114. when '--consume'
  115. acts_as_consumer = true
  116. end
  117. end
  118.  
  119. # set up connection to rabbitmq broker
  120. client = Qpid::Client.new(server, port, spec=Spec.load(specxml))
  121. client.start({ "LOGIN" => "guest", "PASSWORD" => "guest" })
  122. ch = client.channel(1)
  123. ch.channel_open()
  124.  
  125. if acts_as_consumer
  126. consumer(client, ch)
  127. else
  128. if ARGV.length == 0
  129. puts __doc__
  130. raise "List of file names is empty - nothing to do"
  131. end
  132. producer(client, ch, ARGV)
  133. end
  134.  
  135. end
Add Comment
Please, Sign In to add comment