Add regression test reporter for analysis PiperOrigin-RevId: 600987475
diff --git a/ui_automator/commission_reg_test.py b/ui_automator/commission_reg_test.py new file mode 100644 index 0000000..c210401 --- /dev/null +++ b/ui_automator/commission_reg_test.py
@@ -0,0 +1,43 @@ +from __future__ import annotations +from typing import TYPE_CHECKING +import unittest + +# pylint:disable=g-import-not-at-top +if TYPE_CHECKING: + from ui_automator import ui_automator + +# TODO(b/318771536): Check if a device is commissioned on GHA. +_commissioned: bool = False + + +class CommissionRegTest(unittest.TestCase): + """Test class for running commission regression test.""" + + def __init__( + self, + ui_automator: ui_automator.UIAutomator, + test_name: str, + device_name: str | None, + pairing_code: str | None = None, + gha_room: str | None = None, + ) -> None: + super().__init__(methodName=test_name) + self.ui_automator = ui_automator + self.device_name = device_name + self.pairing_code = pairing_code + self.gha_room = gha_room + + def test_commission(self) -> None: + global _commissioned + _commissioned = False + self.ui_automator.commission_device( + self.device_name, self.pairing_code, self.gha_room + ) + # TODO(b/318771536): Check if a device is commissioned on GHA. + _commissioned = True + + def test_decommission(self) -> None: + if not _commissioned: + self.skipTest('Device was not commissioned.') + + self.ui_automator.decommission_device(self.device_name)
diff --git a/ui_automator/test_reporter.py b/ui_automator/test_reporter.py new file mode 100644 index 0000000..ab906c8 --- /dev/null +++ b/ui_automator/test_reporter.py
@@ -0,0 +1,360 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""A helper to create reports in TXT and XML from running regression tests. + +The helper inherits from absl/testing/xml_reporter.py and provides two +additional properties, including producing a xml file even when test is +interrupted and producing a txt file of test summary. +""" + +import collections +import io +import logging +import os +import time +import unittest + +from absl.testing import xml_reporter +import immutabledict + + +_REPORT_TEST_CASE_TITLE_TO_TEST_NAME = immutabledict.immutabledict({ + 'Commission to GHA': 'test_commission', + 'Removing from GHA': 'test_decommission', +}) + +_SUMMARY_COL_INDENTS = 25 +_TEST_CASE_TITLE_INDENTS = 25 +_TEST_RESULT_INDENTS = 8 + + +def duration_formatter(duration_in_seconds: float) -> str: + hour_count, remainder = divmod(duration_in_seconds, 3600) + minute_count, second_count = divmod(remainder, 60) + return ( + f'{int(hour_count)} hrs, {int(minute_count)} mins,' + f' {int(second_count)} secs' + ) + + +# pylint: disable=protected-access +class _TestCaseResult(xml_reporter._TestCaseResult): + """Private helper for TestResult.""" + + def __init__(self, test): + super().__init__(test) + self.status = None + self.result = None + + def print_xml_summary(self, stream: io.TextIOWrapper): + """Prints an XML summary of a TestCase. + + Status and result are populated as per JUnit XML test result reporter. + A test that has been skipped will always have a skip reason, + as every skip method in Python's unittest requires the reason arg to be + passed. + + Args: + stream: output stream to write test report XML to. + """ + if self.skip_reason is None: + self.status = 'run' + self.result = 'PASS' if not self.errors else 'FAIL' + else: + self.status = 'notrun' + self.result = 'N/A' + + test_case_attributes = [ + ('name', self.name), + ('status', self.status), + ('result', self.result), + ('time', f'{self.run_time:.3f}'), + ('classname', self.full_class_name), + ('timestamp', xml_reporter._iso8601_timestamp(self.start_time)), + ] + xml_reporter._print_xml_element_header( + 'testcase', test_case_attributes, stream, indentation=' ' + ) + self._print_testcase_details(stream) + if self.skip_reason: + stream.write(' <properties>\n') + stream.write( + ' <property name="%s" value="%s"></property>\n' + % ( + xml_reporter._escape_xml_attr('skip_reason'), + xml_reporter._escape_xml_attr(self.skip_reason), + ) + ) + stream.write(' </properties>\n') + stream.write(' </testcase>\n') + + +class _TestSuiteResult(xml_reporter._TestSuiteResult): + """Private helper for TestResult. + + The `print_xml_summary` function has been overridden to present test cases in + the XML file in ascending order by their start time. This modification is + implemented solely at line 168, where the sorting criterion has been changed + from `t.name` to `t.start_time`. All other lines remain identical to those in + the original `xml_reporter`. + """ + + def print_xml_summary(self, stream: io.TextIOWrapper): + """Prints an XML Summary of a TestSuite. + + Args: + stream: output stream to write test report XML to. + """ + overall_test_count = sum(len(x) for x in self.suites.values()) + overall_failures = sum(self.failure_counts.values()) + overall_errors = sum(self.error_counts.values()) + overall_attributes = [ + ('name', ''), + ('tests', f'{overall_test_count}'), + ('failures', f'{overall_failures}'), + ('errors', f'{overall_errors}'), + ('time', f'{(self.overall_end_time - self.overall_start_time):.3f}'), + ('timestamp', xml_reporter._iso8601_timestamp(self.overall_start_time)), + ] + xml_reporter._print_xml_element_header( + 'testsuites', overall_attributes, stream + ) + if self._testsuites_properties: + stream.write(' <properties>\n') + for name, value in sorted(self._testsuites_properties.items()): + stream.write( + ' <property name="%s" value="%s"></property>\n' + % ( + xml_reporter._escape_xml_attr(name), + xml_reporter._escape_xml_attr(str(value)), + ) + ) + stream.write(' </properties>\n') + + for suite_name in self.suites: + suite = self.suites[suite_name] + suite_end_time = max(x.start_time + x.run_time for x in suite) + suite_start_time = min(x.start_time for x in suite) + failures = self.failure_counts[suite_name] + errors = self.error_counts[suite_name] + suite_attributes = [ + ('name', f'{suite_name}'), + ('tests', f'{len(suite)}'), + ('failures', f'{failures}'), + ('errors', f'{errors}'), + ('time', f'{(suite_end_time - suite_start_time):.3f}'), + ('timestamp', xml_reporter._iso8601_timestamp(suite_start_time)), + ] + xml_reporter._print_xml_element_header( + 'testsuite', suite_attributes, stream + ) + + # test_case_result entries are not guaranteed to be in any user-friendly + # order, especially when using subtests. So sort them. + for test_case_result in sorted(suite, key=lambda t: t.start_time): + test_case_result.print_xml_summary(stream) + stream.write('</testsuite>\n') + stream.write('</testsuites>\n') + + +# pylint: disable=protected-access +class TestResult(xml_reporter._TextAndXMLTestResult): + """Test Result class to write test results even when test is terminated.""" + + _TEST_SUITE_RESULT_CLASS = _TestSuiteResult + _TEST_CASE_RESULT_CLASS = _TestCaseResult + + # Instance of TestResult class. Allows writing test results when suite is + # terminated - use `writeAllResultsToXml` classmethod. + # Technicaly it's possible to have more than one instance of this class but it + # doesn't happen when running tests that write to XML file (since they'd + # override this file results) so we can safely use only last created instance. + _instance = None + + def __init__(self, logger=logging.getLogger(), *args, **kwargs): + super().__init__(*args, **kwargs) + TestResult._instance = self + self._logger = logger + + @classmethod + def write_summary_in_txt(cls) -> None: + if not cls._instance: + return + + summary_file_path = ( + f"summary_{time.strftime('%Y%m%d%H%M%S', time.localtime())}.txt" + ) + + test_results_by_test_name: dict[str, list[_TestCaseResult]] = ( + collections.defaultdict(list) + ) + # Groups test cases by its name for later sorting. + # It is to avoid incorrect sorting as different suites may run same test + # cases, but those test cases do not run in order. + for suite_name in cls._instance.suite.suites: + suite = cls._instance.suite.suites[suite_name] + for test_case_result in suite: + test_results_by_test_name[test_case_result.name].append( + test_case_result + ) + + if not test_results_by_test_name: + cls._instance._logger.info('Summary can not be saved without test runs.') + return + + # Tests can be stopped unexpectedly, so find the maximum as total runs. + total_runs = max(map(len, test_results_by_test_name.values())) + + is_regression_test_pass = [True for _ in range(total_runs)] + + # Save each test case result to a temp stream. + test_case_result_stream = io.StringIO() + test_case_result_stream.write( + f"{'Test Case/Test Run':<{_TEST_CASE_TITLE_INDENTS}}" + ) + for i in range(total_runs): + test_case_result_stream.write(f"{f'#{i + 1}':<{_TEST_RESULT_INDENTS}}") + for test_case_title in _REPORT_TEST_CASE_TITLE_TO_TEST_NAME: + if test_results_by_test_name[ + _REPORT_TEST_CASE_TITLE_TO_TEST_NAME[test_case_title] + ]: + test_case_result_stream.write('\n') + test_case_result_stream.write( + f'{test_case_title:<{_TEST_CASE_TITLE_INDENTS}}' + ) + for i, test_result in enumerate( + sorted( + test_results_by_test_name[ + _REPORT_TEST_CASE_TITLE_TO_TEST_NAME[test_case_title] + ], + key=lambda t: t.start_time, + ) + ): + if is_regression_test_pass[i] and test_result.result != 'PASS': + is_regression_test_pass[i] = False + test_case_result_stream.write( + f'{test_result.result:<{_TEST_RESULT_INDENTS}}' + ) + + # Start writing summary. + test_date = time.strftime( + '%Y/%m/%d', time.localtime(cls._instance.suite.overall_start_time) + ) + duration = duration_formatter( + cls._instance.suite.overall_end_time + - cls._instance.suite.overall_start_time + ) + total_successful_runs = is_regression_test_pass.count(True) + success_rate = round( + 100.0 * float(total_successful_runs) / float(total_runs) + ) + # TODO(b/317837867): Replace all placeholders with real values. + rows: list[list[str]] = [] + rows.append(['Summary', '', 'Version Info', '']) + rows.append(['DUT:', 'placeholder', 'GHA', 'placeholder']) + rows.append(['Test Time:', test_date, 'GMSCore', 'placeholder']) + rows.append(['Duration:', duration, 'Hub', 'placeholder']) + rows.append(['Number of runs:', str(total_runs), 'Device', 'placeholder']) + rows.append([ + 'Success Rate:', + f'{success_rate}%({total_successful_runs}/{total_runs})', + ]) + + f = open(summary_file_path, 'w', encoding='utf-8') + for row in rows: + f.writelines(element.ljust(_SUMMARY_COL_INDENTS) for element in row) + f.write('\n') + + f.writelines(['\n', '\n']) + + # Writes test case result saved in temp stream. + f.write(test_case_result_stream.getvalue()) + + f.close() + cls._instance._logger.info( + 'Summary of regression tests has been written to %s.', summary_file_path + ) + + @classmethod + def writeAllResultsToXml(cls): + """Writes current results to XML output. Used when test is interrupted.""" + if cls._instance: + cls._instance.writeXmlResultsForTerminatedSuite() + + def writeXmlResultsForTerminatedSuite(self): + """Writes test results to XML output.""" + self._writeSuiteResultsToXml() + self._logger.info('TestResult class stream written to results XML.') + + def _writeSuiteResultsToXml(self): + self.suite.print_xml_summary(self.xml_stream) + + +class TestRunner(xml_reporter.TextAndXMLTestRunner): + """A test runner that can produce both formatted text results and XML. + + It prints out a summary of the results in XML and a summary of results in TXT + at the end of the test run even if the test is interrupted. + """ + + _TEST_RESULT_CLASS = TestResult + + def __init__( + self, xml_file_path=None, logger=logging.getLogger(), *args, **kwargs + ): + """Initializes a TestRunner. + + Args: + xml_file_path: XML-formatted test results are output to this file path. If + None (the default), xml file will be written to default path. + logger: Logger instance. + *args: passed unmodified to xml_reporter.TextAndXMLTestRunner.__init__. + **kwargs: passed unmodified to xml_reporter.TextAndXMLTestRunner.__init__. + """ + self._logger = logger + cur_path = os.path.abspath(os.path.dirname(__file__)) + default_xml_file_path = os.path.join( + cur_path, + f"summary_{time.strftime('%Y%m%d%H%M%S', time.localtime())}.xml", + ) + self._xml_file_path = xml_file_path or default_xml_file_path + super().__init__(xml_stream=open(self._xml_file_path, 'w'), *args, **kwargs) + + def run(self, suite: unittest.TestSuite) -> None: + """Runs tests and generates reports in XML and TXT. + + Args: + suite: TestSuite should be run for regression testing. + """ + try: + super().run(suite) + except KeyboardInterrupt: + # In case tests are interrupted, xml file won't be written by TestRunner. + # Hence, call `writeAllResultsToXml` to write data to given xml path. + TestResult.writeAllResultsToXml() + finally: + self._logger.info(f'Xml file saved to {self._xml_file_path}.') + TestResult.write_summary_in_txt() + + def _makeResult(self): + return self._TEST_RESULT_CLASS( + logger=self._logger, + xml_stream=self._xml_stream, + stream=self.stream, + descriptions=self.descriptions, + verbosity=self.verbosity, + time_getter=time.time, + testsuites_properties=self._testsuites_properties, + )
diff --git a/ui_automator/test_reporter_test.py b/ui_automator/test_reporter_test.py new file mode 100644 index 0000000..0470fde --- /dev/null +++ b/ui_automator/test_reporter_test.py
@@ -0,0 +1,351 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unittest for test reporter.""" +import io +import logging +import os +import re +import time +import traceback +from typing import Any, Literal +import unittest +from unittest import mock + +from absl.testing import xml_reporter + +from ui_automator import test_reporter +from ui_automator import unit_test_utils + + +class StringIOWriteLn(io.StringIO): + + def writeln(self, line): + self.write(line + '\n') + + +class MockTest(unittest.TestCase): + failureException = AssertionError + + def __init__(self, name): + super().__init__() + self.name = name + + def id(self): + return self.name + + def shortDescription(self): + return "This is this test's description." + + +def _run_test_from_result( + test: MockTest, + test_result: test_reporter.TestResult, + result: Literal['PASS', 'N/A', 'FAIL'], + err=None, + skip_reason: str | None = None, +) -> None: + test_result.startTest(test) + if result == 'FAIL': + test_result.addError(test, err) + elif result == 'N/A': + test_result.addSkip(test, skip_reason) + else: + test_result.addSuccess(test) + test_result.stopTest(test) + + +class TestReporterTest(unittest.TestCase): + + def setUp(self): + super().setUp() + self.stream = StringIOWriteLn() + self.xml_stream = io.StringIO() + + def _make_result( + self, start_time: int, end_time: int, test_count: int + ) -> test_reporter.TestResult: + # Called by result.startTestRun(). + times = [start_time] + for i in range(test_count): + # Called by result.startTest(). + times.append(start_time) + # Called by result.stopTest(). + times.append(end_time + i) + # Called by result.stopTestRun(). + times.append(end_time + test_count - 1) + with mock.patch.object(time, 'time', autospec=True, side_effect=times): + return test_reporter.TestResult( + xml_stream=self.xml_stream, + stream=self.stream, + descriptions=True, + verbosity=0, + time_getter=time.time, + ) + + def _assert_match(self, regex, output, flags=0) -> tuple[str | Any, ...]: + result = re.search(regex, output, flags) + if result is None: + self.fail(f'{output} does not match {regex}.') + return result.groups() + + def test_with_skipped_test(self): + start_time = 100 + end_time = 200 + test_name = 'skipped_test_with_reason' + result = self._make_result(start_time, end_time, 1) + start_time_str = re.escape(unit_test_utils.iso_timestamp(start_time)) + run_time = end_time - start_time + expected_re = unit_test_utils.OUTPUT_STRING % { + 'suite_name': 'MockTest', + 'tests': 1, + 'failures': 0, + 'errors': 0, + 'run_time': run_time, + 'start_time': start_time_str, + } + expected_testcase_re = unit_test_utils.TESTCASE_STRING_WITH_PROPERTIES % { + 'run_time': run_time, + 'start_time': start_time_str, + 'test_name': test_name, + 'status': 'notrun', + 'class_name': '', + 'result': 'N/A', + 'properties': ( + ' <property name="skip_reason" value="skip"></property>' + ), + 'message': '', + } + + test = MockTest('%s' % test_name) + result.startTestRun() + result.startTest(test) + result.addSkip(test, 'skip') + result.stopTest(test) + result.stopTestRun() + result.printErrors() + + (testcase,) = self._assert_match( + expected_re, self.xml_stream.getvalue(), re.DOTALL + ) + self.assertRegex(testcase, expected_testcase_re) + + def test_with_errored_test(self): + start_time = 100 + end_time = 200 + test_name = 'test_with_errors' + result = self._make_result(start_time, end_time, 1) + fake_error = Exception('fake_error') + err = (Exception, fake_error, fake_error.__traceback__) + start_time_str = re.escape(unit_test_utils.iso_timestamp(start_time)) + run_time = end_time - start_time + expected_re = unit_test_utils.OUTPUT_STRING % { + 'suite_name': 'MockTest', + 'tests': 1, + 'failures': 0, + 'errors': 1, + 'run_time': run_time, + 'start_time': start_time_str, + } + expected_testcase_re = unit_test_utils.TESTCASE_STRING_WITH_ERRORS % { + 'run_time': run_time, + 'start_time': start_time_str, + 'test_name': test_name, + 'status': 'run', + 'class_name': '', + 'result': 'FAIL', + 'message': xml_reporter._escape_xml_attr(str(err[1])), + 'error_type': xml_reporter._escape_xml_attr(str(err[0])), + 'error_msg': xml_reporter._escape_cdata( + ''.join(traceback.format_exception(*err)) + ), + } + + test = MockTest('%s' % test_name) + result.startTestRun() + result.startTest(test) + result.addError(test, err) + result.stopTest(test) + result.stopTestRun() + result.printErrors() + + (testcase,) = self._assert_match( + expected_re, self.xml_stream.getvalue(), re.DOTALL + ) + self.assertRegex(testcase, expected_testcase_re) + + @mock.patch('builtins.open', autospec=True) + def test_write_summary_in_txt_saves_summary_to_a_file(self, mock_open): + fake_stream = io.StringIO() + mock_open.return_value = fake_stream + start_time = 100 + end_time = 200 + result = self._make_result(start_time, end_time, 6) + fake_error = Exception('fake_error') + err = (Exception, fake_error, fake_error.__traceback__) + # 5 is the number explicitly add in _make_result. + run_time = end_time - start_time + 5 + now = time.localtime() + expected_summary = re.escape( + unit_test_utils.make_summary( + test_date=time.strftime('%Y/%m/%d', now), + duration=test_reporter.duration_formatter(run_time), + total_runs=3, + total_successful_runs=2, + ) + ) + res_of_test_commission = ['FAIL', 'PASS', 'PASS'] + res_of_test_decommission = ['N/A', 'PASS', 'PASS'] + expected_test_case_result = unit_test_utils.make_test_case_result( + 3, + res_of_test_commission=res_of_test_commission, + res_of_test_decommission=res_of_test_decommission, + ) + test_commission_first_round = MockTest('test_commission') + test_decommission_first_round = MockTest('test_decommission') + test_commission_second_round = MockTest('test_commission') + test_decommission_second_round = MockTest('test_decommission') + test_commission_third_round = MockTest('test_commission') + test_decommission_third_round = MockTest('test_decommission') + result.startTestRun() + _run_test_from_result( + test_commission_first_round, + result, + res_of_test_commission[0], + err, + ) + _run_test_from_result( + test_decommission_first_round, + result, + res_of_test_decommission[0], + skip_reason='skip', + ) + _run_test_from_result( + test_commission_second_round, result, res_of_test_commission[1] + ) + _run_test_from_result( + test_decommission_second_round, + result, + res_of_test_decommission[1], + ) + _run_test_from_result( + test_commission_third_round, result, res_of_test_commission[2] + ) + _run_test_from_result( + test_decommission_third_round, result, res_of_test_decommission[2] + ) + result.stopTestRun() + result.printErrors() + + with mock.patch.object(fake_stream, 'close'): + with mock.patch.object(time, 'localtime', return_value=now): + test_reporter.TestResult.write_summary_in_txt() + + mock_open.assert_called_once_with( + f"summary_{time.strftime('%Y%m%d%H%M%S', now)}.txt", + 'w', + encoding='utf-8', + ) + self.assertRegex(fake_stream.getvalue(), expected_summary) + self.assertRegex(fake_stream.getvalue(), expected_test_case_result) + + @mock.patch.object( + xml_reporter.TextAndXMLTestRunner, + 'run', + autospec=True, + side_effect=KeyboardInterrupt, + ) + @mock.patch.object( + test_reporter.TestResult, 'write_summary_in_txt', autospec=True + ) + @mock.patch.object( + test_reporter.TestResult, 'writeAllResultsToXml', autospec=True + ) + @mock.patch('builtins.open', autospec=True) + def test_run_writes_result_in_xml_and_txt_on_interruption( + self, + mock_open, + mock_write_all_results_to_xml, + mock_write_summary_in_txt, + mock_run, + ): + fake_suite = unittest.TestSuite() + now = time.localtime() + expected_xml_path = os.path.join( + os.path.abspath(os.path.dirname(__file__)), + f"summary_{time.strftime('%Y%m%d%H%M%S', now)}.xml", + ) + + with mock.patch.object(time, 'localtime', return_value=now): + test_reporter.TestRunner(xml_file_path=expected_xml_path).run(fake_suite) + + mock_open.assert_called_once_with(expected_xml_path, 'w') + mock_write_all_results_to_xml.assert_called_once() + mock_write_summary_in_txt.assert_called_once() + mock_run.assert_called_once() + + def test_duration_formatter_returns_correct_duration(self): + duration_in_seconds = 3601.0 + + duration = test_reporter.duration_formatter(duration_in_seconds) + + self.assertEqual(duration, '1 hrs, 0 mins, 1 secs') + + @mock.patch('builtins.open', autospec=True) + def test_write_summary_in_txt_should_not_write_any_without_test_result( + self, mock_open + ): + test_reporter.TestResult._instance = None + + test_reporter.TestResult.write_summary_in_txt() + + mock_open.assert_not_called() + + def test_write_summary_in_txt_should_not_write_any_without_test_runs(self): + start_time = 100 + end_time = 200 + self._make_result(start_time, end_time, 0) + + with self.assertLogs() as cm: + test_reporter.TestResult.write_summary_in_txt() + + self.assertEqual( + cm.output[0], 'INFO:root:Summary can not be saved without test runs.' + ) + + @mock.patch('builtins.open', autospec=True) + def test_test_runner_init_injects_correct_logger(self, mock_open): + logger = logging.getLogger('test_reporter') + fake_suite = unittest.TestSuite() + now = time.localtime() + expected_xml_path = os.path.join( + os.path.abspath(os.path.dirname(__file__)), + f"summary_{time.strftime('%Y%m%d%H%M%S', now)}.xml", + ) + + with mock.patch.object(time, 'localtime', return_value=now): + with self.assertLogs(logger) as cm: + test_reporter.TestRunner(xml_file_path=None, logger=logger).run( + fake_suite + ) + + mock_open.assert_called_once_with(expected_xml_path, 'w') + self.assertEqual( + cm.output[0], + f'INFO:test_reporter:Xml file saved to {expected_xml_path}.', + ) + + +if __name__ == '__main__': + unittest.main()
diff --git a/ui_automator/ui_automator.py b/ui_automator/ui_automator.py index 547c2c4..2fd4aee 100644 --- a/ui_automator/ui_automator.py +++ b/ui_automator/ui_automator.py
@@ -1,4 +1,4 @@ -# Copyright 2023 Google LLC +# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,11 +17,15 @@ A python controller that can trigger mobly UI automator snippet to achieve some automated UI operations on Android phones. """ +from concurrent import futures +import enum import functools import logging import os import re +import time from typing import Any, Callable +import unittest from absl import app from absl import flags @@ -31,7 +35,9 @@ from mobly.snippet import errors as snippet_errors from ui_automator import android_device as ad +from ui_automator import commission_reg_test from ui_automator import errors +from ui_automator import test_reporter from ui_automator import version @@ -44,7 +50,8 @@ } _MOBLY_SNIPPET_APK: str = 'com.chip.interop.moblysnippet' _MOBLY_SNIPPET_APK_NAME: str = 'mbs' -_COMMISSIONING_FLAG_USAGE_GUIDE = ( +_REGRESSION_TESTS_TIMEOUT_IN_SECS = 1 +_COMMISSION_FLAG_USAGE_GUIDE = ( 'Use --commission {DeviceName},{PairingCode},{GHARoom} to commission a' ' device to google fabric on GHA.' ) @@ -56,7 +63,7 @@ _COMMISSION = flags.DEFINE_list( name='commission', default=None, - help=_COMMISSIONING_FLAG_USAGE_GUIDE, + help=_COMMISSION_FLAG_USAGE_GUIDE, ) _DECOMMISSION = flags.DEFINE_string( name='decommission', @@ -75,10 +82,14 @@ ) -def _validate_commissioning_arg( +class RegTestSuiteType(enum.Enum): + COMMISSION = 'Commission' + + +def _validate_commission_arg( device_name: str, pairing_code: str, gha_room: str ) -> None: - """Returns None if commissioning values are valid. + """Returns None if arguments passed in `commission` are valid. Args: device_name: Display name of commissioned device on GHA. @@ -165,6 +176,7 @@ logger: Injected logger, if None, specify the default(root) one. """ self._logger = logger + self._is_reg_test_finished = False self._connected_device: android_device.AndroidDevice | None = None def load_device(self): @@ -245,11 +257,8 @@ @get_android_device_ready def commission_device( - self, - device_name: str, - pairing_code: str, - gha_room: str - ): + self, device_name: str, pairing_code: str, gha_room: str + ) -> None: """Commissions a device through installed apk `mbs` on Google Home App. Args: @@ -265,7 +274,7 @@ """ self._logger.info('Start commissioning the device.') - _validate_commissioning_arg(device_name, pairing_code, gha_room) + _validate_commission_arg(device_name, pairing_code, gha_room) device_name_snake_case = f'_{inflection.underscore(device_name)}' matter_device = { @@ -317,17 +326,21 @@ ) from e def run_regression_tests( - self, repeat: int | None, *args: Any, + self, + repeat: int | None, + test_type: RegTestSuiteType, + **kwargs, ) -> None: """Executes automated regression tests. - A single execution of both commissioning and decommissioning constitutes one + A single execution of both commission and decommission constitutes one cycle. Args: repeat: The value of flag `repeat`. If the value is None, regression tests will be run repeatedly until keyboard interrupts. - *args: Any required value to run regression tests. + test_type: The type of test suite for running regression tests. + **kwargs: Required args to run regression test cases. Raises: ValueError: When the value of flag `repeat` is not positive. @@ -335,30 +348,37 @@ if repeat and repeat <= 0: raise ValueError('Number placed after `--repeat` must be positive.') - failure_count = 0 - run_count = 0 self._logger.info( 'Start running regression tests' f' {str(repeat) + " times" if repeat is not None else "continuously"}.' ) - while repeat is None or run_count < repeat: - try: - device_name, _, _ = args - self.commission_device(*args) - self.decommission_device(device_name) - except errors.MoblySnippetError: - failure_count += 1 - except KeyboardInterrupt: - self._logger.info('Tests interrupted by keyboard.') - break - run_count += 1 - self._logger.info( - 'Ran %d times. Passed %d times. Failed %d times.', - run_count, - run_count - failure_count, - failure_count, - ) + self._is_reg_test_finished = False + suite = unittest.TestSuite() + # Once all tests are run, `test_reporter.TestRunner` will produce an XML + # file and a text summary file. If regression test cycles are not predefined + # (i.e., `repeat` is not specified), test cases should be dynamically added + # to the running suite on the fly. + executor = futures.ThreadPoolExecutor(max_workers=1) + if test_type == RegTestSuiteType.COMMISSION: + device_name = kwargs.get('device_name', None) + pairing_code = kwargs.get('pairing_code', None) + gha_room = kwargs.get('gha_room', None) + executor.submit( + self._add_test_to_commission_test_suite, + suite=suite, + repeat=repeat, + device_name=device_name, + pairing_code=pairing_code, + gha_room=gha_room, + ) + + runner = test_reporter.TestRunner(logger=self._logger) + try: + runner.run(suite) + finally: + self._is_reg_test_finished = True + executor.shutdown(wait=False) def _get_mbs_apk_path(self) -> str: return os.path.join( @@ -368,20 +388,69 @@ 'snippet-0.2.2-rc.0.apk', ) + def _add_test_to_commission_test_suite( + self, + suite: unittest.TestSuite, + repeat: int | None, + device_name: str, + pairing_code: str, + gha_room: str, + ) -> None: + """Adds automated regression tests for RegTestSuiteType.COMMISSION. -def _process_flags(ui_automator: UIAutomator) -> None: - """Does specific action based on given flag values.""" - if _COMMISSION.value: - if len(_COMMISSION.value) != 3: - raise flags.IllegalFlagValueError(_COMMISSIONING_FLAG_USAGE_GUIDE) - if _RUN_REGRESSION_TESTS.value: - ui_automator.run_regression_tests(_REPEAT.value, *_COMMISSION.value) - else: - ui_automator.commission_device(*_COMMISSION.value) - elif _DECOMMISSION.value: - ui_automator.decommission_device(_DECOMMISSION.value) - elif _RUN_REGRESSION_TESTS.value: - raise flags.IllegalFlagValueError(_REGRESSION_TESTS_FLAG_USAGE_GUIDE) + Args: + suite: TestSuite instance that tests added to. + repeat: An integer value specifies the number of times the regression + tests should be executed. Or None if tests should be run infinitely. + device_name: Display name of commissioned device on GHA. + pairing_code: An 11-digit or 21-digit numeric code which contains the + information needed to commission a matter device. + gha_room: Assigned room of commissioned device on GHA. + """ + run_count = 0 + while not self._is_reg_test_finished and (not repeat or run_count < repeat): + suite.addTest( + commission_reg_test.CommissionRegTest( + self, + 'test_commission', + device_name=device_name, + pairing_code=pairing_code, + gha_room=gha_room, + ) + ) + suite.addTest( + commission_reg_test.CommissionRegTest( + self, 'test_decommission', device_name=device_name + ) + ) + time.sleep(_REGRESSION_TESTS_TIMEOUT_IN_SECS) + run_count += 1 + + def process_flags(self) -> None: + """Does specific action based on given flag values.""" + if _COMMISSION.value: + if len(_COMMISSION.value) != 3: + raise flags.IllegalFlagValueError(_COMMISSION_FLAG_USAGE_GUIDE) + device_name, pairing_code, gha_room = _COMMISSION.value + if _RUN_REGRESSION_TESTS.value: + self.run_regression_tests( + _REPEAT.value, + RegTestSuiteType.COMMISSION, + device_name=device_name, + pairing_code=pairing_code, + gha_room=gha_room, + ) + else: + self.commission_device( + device_name=device_name, + pairing_code=pairing_code, + gha_room=gha_room, + ) + elif _DECOMMISSION.value: + device_name = _DECOMMISSION.value + self.decommission_device(device_name) + elif _RUN_REGRESSION_TESTS.value: + raise flags.IllegalFlagValueError(_REGRESSION_TESTS_FLAG_USAGE_GUIDE) # TODO(b/309745485): Type of argv should be Sequence[str]. @@ -389,8 +458,7 @@ if argv and len(argv) > 1: raise app.UsageError(f'Too many command-line arguments: {argv!r}') - ui_automator = UIAutomator() - _process_flags(ui_automator) + UIAutomator().process_flags() def run():
diff --git a/ui_automator/ui_automator_test.py b/ui_automator/ui_automator_test.py index dd51af3..a44b947 100644 --- a/ui_automator/ui_automator_test.py +++ b/ui_automator/ui_automator_test.py
@@ -1,4 +1,4 @@ -# Copyright 2023 Google LLC +# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,20 +13,27 @@ # limitations under the License. """Unittest Lab exercise to test implementation of "Synonym Dictionary".""" +import io import os +import re import subprocess import sys +import time +import traceback import unittest from unittest import mock from absl import flags from absl.testing import flagsaver +from absl.testing import xml_reporter from mobly.controllers import android_device from mobly.controllers.android_device_lib import adb from mobly.snippet import errors as snippet_errors from ui_automator import errors +from ui_automator import test_reporter from ui_automator import ui_automator +from ui_automator import unit_test_utils from ui_automator import version _FAKE_MATTER_DEVICE_NAME = 'fake-matter-device-name' @@ -41,12 +48,12 @@ } _PYTHON_PATH = subprocess.check_output(['which', 'python']).decode('utf-8') _PYTHON_BIN_PATH = _PYTHON_PATH.removesuffix('python') -_FAKE_VALID_SYS_ARGV_FOR_COMMISSIONING = [ +_FAKE_VALID_SYS_ARGV_FOR_COMMISSION = [ _PYTHON_BIN_PATH + 'ui-automator', '--commission', 'm5stack,34970112332,Office', ] -_FAKE_VALID_SYS_ARGV_FOR_DECOMMISSIONING = [ +_FAKE_VALID_SYS_ARGV_FOR_DECOMMISSION = [ _PYTHON_BIN_PATH + 'ui-automator', '--decommission', 'm5stack', @@ -57,7 +64,7 @@ 'm5stack,34970112332,Office', '--regtest', '--repeat', - '5', + '3', ] _FAKE_INVALID_SYS_ARGV_FOR_REGRESSION_TESTS = [ _PYTHON_BIN_PATH + 'ui-automator', @@ -65,12 +72,12 @@ '--repeat', '5', ] -_FAKE_SYS_ARGV_FOR_COMMISSIONING_WITH_INVALID_LENGTH = [ +_FAKE_SYS_ARGV_FOR_COMMISSION_WITH_INVALID_LENGTH = [ _PYTHON_BIN_PATH + 'ui-automator', '--commission', 'm5', ] -_FAKE_SYS_ARGV_FOR_COMMISSIONING_WITH_EMPTY_VALUE = [ +_FAKE_SYS_ARGV_FOR_COMMISSION_WITH_EMPTY_VALUE = [ _PYTHON_BIN_PATH + 'ui-automator', '--commission', ] @@ -479,7 +486,7 @@ def test_run_calls_commission_device_with_valid_arguments( self, mock_commission_device, mock_exit ): - with mock.patch.object(sys, 'argv', _FAKE_VALID_SYS_ARGV_FOR_COMMISSIONING): + with mock.patch.object(sys, 'argv', _FAKE_VALID_SYS_ARGV_FOR_COMMISSION): ui_automator.run() mock_commission_device.assert_called_once_with( @@ -490,7 +497,7 @@ @flagsaver.flagsaver((ui_automator._COMMISSION, ['m5'])) def test_commission_with_cmd_invalid_arg_should_raise_an_error(self): with mock.patch.object( - sys, 'argv', _FAKE_SYS_ARGV_FOR_COMMISSIONING_WITH_INVALID_LENGTH + sys, 'argv', _FAKE_SYS_ARGV_FOR_COMMISSION_WITH_INVALID_LENGTH ): with self.assertRaises(flags.IllegalFlagValueError): ui_automator.run() @@ -503,7 +510,7 @@ self, mock_exit, mock_stderr_write ): with mock.patch.object( - sys, 'argv', _FAKE_SYS_ARGV_FOR_COMMISSIONING_WITH_EMPTY_VALUE + sys, 'argv', _FAKE_SYS_ARGV_FOR_COMMISSION_WITH_EMPTY_VALUE ): ui_automator.run() @@ -532,9 +539,7 @@ def test_run_calls_decommission_device_with_valid_arguments( self, mock_decommission_device, mock_exit, mock_commission_device ): - with mock.patch.object( - sys, 'argv', _FAKE_VALID_SYS_ARGV_FOR_DECOMMISSIONING - ): + with mock.patch.object(sys, 'argv', _FAKE_VALID_SYS_ARGV_FOR_DECOMMISSION): ui_automator.run() mock_commission_device.assert_not_called() @@ -608,7 +613,12 @@ ui_automator.run() mock_run_regression_tests.assert_called_once_with( - mock.ANY, 5, *['m5stack', '34970112332', 'Office'] + mock.ANY, + 3, + ui_automator.RegTestSuiteType.COMMISSION, + device_name='m5stack', + pairing_code='34970112332', + gha_room='Office', ) mock_commission_device.assert_not_called() mock_exit.assert_called_once() @@ -643,6 +653,8 @@ @flagsaver.flagsaver( (ui_automator._COMMISSION, ['m5stack', '34970112332', 'Office']) ) + @mock.patch.object(time, 'sleep', autospec=True) + @mock.patch('builtins.open', autospec=True) @mock.patch.object(android_device, 'get_all_instances', autospec=True) @mock.patch.object( ui_automator.UIAutomator, 'commission_device', autospec=True @@ -655,12 +667,19 @@ mock_decommission_device, mock_commission_device, mock_get_all_instances, + mock_open, + mock_sleep, ): + mock_sleep.return_value = None mock_get_all_instances.return_value = [self.mock_android_device] with self.assertLogs() as cm: self.ui_automator.run_regression_tests( - 5, *['m5stack', '34970112332', 'Office'] + 5, + ui_automator.RegTestSuiteType.COMMISSION, + device_name='m5stack', + pairing_code='34970112332', + gha_room='Office', ) self.assertEqual(mock_commission_device.call_count, 5) @@ -668,13 +687,13 @@ self.assertEqual( cm.output[0], 'INFO:root:Start running regression tests 5 times.' ) - self.assertEqual( - cm.output[1], 'INFO:root:Ran 5 times. Passed 5 times. Failed 0 times.' - ) + self.assertEqual(mock_open.call_count, 2) @flagsaver.flagsaver( (ui_automator._COMMISSION, ['m5stack', '34970112332', 'Office']) ) + @mock.patch.object(time, 'sleep', autospec=True) + @mock.patch('builtins.open', autospec=True) @mock.patch.object(android_device, 'get_all_instances', autospec=True) @mock.patch.object( ui_automator.UIAutomator, 'commission_device', autospec=True @@ -687,7 +706,10 @@ mock_decommission_device, mock_commission_device, mock_get_all_instances, + mock_open, + mock_sleep, ): + mock_sleep.return_value = None mock_commission_device.side_effect = [ None, None, @@ -699,7 +721,11 @@ with self.assertLogs() as cm: self.ui_automator.run_regression_tests( - 5, *['m5stack', '34970112332', 'Office'] + 5, + ui_automator.RegTestSuiteType.COMMISSION, + device_name='m5stack', + pairing_code='34970112332', + gha_room='Office', ) self.assertEqual(mock_commission_device.call_count, 5) @@ -707,19 +733,25 @@ self.assertEqual( cm.output[0], 'INFO:root:Start running regression tests 5 times.' ) - self.assertEqual( - cm.output[1], 'INFO:root:Ran 5 times. Passed 4 times. Failed 1 times.' - ) + self.assertEqual(mock_open.call_count, 2) def test_run_regression_tests_raises_an_error_with_invalid_input(self): with self.assertRaisesRegex( ValueError, 'Number placed after `--repeat` must be positive.' ): - self.ui_automator.run_regression_tests(-5) + self.ui_automator.run_regression_tests( + -5, + ui_automator.RegTestSuiteType.COMMISSION, + device_name='m5stack', + pairing_code='34970112332', + gha_room='Office', + ) @flagsaver.flagsaver( (ui_automator._COMMISSION, ['m5stack', '34970112332', 'Office']) ) + @mock.patch.object(time, 'sleep', autospec=True) + @mock.patch('builtins.open', autospec=True) @mock.patch.object(android_device, 'get_all_instances', autospec=True) @mock.patch.object( ui_automator.UIAutomator, 'commission_device', autospec=True @@ -732,7 +764,10 @@ mock_decommission_device, mock_commission_device, mock_get_all_instances, + mock_open, + mock_sleep, ): + mock_sleep.return_value = None mock_commission_device.side_effect = [ None, errors.MoblySnippetError('fake_error'), @@ -744,7 +779,11 @@ mock_get_all_instances.return_value = [self.mock_android_device] with self.assertLogs() as cm: self.ui_automator.run_regression_tests( - None, *['m5stack', '34970112332', 'Office'] + None, + ui_automator.RegTestSuiteType.COMMISSION, + device_name='m5stack', + pairing_code='34970112332', + gha_room='Office', ) self.assertEqual(mock_commission_device.call_count, 6) @@ -752,10 +791,222 @@ self.assertEqual( cm.output[0], 'INFO:root:Start running regression tests continuously.' ) - self.assertEqual(cm.output[1], 'INFO:root:Tests interrupted by keyboard.') - self.assertEqual( - cm.output[2], 'INFO:root:Ran 5 times. Passed 3 times. Failed 2 times.' + self.assertEqual(mock_open.call_count, 2) + + @flagsaver.flagsaver( + (ui_automator._COMMISSION, ['m5stack', '34970112332', 'Office']) + ) + @mock.patch.object(sys, 'exit', autospec=True) + @mock.patch.object(time, 'time', autospec=True) + @mock.patch.object(time, 'sleep', autospec=True) + @mock.patch('builtins.open', autospec=True) + @mock.patch.object(android_device, 'get_all_instances', autospec=True) + @mock.patch.object( + ui_automator.UIAutomator, 'commission_device', autospec=True + ) + @mock.patch.object( + ui_automator.UIAutomator, 'decommission_device', autospec=True + ) + def test_run_calls_run_regression_tests_and_produces_summary_in_txt( + self, + mock_decommission_device, + mock_commission_device, + mock_get_all_instances, + mock_open, + mock_sleep, + mock_time, + mock_exit, + ): + txt_stream = io.StringIO() + mock_sleep.return_value = None + fake_error = errors.MoblySnippetError('error') + mock_commission_device.side_effect = [ + fake_error, + None, + None, + ] + mock_decommission_device.side_effect = [ + None, + fake_error, + ] + mock_get_all_instances.return_value = [self.mock_android_device] + mock_open.side_effect = [io.StringIO(), txt_stream] + # mock_time called by startTestRun, startTest, stopTest, and stopTestRun. + # startTest and stopTest will be called when running test cases. + # There are 3 test_commission and 3 test_decommission in this test suite. + # So 6 test cases will call mock_time for 12 times. + mock_time.side_effect = [0] + list(range(12)) + [11] + expected_summary = unit_test_utils.make_summary( + test_date=time.strftime('%Y/%m/%d', time.localtime(0)), + duration=test_reporter.duration_formatter(11), + total_runs=3, + total_successful_runs=1, ) + expected_test_case_result = unit_test_utils.make_test_case_result( + 3, + res_of_test_commission=['FAIL', 'PASS', 'PASS'], + res_of_test_decommission=['N/A', 'PASS', 'FAIL'], + ) + err = (errors.MoblySnippetError, fake_error, fake_error.__traceback__) + fake_err_msg = ''.join(traceback.format_exception(*err)) + + with mock.patch.object(txt_stream, 'close'): + with mock.patch.object( + test_reporter.TestResult, + '_exc_info_to_string', + return_value=fake_err_msg, + ): + with mock.patch.object( + sys, 'argv', _FAKE_VALID_SYS_ARGV_FOR_REGRESSION_TESTS + ): + ui_automator.run() + + self.assertEqual(mock_commission_device.call_count, 3) + self.assertEqual(mock_decommission_device.call_count, 2) + self.assertEqual( + expected_summary + '\n\n' + expected_test_case_result, + txt_stream.getvalue(), + ) + mock_exit.assert_called_once() + + @flagsaver.flagsaver( + (ui_automator._COMMISSION, ['m5stack', '34970112332', 'Office']) + ) + @mock.patch.object(sys, 'exit', autospec=True) + @mock.patch.object(time, 'time', autospec=True) + @mock.patch.object(time, 'sleep', autospec=True) + @mock.patch('builtins.open', autospec=True) + @mock.patch.object(android_device, 'get_all_instances', autospec=True) + @mock.patch.object( + ui_automator.UIAutomator, 'commission_device', autospec=True + ) + @mock.patch.object( + ui_automator.UIAutomator, 'decommission_device', autospec=True + ) + def test_run_calls_run_regression_tests_and_produces_summary_in_xml( + self, + mock_decommission_device, + mock_commission_device, + mock_get_all_instances, + mock_open, + mock_sleep, + mock_time, + mock_exit, + ): + xml_stream = io.StringIO() + mock_sleep.return_value = None + fake_error = errors.MoblySnippetError('error') + mock_commission_device.side_effect = [ + fake_error, + None, + None, + ] + mock_decommission_device.side_effect = [ + None, + fake_error, + ] + mock_get_all_instances.return_value = [self.mock_android_device] + mock_open.side_effect = [xml_stream, io.StringIO()] + # mock_time called by startTestRun, startTest, stopTest, and stopTestRun. + # startTest and stopTest will be called when running test cases. + # There are 3 test_commission and 3 test_decommission in this test suite. + # So 6 test cases will call mock_time for 12 times. + mock_time.side_effect = [0] + list(range(12)) + [11] + expected_test_suite_re = unit_test_utils.OUTPUT_STRING % { + 'suite_name': 'CommissionRegTest', + 'tests': 6, + 'failures': 0, + 'errors': 2, + 'run_time': 11, + 'start_time': re.escape(unit_test_utils.iso_timestamp(0)), + } + err = (errors.MoblySnippetError, fake_error, fake_error.__traceback__) + fake_err_msg = ''.join(traceback.format_exception(*err)) + expected_testcase1_re = unit_test_utils.TESTCASE_STRING_WITH_ERRORS % { + 'run_time': 1, + 'start_time': re.escape(unit_test_utils.iso_timestamp(0)), + 'test_name': 'test_commission', + 'class_name': 'google3.java.com.google.assistant.verticals.homeautomation.partners.ui_automator.commission_reg_test.CommissionRegTest', + 'status': 'run', + 'result': 'FAIL', + 'message': xml_reporter._escape_xml_attr(str(err[1])), + 'error_type': xml_reporter._escape_xml_attr(str(err[0])), + 'error_msg': xml_reporter._escape_cdata(fake_err_msg), + } + expected_testcase2_re = unit_test_utils.TESTCASE_STRING_WITH_PROPERTIES % { + 'run_time': 1, + 'start_time': re.escape(unit_test_utils.iso_timestamp(2)), + 'test_name': 'test_decommission', + 'class_name': 'google3.java.com.google.assistant.verticals.homeautomation.partners.ui_automator.commission_reg_test.CommissionRegTest', + 'status': 'notrun', + 'result': 'N/A', + 'properties': ( + ' <property name="skip_reason" value="%s"></property>' + % (xml_reporter._escape_xml_attr('Device was not commissioned.'),) + ), + 'message': '', + } + expected_testcase3_re = unit_test_utils.TESTCASE_STRING % { + 'run_time': 1, + 'start_time': re.escape(unit_test_utils.iso_timestamp(4)), + 'test_name': 'test_commission', + 'class_name': 'google3.java.com.google.assistant.verticals.homeautomation.partners.ui_automator.commission_reg_test.CommissionRegTest', + 'status': 'run', + 'result': 'PASS', + } + expected_testcase4_re = unit_test_utils.TESTCASE_STRING % { + 'run_time': 1, + 'start_time': re.escape(unit_test_utils.iso_timestamp(6)), + 'test_name': 'test_decommission', + 'class_name': 'google3.java.com.google.assistant.verticals.homeautomation.partners.ui_automator.commission_reg_test.CommissionRegTest', + 'status': 'run', + 'result': 'PASS', + } + expected_testcase5_re = unit_test_utils.TESTCASE_STRING % { + 'run_time': 1, + 'start_time': re.escape(unit_test_utils.iso_timestamp(8)), + 'test_name': 'test_commission', + 'class_name': 'google3.java.com.google.assistant.verticals.homeautomation.partners.ui_automator.commission_reg_test.CommissionRegTest', + 'status': 'run', + 'result': 'PASS', + } + expected_testcase6_re = unit_test_utils.TESTCASE_STRING_WITH_ERRORS % { + 'run_time': 1, + 'start_time': re.escape(unit_test_utils.iso_timestamp(10)), + 'test_name': 'test_decommission', + 'class_name': 'google3.java.com.google.assistant.verticals.homeautomation.partners.ui_automator.commission_reg_test.CommissionRegTest', + 'status': 'run', + 'result': 'FAIL', + 'message': xml_reporter._escape_xml_attr(str(err[1])), + 'error_type': xml_reporter._escape_xml_attr(str(err[0])), + 'error_msg': xml_reporter._escape_cdata(fake_err_msg), + } + + with mock.patch.object( + test_reporter.TestResult, + '_exc_info_to_string', + return_value=fake_err_msg, + ): + with mock.patch.object( + sys, 'argv', _FAKE_VALID_SYS_ARGV_FOR_REGRESSION_TESTS + ): + ui_automator.run() + + self.assertEqual(mock_commission_device.call_count, 3) + self.assertEqual(mock_decommission_device.call_count, 2) + (testcases,) = re.search( + expected_test_suite_re, xml_stream.getvalue() + ).groups() + [testcase1, testcase2, testcase3, testcase4, testcase5, testcase6] = ( + testcases.split('\n </testcase>\n') + ) + self.assertRegex(testcase1, expected_testcase1_re) + self.assertRegex(testcase2, expected_testcase2_re) + self.assertRegex(testcase3, expected_testcase3_re) + self.assertRegex(testcase4, expected_testcase4_re) + self.assertRegex(testcase5, expected_testcase5_re) + self.assertRegex(testcase6, expected_testcase6_re) + mock_exit.assert_called_once() if __name__ == '__main__':
diff --git a/ui_automator/unit_test_utils.py b/ui_automator/unit_test_utils.py new file mode 100644 index 0000000..d383152 --- /dev/null +++ b/ui_automator/unit_test_utils.py
@@ -0,0 +1,129 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Utilities for unit tests.""" +import datetime + +# Matches the entire XML output. Captures all <testcase> tags except for the +# last closing </testcase> in a single group. +OUTPUT_STRING = r"""<\?xml version="1.0"\?> +<testsuites name="" tests="%(tests)d" failures="%(failures)d" errors="%(errors)d" time="%(run_time).3f" timestamp="%(start_time)s"> +<testsuite name="%(suite_name)s" tests="%(tests)d" failures="%(failures)d" errors="%(errors)d" time="%(run_time).3f" timestamp="%(start_time)s"> +( <testcase [.\S\s]*) + </testcase> +</testsuite> +</testsuites> +""" + +# Matches a single <testcase> tag and its contents, without the closing +# </testcase>, which we use as a separator to split multiple <testcase> tags. +TESTCASE_STRING_WITH_PROPERTIES = r""" <testcase name="%(test_name)s" status="%(status)s" result="%(result)s" time="%(run_time).3f" classname="%(class_name)s" timestamp="%(start_time)s"> + <properties> +%(properties)s + </properties>%(message)s""" + +# Matches a single <testcase> tag and its contents, without the closing +# </testcase>, which we use as a separator to split multiple <testcase> tags. +TESTCASE_STRING_WITH_ERRORS = r""" <testcase name="%(test_name)s" status="%(status)s" result="%(result)s" time="%(run_time).3f" classname="%(class_name)s" timestamp="%(start_time)s"> + <error message="%(message)s" type="%(error_type)s"><\!\[CDATA\[%(error_msg)s\]\]></error>""" + +# Matches a single <testcase> tag and its contents, without the closing +# </testcase>, which we use as a separator to split multiple <testcase> tags. +TESTCASE_STRING = r""" <testcase name="%(test_name)s" status="%(status)s" result="%(result)s" time="%(run_time).3f" classname="%(class_name)s" timestamp="%(start_time)s">""" + + +def iso_timestamp(timestamp: float) -> str: + """Makes timestamp in iso format for unit tests. + + Args: + timestamp: Time value in float. + + Returns: + Formatted time according to ISO. + """ + return datetime.datetime.fromtimestamp( + timestamp, tz=datetime.timezone.utc + ).isoformat() + + +def make_summary(**kwargs) -> str: + """Makes test summary produced by `test_reporter` for unit tests. + + Args: + **kwargs: Fields written in test summary. + + Returns: + Test summary. + """ + dut = kwargs.get('dut', 'placeholder') + gha = kwargs.get('gha', 'placeholder') + test_date = kwargs.get('test_date', 'placeholder') + gms_core = kwargs.get('gms_core', 'placeholder') + duration = kwargs.get('duration', 0) + hub = kwargs.get('hub', 'placeholder') + total_runs = kwargs.get('total_runs', 0) + device = kwargs.get('device', 'placeholder') + total_successful_runs = kwargs.get('total_successful_runs', 0) + success_rate = round(100.0 * float(total_successful_runs) / float(total_runs)) + rows: list[list[str]] = [] + rows.append(['Summary', '', 'Version Info', '']) + rows.append(['DUT:', dut, 'GHA', gha]) + rows.append(['Test Time:', test_date, 'GMSCore', gms_core]) + rows.append(['Duration:', duration, 'Hub', hub]) + rows.append(['Number of runs:', str(total_runs), 'Device', device]) + rows.append([ + 'Success Rate:', + f'{success_rate}%({total_successful_runs}/{total_runs})', + ]) + summary = [] + for row in rows: + summary.append(''.join(element.ljust(25) for element in row)) + summary.append('\n') + + return ''.join(summary) + + +def make_test_case_result( + total_runs: int, + res_of_test_commission: list[str] | None, + res_of_test_decommission: list[str] | None, +) -> str: + """Makes test case result produced by `test_reporter` for unit tests. + + Args: + total_runs: Total cycles for regression tests. + res_of_test_commission: Elements in list should be `PASS`, `FAIL` or `N/A`, + which indicates the test result for each test case `test_commission`. + res_of_test_decommission: Elements in list should be `PASS`, `FAIL` or + `N/A`, which indicates the test result for each test case + `test_decommission`. + + Returns: + Test case result. + """ + test_case_result = ['Test Case/Test Run'.ljust(25)] + for i in range(total_runs): + test_case_result.append(f'#{i + 1}'.ljust(8)) + if res_of_test_commission: + test_case_result.append('\n') + test_case_result.append('Commission to GHA'.ljust(25)) + for res in res_of_test_commission: + test_case_result.append(res.ljust(8)) + if res_of_test_decommission: + test_case_result.append('\n') + test_case_result.append('Removing from GHA'.ljust(25)) + for res in res_of_test_decommission: + test_case_result.append(res.ljust(8)) + + return ''.join(test_case_result)