Guest User

Untitled

a guest
Oct 16th, 2018
96
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.81 KB | None | 0 0
  1. #!/usr/bin/env ruby
  2.  
  3. require 'getoptlong'
  4.  
  5. optparser = GetoptLong.new(['--type', '-t', GetoptLong::REQUIRED_ARGUMENT],
  6. ['--base-path', '-b', GetoptLong::REQUIRED_ARGUMENT],
  7. ['--chunk-size', '-c', GetoptLong::REQUIRED_ARGUMENT],
  8. ['--host', '-h', GetoptLong::REQUIRED_ARGUMENT],
  9. ['--port', '-p', GetoptLong::REQUIRED_ARGUMENT],
  10. ['--user', '-u', GetoptLong::REQUIRED_ARGUMENT],
  11. ['--seconds', '-s', GetoptLong::REQUIRED_ARGUMENT],
  12. ['--label', '-l', GetoptLong::REQUIRED_ARGUMENT])
  13. type,base_path,seconds,host,port,username,chunk_size = nil,'/tmp',3600,nil,nil,nil,(10*1024*1024)
  14. label = type
  15.  
  16. optparser.each do |optname,arg|
  17. case optname
  18. when '--type' then type = arg
  19. when '--base-path' then base_path = arg
  20. when '--chunk-size' then chunk_size = arg.to_i
  21. when '--host' then host = arg
  22. when '--port' then port = arg.to_i
  23. when '--user' then username = arg
  24. when '--seconds' then seconds = arg.to_i
  25. when '--label' then label = arg
  26. end
  27. end
  28.  
  29. unless type and ['httpfs', 'webhdfs', 'cachedwebhdfs'].include?(type)
  30. raise RuntimeError, "invalid type: '#{type}'"
  31. end
  32. unless host
  33. raise RuntimeError, "host must be specified"
  34. end
  35. unless port
  36. port = if type == 'httpfs' then 14000 else 50070 end
  37. end
  38. unless username
  39. raise RuntimeError, "user must be specified"
  40. end
  41.  
  42. chunk = "a" * chunk_size
  43. chunk.force_encoding("ASCII-8BIT")
  44.  
  45. def try_httpfs(path, host, port, username, chunk, lasting)
  46. require 'net/http'
  47. client = Net::HTTP.new(host, port)
  48. end_at = Time.now + lasting
  49. header = {'Content-Type' => 'application/octet-stream'}
  50.  
  51. response = client.request_put('/webhdfs/v1' + path + '?op=CREATE&data=true&user.name=' + username, chunk, header)
  52. if response.code.to_i >= 400
  53. raise RuntimeError, "failed to put new file with code #{response.code}: #{host}:#{port} hdfs://#{path}\n#{response.message}"
  54. end
  55. counter,failed = 1,0
  56. while Time.now < end_at
  57. response = client.request_post('/webhdfs/v1' + path + '?op=append&data=true&user.name=' + username, chunk, header)
  58. counter += 1
  59. failed += 1 if response.code.to_i >= 400
  60. end
  61. return {:type => 'httpfs', :count => counter, :failed => failed}
  62. end
  63.  
  64. def try_webhdfs(path, host, port, username, chunk, lasting)
  65. require 'webhdfs'
  66. end_at = Time.now + lasting
  67. client = WebHDFS::Client.new(host, port, username)
  68. unless client.create(path, chunk)
  69. raise RuntimeError, "failed to create new file, #{host}:#{port}, hdfs://#{path}"
  70. end
  71. counter,failed = 1,0
  72. while Time.now < end_at
  73. unless client.append(path, chunk)
  74. failed += 1
  75. end
  76. counter += 1
  77. end
  78. return {:type => 'webhdfs', :count => counter, :failed => failed}
  79. end
  80.  
  81. def try_cachedwebhdfs(path, host, port, username, chunk, lasting)
  82. require 'net/http'
  83. require 'uri'
  84. require 'webhdfs'
  85. actual_path = '/webhdfs/v1' + path
  86.  
  87. end_at = Time.now + lasting
  88.  
  89. client = WebHDFS::Client.new(host, port, username)
  90. unless client.create(path, chunk)
  91. raise RuntimeError, "failed to create new file, #{host}:#{port}, hdfs://#{path}"
  92. end
  93. counter,failed = 1,0
  94. fail_detail = {:namenode => 0, :datanode => 0}
  95. cached = nil
  96. client = nil
  97. while Time.now < end_at
  98. if cached.nil?
  99. response = Net::HTTP.start(host, port) do |http|
  100. http.request_post(actual_path + '?op=APPEND&user.name=' + username, '')
  101. end
  102. unless response.is_a?(Net::HTTPRedirection) and response['location']
  103. counter += 1
  104. failed += 1
  105. fail_detail[:namenode] += 1
  106. next
  107. end
  108. uri = URI.parse(response['location'])
  109. cached = {:host => uri.host, :port => uri.port, :path => (if uri.query then uri.path + '?' + uri.query else uri.path end)}
  110. client = Net::HTTP.new(cached[:host], cached[:port])
  111. end
  112. response = client.request_post(cached[:path], chunk)
  113. counter += 1
  114. if response.code.to_i >= 300
  115. failed += 1
  116. fail_detail[:datanode] += 1
  117. cached = nil
  118. client.finish
  119. client = nil
  120. end
  121. end
  122. return {:type => 'cachedwebhdfs', :count => counter, :failed => failed, :detail => fail_detail}
  123. end
  124.  
  125. path = base_path + '/' + type + '.txt'
  126. result = case type
  127. when 'httpfs'
  128. try_httpfs(path, host, port, username, chunk, seconds)
  129. when 'webhdfs'
  130. try_webhdfs(path, host, port, username, chunk, seconds)
  131. when 'cachedwebhdfs'
  132. try_cachedwebhdfs(path, host, port, username, chunk, seconds)
  133. end
  134.  
  135. warn "output type:#{result[:type]}, written:#{result[:count]}, failed:#{result[:failed]}"
  136. if result[:detail] # for cachedwebhdfs
  137. warn "\t namenode fail:#{result[:detail][:namenode]}, datanode fail:#{result[:detail][:datanode]}"
  138. end
  139. rate = (chunk.length * 8 * (result[:count] - result[:failed]) / (1000 * seconds)).floor / 1000
  140. warn "rate: #{rate} Mbps"
  141. exit(0)
Add Comment
Please, Sign In to add comment