Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/ruby -I/usr/local/qpid-svn/qpid/ruby
- #
- #
- __doc__ = %q(
- disttailf.rb - distributed "tail -f"
- Aggregates "tail -f" output from multiple machines and multiple files
- into a single RabbitMQ pubsub queue (kind of splunk's log consolidation
- function)
- Usage:
- Producer: disttailf.rb [-s broker_host] [-p broker_port] [-x spec_xml] file ...
- Consumer: disttailf.rb [-s broker_host] [-p broker_port] [-x spec_xml] -c
- )
- require 'qpid'
- require 'socket'
- def consumer(client, ch)
- myqueue = ch.queue_declare()
- ch.queue_bind(:queue=>myqueue.queue, :exchange=>'amq.topic',
- :routing_key=>'disttailf.#')
- cons = ch.basic_consume(:queue=>myqueue.queue, :no_ack => true)
- ruby_queue = client.queue(cons.consumer_tag)
- while true
- raise "Rabbitmq broker disconnected" if client.closed?
- begin
- msg = ruby_queue.pop(non_block=true)
- puts "== #{msg.content.headers[:headers]} " \
- "#{msg.routing_key.split('.')[-1]}"
- puts msg.content.body
- rescue
- sleep(0.5)
- end
- end
- end
- def producer(client, ch, filenames)
- rkey = "disttailf." + Socket.gethostname.split('.')[-1]
- tail_f(filenames) do |filename, line|
- h = {'sent' => Time.now.to_i, 'filename' => filename }
- c = Qpid::Content.new({:headers=>h}, line)
- ch.basic_publish(:routing_key=>rkey, :content=>c,
- :exchange=>'amq.topic')
- puts "#{filename}: #{line}"
- end
- end
- def tail_f(filenames, &block)
- filedict = Hash.new
- filenames.each { |f| filedict[f] = open_or_nil(f) }
- reopen_counter = 0
- while true:
- if reopen_counter > 120
- reopen_counter = 0
- filenames.reject { |f| filedict[f] }.each {
- |f| filedict[f] = open_or_nil(f) }
- end
- filedict.values.reject { |f| not f }.each do |f|
- begin
- raise "trunc" unless File.stat(f.path).size >= f.tell
- rescue
- $stderr << "#{f.path}: removed or truncated\n"
- f.close
- filedict[f.path] = nil
- next
- end
- begin
- block.call(f.path,f.readline) while true
- rescue EOFError
- true
- end
- end
- reopen_counter += 1
- sleep(0.5)
- end # while true
- end
- def open_or_nil(filename)
- begin
- File.open(filename)
- rescue
- nil
- end
- end
- if __FILE__ == $0
- require 'getoptlong'
- server = '127.0.0.1'
- port = 5672
- specxml = '/etc/amqp0-8.xml'
- acts_as_consumer = false
- opts = GetoptLong.new(
- ['--server', '-s', GetoptLong::REQUIRED_ARGUMENT],
- ['--port', '-p', GetoptLong::REQUIRED_ARGUMENT],
- ['--specxml', '-x', GetoptLong::REQUIRED_ARGUMENT],
- ['--consume', '-c', GetoptLong::NO_ARGUMENT])
- opts.each do |opt,arg|
- case opt
- when '--server'
- server = arg
- when '--port'
- port = arg.to_i
- when '--specxml'
- specxml = arg
- when '--consume'
- acts_as_consumer = true
- end
- end
- # set up connection to rabbitmq broker
- client = Qpid::Client.new(server, port, spec=Spec.load(specxml))
- client.start({ "LOGIN" => "guest", "PASSWORD" => "guest" })
- ch = client.channel(1)
- ch.channel_open()
- if acts_as_consumer
- consumer(client, ch)
- else
- if ARGV.length == 0
- puts __doc__
- raise "List of file names is empty - nothing to do"
- end
- producer(client, ch, ARGV)
- end
- end
Add Comment
Please, Sign In to add comment