Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # Encoding: UTF-8
- """
- Module for working with tabular data. Tabular data in this context means data in
- a column and row format.
- """
- import sys
- import os
- import math
- import texttable
- import broadpy.clitools as clitools
- from broadpy.clitools import console_width
- # Mapping between supported file formats and their respective load/save
- # functions.
- LOAD_EXT_MAPPING = {
- 'xlsx': 'load_xlsx_file'
- }
- SAVE_EXT_MAPPING = {
- 'xlsx': 'save_xlsx_file'
- }
- # Some error constants
- ERROR_UNSUPPORTED_FILE_EXTENSION = 0x0001
- ERROR_DUPLICATE_COLNAME = 0x0002
- class Table(object):
- """A class for representing a table."""
- def __init__(self):
- # This variable is used to remember which row of the file is the first
- # row imported. This is to maintain the correct row indexes when
- # printing the table.
- self.from_row = 0
- # Lists for the columns and rows.
- self.columns = []
- self.rows = []
- def _stringify_value(self, value):
- if type(value) in [str, unicode, int, float, long]:
- return unicode(value)
- elif value in [None, True, False]:
- return u''
- else:
- raise TypeError, 'Unknown type: {}'.format(type(value))
- def get(self, index):
- """Get the row at `index` as an object with each column as a key"""
- r = {}
- # Iterate through each cell of the row at `index` and assign it to the
- # object at the column of the index of the cell.
- for i, cell in enumerate(self.rows[index]):
- r[self.columns[i]] = cell
- return r
- def get_cell_value(self, row, col):
- return self.rows[row][col]
- def set_cell_value(self, row, col, value):
- self.rows[row][col] = value
- def get_row_count(self):
- return len(self.rows)
- def get_column_count(self):
- return len(self.columns)
- def get_column_header(self, index):
- return self.columns[index]
- def get_column_index(self, label):
- """Get the index of a column by the header."""
- return self.columns.index(label)
- def get_column_headers(self):
- """Returns a list containing all the column headers in order."""
- return self.columns
- def find(self, value, case_sensitive=False, offset=(0, 0), wrap=False):
- """Find a table cell by the input `value`.
- Args:
- value (str): The value to search for. For simplicity, `value` must
- always be a string. A numerical string will match integers in
- the table and empty strings will match `None`.
- offset: (optional) A sequence of two integers deciding where the
- search will start. The first number is the row, the second is
- the column. The search will start at the offset, so the cell at
- the offset could be matched.
- wrap (bool): (optional) Whether or not to continue from the start of
- the table when the end is reached without results.
- Returns:
- A `TableCell` instance or `None` if no results could be found.
- """
- def search_cell(row_index, column_index, x):
- # Get the cell x and ensure that it's a string.
- cell_value = self.get_cell_value(row_index, column_index)
- cell_value = self._stringify_value(cell_value)
- if not case_sensitive:
- cell_value = cell_value.lower()
- # Check if the search term is in the cell x.
- if x in cell_value:
- return TableCell(self, row_index, column_index)
- return None
- if not case_sensitive:
- value = value.lower()
- # Perform search.
- for row_index in xrange(offset[0], self.row_count):
- for column_index in xrange(self.column_count):
- # Skip the cells that are before the offset.
- if row_index == offset[0] and column_index < offset[1]:
- continue
- result = search_cell(row_index, column_index, value)
- if result is not None:
- return result
- # Wrap search?
- if wrap and (offset[0] > 0 or offset[1] > 0):
- for row_index in xrange(offset[0] + 1):
- for column_index in xrange(self.column_count):
- # Skip the cells that are after the offset.
- if row_index == offset[0] and column_index >= offset[1]:
- continue
- result = search_cell(row_index, column_index, value)
- if result is not None:
- return result
- # No results? Daw :(
- return None
- def each(self, fn, progress=None):
- """
- Run `fn` for each of the rows in the table.
- Args:
- fn (function): A function that will be executed for each row in the
- table with the respective item index and item as function
- arguments.
- progress (str): (optional) template for a status that will be
- displayed in the console and updated for every iteration.
- Example:
- >>> def myfunction(index, item):
- ... print index, item[name]
- >>>
- >>> tbl.each(myfunction)
- """
- # If a progress indicator is to be displayed, create the Progress class.
- if progress is not None:
- p = clitools.Progress(limit=len(self.rows), template=progress)
- else:
- p = None
- for i in range(0, len(self.rows)):
- # Run the input function.
- fn(i, self.get(i))
- # Update the progress indicator.
- if p is not None:
- p.update()
- # We're done using the progress indicator.
- if p: p.done()
- def remove_row(self, index):
- del self.rows[index]
- def add_column(self, index, colname, fn=None, progress=None):
- """
- Add a column to the table. Equivalent of using `add_columns` with one
- column name.
- """
- # Wrap the fn function to make it return an array.
- if fn:
- fn_wrapper = lambda i, row: [fn(i, row)]
- self.add_columns(index, [colname], fn and fn_wrapper or fn, progress)
- def add_columns(self, index, colnames, fn=None, progress=None):
- """
- Add columns to the table.
- Args:
- index (int): The index the columns will be inserted at.
- colnames (iterable): An iterable of strings containing the column
- names. This argument also specifies the amount of columns to
- insert.
- fn (callable): (optional) A callable that will be called for each
- existing row with the arguments (index, item). The callable's
- return value should be an iterable at least the length of
- `colnames` which will be used to fill the new columns for the
- current row. If this argument is not present the value of the
- new columns will be set to `None` for all rows.
- """
- if not fn:
- def fn(*args): return None
- # If the user wants the progress displayed, initiate a Progress class.
- if progress:
- progress = clitools.Progress(limit=len(self.rows),
- template=progress)
- # Duplicate column names not allowed.
- for colname in colnames:
- if colname in self.columns:
- raise TabularDataError('Column "%s" already exists.' % colname,
- ERROR_DUPLICATE_COLNAME)
- # Insert the new columns.
- for i, colname in enumerate(colnames):
- self.columns.insert(index+i, colname)
- # Iterate over the rows and run `fn` for each one.
- rownum = len(self.rows)
- for i in xrange(0, rownum):
- # If a progress template was provided, update the progress for
- # every iteration.
- if progress:
- progress.update()
- val = fn(i, self.get(i))
- for num in xrange(0, len(colnames)):
- self.rows[i].insert(index+num, val and val[num] or None)
- # If a progress template was used, print a newline now.
- if progress:
- sys.stdout.write(u'\n')
- def append_column(self, colname, fn=None, progress=None):
- """
- Adds a column to the far right of the table. Equivalent of calling
- `add_column` with the index being `len(columns)`.
- """
- self.add_column(len(self.columns), colname, fn, progress)
- def append_columns(self, colnames, fn=None, progress=None):
- """
- Adds columns to the far right of the table. Equivalent of calling
- `add_columns` with the index being `len(columns)`.
- """
- self.add_columns(len(self.columns), colnames, fn, progress)
- def remove_column(self, index):
- """Removes a column. See `Table.remove_columns`."""
- self.remove_columns([index])
- def remove_columns(self, indexes):
- """
- Remove the table columns at the input indexes.
- Args:
- indexes: An iterable yielding integers.
- """
- for index in reversed(sorted(indexes)):
- del self.columns[index]
- for row in self.rows:
- del row[index]
- def insert_row(self, index, values=None):
- """Insert a row to the table.
- Args:
- index (int): The index in which to insert the row.
- values: (optional) The values of the row. This must either be `None`
- or a sequence type with the exact length as the number of
- columns.
- """
- if values is None:
- values = [None for x in xrange(0, self.column_count)]
- else:
- values = list(values)
- assert len(values) == self.column_count, 'Invalid item count in row'
- self.rows.insert(index, values)
- def append_row(self, values=None):
- """Adds a row to the bottom of the table.
- Args:
- values: (optional) See `Table.insert_row`.
- """
- self.insert_row(self.row_count, values)
- def to_text(self, cols=None, max_width='auto', from_row=None, to_row=None):
- """Return a text representation of the table"""
- if not cols:
- cols = self.columns
- # Default the max width of the table to the width of the console.
- if max_width == 'auto':
- max_width = console_width() - 2
- # Prepare the column headers to draw. Is cols is set, draw only the
- # columns in cols.
- columns = ['#'] + [x for x in self.columns if x in cols]
- # Prepare the rows to draw. If cols is set, filter which cells are
- # displayed by those columns.
- rows = []
- for rowindex, row in enumerate(self.rows):
- # Honor the from_row and to_row arguments.
- if from_row is not None and rowindex < from_row:
- continue
- if to_row is not None and rowindex > to_row:
- continue
- r = [self.from_row + rowindex]
- for i, cell in enumerate(row):
- # Filter which columns are displayed for every row.
- if self.columns[i] in cols:
- r.append(cell is not None and cell or u'')
- rows.append(r)
- table = texttable.Texttable(max_width=max_width)
- table.set_cols_dtype(['t'] * len(columns))
- table.add_rows([columns] + rows)
- return table.draw()
- def save(self, path):
- """
- Save this `Table` instance to a file. The file format depends on the
- file extendion of `path`.
- Args:
- path (str): The `Table` instance will be saved to this path. The
- file format will be chosen automatically based on the file
- extension of `path`.
- """
- # Extract the file extension.
- ext = os.path.splitext(path)[1][1:]
- if ext not in SAVE_EXT_MAPPING:
- raise TabularDataError(
- 'Unknown file extension: {}'.format(repr(ext)),
- ERROR_UNSUPPORTED_FILE_EXTENSION
- )
- # Fetch the save function for this file extension.
- fn = get_from_self(SAVE_EXT_MAPPING[ext])
- fn(self, path)
- # Properties.
- row_count = property(get_row_count)
- column_count = property(get_column_count)
- column_headers = property(get_column_headers)
- class TableCell(object):
- """A table cell for use with the `Table`."""
- def __init__(self, table, row_index, column_index):
- self.table = table
- self.row_index = row_index
- self.column_index = column_index
- def __unicode__(self):
- return unicode(str(self))
- def __str__(self):
- return self.value
- def __repr__(self):
- return '<TableCell(row={}, col={}, value={})>'.format(
- self.row_index, self.column_index, repr(self.value)
- )
- def get_value(self):
- return self.table.get_cell_value(self.row_index, self.column_index)
- def set_value(self, value):
- self.table.set_cell_value(self.row_index, self.column_index)
- # Properties
- value = property(get_value, set_value)
- position = property(lambda self: (self.row_index, self.column_index))
- class TabularDataError(Exception):
- pass
- def from_file(path, from_row=None, to_row=None, progress=None):
- """
- Method for loading a tabular data file.
- Args:
- path (str): The path to the file.
- Returns:
- A `Table` instance representing the file.
- """
- # Extract the file extension.
- ext = os.path.splitext(path)[1][1:]
- # Do we know this file extension?
- if ext not in LOAD_EXT_MAPPING:
- raise TabularDataError(
- 'Unknown file extension: {}'.format(repr(ext)),
- ERROR_UNSUPPORTED_FILE_EXTENSION
- )
- # Fetch a reference to the function responsible for loading the provided
- # file format.
- fn = get_from_self(LOAD_EXT_MAPPING[ext])
- return fn(path, from_row, to_row, progress)
- def load_xlsx_file(path, from_row=None, to_row=None, progress=None):
- """
- Function for loading an Excel file (xlsx) using the openpyxl library.
- Note:
- If the file contains multiple sheets only the first one will be used.
- Args:
- path (str): The path of the Excel file.
- Returns:
- A `Table` representation of the file.
- """
- from openpyxl import load_workbook
- # Wrap progress so we don't have to check it for every usage.
- if progress is None:
- progress = lambda *args: None
- # Ready a Table instance.
- table = Table()
- table.from_row = from_row is None and 0 or from_row
- # Load the Excel file.
- wb = load_workbook(path, use_iterators=True)
- sheet = wb.worksheets[0]
- rownum = sheet.get_highest_row() - 1
- # colnum = letter_to_index(sheet.get_highest_column()) + 1
- # Get the column widths.
- # column_widths = get_xlsx_column_widths(worksheet=sheet)
- def _process_cell(x):
- """
- This function decides which type to interpret cell value `x` as.
- * If `x` is a number, it's returned unchanged UNLESS it's a round
- float, in which case it is converted to an int.
- * If `x` is a string type it is stripped of enclosing whitespace and if
- it is 100% whitespace, `None` is returned. Otherwise a conversion to
- a number is attempted.
- * If `x` is a `str` it is converted to unicode.
- """
- if type(x) in [int, long, float]:
- # If this is a float with a round number, convert it to an int.
- if x % 1 == 0:
- return int(x)
- return x
- if type(x) in [str, unicode]:
- x = x.strip()
- if len(x) < 1:
- return None
- try:
- return int(x)
- except ValueError:
- pass
- try:
- return float(x.replace(',', '.'))
- except ValueError:
- pass
- if type(x) is str:
- return unicode(str)
- return x
- # Iterate over each row. The first row is the table columns, so treat the
- # remaining rows as if their index was i-1.
- for rowindex, row in enumerate(sheet.iter_rows()):
- if rowindex > 0:
- # If from_row is set and this row is below that index, skip.
- if from_row is not None and rowindex - 1 < from_row:
- continue
- # If to_row is set and this row is above that index, stop.
- if to_row is not None and rowindex - 1 > to_row:
- break
- table.rows.append([])
- # Iterate over each cell in the row.
- for cell in row:
- if rowindex == 0:
- table.columns.append(_process_cell(cell.internal_value))
- else:
- table.rows[-1].append(_process_cell(cell.internal_value))
- progress(rowindex, rownum)
- # table.column_widths = column_widths[0:len(table.columns)]
- # Return the table.
- return table
- def get_xlsx_column_widths(filepath=None, worksheet=None):
- """Fetches the column widths from the Excel 2007 (xlsx) file using the
- openpyxl library.
- Must supply ONE of the arguments, not both.
- Args:
- filepath (str): The path to the Excel file.
- worksheet: The openpyxl Worksheet.
- Returns:
- Returns a list of floats.
- """
- if filepath is not None:
- from openpyxl import load_workbook
- wb = load_workbook(filepath)
- sheet = wb.worksheets[0]
- elif worksheet is not None:
- sheet = worksheet
- else:
- raise ValueError('Must supply either filepath or worksheet.')
- widths = sheet.column_dimensions.iteritems()
- widths = sorted(widths, key=lambda x: x[0])
- output = []
- for key, value in widths:
- output.append(value.width)
- return output
- def save_xlsx_file(table, path):
- """
- Function for saving a `Table` instance to an Excel 2007 (xlsx) file using
- the openpyxl library.
- Args:
- table (tabulardata.Table): The table to save.
- path (str): The path the table will be saved to.
- """
- from openpyxl import Workbook
- # Create an Excel workbook for the output.
- wb = Workbook()
- # Create a new sheet in the workbook for the table.
- ws = wb.worksheets[0]
- # Write the columns to the sheet.
- ws.append(table.columns)
- # Iterate over the table rows and append them to the worksheet.
- for row in table.rows:
- ws.append([item or '' for item in row])
- wb.save(path)
- def letter_to_index(letter):
- """Converts a column letter, e.g. "A", "B", "AA", "BC" etc. to a zero based
- column index.
- A becomes 0, B becomes 1, Z becomes 25, AA becomes 26 etc.
- Args:
- letter (str): The column index letter.
- Returns:
- The column index as an integer.
- """
- letter = letter.upper()
- result = 0
- for index, char in enumerate(reversed(letter)):
- # Get the ASCII number of the letter and subtract 64 so that A
- # corresponds to 1.
- num = ord(char) - 64
- # Multiply the number with 26 to the power of `index` to get the correct
- # value of the letter based on it's index in the string.
- final_num = (26 ** index) * num
- result += final_num
- # Subtract 1 from the result to make it zero-based before returning.
- return result - 1
- def get_from_self(name):
- """Shortcut function for fetching an item from this module"""
- self = sys.modules[__name__]
- return getattr(self, name)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement