Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #---------------------------------------------------------------------------------
- # Scanner-related methods
- def self.open_scanner(table, params)
- scanner = nil
- scanner = client(table).scannerOpenWithPrefix(table, params[:prefix], params[:columns]) if params[:prefix]
- scanner = client(table).scannerOpenWithStop(table, params[:start_id], params[:stop_id], params[:columns]) if params[:stop_id]
- scanner ||= client(table).scannerOpen(table, params[:start_id], params[:columns])
- return scanner
- end
- # Scans a set of rows emitting one row at a time
- def self.scan_rows(table, params = {})
- raise ArgumentError, "No block given" unless block_given?
- return {} unless enable_reads?(table)
- hbase = client(table)
- # Default params
- params = {
- :start_id => '', # start from the beginning
- :columns => [], # retrieve all the columns
- :rows_per_batch => 1, # retrieve rows one by one
- :limit => 0 # retrieve all rows
- }.merge(params)
- # Open a scanner
- scanner = retryable(:times => 5, :on => Thrift::Exception) do
- open_scanner(table, params)
- end
- limit = params[:limit]
- loop do
- # Get rows from the scanner
- rows = retryable(:times => 5, :on => Thrift::Exception) do
- hbase.scannerGetList(scanner, params[:rows_per_batch])
- end
- # ... until it is empty
- break if rows.empty?
- # ... and feed them to the caller one at a time
- rows.each do |row|
- columns = columns_to_hash(row.columns)
- yield(row.row, columns)
- # If 0 was passed as limit, it will never be 0 again, so it means "no limit"
- limit -= 1
- break if limit == 0
- end
- # Stop retrieving rows if limit reached.
- break if limit == 0
- end
- ensure
- # Close the scanner
- if hbase && scanner
- hbase.scannerClose(scanner) rescue nil
- end
- end
- def self.columns_to_hash(columns)
- return {} unless columns
- result = {}
- columns.each do |column, cell|
- result[column] = cell.value
- end
- return result
- end
Add Comment
Please, Sign In to add comment