Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env ruby
- require 'shellwords'
- require 'time'
- END {
- zookeeper = ARGV.shift || fail("Usage #{$0} <zookeeper>")
- $stderr.puts("Getting stats via #{zookeeper}")
- one_five_fifteen = []
- heartbeat(60) do
- $stdout.puts(" #{Time.now.utc.iso8601} ".center(80,?=))
- zero = with_progress('capturing snapshot') { capture_snapshot(zookeeper) }
- one_five_fifteen.unshift(zero)
- one_five_fifteen.pop if one_five_fifteen.size > 16
- zero['TOTAL'] = begin
- lag_sum = zero.values.reduce(0) { |memo, item| memo + item.fetch(:Lag, 0) }
- time = zero.values.first[:Time]
- {Lag: lag_sum, Time: time}
- end
- zero = one_five_fifteen[0]
- one = one_five_fifteen[1]
- five = one_five_fifteen[5]
- fifteen = one_five_fifteen[15]
- one_rates = one && get_rates(one, zero)
- five_rates = five && get_rates(five, zero)
- fifteen_rates = fifteen && get_rates(fifteen, zero)
- output(zero, one_rates, five_rates, fifteen_rates)
- end
- } if $0 == __FILE__
- def output(zero, one_rates, five_rates, fifteen_rates)
- zero.keys.each do |key|
- one_rate, one_eta = one_rates && one_rates[key]
- five_rate, five_eta = five_rates && five_rates[key]
- fifteen_rate, fifteen_eta = fifteen_rates && fifteen_rates[key]
- $stdout.puts([
- "#{key}",
- "#{zero[key][:Lag]}",
- (one_rate && "(#{[one_rate,five_rate,fifteen_rate].compact.join(' / ')})"),
- (one_eta && "(#{[one_eta,five_eta,fifteen_eta].compact.join(' / ')})"),
- ].compact.join("\t"))
- end
- end
- def get_rates(snap_a, snap_b)
- (snap_a.keys|snap_b.keys).map do |key|
- [key, snap_a[key],snap_b[key]]
- end.each_with_object({}) do |(key, a, b), memo|
- memo[key] = get_rate(a, b)
- end
- end
- def get_rate(a,b)
- if a.nil?
- return ['+∞', nil]
- elsif b.nil?
- return ['-∞', nil]
- else
- d_lag = b[:Lag]-a[:Lag];
- d_time = b[:Time]-a[:Time];
- g_rate_per_minute = ((d_lag.to_f/d_time.to_f)*60).to_i;
- if d_lag == 0 || g_rate_per_minute == 0
- return ['±0', nil]
- elsif d_lag > 0
- return ["+#{g_rate_per_minute}", nil]
- else
- minutes_to_catchup = -1 * (b[:Lag]/g_rate_per_minute);
- h,m = minutes_to_catchup.divmod(60);
- return ["-#{g_rate_per_minute.abs}", sprintf('%02d:%02d', h, m)]
- end
- end
- end
- def heartbeat(pulse)
- loop do
- next_beat = Time.now + pulse
- yield
- Time.now.tap do |now|
- countdown("Next check in", next_beat - now) unless now > next_beat
- end
- end
- end
- def compare(key, a, b, io=$stdout)
- io.write("#{key}: #{(b && b[:Lag])|| '---'} ")
- if a.nil?
- io.puts("(added)")
- elsif b.nil?
- io.puts("(removed)")
- elsif b[:Lag] < 100
- io.puts("(ok)")
- else
- d_lag = b[:Lag]-a[:Lag];
- d_time = b[:Time]-a[:Time];
- g_rate_per_minute = ((d_lag.to_f/d_time.to_f)*60).to_i;
- if d_lag == 0 || g_rate_per_minute == 0
- io.puts("(constant)")
- elsif d_lag > 0
- io.puts("(growing at #{g_rate_per_minute}/min)")
- else
- minutes_to_catchup = -1 * (b[:Lag]/g_rate_per_minute);
- h,m = minutes_to_catchup.divmod(60);
- io.puts("(catching up at #{g_rate_per_minute.abs}/min; eta: #{sprintf('%02d:%02d',h,m)})")
- end
- end
- end
- def with_progress(intro, io = $stderr)
- io.write("#{intro}...")
- io.flush
- th = Thread.new do
- loop do
- io.write(?.)
- io.flush
- sleep 1
- end
- end
- return yield
- ensure
- th.kill
- io.write("\r\033[J")
- io.flush
- end
- def countdown(intro, duration, io=$stderr)
- th = Thread.new do
- io.write("#{intro}: ")
- io.flush
- target_time = Time.now + duration
- loop do
- begin
- remaining = (target_time - Time.now).to_i
- rh,rm = remaining.divmod(60)
- io.write(sprintf("%02d:%02d", rh, rm))
- io.flush
- sleep 1
- io.write("\033[5D")
- io.flush
- end
- end
- end
- sleep duration
- ensure
- th.kill
- io.write("\r\033[J")
- io.flush
- end
- def capture_snapshot(zookeeper)
- offset_checker = File.expand_path('kafka-consumer-offset-checker.sh', (ENV['KAFKA_BIN']||File.dirname(__FILE__)))
- unless File.exists?(offset_checker) && File.executable?(offset_checker)
- fail("#{File.basename(offset_checker)} not found at #{File.dirname(offset_checker)}")
- end
- time = Time.now
- raw_result = `#{offset_checker.shellescape} --group phoenix_consumer_2016_02_07 --topic dryad_persist_1 --zookeeper #{zookeeper.shellescape} 2>/dev/null`
- fail(raw_result) if $?.exitstatus > 0
- result = raw_result.lines.map(&:chomp).map(&:split)
- headers = result.shift.map(&:to_sym)
- result.each_with_object({}) do |item, m0|
- hash = headers.zip(item).each_with_object({Time: time}) do |(header, value), m1|
- value = Integer(value) if value.match(/\A[0-9]+\Z/)
- m1[header] = value
- end
- m0[hash[:Owner]] = hash
- end
- end
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement