Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- from cpy.parser import Parser
- from cpy.table import *
- from xlrd import *
- from xlrd.compdoc import *
- from xlrd.book import *
- parser = Parser()
- class MyCompDoc(CompDoc):
- def _locate_stream(self, mem, base, sat, sec_size, start_sid, expected_stream_size, qname, seen_id):
- # print >> self.logfile, "_locate_stream", base, sec_size, start_sid, expected_stream_size
- s = start_sid
- if s < 0:
- raise CompDocError("_locate_stream: start_sid (%d) is -ve" % start_sid)
- p = -99 # dummy previous SID
- start_pos = -9999
- end_pos = -8888
- slices = []
- tot_found = 0
- found_limit = (expected_stream_size + sec_size - 1) // sec_size
- while s >= 0:
- # if self.seen[s]:
- # print("_locate_stream(%s): seen" % qname, file=self.logfile); dump_list(self.seen, 20, self.logfile)
- # raise CompDocError("%s corruption: seen[%d] == %d" % (qname, s, self.seen[s]))
- self.seen[s] = seen_id
- tot_found += 1
- if tot_found > found_limit:
- raise CompDocError(
- "%s: size exceeds expected %d bytes; corrupt?"
- % (qname, found_limit * sec_size)
- ) # Note: expected size rounded up to higher sector
- if s == p + 1:
- # contiguous sectors
- end_pos += sec_size
- else:
- # start new slice
- if p >= 0:
- # not first time
- slices.append((start_pos, end_pos))
- start_pos = base + s * sec_size
- end_pos = start_pos + sec_size
- p = s
- s = sat[s]
- assert s == EOCSID
- assert tot_found == found_limit
- # print >> self.logfile, "_locate_stream(%s): seen" % qname; dump_list(self.seen, 20, self.logfile)
- if not slices:
- # The stream is contiguous ... just what we like!
- return (mem, start_pos, expected_stream_size)
- slices.append((start_pos, end_pos))
- # print >> self.logfile, "+++>>> %d fragments" % len(slices)
- return (b''.join(mem[start_pos:end_pos] for start_pos, end_pos in slices), 0, expected_stream_size)
- class MyBook(Book):
- def __init__(self):
- super(MyBook, self).__init__()
- def biff2_8_load(self, filename=None, file_contents=None,
- logfile=sys.stdout, verbosity=0, use_mmap=USE_MMAP,
- encoding_override=None,
- formatting_info=False,
- on_demand=False,
- ragged_rows=False,
- ):
- # DEBUG = 0
- self.logfile = logfile
- self.verbosity = verbosity
- self.use_mmap = use_mmap and MMAP_AVAILABLE
- self.encoding_override = encoding_override
- self.formatting_info = formatting_info
- self.on_demand = on_demand
- self.ragged_rows = ragged_rows
- if not file_contents:
- with open(filename, "rb") as f:
- f.seek(0, 2) # EOF
- size = f.tell()
- f.seek(0, 0) # BOF
- if size == 0:
- raise XLRDError("File size is 0 bytes")
- if self.use_mmap:
- self.filestr = mmap.mmap(f.fileno(), size, access=mmap.ACCESS_READ)
- self.stream_len = size
- else:
- self.filestr = f.read()
- self.stream_len = len(self.filestr)
- else:
- self.filestr = file_contents
- self.stream_len = len(file_contents)
- self.base = 0
- if self.filestr[:8] != SIGNATURE:
- # got this one at the antique store
- self.mem = self.filestr
- else:
- cd = MyCompDoc(self.filestr, logfile=self.logfile)
- if USE_FANCY_CD:
- for qname in ['Workbook', 'Book']:
- self.mem, self.base, self.stream_len = \
- cd.locate_named_stream(UNICODE_LITERAL(qname))
- if self.mem: break
- else:
- raise XLRDError("Can't find workbook in OLE2 compound document")
- else:
- for qname in ['Workbook', 'Book']:
- self.mem = cd.get_named_stream(UNICODE_LITERAL(qname))
- if self.mem: break
- else:
- raise XLRDError("Can't find workbook in OLE2 compound document")
- self.stream_len = len(self.mem)
- del cd
- if self.mem is not self.filestr:
- if hasattr(self.filestr, "close"):
- self.filestr.close()
- self.filestr = b''
- self._position = self.base
- if DEBUG:
- print("mem: %s, base: %d, len: %d" % (type(self.mem), self.base, self.stream_len), self.logfile)
- def open_workbook_xls(filename=None,
- logfile=sys.stdout, verbosity=0, use_mmap=USE_MMAP,
- file_contents=None,
- encoding_override=None,
- formatting_info=False, on_demand=False, ragged_rows=False,
- ):
- t0 = time.clock()
- if TOGGLE_GC:
- orig_gc_enabled = gc.isenabled()
- if orig_gc_enabled:
- gc.disable()
- bk = MyBook()
- try:
- bk.biff2_8_load(
- filename=filename, file_contents=file_contents,
- logfile=logfile, verbosity=verbosity, use_mmap=use_mmap,
- encoding_override=encoding_override,
- formatting_info=formatting_info,
- on_demand=on_demand,
- ragged_rows=ragged_rows,
- )
- t1 = time.clock()
- bk.load_time_stage_1 = t1 - t0
- biff_version = bk.getbof(XL_WORKBOOK_GLOBALS)
- if not biff_version:
- raise XLRDError("Can't determine file's BIFF version")
- if biff_version not in SUPPORTED_VERSIONS:
- raise XLRDError(
- "BIFF version %s is not supported"
- % biff_text_from_num[biff_version]
- )
- bk.biff_version = biff_version
- if biff_version <= 40:
- # no workbook globals, only 1 worksheet
- if on_demand:
- fprintf(bk.logfile,
- "*** WARNING: on_demand is not supported for this Excel version.\n"
- "*** Setting on_demand to False.\n")
- bk.on_demand = on_demand = False
- bk.fake_globals_get_sheet()
- elif biff_version == 45:
- # worksheet(s) embedded in global stream
- bk.parse_globals()
- if on_demand:
- fprintf(bk.logfile, "*** WARNING: on_demand is not supported for this Excel version.\n"
- "*** Setting on_demand to False.\n")
- bk.on_demand = on_demand = False
- else:
- bk.parse_globals()
- bk._sheet_list = [None for sh in bk._sheet_names]
- if not on_demand:
- bk.get_sheets()
- bk.nsheets = len(bk._sheet_list)
- if biff_version == 45 and bk.nsheets > 1:
- fprintf(bk.logfile,
- "*** WARNING: Excel 4.0 workbook (.XLW) file contains %d worksheets.\n"
- "*** Book-level data will be that of the last worksheet.\n",
- bk.nsheets
- )
- if TOGGLE_GC:
- if orig_gc_enabled:
- gc.enable()
- t2 = time.clock()
- bk.load_time_stage_2 = t2 - t1
- except:
- bk.release_resources()
- raise
- # normal exit
- if not on_demand:
- bk.release_resources()
- return bk
- def open_workbook(filename=None,
- logfile=sys.stdout,
- verbosity=0,
- use_mmap=USE_MMAP,
- file_contents=None,
- encoding_override=None,
- formatting_info=False,
- on_demand=False,
- ragged_rows=False,
- ):
- peeksz = 4
- if file_contents:
- peek = file_contents[:peeksz]
- else:
- with open(filename, "rb") as f:
- peek = f.read(peeksz)
- if peek == b"PK\x03\x04": # a ZIP file
- if file_contents:
- zf = zipfile.ZipFile(timemachine.BYTES_IO(file_contents))
- else:
- zf = zipfile.ZipFile(filename)
- # Workaround for some third party files that use forward slashes and
- # lower case names. We map the expected name in lowercase to the
- # actual filename in the zip container.
- component_names = dict([(X12Book.convert_filename(name), name)
- for name in zf.namelist()])
- if verbosity:
- logfile.write('ZIP component_names:\n')
- pprint.pprint(component_names, logfile)
- if 'xl/workbook.xml' in component_names:
- from xlrd import xlsx
- bk = xlsx.open_workbook_2007_xml(
- zf,
- component_names,
- logfile=logfile,
- verbosity=verbosity,
- use_mmap=use_mmap,
- formatting_info=formatting_info,
- on_demand=on_demand,
- ragged_rows=ragged_rows,
- )
- return bk
- if 'xl/workbook.bin' in component_names:
- raise XLRDError('Excel 2007 xlsb file; not supported')
- if 'content.xml' in component_names:
- raise XLRDError('Openoffice.org ODS file; not supported')
- raise XLRDError('ZIP file contents not a known type of workbook')
- from xlrd import book
- bk = open_workbook_xls(
- filename=filename,
- logfile=logfile,
- verbosity=verbosity,
- use_mmap=use_mmap,
- file_contents=file_contents,
- encoding_override=encoding_override,
- formatting_info=formatting_info,
- on_demand=on_demand,
- ragged_rows=ragged_rows,
- )
- return bk
- class MyExcelTableRows(TableField):
- def __init__(self, formatting_info=True, populate_merged=True):
- self.formatting_info = formatting_info
- self.populate_merged = populate_merged
- def __call__(self, table):
- return self._extract_rows(table)
- def _extract_rows(self, table):
- filename = table.prdata['filename']
- book = open_workbook(
- filename,
- on_demand=True,
- formatting_info=self.formatting_info
- )
- table_identifier = table.prdata['table_identifier']
- sheet = book.sheet_by_index(table_identifier)
- table.prdata['sheet_name'] = sheet.name
- book_date_mode = book.datemode
- rows = []
- for row in range(sheet.nrows):
- row_list = []
- for col in range(sheet.ncols):
- try:
- cell = sheet.cell(row, col)
- value = cell.value
- cell_type = cell.ctype
- except IndexError:
- cell_type = XL_CELL_ERROR
- if cell_type == XL_CELL_ERROR:
- value = ""
- elif cell_type == XL_CELL_DATE:
- try:
- date_value = xldate_as_tuple(value, book_date_mode)
- try:
- value = str(date(*date_value[:3]))
- except ValueError:
- pass
- except xlrd.xldate.XLDateError:
- pass
- if type(value) != str and type(value) != unicode:
- # This would cut and round (5 down)
- value = "%0.15g" % value
- row_list.append(value)
- rows.append(row_list)
- if self.populate_merged:
- # Populate values in merged cells
- for crange in sheet.merged_cells:
- rlo, rhi, clo, chi = crange
- value = rows[rlo][clo]
- for rowx in xrange(rlo, rhi):
- for colx in xrange(clo, chi):
- rows[rowx][colx] = value
- return rows
- class MyTableIdentifierExcel(TableField):
- def __init__(self, formatting_info=True, cleaner=lambda v: v):
- self.formatting_info = formatting_info
- self.cleaner = cleaner
- def __call__(self, table):
- filename = table.prdata['filename']
- table_descr = self.cleaner(table.prdata['table_descr'])
- table_regex = getre(table_descr, re.I | re.S | re.U)
- matched_sheets = []
- book = open_workbook(
- filename,
- on_demand=True,
- formatting_info=self.formatting_info
- )
- for index, name in enumerate(book.sheet_names()):
- if table_regex.search(name):
- matched_sheets.append(index)
- return matched_sheets
- @parser.source('file', "Laporan\s*Harian")
- class TableOne(AutoDateFinder.params(data_frequency='daily',
- priority_regexes=[r'(?P<day>\d{2})\W*(?P<month>\d{2})\W*(?P<year>(?:19|20)\d{2})']),
- Rows, ExcelTable):
- normalizer = CleanAllNonAlphaNumUnicode()
- duplicate_suffix = '_dummy'
- table_identifier = MyTableIdentifierExcel()
- @prdata
- def rows(self):
- rows = MyExcelTableRows()(self)
- """
- There are a lot of duplicated descriptions
- """
- found_descriptions = {}
- descr_col_index = None
- for row_idx, row in enumerate(rows):
- if descr_col_index is None:
- match_descr_col = match_forward(regex=getre(r'Provinsi', re.I | re.U), iterable=row)
- if match_descr_col[0] is not None:
- descr_col_index = match_descr_col[1]
- continue
- else:
- orig_descr = row[descr_col_index]
- descr = self.normalizer(orig_descr)
- if not descr:
- continue
- descr_count = found_descriptions.setdefault(descr, 0)
- descr_count += 1
- found_descriptions[descr] = descr_count
- if descr_count == 2:
- rows[row_idx][descr_col_index] = orig_descr + self.duplicate_suffix
- elif descr_count > 2:
- rows[row_idx][descr_col_index] = orig_descr + self.duplicate_suffix + str((descr_count - 1))
- return rows
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement