Advertisement
Guest User

Untitled

a guest
Apr 15th, 2016
80
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.80 KB | None | 0 0
  1. #!/usr/bin/env ruby
  2.  
  3. require 'time'
  4. require 'thread'
  5. require 'optparse'
  6. require 'rubygems'
  7.  
  8. require 'aerospike'
  9.  
  10. include Aerospike
  11.  
  12. @total_increment_write_count = 0
  13. @total_read_count = 0
  14. @total_transations_count = 0
  15. @total_write_transations_error_count = 0
  16. @total_read_transactions_error_count = 0
  17.  
  18. @log_frequency = 0.5 # Seconds
  19.  
  20. @options = {
  21. host: '127.0.0.1',
  22. port: 3000,
  23. namespace: 'test',
  24. set: 'benchmark',
  25. key_count: 100000,
  26. concurrency: 4,
  27. workload_def: 'RU,50',
  28. throughput: 0,
  29. timeout: 0,
  30. max_retries: 2,
  31. conn_queue_size: 64,
  32. debug_mode: false,
  33. user: '',
  34. password: ''
  35. }
  36.  
  37. @mutex = Mutex.new
  38.  
  39. @opt_parser = OptionParser.new do |opts|
  40. opts.banner = "Usage: benchmark [@options]"
  41.  
  42. opts.on("-h", "--host HOST", "Aerospike server seed hostnames or IP addresses") do |v|
  43. @options[:host] = v
  44. end
  45.  
  46. opts.on("-p", "--port PORT", "Aerospike server seed hostname or IP address port number.") do |v|
  47. @options[:port] = v.to_i
  48. end
  49.  
  50. opts.on("-U", "--user USER", "Aerospike user name") do |v|
  51. @options[:user] = v
  52. end
  53.  
  54. opts.on("-P", "--password PASSWORD", "Aerospike user password") do |v|
  55. @options[:password] = v
  56. end
  57.  
  58. opts.on("-n", "--namespace NAMESPACE", "Aerospike namespace.") do |v|
  59. @options[:namespace] = v
  60. end
  61.  
  62. opts.on("-s", "--set SET", "Aerospike set name.") do |v|
  63. @options[:set] = v
  64. end
  65.  
  66. opts.on("-k", "--keys KEYS", "Key/record count or key/record range.") do |v|
  67. @options[:key_count] = v.to_i
  68. end
  69.  
  70. opts.on("-c", "--concurrency COUNT", "Number of threads to generate load.") do |v|
  71. @options[:concurrency] = v.to_i
  72. end
  73.  
  74. opts.on("-w", "--workload TYPE", "Desired workload.\n\t\t\t\t\tI:60\t: Linear 'insert' workload initializing 60% of the keys.\n\t\t\t\t\tRU:80\t: Random read/update workload with 80% reads and 20% writes.") do |v|
  75. @options[:workload_def] = v
  76. end
  77.  
  78. opts.on("-g", "--throttle VALUE", "Throttle transactions per second to a maximum value.\n\t\t\t\t\tIf tps is zero, do not throttle throughput.\n\t\t\t\t\tUsed in read/write mode only.") do |v|
  79. @options[:throughput] = v.to_i
  80. end
  81.  
  82. opts.on("-t", "--timeout MILISECONDS", "Read/Write timeout in milliseconds.") do |v|
  83. @options[:timeout] = v.to_i / 1000.to_f
  84. end
  85.  
  86. opts.on("-m", "--max-retries COUNT", "Maximum number of retries before aborting the current transaction.") do |v|
  87. @options[:max_retries] = v.to_i
  88. end
  89.  
  90. opts.on("-q", "--queue-size SIZE", "Maximum number of connections to pool.") do |v|
  91. @options[:conn_queue_size] = v.to_i
  92. end
  93.  
  94. opts.on("-d", "--debug", "Run benchmarks in debug mode.") do |v|
  95. @options[:debug_mode] = v
  96. end
  97.  
  98. opts.on("-u", "--usage", "Show usage information.") do |v|
  99. puts opts
  100. exit
  101. end
  102. end
  103.  
  104. def workload_to_string
  105. "Read #{@workload_percent}%, Write #{100 - @workload_percent}%"
  106. end
  107.  
  108. def throughput_to_string
  109. if @options[:throughput] <= 0
  110. "Unlimited"
  111. else
  112. "#{@options[:throughput]}"
  113. end
  114. end
  115.  
  116. def print_benchmark_params
  117. puts("Host(s):\t\t#{@options[:host]}")
  118. puts("Port:\t\t\t#{@options[:port]}")
  119. puts("Namespace:\t\t#{@options[:namespace]}")
  120. puts("Set:\t\t\t#{@options[:set]}")
  121. puts("Keys:\t\t\t#{@options[:key_count]}")
  122. puts("Workload:\t\t#{workload_to_string}")
  123. puts("Concurrency:\t\t#{@options[:concurrency]}")
  124. puts("Maximum throughput:\t#{throughput_to_string}")
  125. puts("Timeout:\t\t#{@options[:timeout] > 0 ? (@options[:timeout] * 1000).to_i + 'ms' : 'Not set'}")
  126. puts("Maximum retries:\t#{@options[:max_retries]}")
  127. puts("Debug:\t\t\t#{@options[:debug_mode]}")
  128.  
  129. puts
  130.  
  131. puts "NOTE: Press Ctrl + C to exit!"
  132.  
  133. puts
  134. end
  135.  
  136. # Parses a string of (key:value) type.
  137. # Example:
  138. # "RU:10" returns ["RU", 10]
  139. def parse_valued_param(param)
  140. re = /(?<type>\w+)([:,](?<value>\d+))?/
  141. values = re.match(param)
  142. if values
  143. [values[:type], values[:value].to_i]
  144. else
  145. [nil, nil]
  146. end
  147. end
  148.  
  149. # reads input flags and interprets the complex ones
  150. def read_flags
  151. @opt_parser.parse!
  152.  
  153. Aerospike.logger.level = Logger::ERROR
  154.  
  155. if @options[:debug_mode]
  156. Aerospike.logger.level = Logger::INFO
  157. end
  158.  
  159. _, workload_percent = parse_valued_param(@options[:workload_def])
  160.  
  161. @workload_percent = workload_percent ? workload_percent.to_i : 50
  162. end
  163.  
  164. # Returns a random bin (column)
  165. def load_random_bin
  166. Bin.new('counter', rand(2 ** 63))
  167. end
  168.  
  169. def run_benchmark(client)
  170. write_policy = WritePolicy.new
  171.  
  172. client.default_policy = write_policy
  173. client.default_write_policy.timeout = @options[:timeout]
  174. client.default_write_policy.max_retries = @options[:max_retries]
  175.  
  176. t = Time.now
  177.  
  178. increment_write_count = 0
  179. read_count = 0
  180. write_increment_err = 0
  181. read_transaction_err = 0
  182.  
  183. namespace = @options[:namespace]
  184. set = @options[:set]
  185.  
  186. loop do
  187. bin = load_random_bin
  188.  
  189. key = Key.new(namespace, set, rand(@options[:key_count]))
  190.  
  191. # If the random value is greater than @workload_percent
  192. # then write (or increment) else read
  193. if rand(100) >= @workload_percent
  194. begin # Write (increment)
  195. client.add(key, [bin], write_policy)
  196. increment_write_count += 1
  197. rescue Exception => err
  198. if err.is_a?(Aerospike::Exceptions::Timeout)
  199. write_increment_err += 1
  200. end
  201. end
  202. else # Read
  203. begin
  204. client.get(key, [bin.name])
  205. read_count += 1
  206. rescue Exception => err
  207. if err.is_a?(Aerospike::Exceptions::Timeout)
  208. read_transaction_err += 1
  209. end
  210. end
  211. end
  212.  
  213. # Log statistics every X seconds
  214. if Time.now - t >= @log_frequency
  215. @mutex.synchronize do
  216. @total_increment_write_count += increment_write_count
  217. @total_read_count += read_count
  218. @total_write_transations_error_count += write_increment_err
  219. @total_read_transactions_error_count += read_transaction_err
  220. end
  221.  
  222. # Reset thread counts
  223. increment_write_count = 0
  224. read_count = 0
  225. write_increment_err = 0
  226. read_transaction_err = 0
  227.  
  228. t = Time.now
  229. end
  230. end
  231. end
  232.  
  233. def reporter
  234. last_total_increment_write_count = 0
  235. last_total_read_count = 0
  236. last_total_write_transations_error_count = 0
  237. last_total_read_transactions_error_count = 0
  238.  
  239. t = Time.now
  240. loop do
  241. if Time.now - t >= 1
  242. @mutex.synchronize do
  243. writes = @total_increment_write_count - last_total_increment_write_count
  244. reads = @total_read_count - last_total_read_count
  245.  
  246. write_timeouts = @total_write_transations_error_count - last_total_write_transations_error_count
  247. read_timeouts = @total_read_transactions_error_count - last_total_read_transactions_error_count
  248.  
  249. total = @total_increment_write_count + @total_read_count
  250.  
  251. str = "Increments: tps = #{writes} timeouts = #{write_timeouts}"
  252. str << " - Reads: tps = #{reads} timeouts = #{read_timeouts}"
  253. str << " - Total (write + read) (tps = #{writes + reads} timeouts = #{write_timeouts + read_timeouts}, total transactions (since bechmark start) = #{total})"
  254.  
  255. @logger.info str
  256.  
  257. last_total_increment_write_count = @total_increment_write_count
  258. last_total_read_count = @total_read_count
  259. last_total_write_transations_error_count = @total_write_transations_error_count
  260. last_total_read_transactions_error_count = @total_read_transactions_error_count
  261. end
  262.  
  263. t = Time.now
  264. end
  265.  
  266. sleep 1
  267. end
  268. end
  269.  
  270. def start_aerospike_benchmarking
  271. @logger = Logger.new(STDOUT)
  272. @logger.level = Logger::INFO
  273.  
  274. read_flags
  275. print_benchmark_params
  276.  
  277. client = Client.new(@options[:host], @options[:port], user: @options[:user], password: @options[:password])
  278.  
  279. read_write_threads = []
  280.  
  281. # One thread for reporter
  282. reporter_thread = Thread.new { reporter }
  283.  
  284. for i in (1..@options[:concurrency]) do
  285. read_write_threads << Thread.new { run_benchmark(client) }
  286. end
  287.  
  288. read_write_threads.each {|t| t.join}
  289.  
  290. reporter_thread.kill
  291. rescue SystemExit, Interrupt, StandardError
  292. client.close if client
  293. end
  294.  
  295. start_aerospike_benchmarking
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement