Guest User

Untitled

a guest
Jan 23rd, 2018
72
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.17 KB | None | 0 0
  1. #---------------------------------------------------------------------------------
  2. # Scanner-related methods
  3.  
  4. def self.open_scanner(table, params)
  5. scanner = nil
  6. scanner = client(table).scannerOpenWithPrefix(table, params[:prefix], params[:columns]) if params[:prefix]
  7. scanner = client(table).scannerOpenWithStop(table, params[:start_id], params[:stop_id], params[:columns]) if params[:stop_id]
  8. scanner ||= client(table).scannerOpen(table, params[:start_id], params[:columns])
  9. return scanner
  10. end
  11.  
  12. # Scans a set of rows emitting one row at a time
  13. def self.scan_rows(table, params = {})
  14. raise ArgumentError, "No block given" unless block_given?
  15. return {} unless enable_reads?(table)
  16. hbase = client(table)
  17.  
  18. # Default params
  19. params = {
  20. :start_id => '', # start from the beginning
  21. :columns => [], # retrieve all the columns
  22. :rows_per_batch => 1, # retrieve rows one by one
  23. :limit => 0 # retrieve all rows
  24. }.merge(params)
  25.  
  26. # Open a scanner
  27. scanner = retryable(:times => 5, :on => Thrift::Exception) do
  28. open_scanner(table, params)
  29. end
  30. limit = params[:limit]
  31.  
  32. loop do
  33. # Get rows from the scanner
  34. rows = retryable(:times => 5, :on => Thrift::Exception) do
  35. hbase.scannerGetList(scanner, params[:rows_per_batch])
  36. end
  37. # ... until it is empty
  38. break if rows.empty?
  39. # ... and feed them to the caller one at a time
  40. rows.each do |row|
  41. columns = columns_to_hash(row.columns)
  42. yield(row.row, columns)
  43. # If 0 was passed as limit, it will never be 0 again, so it means "no limit"
  44. limit -= 1
  45. break if limit == 0
  46. end
  47. # Stop retrieving rows if limit reached.
  48. break if limit == 0
  49. end
  50. ensure
  51. # Close the scanner
  52. if hbase && scanner
  53. hbase.scannerClose(scanner) rescue nil
  54. end
  55. end
  56.  
  57. def self.columns_to_hash(columns)
  58. return {} unless columns
  59. result = {}
  60. columns.each do |column, cell|
  61. result[column] = cell.value
  62. end
  63. return result
  64. end
Add Comment
Please, Sign In to add comment