Advertisement
Guest User

Untitled

a guest
Feb 8th, 2016
51
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.71 KB | None | 0 0
  1. #!/usr/bin/env ruby
  2.  
  3. require 'shellwords'
  4. require 'time'
  5.  
  6. END {
  7. zookeeper = ARGV.shift || fail("Usage #{$0} <zookeeper>")
  8. $stderr.puts("Getting stats via #{zookeeper}")
  9.  
  10. one_five_fifteen = []
  11. heartbeat(60) do
  12. $stdout.puts(" #{Time.now.utc.iso8601} ".center(80,?=))
  13. zero = with_progress('capturing snapshot') { capture_snapshot(zookeeper) }
  14.  
  15. one_five_fifteen.unshift(zero)
  16. one_five_fifteen.pop if one_five_fifteen.size > 16
  17.  
  18. zero['TOTAL'] = begin
  19. lag_sum = zero.values.reduce(0) { |memo, item| memo + item.fetch(:Lag, 0) }
  20. time = zero.values.first[:Time]
  21. {Lag: lag_sum, Time: time}
  22. end
  23.  
  24. zero = one_five_fifteen[0]
  25. one = one_five_fifteen[1]
  26. five = one_five_fifteen[5]
  27. fifteen = one_five_fifteen[15]
  28.  
  29. one_rates = one && get_rates(one, zero)
  30. five_rates = five && get_rates(five, zero)
  31. fifteen_rates = fifteen && get_rates(fifteen, zero)
  32.  
  33. output(zero, one_rates, five_rates, fifteen_rates)
  34. end
  35. } if $0 == __FILE__
  36.  
  37. def output(zero, one_rates, five_rates, fifteen_rates)
  38. zero.keys.each do |key|
  39. one_rate, one_eta = one_rates && one_rates[key]
  40. five_rate, five_eta = five_rates && five_rates[key]
  41. fifteen_rate, fifteen_eta = fifteen_rates && fifteen_rates[key]
  42.  
  43. $stdout.puts([
  44. "#{key}",
  45. "#{zero[key][:Lag]}",
  46. (one_rate && "(#{[one_rate,five_rate,fifteen_rate].compact.join(' / ')})"),
  47. (one_eta && "(#{[one_eta,five_eta,fifteen_eta].compact.join(' / ')})"),
  48. ].compact.join("\t"))
  49. end
  50. end
  51.  
  52. def get_rates(snap_a, snap_b)
  53. (snap_a.keys|snap_b.keys).map do |key|
  54. [key, snap_a[key],snap_b[key]]
  55. end.each_with_object({}) do |(key, a, b), memo|
  56. memo[key] = get_rate(a, b)
  57. end
  58. end
  59.  
  60. def get_rate(a,b)
  61. if a.nil?
  62. return ['+∞', nil]
  63. elsif b.nil?
  64. return ['-∞', nil]
  65. else
  66. d_lag = b[:Lag]-a[:Lag];
  67. d_time = b[:Time]-a[:Time];
  68. g_rate_per_minute = ((d_lag.to_f/d_time.to_f)*60).to_i;
  69.  
  70. if d_lag == 0 || g_rate_per_minute == 0
  71. return ['±0', nil]
  72. elsif d_lag > 0
  73. return ["+#{g_rate_per_minute}", nil]
  74. else
  75. minutes_to_catchup = -1 * (b[:Lag]/g_rate_per_minute);
  76. h,m = minutes_to_catchup.divmod(60);
  77.  
  78. return ["-#{g_rate_per_minute.abs}", sprintf('%02d:%02d', h, m)]
  79. end
  80. end
  81. end
  82.  
  83. def heartbeat(pulse)
  84. loop do
  85. next_beat = Time.now + pulse
  86.  
  87. yield
  88.  
  89. Time.now.tap do |now|
  90. countdown("Next check in", next_beat - now) unless now > next_beat
  91. end
  92. end
  93. end
  94.  
  95. def compare(key, a, b, io=$stdout)
  96. io.write("#{key}: #{(b && b[:Lag])|| '---'} ")
  97. if a.nil?
  98. io.puts("(added)")
  99. elsif b.nil?
  100. io.puts("(removed)")
  101. elsif b[:Lag] < 100
  102. io.puts("(ok)")
  103. else
  104. d_lag = b[:Lag]-a[:Lag];
  105. d_time = b[:Time]-a[:Time];
  106. g_rate_per_minute = ((d_lag.to_f/d_time.to_f)*60).to_i;
  107.  
  108. if d_lag == 0 || g_rate_per_minute == 0
  109. io.puts("(constant)")
  110. elsif d_lag > 0
  111. io.puts("(growing at #{g_rate_per_minute}/min)")
  112. else
  113. minutes_to_catchup = -1 * (b[:Lag]/g_rate_per_minute);
  114. h,m = minutes_to_catchup.divmod(60);
  115.  
  116. io.puts("(catching up at #{g_rate_per_minute.abs}/min; eta: #{sprintf('%02d:%02d',h,m)})")
  117. end
  118. end
  119. end
  120.  
  121. def with_progress(intro, io = $stderr)
  122. io.write("#{intro}...")
  123. io.flush
  124. th = Thread.new do
  125. loop do
  126. io.write(?.)
  127. io.flush
  128. sleep 1
  129. end
  130. end
  131.  
  132. return yield
  133.  
  134. ensure
  135. th.kill
  136. io.write("\r\033[J")
  137. io.flush
  138. end
  139.  
  140. def countdown(intro, duration, io=$stderr)
  141. th = Thread.new do
  142. io.write("#{intro}: ")
  143. io.flush
  144. target_time = Time.now + duration
  145.  
  146. loop do
  147. begin
  148.  
  149. remaining = (target_time - Time.now).to_i
  150. rh,rm = remaining.divmod(60)
  151.  
  152. io.write(sprintf("%02d:%02d", rh, rm))
  153. io.flush
  154.  
  155. sleep 1
  156.  
  157. io.write("\033[5D")
  158. io.flush
  159. end
  160. end
  161. end
  162.  
  163. sleep duration
  164.  
  165. ensure
  166. th.kill
  167. io.write("\r\033[J")
  168. io.flush
  169. end
  170.  
  171. def capture_snapshot(zookeeper)
  172. offset_checker = File.expand_path('kafka-consumer-offset-checker.sh', (ENV['KAFKA_BIN']||File.dirname(__FILE__)))
  173. unless File.exists?(offset_checker) && File.executable?(offset_checker)
  174. fail("#{File.basename(offset_checker)} not found at #{File.dirname(offset_checker)}")
  175. end
  176.  
  177. time = Time.now
  178. raw_result = `#{offset_checker.shellescape} --group phoenix_consumer_2016_02_07 --topic dryad_persist_1 --zookeeper #{zookeeper.shellescape} 2>/dev/null`
  179. fail(raw_result) if $?.exitstatus > 0
  180.  
  181. result = raw_result.lines.map(&:chomp).map(&:split)
  182. headers = result.shift.map(&:to_sym)
  183.  
  184. result.each_with_object({}) do |item, m0|
  185. hash = headers.zip(item).each_with_object({Time: time}) do |(header, value), m1|
  186. value = Integer(value) if value.match(/\A[0-9]+\Z/)
  187. m1[header] = value
  188. end
  189. m0[hash[:Owner]] = hash
  190. end
  191. end
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement