Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env ruby
- require 'time'
- require 'thread'
- require 'optparse'
- require 'rubygems'
- require 'aerospike'
- include Aerospike
- @total_increment_write_count = 0
- @total_read_count = 0
- @total_transations_count = 0
- @total_write_transations_error_count = 0
- @total_read_transactions_error_count = 0
- @log_frequency = 0.5 # Seconds
- @options = {
- host: '127.0.0.1',
- port: 3000,
- namespace: 'test',
- set: 'benchmark',
- key_count: 100000,
- concurrency: 4,
- workload_def: 'RU,50',
- throughput: 0,
- timeout: 0,
- max_retries: 2,
- conn_queue_size: 64,
- debug_mode: false,
- user: '',
- password: ''
- }
- @mutex = Mutex.new
- @opt_parser = OptionParser.new do |opts|
- opts.banner = "Usage: benchmark [@options]"
- opts.on("-h", "--host HOST", "Aerospike server seed hostnames or IP addresses") do |v|
- @options[:host] = v
- end
- opts.on("-p", "--port PORT", "Aerospike server seed hostname or IP address port number.") do |v|
- @options[:port] = v.to_i
- end
- opts.on("-U", "--user USER", "Aerospike user name") do |v|
- @options[:user] = v
- end
- opts.on("-P", "--password PASSWORD", "Aerospike user password") do |v|
- @options[:password] = v
- end
- opts.on("-n", "--namespace NAMESPACE", "Aerospike namespace.") do |v|
- @options[:namespace] = v
- end
- opts.on("-s", "--set SET", "Aerospike set name.") do |v|
- @options[:set] = v
- end
- opts.on("-k", "--keys KEYS", "Key/record count or key/record range.") do |v|
- @options[:key_count] = v.to_i
- end
- opts.on("-c", "--concurrency COUNT", "Number of threads to generate load.") do |v|
- @options[:concurrency] = v.to_i
- end
- opts.on("-w", "--workload TYPE", "Desired workload.\n\t\t\t\t\tI:60\t: Linear 'insert' workload initializing 60% of the keys.\n\t\t\t\t\tRU:80\t: Random read/update workload with 80% reads and 20% writes.") do |v|
- @options[:workload_def] = v
- end
- opts.on("-g", "--throttle VALUE", "Throttle transactions per second to a maximum value.\n\t\t\t\t\tIf tps is zero, do not throttle throughput.\n\t\t\t\t\tUsed in read/write mode only.") do |v|
- @options[:throughput] = v.to_i
- end
- opts.on("-t", "--timeout MILISECONDS", "Read/Write timeout in milliseconds.") do |v|
- @options[:timeout] = v.to_i / 1000.to_f
- end
- opts.on("-m", "--max-retries COUNT", "Maximum number of retries before aborting the current transaction.") do |v|
- @options[:max_retries] = v.to_i
- end
- opts.on("-q", "--queue-size SIZE", "Maximum number of connections to pool.") do |v|
- @options[:conn_queue_size] = v.to_i
- end
- opts.on("-d", "--debug", "Run benchmarks in debug mode.") do |v|
- @options[:debug_mode] = v
- end
- opts.on("-u", "--usage", "Show usage information.") do |v|
- puts opts
- exit
- end
- end
- def workload_to_string
- "Read #{@workload_percent}%, Write #{100 - @workload_percent}%"
- end
- def throughput_to_string
- if @options[:throughput] <= 0
- "Unlimited"
- else
- "#{@options[:throughput]}"
- end
- end
- def print_benchmark_params
- puts("Host(s):\t\t#{@options[:host]}")
- puts("Port:\t\t\t#{@options[:port]}")
- puts("Namespace:\t\t#{@options[:namespace]}")
- puts("Set:\t\t\t#{@options[:set]}")
- puts("Keys:\t\t\t#{@options[:key_count]}")
- puts("Workload:\t\t#{workload_to_string}")
- puts("Concurrency:\t\t#{@options[:concurrency]}")
- puts("Maximum throughput:\t#{throughput_to_string}")
- puts("Timeout:\t\t#{@options[:timeout] > 0 ? (@options[:timeout] * 1000).to_i + 'ms' : 'Not set'}")
- puts("Maximum retries:\t#{@options[:max_retries]}")
- puts("Debug:\t\t\t#{@options[:debug_mode]}")
- puts
- puts "NOTE: Press Ctrl + C to exit!"
- puts
- end
- # Parses a string of (key:value) type.
- # Example:
- # "RU:10" returns ["RU", 10]
- def parse_valued_param(param)
- re = /(?<type>\w+)([:,](?<value>\d+))?/
- values = re.match(param)
- if values
- [values[:type], values[:value].to_i]
- else
- [nil, nil]
- end
- end
- # reads input flags and interprets the complex ones
- def read_flags
- @opt_parser.parse!
- Aerospike.logger.level = Logger::ERROR
- if @options[:debug_mode]
- Aerospike.logger.level = Logger::INFO
- end
- _, workload_percent = parse_valued_param(@options[:workload_def])
- @workload_percent = workload_percent ? workload_percent.to_i : 50
- end
- # Returns a random bin (column)
- def load_random_bin
- Bin.new('counter', rand(2 ** 63))
- end
- def run_benchmark(client)
- write_policy = WritePolicy.new
- client.default_policy = write_policy
- client.default_write_policy.timeout = @options[:timeout]
- client.default_write_policy.max_retries = @options[:max_retries]
- t = Time.now
- increment_write_count = 0
- read_count = 0
- write_increment_err = 0
- read_transaction_err = 0
- namespace = @options[:namespace]
- set = @options[:set]
- loop do
- bin = load_random_bin
- key = Key.new(namespace, set, rand(@options[:key_count]))
- # If the random value is greater than @workload_percent
- # then write (or increment) else read
- if rand(100) >= @workload_percent
- begin # Write (increment)
- client.add(key, [bin], write_policy)
- increment_write_count += 1
- rescue Exception => err
- if err.is_a?(Aerospike::Exceptions::Timeout)
- write_increment_err += 1
- end
- end
- else # Read
- begin
- client.get(key, [bin.name])
- read_count += 1
- rescue Exception => err
- if err.is_a?(Aerospike::Exceptions::Timeout)
- read_transaction_err += 1
- end
- end
- end
- # Log statistics every X seconds
- if Time.now - t >= @log_frequency
- @mutex.synchronize do
- @total_increment_write_count += increment_write_count
- @total_read_count += read_count
- @total_write_transations_error_count += write_increment_err
- @total_read_transactions_error_count += read_transaction_err
- end
- # Reset thread counts
- increment_write_count = 0
- read_count = 0
- write_increment_err = 0
- read_transaction_err = 0
- t = Time.now
- end
- end
- end
- def reporter
- last_total_increment_write_count = 0
- last_total_read_count = 0
- last_total_write_transations_error_count = 0
- last_total_read_transactions_error_count = 0
- t = Time.now
- loop do
- if Time.now - t >= 1
- @mutex.synchronize do
- writes = @total_increment_write_count - last_total_increment_write_count
- reads = @total_read_count - last_total_read_count
- write_timeouts = @total_write_transations_error_count - last_total_write_transations_error_count
- read_timeouts = @total_read_transactions_error_count - last_total_read_transactions_error_count
- total = @total_increment_write_count + @total_read_count
- str = "Increments: tps = #{writes} timeouts = #{write_timeouts}"
- str << " - Reads: tps = #{reads} timeouts = #{read_timeouts}"
- str << " - Total (write + read) (tps = #{writes + reads} timeouts = #{write_timeouts + read_timeouts}, total transactions (since bechmark start) = #{total})"
- @logger.info str
- last_total_increment_write_count = @total_increment_write_count
- last_total_read_count = @total_read_count
- last_total_write_transations_error_count = @total_write_transations_error_count
- last_total_read_transactions_error_count = @total_read_transactions_error_count
- end
- t = Time.now
- end
- sleep 1
- end
- end
- def start_aerospike_benchmarking
- @logger = Logger.new(STDOUT)
- @logger.level = Logger::INFO
- read_flags
- print_benchmark_params
- client = Client.new(@options[:host], @options[:port], user: @options[:user], password: @options[:password])
- read_write_threads = []
- # One thread for reporter
- reporter_thread = Thread.new { reporter }
- for i in (1..@options[:concurrency]) do
- read_write_threads << Thread.new { run_benchmark(client) }
- end
- read_write_threads.each {|t| t.join}
- reporter_thread.kill
- rescue SystemExit, Interrupt, StandardError
- client.close if client
- end
- start_aerospike_benchmarking
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement