Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env ruby
- require 'getoptlong'
- optparser = GetoptLong.new(['--type', '-t', GetoptLong::REQUIRED_ARGUMENT],
- ['--base-path', '-b', GetoptLong::REQUIRED_ARGUMENT],
- ['--chunk-size', '-c', GetoptLong::REQUIRED_ARGUMENT],
- ['--host', '-h', GetoptLong::REQUIRED_ARGUMENT],
- ['--port', '-p', GetoptLong::REQUIRED_ARGUMENT],
- ['--user', '-u', GetoptLong::REQUIRED_ARGUMENT],
- ['--seconds', '-s', GetoptLong::REQUIRED_ARGUMENT],
- ['--label', '-l', GetoptLong::REQUIRED_ARGUMENT])
- type,base_path,seconds,host,port,username,chunk_size = nil,'/tmp',3600,nil,nil,nil,(10*1024*1024)
- label = type
- optparser.each do |optname,arg|
- case optname
- when '--type' then type = arg
- when '--base-path' then base_path = arg
- when '--chunk-size' then chunk_size = arg.to_i
- when '--host' then host = arg
- when '--port' then port = arg.to_i
- when '--user' then username = arg
- when '--seconds' then seconds = arg.to_i
- when '--label' then label = arg
- end
- end
- unless type and ['httpfs', 'webhdfs', 'cachedwebhdfs'].include?(type)
- raise RuntimeError, "invalid type: '#{type}'"
- end
- unless host
- raise RuntimeError, "host must be specified"
- end
- unless port
- port = if type == 'httpfs' then 14000 else 50070 end
- end
- unless username
- raise RuntimeError, "user must be specified"
- end
- chunk = "a" * chunk_size
- chunk.force_encoding("ASCII-8BIT")
- def try_httpfs(path, host, port, username, chunk, lasting)
- require 'net/http'
- client = Net::HTTP.new(host, port)
- end_at = Time.now + lasting
- header = {'Content-Type' => 'application/octet-stream'}
- response = client.request_put('/webhdfs/v1' + path + '?op=CREATE&data=true&user.name=' + username, chunk, header)
- if response.code.to_i >= 400
- raise RuntimeError, "failed to put new file with code #{response.code}: #{host}:#{port} hdfs://#{path}\n#{response.message}"
- end
- counter,failed = 1,0
- while Time.now < end_at
- response = client.request_post('/webhdfs/v1' + path + '?op=append&data=true&user.name=' + username, chunk, header)
- counter += 1
- failed += 1 if response.code.to_i >= 400
- end
- return {:type => 'httpfs', :count => counter, :failed => failed}
- end
- def try_webhdfs(path, host, port, username, chunk, lasting)
- require 'webhdfs'
- end_at = Time.now + lasting
- client = WebHDFS::Client.new(host, port, username)
- unless client.create(path, chunk)
- raise RuntimeError, "failed to create new file, #{host}:#{port}, hdfs://#{path}"
- end
- counter,failed = 1,0
- while Time.now < end_at
- unless client.append(path, chunk)
- failed += 1
- end
- counter += 1
- end
- return {:type => 'webhdfs', :count => counter, :failed => failed}
- end
- def try_cachedwebhdfs(path, host, port, username, chunk, lasting)
- require 'net/http'
- require 'uri'
- require 'webhdfs'
- actual_path = '/webhdfs/v1' + path
- end_at = Time.now + lasting
- client = WebHDFS::Client.new(host, port, username)
- unless client.create(path, chunk)
- raise RuntimeError, "failed to create new file, #{host}:#{port}, hdfs://#{path}"
- end
- counter,failed = 1,0
- fail_detail = {:namenode => 0, :datanode => 0}
- cached = nil
- client = nil
- while Time.now < end_at
- if cached.nil?
- response = Net::HTTP.start(host, port) do |http|
- http.request_post(actual_path + '?op=APPEND&user.name=' + username, '')
- end
- unless response.is_a?(Net::HTTPRedirection) and response['location']
- counter += 1
- failed += 1
- fail_detail[:namenode] += 1
- next
- end
- uri = URI.parse(response['location'])
- cached = {:host => uri.host, :port => uri.port, :path => (if uri.query then uri.path + '?' + uri.query else uri.path end)}
- client = Net::HTTP.new(cached[:host], cached[:port])
- end
- response = client.request_post(cached[:path], chunk)
- counter += 1
- if response.code.to_i >= 300
- failed += 1
- fail_detail[:datanode] += 1
- cached = nil
- client.finish
- client = nil
- end
- end
- return {:type => 'cachedwebhdfs', :count => counter, :failed => failed, :detail => fail_detail}
- end
- path = base_path + '/' + type + '.txt'
- result = case type
- when 'httpfs'
- try_httpfs(path, host, port, username, chunk, seconds)
- when 'webhdfs'
- try_webhdfs(path, host, port, username, chunk, seconds)
- when 'cachedwebhdfs'
- try_cachedwebhdfs(path, host, port, username, chunk, seconds)
- end
- warn "output type:#{result[:type]}, written:#{result[:count]}, failed:#{result[:failed]}"
- if result[:detail] # for cachedwebhdfs
- warn "\t namenode fail:#{result[:detail][:namenode]}, datanode fail:#{result[:detail][:datanode]}"
- end
- rate = (chunk.length * 8 * (result[:count] - result[:failed]) / (1000 * seconds)).floor / 1000
- warn "rate: #{rate} Mbps"
- exit(0)
Add Comment
Please, Sign In to add comment