Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- from collections import defaultdict
- from datetime import datetime
- import json
- import fileinput
- import os
- import pprint
- from xml.dom.minidom import Document
- LINENO = defaultdict(int)
- (PASS, FAIL, SKIP) = ('PASS', 'FAIL', 'SKIP')
- RESULT_FILE = os.path.join('build', 'TEST-result.xml')
- class Testcase:
- def __init__(self):
- self.status = None
- self.timestamp = None
- self.elapsed = None
- self.lines = []
- def report(self, status, timestamp, elapsed):
- self.status = status
- self.timestamp = timestamp
- self.elapsed = elapsed
- def add_output(self, line):
- self.lines.append(line)
- class Testsuite:
- def __init__(self):
- self.package = None
- self.tests = defaultdict(Testcase)
- self.timestamp = None
- self.elapsed = None
- self.failures = 0
- self.skipped = 0
- self.total = 0
- self.lines = []
- def run(self, test):
- if test is None:
- self.elapsed = elapsed
- return
- _ = self.tests[test]
- self.total += 1
- def report(self, test, status, timestamp, elapsed):
- if test is None:
- self.elapsed = elapsed
- self.timestamp = timestamp
- return
- self.tests[test].report(status, timestamp, elapsed)
- if status == FAIL:
- self.failures += 1
- elif status == SKIP:
- self.skipped += 1
- def add_output(self, test, line):
- if test is None:
- self.lines.append(line)
- return
- self.tests[test].add_output(line)
- TESTSUITES = defaultdict(Testsuite)
- def debug_event(event):
- pprint.pprint(event)
- def report_test(status, event):
- package = event.get('Package')
- test = event.get('Test')
- elapsed = event.get('Elapsed')
- timestamp = event.get('Time')
- testsuite = TESTSUITES[package]
- testsuite.report(test, status, timestamp, elapsed)
- def action_fail(event):
- report_test(FAIL, event)
- def action_output(event):
- package = event.get('Package')
- test = event.get('Test')
- time = event.get('Time')
- output = event['Output']
- testsuite = TESTSUITES[package]
- testsuite.add_output(test, output)
- if 'Test' in event:
- test = event['Test']
- num = LINENO[test]
- LINENO[test] += 1
- slug = '{}+{}'.format(test, num)
- print('{:32s} {}'.format(slug, event['Output']), end='')
- else:
- print('{}'.format(event['Output']), end='')
- def action_pass(event):
- report_test(PASS, event)
- def action_run(event):
- package = event.get('Package')
- test = event.get('Test')
- time = event.get('Time')
- testsuite = TESTSUITES[package]
- testsuite.run(test)
- def action_skip(event):
- report_test(SKIP, event)
- ACTIONS = {
- 'fail': action_fail,
- 'output': action_output,
- 'pass': action_pass,
- 'run': action_run,
- 'skip': action_skip,
- }
- def process(line):
- try:
- event = json.loads(line)
- action = event['Action']
- ACTIONS.get(action, debug_event)(event)
- except (ValueError, KeyError):
- print('[ParseErr]', line, end='')
- def _createCDATAsections(doc, node, text):
- pos = text.find(']]>')
- while pos >= 0:
- tmp = text[0:pos+2]
- cdata = doc.createCDATASection(tmp)
- node.appendChild(cdata)
- text = text[pos+2:]
- pos = text.find(']]>')
- cdata = doc.createCDATASection(text)
- node.appendChild(cdata)
- def generate_report():
- print('#' * 80)
- print('Report:')
- doc = Document()
- testsuites = doc.createElement('testsuites')
- doc.appendChild(testsuites)
- total, failures, skipped = 0, 0, 0
- for package, suite in TESTSUITES.items():
- testsuite = doc.createElement('testsuite')
- testsuites.appendChild(testsuite)
- testsuite.setAttribute('name', package)
- testsuite.setAttribute('tests', str(suite.total))
- testsuite.setAttribute('time', '{:.3f}'.format(suite.elapsed))
- testsuite.setAttribute('timestamp', suite.timestamp)
- testsuite.setAttribute('failures', str(suite.failures))
- testsuite.setAttribute('skipped', str(suite.skipped))
- testsuite.setAttribute('name', package)
- if suite.lines:
- stdout = doc.createElement('system-out')
- testsuite.appendChild(stdout)
- _createCDATAsections(doc, stdout, ''.join(suite.lines))
- for name, case in suite.tests.items():
- print(case.status, name, '{:.3f}s'.format(case.elapsed))
- testcase = doc.createElement('testcase')
- testsuite.appendChild(testcase)
- testcase.setAttribute('name', name)
- testcase.setAttribute('time', '{:.3f}'.format(case.elapsed))
- testcase.setAttribute('timestamp', case.timestamp)
- if case.status == SKIP:
- skip = doc.createElement('skipped')
- testcase.appendChild(skip)
- elif case.status == FAIL:
- fail = doc.createElement('failure')
- testcase.appendChild(fail)
- if case.lines:
- stdout = doc.createElement('system-out')
- testcase.appendChild(stdout)
- _createCDATAsections(doc, stdout, ''.join(case.lines))
- print(package, 'total={} failed={} skipped={} duration={:.3f}s'.format(
- suite.total, suite.failures, suite.skipped, suite.elapsed))
- total += suite.total
- failures += suite.failures
- skipped += suite.skipped
- print('Ran {} tests, failed={} skipped={}'.format(total, failures, skipped))
- print('#' * 80)
- content = doc.toprettyxml()
- try:
- os.makedirs('build', exist_ok=True)
- with open(RESULT_FILE, 'wb') as junit:
- junit.write(content.encode('utf-8'))
- except Exception:
- print('Could not write {}'.format(RESULT_FILE))
- sys.exit(1)
- print('Successfully wrote {}'.format(RESULT_FILE))
- if __name__ == "__main__":
- for line in fileinput.input():
- process(line)
- generate_report()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement