Add regression test reporter for analysis
PiperOrigin-RevId: 600987475
diff --git a/ui_automator/commission_reg_test.py b/ui_automator/commission_reg_test.py
new file mode 100644
index 0000000..c210401
--- /dev/null
+++ b/ui_automator/commission_reg_test.py
@@ -0,0 +1,43 @@
+from __future__ import annotations
+from typing import TYPE_CHECKING
+import unittest
+
+# pylint:disable=g-import-not-at-top
+if TYPE_CHECKING:
+ from ui_automator import ui_automator
+
+# TODO(b/318771536): Check if a device is commissioned on GHA.
+_commissioned: bool = False
+
+
+class CommissionRegTest(unittest.TestCase):
+ """Test class for running commission regression test."""
+
+ def __init__(
+ self,
+ ui_automator: ui_automator.UIAutomator,
+ test_name: str,
+ device_name: str | None,
+ pairing_code: str | None = None,
+ gha_room: str | None = None,
+ ) -> None:
+ super().__init__(methodName=test_name)
+ self.ui_automator = ui_automator
+ self.device_name = device_name
+ self.pairing_code = pairing_code
+ self.gha_room = gha_room
+
+ def test_commission(self) -> None:
+ global _commissioned
+ _commissioned = False
+ self.ui_automator.commission_device(
+ self.device_name, self.pairing_code, self.gha_room
+ )
+ # TODO(b/318771536): Check if a device is commissioned on GHA.
+ _commissioned = True
+
+ def test_decommission(self) -> None:
+ if not _commissioned:
+ self.skipTest('Device was not commissioned.')
+
+ self.ui_automator.decommission_device(self.device_name)
diff --git a/ui_automator/test_reporter.py b/ui_automator/test_reporter.py
new file mode 100644
index 0000000..ab906c8
--- /dev/null
+++ b/ui_automator/test_reporter.py
@@ -0,0 +1,360 @@
+# Copyright 2024 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A helper to create reports in TXT and XML from running regression tests.
+
+The helper inherits from absl/testing/xml_reporter.py and provides two
+additional properties, including producing a xml file even when test is
+interrupted and producing a txt file of test summary.
+"""
+
+import collections
+import io
+import logging
+import os
+import time
+import unittest
+
+from absl.testing import xml_reporter
+import immutabledict
+
+
+_REPORT_TEST_CASE_TITLE_TO_TEST_NAME = immutabledict.immutabledict({
+ 'Commission to GHA': 'test_commission',
+ 'Removing from GHA': 'test_decommission',
+})
+
+_SUMMARY_COL_INDENTS = 25
+_TEST_CASE_TITLE_INDENTS = 25
+_TEST_RESULT_INDENTS = 8
+
+
+def duration_formatter(duration_in_seconds: float) -> str:
+ hour_count, remainder = divmod(duration_in_seconds, 3600)
+ minute_count, second_count = divmod(remainder, 60)
+ return (
+ f'{int(hour_count)} hrs, {int(minute_count)} mins,'
+ f' {int(second_count)} secs'
+ )
+
+
+# pylint: disable=protected-access
+class _TestCaseResult(xml_reporter._TestCaseResult):
+ """Private helper for TestResult."""
+
+ def __init__(self, test):
+ super().__init__(test)
+ self.status = None
+ self.result = None
+
+ def print_xml_summary(self, stream: io.TextIOWrapper):
+ """Prints an XML summary of a TestCase.
+
+ Status and result are populated as per JUnit XML test result reporter.
+ A test that has been skipped will always have a skip reason,
+ as every skip method in Python's unittest requires the reason arg to be
+ passed.
+
+ Args:
+ stream: output stream to write test report XML to.
+ """
+ if self.skip_reason is None:
+ self.status = 'run'
+ self.result = 'PASS' if not self.errors else 'FAIL'
+ else:
+ self.status = 'notrun'
+ self.result = 'N/A'
+
+ test_case_attributes = [
+ ('name', self.name),
+ ('status', self.status),
+ ('result', self.result),
+ ('time', f'{self.run_time:.3f}'),
+ ('classname', self.full_class_name),
+ ('timestamp', xml_reporter._iso8601_timestamp(self.start_time)),
+ ]
+ xml_reporter._print_xml_element_header(
+ 'testcase', test_case_attributes, stream, indentation=' '
+ )
+ self._print_testcase_details(stream)
+ if self.skip_reason:
+ stream.write(' <properties>\n')
+ stream.write(
+ ' <property name="%s" value="%s"></property>\n'
+ % (
+ xml_reporter._escape_xml_attr('skip_reason'),
+ xml_reporter._escape_xml_attr(self.skip_reason),
+ )
+ )
+ stream.write(' </properties>\n')
+ stream.write(' </testcase>\n')
+
+
+class _TestSuiteResult(xml_reporter._TestSuiteResult):
+ """Private helper for TestResult.
+
+ The `print_xml_summary` function has been overridden to present test cases in
+ the XML file in ascending order by their start time. This modification is
+ implemented solely at line 168, where the sorting criterion has been changed
+ from `t.name` to `t.start_time`. All other lines remain identical to those in
+ the original `xml_reporter`.
+ """
+
+ def print_xml_summary(self, stream: io.TextIOWrapper):
+ """Prints an XML Summary of a TestSuite.
+
+ Args:
+ stream: output stream to write test report XML to.
+ """
+ overall_test_count = sum(len(x) for x in self.suites.values())
+ overall_failures = sum(self.failure_counts.values())
+ overall_errors = sum(self.error_counts.values())
+ overall_attributes = [
+ ('name', ''),
+ ('tests', f'{overall_test_count}'),
+ ('failures', f'{overall_failures}'),
+ ('errors', f'{overall_errors}'),
+ ('time', f'{(self.overall_end_time - self.overall_start_time):.3f}'),
+ ('timestamp', xml_reporter._iso8601_timestamp(self.overall_start_time)),
+ ]
+ xml_reporter._print_xml_element_header(
+ 'testsuites', overall_attributes, stream
+ )
+ if self._testsuites_properties:
+ stream.write(' <properties>\n')
+ for name, value in sorted(self._testsuites_properties.items()):
+ stream.write(
+ ' <property name="%s" value="%s"></property>\n'
+ % (
+ xml_reporter._escape_xml_attr(name),
+ xml_reporter._escape_xml_attr(str(value)),
+ )
+ )
+ stream.write(' </properties>\n')
+
+ for suite_name in self.suites:
+ suite = self.suites[suite_name]
+ suite_end_time = max(x.start_time + x.run_time for x in suite)
+ suite_start_time = min(x.start_time for x in suite)
+ failures = self.failure_counts[suite_name]
+ errors = self.error_counts[suite_name]
+ suite_attributes = [
+ ('name', f'{suite_name}'),
+ ('tests', f'{len(suite)}'),
+ ('failures', f'{failures}'),
+ ('errors', f'{errors}'),
+ ('time', f'{(suite_end_time - suite_start_time):.3f}'),
+ ('timestamp', xml_reporter._iso8601_timestamp(suite_start_time)),
+ ]
+ xml_reporter._print_xml_element_header(
+ 'testsuite', suite_attributes, stream
+ )
+
+ # test_case_result entries are not guaranteed to be in any user-friendly
+ # order, especially when using subtests. So sort them.
+ for test_case_result in sorted(suite, key=lambda t: t.start_time):
+ test_case_result.print_xml_summary(stream)
+ stream.write('</testsuite>\n')
+ stream.write('</testsuites>\n')
+
+
+# pylint: disable=protected-access
+class TestResult(xml_reporter._TextAndXMLTestResult):
+ """Test Result class to write test results even when test is terminated."""
+
+ _TEST_SUITE_RESULT_CLASS = _TestSuiteResult
+ _TEST_CASE_RESULT_CLASS = _TestCaseResult
+
+ # Instance of TestResult class. Allows writing test results when suite is
+ # terminated - use `writeAllResultsToXml` classmethod.
+ # Technicaly it's possible to have more than one instance of this class but it
+ # doesn't happen when running tests that write to XML file (since they'd
+ # override this file results) so we can safely use only last created instance.
+ _instance = None
+
+ def __init__(self, logger=logging.getLogger(), *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ TestResult._instance = self
+ self._logger = logger
+
+ @classmethod
+ def write_summary_in_txt(cls) -> None:
+ if not cls._instance:
+ return
+
+ summary_file_path = (
+ f"summary_{time.strftime('%Y%m%d%H%M%S', time.localtime())}.txt"
+ )
+
+ test_results_by_test_name: dict[str, list[_TestCaseResult]] = (
+ collections.defaultdict(list)
+ )
+ # Groups test cases by its name for later sorting.
+ # It is to avoid incorrect sorting as different suites may run same test
+ # cases, but those test cases do not run in order.
+ for suite_name in cls._instance.suite.suites:
+ suite = cls._instance.suite.suites[suite_name]
+ for test_case_result in suite:
+ test_results_by_test_name[test_case_result.name].append(
+ test_case_result
+ )
+
+ if not test_results_by_test_name:
+ cls._instance._logger.info('Summary can not be saved without test runs.')
+ return
+
+ # Tests can be stopped unexpectedly, so find the maximum as total runs.
+ total_runs = max(map(len, test_results_by_test_name.values()))
+
+ is_regression_test_pass = [True for _ in range(total_runs)]
+
+ # Save each test case result to a temp stream.
+ test_case_result_stream = io.StringIO()
+ test_case_result_stream.write(
+ f"{'Test Case/Test Run':<{_TEST_CASE_TITLE_INDENTS}}"
+ )
+ for i in range(total_runs):
+ test_case_result_stream.write(f"{f'#{i + 1}':<{_TEST_RESULT_INDENTS}}")
+ for test_case_title in _REPORT_TEST_CASE_TITLE_TO_TEST_NAME:
+ if test_results_by_test_name[
+ _REPORT_TEST_CASE_TITLE_TO_TEST_NAME[test_case_title]
+ ]:
+ test_case_result_stream.write('\n')
+ test_case_result_stream.write(
+ f'{test_case_title:<{_TEST_CASE_TITLE_INDENTS}}'
+ )
+ for i, test_result in enumerate(
+ sorted(
+ test_results_by_test_name[
+ _REPORT_TEST_CASE_TITLE_TO_TEST_NAME[test_case_title]
+ ],
+ key=lambda t: t.start_time,
+ )
+ ):
+ if is_regression_test_pass[i] and test_result.result != 'PASS':
+ is_regression_test_pass[i] = False
+ test_case_result_stream.write(
+ f'{test_result.result:<{_TEST_RESULT_INDENTS}}'
+ )
+
+ # Start writing summary.
+ test_date = time.strftime(
+ '%Y/%m/%d', time.localtime(cls._instance.suite.overall_start_time)
+ )
+ duration = duration_formatter(
+ cls._instance.suite.overall_end_time
+ - cls._instance.suite.overall_start_time
+ )
+ total_successful_runs = is_regression_test_pass.count(True)
+ success_rate = round(
+ 100.0 * float(total_successful_runs) / float(total_runs)
+ )
+ # TODO(b/317837867): Replace all placeholders with real values.
+ rows: list[list[str]] = []
+ rows.append(['Summary', '', 'Version Info', ''])
+ rows.append(['DUT:', 'placeholder', 'GHA', 'placeholder'])
+ rows.append(['Test Time:', test_date, 'GMSCore', 'placeholder'])
+ rows.append(['Duration:', duration, 'Hub', 'placeholder'])
+ rows.append(['Number of runs:', str(total_runs), 'Device', 'placeholder'])
+ rows.append([
+ 'Success Rate:',
+ f'{success_rate}%({total_successful_runs}/{total_runs})',
+ ])
+
+ f = open(summary_file_path, 'w', encoding='utf-8')
+ for row in rows:
+ f.writelines(element.ljust(_SUMMARY_COL_INDENTS) for element in row)
+ f.write('\n')
+
+ f.writelines(['\n', '\n'])
+
+ # Writes test case result saved in temp stream.
+ f.write(test_case_result_stream.getvalue())
+
+ f.close()
+ cls._instance._logger.info(
+ 'Summary of regression tests has been written to %s.', summary_file_path
+ )
+
+ @classmethod
+ def writeAllResultsToXml(cls):
+ """Writes current results to XML output. Used when test is interrupted."""
+ if cls._instance:
+ cls._instance.writeXmlResultsForTerminatedSuite()
+
+ def writeXmlResultsForTerminatedSuite(self):
+ """Writes test results to XML output."""
+ self._writeSuiteResultsToXml()
+ self._logger.info('TestResult class stream written to results XML.')
+
+ def _writeSuiteResultsToXml(self):
+ self.suite.print_xml_summary(self.xml_stream)
+
+
+class TestRunner(xml_reporter.TextAndXMLTestRunner):
+ """A test runner that can produce both formatted text results and XML.
+
+ It prints out a summary of the results in XML and a summary of results in TXT
+ at the end of the test run even if the test is interrupted.
+ """
+
+ _TEST_RESULT_CLASS = TestResult
+
+ def __init__(
+ self, xml_file_path=None, logger=logging.getLogger(), *args, **kwargs
+ ):
+ """Initializes a TestRunner.
+
+ Args:
+ xml_file_path: XML-formatted test results are output to this file path. If
+ None (the default), xml file will be written to default path.
+ logger: Logger instance.
+ *args: passed unmodified to xml_reporter.TextAndXMLTestRunner.__init__.
+ **kwargs: passed unmodified to xml_reporter.TextAndXMLTestRunner.__init__.
+ """
+ self._logger = logger
+ cur_path = os.path.abspath(os.path.dirname(__file__))
+ default_xml_file_path = os.path.join(
+ cur_path,
+ f"summary_{time.strftime('%Y%m%d%H%M%S', time.localtime())}.xml",
+ )
+ self._xml_file_path = xml_file_path or default_xml_file_path
+ super().__init__(xml_stream=open(self._xml_file_path, 'w'), *args, **kwargs)
+
+ def run(self, suite: unittest.TestSuite) -> None:
+ """Runs tests and generates reports in XML and TXT.
+
+ Args:
+ suite: TestSuite should be run for regression testing.
+ """
+ try:
+ super().run(suite)
+ except KeyboardInterrupt:
+ # In case tests are interrupted, xml file won't be written by TestRunner.
+ # Hence, call `writeAllResultsToXml` to write data to given xml path.
+ TestResult.writeAllResultsToXml()
+ finally:
+ self._logger.info(f'Xml file saved to {self._xml_file_path}.')
+ TestResult.write_summary_in_txt()
+
+ def _makeResult(self):
+ return self._TEST_RESULT_CLASS(
+ logger=self._logger,
+ xml_stream=self._xml_stream,
+ stream=self.stream,
+ descriptions=self.descriptions,
+ verbosity=self.verbosity,
+ time_getter=time.time,
+ testsuites_properties=self._testsuites_properties,
+ )
diff --git a/ui_automator/test_reporter_test.py b/ui_automator/test_reporter_test.py
new file mode 100644
index 0000000..0470fde
--- /dev/null
+++ b/ui_automator/test_reporter_test.py
@@ -0,0 +1,351 @@
+# Copyright 2024 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Unittest for test reporter."""
+import io
+import logging
+import os
+import re
+import time
+import traceback
+from typing import Any, Literal
+import unittest
+from unittest import mock
+
+from absl.testing import xml_reporter
+
+from ui_automator import test_reporter
+from ui_automator import unit_test_utils
+
+
+class StringIOWriteLn(io.StringIO):
+
+ def writeln(self, line):
+ self.write(line + '\n')
+
+
+class MockTest(unittest.TestCase):
+ failureException = AssertionError
+
+ def __init__(self, name):
+ super().__init__()
+ self.name = name
+
+ def id(self):
+ return self.name
+
+ def shortDescription(self):
+ return "This is this test's description."
+
+
+def _run_test_from_result(
+ test: MockTest,
+ test_result: test_reporter.TestResult,
+ result: Literal['PASS', 'N/A', 'FAIL'],
+ err=None,
+ skip_reason: str | None = None,
+) -> None:
+ test_result.startTest(test)
+ if result == 'FAIL':
+ test_result.addError(test, err)
+ elif result == 'N/A':
+ test_result.addSkip(test, skip_reason)
+ else:
+ test_result.addSuccess(test)
+ test_result.stopTest(test)
+
+
+class TestReporterTest(unittest.TestCase):
+
+ def setUp(self):
+ super().setUp()
+ self.stream = StringIOWriteLn()
+ self.xml_stream = io.StringIO()
+
+ def _make_result(
+ self, start_time: int, end_time: int, test_count: int
+ ) -> test_reporter.TestResult:
+ # Called by result.startTestRun().
+ times = [start_time]
+ for i in range(test_count):
+ # Called by result.startTest().
+ times.append(start_time)
+ # Called by result.stopTest().
+ times.append(end_time + i)
+ # Called by result.stopTestRun().
+ times.append(end_time + test_count - 1)
+ with mock.patch.object(time, 'time', autospec=True, side_effect=times):
+ return test_reporter.TestResult(
+ xml_stream=self.xml_stream,
+ stream=self.stream,
+ descriptions=True,
+ verbosity=0,
+ time_getter=time.time,
+ )
+
+ def _assert_match(self, regex, output, flags=0) -> tuple[str | Any, ...]:
+ result = re.search(regex, output, flags)
+ if result is None:
+ self.fail(f'{output} does not match {regex}.')
+ return result.groups()
+
+ def test_with_skipped_test(self):
+ start_time = 100
+ end_time = 200
+ test_name = 'skipped_test_with_reason'
+ result = self._make_result(start_time, end_time, 1)
+ start_time_str = re.escape(unit_test_utils.iso_timestamp(start_time))
+ run_time = end_time - start_time
+ expected_re = unit_test_utils.OUTPUT_STRING % {
+ 'suite_name': 'MockTest',
+ 'tests': 1,
+ 'failures': 0,
+ 'errors': 0,
+ 'run_time': run_time,
+ 'start_time': start_time_str,
+ }
+ expected_testcase_re = unit_test_utils.TESTCASE_STRING_WITH_PROPERTIES % {
+ 'run_time': run_time,
+ 'start_time': start_time_str,
+ 'test_name': test_name,
+ 'status': 'notrun',
+ 'class_name': '',
+ 'result': 'N/A',
+ 'properties': (
+ ' <property name="skip_reason" value="skip"></property>'
+ ),
+ 'message': '',
+ }
+
+ test = MockTest('%s' % test_name)
+ result.startTestRun()
+ result.startTest(test)
+ result.addSkip(test, 'skip')
+ result.stopTest(test)
+ result.stopTestRun()
+ result.printErrors()
+
+ (testcase,) = self._assert_match(
+ expected_re, self.xml_stream.getvalue(), re.DOTALL
+ )
+ self.assertRegex(testcase, expected_testcase_re)
+
+ def test_with_errored_test(self):
+ start_time = 100
+ end_time = 200
+ test_name = 'test_with_errors'
+ result = self._make_result(start_time, end_time, 1)
+ fake_error = Exception('fake_error')
+ err = (Exception, fake_error, fake_error.__traceback__)
+ start_time_str = re.escape(unit_test_utils.iso_timestamp(start_time))
+ run_time = end_time - start_time
+ expected_re = unit_test_utils.OUTPUT_STRING % {
+ 'suite_name': 'MockTest',
+ 'tests': 1,
+ 'failures': 0,
+ 'errors': 1,
+ 'run_time': run_time,
+ 'start_time': start_time_str,
+ }
+ expected_testcase_re = unit_test_utils.TESTCASE_STRING_WITH_ERRORS % {
+ 'run_time': run_time,
+ 'start_time': start_time_str,
+ 'test_name': test_name,
+ 'status': 'run',
+ 'class_name': '',
+ 'result': 'FAIL',
+ 'message': xml_reporter._escape_xml_attr(str(err[1])),
+ 'error_type': xml_reporter._escape_xml_attr(str(err[0])),
+ 'error_msg': xml_reporter._escape_cdata(
+ ''.join(traceback.format_exception(*err))
+ ),
+ }
+
+ test = MockTest('%s' % test_name)
+ result.startTestRun()
+ result.startTest(test)
+ result.addError(test, err)
+ result.stopTest(test)
+ result.stopTestRun()
+ result.printErrors()
+
+ (testcase,) = self._assert_match(
+ expected_re, self.xml_stream.getvalue(), re.DOTALL
+ )
+ self.assertRegex(testcase, expected_testcase_re)
+
+ @mock.patch('builtins.open', autospec=True)
+ def test_write_summary_in_txt_saves_summary_to_a_file(self, mock_open):
+ fake_stream = io.StringIO()
+ mock_open.return_value = fake_stream
+ start_time = 100
+ end_time = 200
+ result = self._make_result(start_time, end_time, 6)
+ fake_error = Exception('fake_error')
+ err = (Exception, fake_error, fake_error.__traceback__)
+ # 5 is the number explicitly add in _make_result.
+ run_time = end_time - start_time + 5
+ now = time.localtime()
+ expected_summary = re.escape(
+ unit_test_utils.make_summary(
+ test_date=time.strftime('%Y/%m/%d', now),
+ duration=test_reporter.duration_formatter(run_time),
+ total_runs=3,
+ total_successful_runs=2,
+ )
+ )
+ res_of_test_commission = ['FAIL', 'PASS', 'PASS']
+ res_of_test_decommission = ['N/A', 'PASS', 'PASS']
+ expected_test_case_result = unit_test_utils.make_test_case_result(
+ 3,
+ res_of_test_commission=res_of_test_commission,
+ res_of_test_decommission=res_of_test_decommission,
+ )
+ test_commission_first_round = MockTest('test_commission')
+ test_decommission_first_round = MockTest('test_decommission')
+ test_commission_second_round = MockTest('test_commission')
+ test_decommission_second_round = MockTest('test_decommission')
+ test_commission_third_round = MockTest('test_commission')
+ test_decommission_third_round = MockTest('test_decommission')
+ result.startTestRun()
+ _run_test_from_result(
+ test_commission_first_round,
+ result,
+ res_of_test_commission[0],
+ err,
+ )
+ _run_test_from_result(
+ test_decommission_first_round,
+ result,
+ res_of_test_decommission[0],
+ skip_reason='skip',
+ )
+ _run_test_from_result(
+ test_commission_second_round, result, res_of_test_commission[1]
+ )
+ _run_test_from_result(
+ test_decommission_second_round,
+ result,
+ res_of_test_decommission[1],
+ )
+ _run_test_from_result(
+ test_commission_third_round, result, res_of_test_commission[2]
+ )
+ _run_test_from_result(
+ test_decommission_third_round, result, res_of_test_decommission[2]
+ )
+ result.stopTestRun()
+ result.printErrors()
+
+ with mock.patch.object(fake_stream, 'close'):
+ with mock.patch.object(time, 'localtime', return_value=now):
+ test_reporter.TestResult.write_summary_in_txt()
+
+ mock_open.assert_called_once_with(
+ f"summary_{time.strftime('%Y%m%d%H%M%S', now)}.txt",
+ 'w',
+ encoding='utf-8',
+ )
+ self.assertRegex(fake_stream.getvalue(), expected_summary)
+ self.assertRegex(fake_stream.getvalue(), expected_test_case_result)
+
+ @mock.patch.object(
+ xml_reporter.TextAndXMLTestRunner,
+ 'run',
+ autospec=True,
+ side_effect=KeyboardInterrupt,
+ )
+ @mock.patch.object(
+ test_reporter.TestResult, 'write_summary_in_txt', autospec=True
+ )
+ @mock.patch.object(
+ test_reporter.TestResult, 'writeAllResultsToXml', autospec=True
+ )
+ @mock.patch('builtins.open', autospec=True)
+ def test_run_writes_result_in_xml_and_txt_on_interruption(
+ self,
+ mock_open,
+ mock_write_all_results_to_xml,
+ mock_write_summary_in_txt,
+ mock_run,
+ ):
+ fake_suite = unittest.TestSuite()
+ now = time.localtime()
+ expected_xml_path = os.path.join(
+ os.path.abspath(os.path.dirname(__file__)),
+ f"summary_{time.strftime('%Y%m%d%H%M%S', now)}.xml",
+ )
+
+ with mock.patch.object(time, 'localtime', return_value=now):
+ test_reporter.TestRunner(xml_file_path=expected_xml_path).run(fake_suite)
+
+ mock_open.assert_called_once_with(expected_xml_path, 'w')
+ mock_write_all_results_to_xml.assert_called_once()
+ mock_write_summary_in_txt.assert_called_once()
+ mock_run.assert_called_once()
+
+ def test_duration_formatter_returns_correct_duration(self):
+ duration_in_seconds = 3601.0
+
+ duration = test_reporter.duration_formatter(duration_in_seconds)
+
+ self.assertEqual(duration, '1 hrs, 0 mins, 1 secs')
+
+ @mock.patch('builtins.open', autospec=True)
+ def test_write_summary_in_txt_should_not_write_any_without_test_result(
+ self, mock_open
+ ):
+ test_reporter.TestResult._instance = None
+
+ test_reporter.TestResult.write_summary_in_txt()
+
+ mock_open.assert_not_called()
+
+ def test_write_summary_in_txt_should_not_write_any_without_test_runs(self):
+ start_time = 100
+ end_time = 200
+ self._make_result(start_time, end_time, 0)
+
+ with self.assertLogs() as cm:
+ test_reporter.TestResult.write_summary_in_txt()
+
+ self.assertEqual(
+ cm.output[0], 'INFO:root:Summary can not be saved without test runs.'
+ )
+
+ @mock.patch('builtins.open', autospec=True)
+ def test_test_runner_init_injects_correct_logger(self, mock_open):
+ logger = logging.getLogger('test_reporter')
+ fake_suite = unittest.TestSuite()
+ now = time.localtime()
+ expected_xml_path = os.path.join(
+ os.path.abspath(os.path.dirname(__file__)),
+ f"summary_{time.strftime('%Y%m%d%H%M%S', now)}.xml",
+ )
+
+ with mock.patch.object(time, 'localtime', return_value=now):
+ with self.assertLogs(logger) as cm:
+ test_reporter.TestRunner(xml_file_path=None, logger=logger).run(
+ fake_suite
+ )
+
+ mock_open.assert_called_once_with(expected_xml_path, 'w')
+ self.assertEqual(
+ cm.output[0],
+ f'INFO:test_reporter:Xml file saved to {expected_xml_path}.',
+ )
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/ui_automator/ui_automator.py b/ui_automator/ui_automator.py
index 547c2c4..2fd4aee 100644
--- a/ui_automator/ui_automator.py
+++ b/ui_automator/ui_automator.py
@@ -1,4 +1,4 @@
-# Copyright 2023 Google LLC
+# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -17,11 +17,15 @@
A python controller that can trigger mobly UI automator snippet to achieve some
automated UI operations on Android phones.
"""
+from concurrent import futures
+import enum
import functools
import logging
import os
import re
+import time
from typing import Any, Callable
+import unittest
from absl import app
from absl import flags
@@ -31,7 +35,9 @@
from mobly.snippet import errors as snippet_errors
from ui_automator import android_device as ad
+from ui_automator import commission_reg_test
from ui_automator import errors
+from ui_automator import test_reporter
from ui_automator import version
@@ -44,7 +50,8 @@
}
_MOBLY_SNIPPET_APK: str = 'com.chip.interop.moblysnippet'
_MOBLY_SNIPPET_APK_NAME: str = 'mbs'
-_COMMISSIONING_FLAG_USAGE_GUIDE = (
+_REGRESSION_TESTS_TIMEOUT_IN_SECS = 1
+_COMMISSION_FLAG_USAGE_GUIDE = (
'Use --commission {DeviceName},{PairingCode},{GHARoom} to commission a'
' device to google fabric on GHA.'
)
@@ -56,7 +63,7 @@
_COMMISSION = flags.DEFINE_list(
name='commission',
default=None,
- help=_COMMISSIONING_FLAG_USAGE_GUIDE,
+ help=_COMMISSION_FLAG_USAGE_GUIDE,
)
_DECOMMISSION = flags.DEFINE_string(
name='decommission',
@@ -75,10 +82,14 @@
)
-def _validate_commissioning_arg(
+class RegTestSuiteType(enum.Enum):
+ COMMISSION = 'Commission'
+
+
+def _validate_commission_arg(
device_name: str, pairing_code: str, gha_room: str
) -> None:
- """Returns None if commissioning values are valid.
+ """Returns None if arguments passed in `commission` are valid.
Args:
device_name: Display name of commissioned device on GHA.
@@ -165,6 +176,7 @@
logger: Injected logger, if None, specify the default(root) one.
"""
self._logger = logger
+ self._is_reg_test_finished = False
self._connected_device: android_device.AndroidDevice | None = None
def load_device(self):
@@ -245,11 +257,8 @@
@get_android_device_ready
def commission_device(
- self,
- device_name: str,
- pairing_code: str,
- gha_room: str
- ):
+ self, device_name: str, pairing_code: str, gha_room: str
+ ) -> None:
"""Commissions a device through installed apk `mbs` on Google Home App.
Args:
@@ -265,7 +274,7 @@
"""
self._logger.info('Start commissioning the device.')
- _validate_commissioning_arg(device_name, pairing_code, gha_room)
+ _validate_commission_arg(device_name, pairing_code, gha_room)
device_name_snake_case = f'_{inflection.underscore(device_name)}'
matter_device = {
@@ -317,17 +326,21 @@
) from e
def run_regression_tests(
- self, repeat: int | None, *args: Any,
+ self,
+ repeat: int | None,
+ test_type: RegTestSuiteType,
+ **kwargs,
) -> None:
"""Executes automated regression tests.
- A single execution of both commissioning and decommissioning constitutes one
+ A single execution of both commission and decommission constitutes one
cycle.
Args:
repeat: The value of flag `repeat`. If the value is None, regression
tests will be run repeatedly until keyboard interrupts.
- *args: Any required value to run regression tests.
+ test_type: The type of test suite for running regression tests.
+ **kwargs: Required args to run regression test cases.
Raises:
ValueError: When the value of flag `repeat` is not positive.
@@ -335,30 +348,37 @@
if repeat and repeat <= 0:
raise ValueError('Number placed after `--repeat` must be positive.')
- failure_count = 0
- run_count = 0
self._logger.info(
'Start running regression tests'
f' {str(repeat) + " times" if repeat is not None else "continuously"}.'
)
- while repeat is None or run_count < repeat:
- try:
- device_name, _, _ = args
- self.commission_device(*args)
- self.decommission_device(device_name)
- except errors.MoblySnippetError:
- failure_count += 1
- except KeyboardInterrupt:
- self._logger.info('Tests interrupted by keyboard.')
- break
- run_count += 1
- self._logger.info(
- 'Ran %d times. Passed %d times. Failed %d times.',
- run_count,
- run_count - failure_count,
- failure_count,
- )
+ self._is_reg_test_finished = False
+ suite = unittest.TestSuite()
+ # Once all tests are run, `test_reporter.TestRunner` will produce an XML
+ # file and a text summary file. If regression test cycles are not predefined
+ # (i.e., `repeat` is not specified), test cases should be dynamically added
+ # to the running suite on the fly.
+ executor = futures.ThreadPoolExecutor(max_workers=1)
+ if test_type == RegTestSuiteType.COMMISSION:
+ device_name = kwargs.get('device_name', None)
+ pairing_code = kwargs.get('pairing_code', None)
+ gha_room = kwargs.get('gha_room', None)
+ executor.submit(
+ self._add_test_to_commission_test_suite,
+ suite=suite,
+ repeat=repeat,
+ device_name=device_name,
+ pairing_code=pairing_code,
+ gha_room=gha_room,
+ )
+
+ runner = test_reporter.TestRunner(logger=self._logger)
+ try:
+ runner.run(suite)
+ finally:
+ self._is_reg_test_finished = True
+ executor.shutdown(wait=False)
def _get_mbs_apk_path(self) -> str:
return os.path.join(
@@ -368,20 +388,69 @@
'snippet-0.2.2-rc.0.apk',
)
+ def _add_test_to_commission_test_suite(
+ self,
+ suite: unittest.TestSuite,
+ repeat: int | None,
+ device_name: str,
+ pairing_code: str,
+ gha_room: str,
+ ) -> None:
+ """Adds automated regression tests for RegTestSuiteType.COMMISSION.
-def _process_flags(ui_automator: UIAutomator) -> None:
- """Does specific action based on given flag values."""
- if _COMMISSION.value:
- if len(_COMMISSION.value) != 3:
- raise flags.IllegalFlagValueError(_COMMISSIONING_FLAG_USAGE_GUIDE)
- if _RUN_REGRESSION_TESTS.value:
- ui_automator.run_regression_tests(_REPEAT.value, *_COMMISSION.value)
- else:
- ui_automator.commission_device(*_COMMISSION.value)
- elif _DECOMMISSION.value:
- ui_automator.decommission_device(_DECOMMISSION.value)
- elif _RUN_REGRESSION_TESTS.value:
- raise flags.IllegalFlagValueError(_REGRESSION_TESTS_FLAG_USAGE_GUIDE)
+ Args:
+ suite: TestSuite instance that tests added to.
+ repeat: An integer value specifies the number of times the regression
+ tests should be executed. Or None if tests should be run infinitely.
+ device_name: Display name of commissioned device on GHA.
+ pairing_code: An 11-digit or 21-digit numeric code which contains the
+ information needed to commission a matter device.
+ gha_room: Assigned room of commissioned device on GHA.
+ """
+ run_count = 0
+ while not self._is_reg_test_finished and (not repeat or run_count < repeat):
+ suite.addTest(
+ commission_reg_test.CommissionRegTest(
+ self,
+ 'test_commission',
+ device_name=device_name,
+ pairing_code=pairing_code,
+ gha_room=gha_room,
+ )
+ )
+ suite.addTest(
+ commission_reg_test.CommissionRegTest(
+ self, 'test_decommission', device_name=device_name
+ )
+ )
+ time.sleep(_REGRESSION_TESTS_TIMEOUT_IN_SECS)
+ run_count += 1
+
+ def process_flags(self) -> None:
+ """Does specific action based on given flag values."""
+ if _COMMISSION.value:
+ if len(_COMMISSION.value) != 3:
+ raise flags.IllegalFlagValueError(_COMMISSION_FLAG_USAGE_GUIDE)
+ device_name, pairing_code, gha_room = _COMMISSION.value
+ if _RUN_REGRESSION_TESTS.value:
+ self.run_regression_tests(
+ _REPEAT.value,
+ RegTestSuiteType.COMMISSION,
+ device_name=device_name,
+ pairing_code=pairing_code,
+ gha_room=gha_room,
+ )
+ else:
+ self.commission_device(
+ device_name=device_name,
+ pairing_code=pairing_code,
+ gha_room=gha_room,
+ )
+ elif _DECOMMISSION.value:
+ device_name = _DECOMMISSION.value
+ self.decommission_device(device_name)
+ elif _RUN_REGRESSION_TESTS.value:
+ raise flags.IllegalFlagValueError(_REGRESSION_TESTS_FLAG_USAGE_GUIDE)
# TODO(b/309745485): Type of argv should be Sequence[str].
@@ -389,8 +458,7 @@
if argv and len(argv) > 1:
raise app.UsageError(f'Too many command-line arguments: {argv!r}')
- ui_automator = UIAutomator()
- _process_flags(ui_automator)
+ UIAutomator().process_flags()
def run():
diff --git a/ui_automator/ui_automator_test.py b/ui_automator/ui_automator_test.py
index dd51af3..a44b947 100644
--- a/ui_automator/ui_automator_test.py
+++ b/ui_automator/ui_automator_test.py
@@ -1,4 +1,4 @@
-# Copyright 2023 Google LLC
+# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,20 +13,27 @@
# limitations under the License.
"""Unittest Lab exercise to test implementation of "Synonym Dictionary"."""
+import io
import os
+import re
import subprocess
import sys
+import time
+import traceback
import unittest
from unittest import mock
from absl import flags
from absl.testing import flagsaver
+from absl.testing import xml_reporter
from mobly.controllers import android_device
from mobly.controllers.android_device_lib import adb
from mobly.snippet import errors as snippet_errors
from ui_automator import errors
+from ui_automator import test_reporter
from ui_automator import ui_automator
+from ui_automator import unit_test_utils
from ui_automator import version
_FAKE_MATTER_DEVICE_NAME = 'fake-matter-device-name'
@@ -41,12 +48,12 @@
}
_PYTHON_PATH = subprocess.check_output(['which', 'python']).decode('utf-8')
_PYTHON_BIN_PATH = _PYTHON_PATH.removesuffix('python')
-_FAKE_VALID_SYS_ARGV_FOR_COMMISSIONING = [
+_FAKE_VALID_SYS_ARGV_FOR_COMMISSION = [
_PYTHON_BIN_PATH + 'ui-automator',
'--commission',
'm5stack,34970112332,Office',
]
-_FAKE_VALID_SYS_ARGV_FOR_DECOMMISSIONING = [
+_FAKE_VALID_SYS_ARGV_FOR_DECOMMISSION = [
_PYTHON_BIN_PATH + 'ui-automator',
'--decommission',
'm5stack',
@@ -57,7 +64,7 @@
'm5stack,34970112332,Office',
'--regtest',
'--repeat',
- '5',
+ '3',
]
_FAKE_INVALID_SYS_ARGV_FOR_REGRESSION_TESTS = [
_PYTHON_BIN_PATH + 'ui-automator',
@@ -65,12 +72,12 @@
'--repeat',
'5',
]
-_FAKE_SYS_ARGV_FOR_COMMISSIONING_WITH_INVALID_LENGTH = [
+_FAKE_SYS_ARGV_FOR_COMMISSION_WITH_INVALID_LENGTH = [
_PYTHON_BIN_PATH + 'ui-automator',
'--commission',
'm5',
]
-_FAKE_SYS_ARGV_FOR_COMMISSIONING_WITH_EMPTY_VALUE = [
+_FAKE_SYS_ARGV_FOR_COMMISSION_WITH_EMPTY_VALUE = [
_PYTHON_BIN_PATH + 'ui-automator',
'--commission',
]
@@ -479,7 +486,7 @@
def test_run_calls_commission_device_with_valid_arguments(
self, mock_commission_device, mock_exit
):
- with mock.patch.object(sys, 'argv', _FAKE_VALID_SYS_ARGV_FOR_COMMISSIONING):
+ with mock.patch.object(sys, 'argv', _FAKE_VALID_SYS_ARGV_FOR_COMMISSION):
ui_automator.run()
mock_commission_device.assert_called_once_with(
@@ -490,7 +497,7 @@
@flagsaver.flagsaver((ui_automator._COMMISSION, ['m5']))
def test_commission_with_cmd_invalid_arg_should_raise_an_error(self):
with mock.patch.object(
- sys, 'argv', _FAKE_SYS_ARGV_FOR_COMMISSIONING_WITH_INVALID_LENGTH
+ sys, 'argv', _FAKE_SYS_ARGV_FOR_COMMISSION_WITH_INVALID_LENGTH
):
with self.assertRaises(flags.IllegalFlagValueError):
ui_automator.run()
@@ -503,7 +510,7 @@
self, mock_exit, mock_stderr_write
):
with mock.patch.object(
- sys, 'argv', _FAKE_SYS_ARGV_FOR_COMMISSIONING_WITH_EMPTY_VALUE
+ sys, 'argv', _FAKE_SYS_ARGV_FOR_COMMISSION_WITH_EMPTY_VALUE
):
ui_automator.run()
@@ -532,9 +539,7 @@
def test_run_calls_decommission_device_with_valid_arguments(
self, mock_decommission_device, mock_exit, mock_commission_device
):
- with mock.patch.object(
- sys, 'argv', _FAKE_VALID_SYS_ARGV_FOR_DECOMMISSIONING
- ):
+ with mock.patch.object(sys, 'argv', _FAKE_VALID_SYS_ARGV_FOR_DECOMMISSION):
ui_automator.run()
mock_commission_device.assert_not_called()
@@ -608,7 +613,12 @@
ui_automator.run()
mock_run_regression_tests.assert_called_once_with(
- mock.ANY, 5, *['m5stack', '34970112332', 'Office']
+ mock.ANY,
+ 3,
+ ui_automator.RegTestSuiteType.COMMISSION,
+ device_name='m5stack',
+ pairing_code='34970112332',
+ gha_room='Office',
)
mock_commission_device.assert_not_called()
mock_exit.assert_called_once()
@@ -643,6 +653,8 @@
@flagsaver.flagsaver(
(ui_automator._COMMISSION, ['m5stack', '34970112332', 'Office'])
)
+ @mock.patch.object(time, 'sleep', autospec=True)
+ @mock.patch('builtins.open', autospec=True)
@mock.patch.object(android_device, 'get_all_instances', autospec=True)
@mock.patch.object(
ui_automator.UIAutomator, 'commission_device', autospec=True
@@ -655,12 +667,19 @@
mock_decommission_device,
mock_commission_device,
mock_get_all_instances,
+ mock_open,
+ mock_sleep,
):
+ mock_sleep.return_value = None
mock_get_all_instances.return_value = [self.mock_android_device]
with self.assertLogs() as cm:
self.ui_automator.run_regression_tests(
- 5, *['m5stack', '34970112332', 'Office']
+ 5,
+ ui_automator.RegTestSuiteType.COMMISSION,
+ device_name='m5stack',
+ pairing_code='34970112332',
+ gha_room='Office',
)
self.assertEqual(mock_commission_device.call_count, 5)
@@ -668,13 +687,13 @@
self.assertEqual(
cm.output[0], 'INFO:root:Start running regression tests 5 times.'
)
- self.assertEqual(
- cm.output[1], 'INFO:root:Ran 5 times. Passed 5 times. Failed 0 times.'
- )
+ self.assertEqual(mock_open.call_count, 2)
@flagsaver.flagsaver(
(ui_automator._COMMISSION, ['m5stack', '34970112332', 'Office'])
)
+ @mock.patch.object(time, 'sleep', autospec=True)
+ @mock.patch('builtins.open', autospec=True)
@mock.patch.object(android_device, 'get_all_instances', autospec=True)
@mock.patch.object(
ui_automator.UIAutomator, 'commission_device', autospec=True
@@ -687,7 +706,10 @@
mock_decommission_device,
mock_commission_device,
mock_get_all_instances,
+ mock_open,
+ mock_sleep,
):
+ mock_sleep.return_value = None
mock_commission_device.side_effect = [
None,
None,
@@ -699,7 +721,11 @@
with self.assertLogs() as cm:
self.ui_automator.run_regression_tests(
- 5, *['m5stack', '34970112332', 'Office']
+ 5,
+ ui_automator.RegTestSuiteType.COMMISSION,
+ device_name='m5stack',
+ pairing_code='34970112332',
+ gha_room='Office',
)
self.assertEqual(mock_commission_device.call_count, 5)
@@ -707,19 +733,25 @@
self.assertEqual(
cm.output[0], 'INFO:root:Start running regression tests 5 times.'
)
- self.assertEqual(
- cm.output[1], 'INFO:root:Ran 5 times. Passed 4 times. Failed 1 times.'
- )
+ self.assertEqual(mock_open.call_count, 2)
def test_run_regression_tests_raises_an_error_with_invalid_input(self):
with self.assertRaisesRegex(
ValueError, 'Number placed after `--repeat` must be positive.'
):
- self.ui_automator.run_regression_tests(-5)
+ self.ui_automator.run_regression_tests(
+ -5,
+ ui_automator.RegTestSuiteType.COMMISSION,
+ device_name='m5stack',
+ pairing_code='34970112332',
+ gha_room='Office',
+ )
@flagsaver.flagsaver(
(ui_automator._COMMISSION, ['m5stack', '34970112332', 'Office'])
)
+ @mock.patch.object(time, 'sleep', autospec=True)
+ @mock.patch('builtins.open', autospec=True)
@mock.patch.object(android_device, 'get_all_instances', autospec=True)
@mock.patch.object(
ui_automator.UIAutomator, 'commission_device', autospec=True
@@ -732,7 +764,10 @@
mock_decommission_device,
mock_commission_device,
mock_get_all_instances,
+ mock_open,
+ mock_sleep,
):
+ mock_sleep.return_value = None
mock_commission_device.side_effect = [
None,
errors.MoblySnippetError('fake_error'),
@@ -744,7 +779,11 @@
mock_get_all_instances.return_value = [self.mock_android_device]
with self.assertLogs() as cm:
self.ui_automator.run_regression_tests(
- None, *['m5stack', '34970112332', 'Office']
+ None,
+ ui_automator.RegTestSuiteType.COMMISSION,
+ device_name='m5stack',
+ pairing_code='34970112332',
+ gha_room='Office',
)
self.assertEqual(mock_commission_device.call_count, 6)
@@ -752,10 +791,222 @@
self.assertEqual(
cm.output[0], 'INFO:root:Start running regression tests continuously.'
)
- self.assertEqual(cm.output[1], 'INFO:root:Tests interrupted by keyboard.')
- self.assertEqual(
- cm.output[2], 'INFO:root:Ran 5 times. Passed 3 times. Failed 2 times.'
+ self.assertEqual(mock_open.call_count, 2)
+
+ @flagsaver.flagsaver(
+ (ui_automator._COMMISSION, ['m5stack', '34970112332', 'Office'])
+ )
+ @mock.patch.object(sys, 'exit', autospec=True)
+ @mock.patch.object(time, 'time', autospec=True)
+ @mock.patch.object(time, 'sleep', autospec=True)
+ @mock.patch('builtins.open', autospec=True)
+ @mock.patch.object(android_device, 'get_all_instances', autospec=True)
+ @mock.patch.object(
+ ui_automator.UIAutomator, 'commission_device', autospec=True
+ )
+ @mock.patch.object(
+ ui_automator.UIAutomator, 'decommission_device', autospec=True
+ )
+ def test_run_calls_run_regression_tests_and_produces_summary_in_txt(
+ self,
+ mock_decommission_device,
+ mock_commission_device,
+ mock_get_all_instances,
+ mock_open,
+ mock_sleep,
+ mock_time,
+ mock_exit,
+ ):
+ txt_stream = io.StringIO()
+ mock_sleep.return_value = None
+ fake_error = errors.MoblySnippetError('error')
+ mock_commission_device.side_effect = [
+ fake_error,
+ None,
+ None,
+ ]
+ mock_decommission_device.side_effect = [
+ None,
+ fake_error,
+ ]
+ mock_get_all_instances.return_value = [self.mock_android_device]
+ mock_open.side_effect = [io.StringIO(), txt_stream]
+ # mock_time called by startTestRun, startTest, stopTest, and stopTestRun.
+ # startTest and stopTest will be called when running test cases.
+ # There are 3 test_commission and 3 test_decommission in this test suite.
+ # So 6 test cases will call mock_time for 12 times.
+ mock_time.side_effect = [0] + list(range(12)) + [11]
+ expected_summary = unit_test_utils.make_summary(
+ test_date=time.strftime('%Y/%m/%d', time.localtime(0)),
+ duration=test_reporter.duration_formatter(11),
+ total_runs=3,
+ total_successful_runs=1,
)
+ expected_test_case_result = unit_test_utils.make_test_case_result(
+ 3,
+ res_of_test_commission=['FAIL', 'PASS', 'PASS'],
+ res_of_test_decommission=['N/A', 'PASS', 'FAIL'],
+ )
+ err = (errors.MoblySnippetError, fake_error, fake_error.__traceback__)
+ fake_err_msg = ''.join(traceback.format_exception(*err))
+
+ with mock.patch.object(txt_stream, 'close'):
+ with mock.patch.object(
+ test_reporter.TestResult,
+ '_exc_info_to_string',
+ return_value=fake_err_msg,
+ ):
+ with mock.patch.object(
+ sys, 'argv', _FAKE_VALID_SYS_ARGV_FOR_REGRESSION_TESTS
+ ):
+ ui_automator.run()
+
+ self.assertEqual(mock_commission_device.call_count, 3)
+ self.assertEqual(mock_decommission_device.call_count, 2)
+ self.assertEqual(
+ expected_summary + '\n\n' + expected_test_case_result,
+ txt_stream.getvalue(),
+ )
+ mock_exit.assert_called_once()
+
+ @flagsaver.flagsaver(
+ (ui_automator._COMMISSION, ['m5stack', '34970112332', 'Office'])
+ )
+ @mock.patch.object(sys, 'exit', autospec=True)
+ @mock.patch.object(time, 'time', autospec=True)
+ @mock.patch.object(time, 'sleep', autospec=True)
+ @mock.patch('builtins.open', autospec=True)
+ @mock.patch.object(android_device, 'get_all_instances', autospec=True)
+ @mock.patch.object(
+ ui_automator.UIAutomator, 'commission_device', autospec=True
+ )
+ @mock.patch.object(
+ ui_automator.UIAutomator, 'decommission_device', autospec=True
+ )
+ def test_run_calls_run_regression_tests_and_produces_summary_in_xml(
+ self,
+ mock_decommission_device,
+ mock_commission_device,
+ mock_get_all_instances,
+ mock_open,
+ mock_sleep,
+ mock_time,
+ mock_exit,
+ ):
+ xml_stream = io.StringIO()
+ mock_sleep.return_value = None
+ fake_error = errors.MoblySnippetError('error')
+ mock_commission_device.side_effect = [
+ fake_error,
+ None,
+ None,
+ ]
+ mock_decommission_device.side_effect = [
+ None,
+ fake_error,
+ ]
+ mock_get_all_instances.return_value = [self.mock_android_device]
+ mock_open.side_effect = [xml_stream, io.StringIO()]
+ # mock_time called by startTestRun, startTest, stopTest, and stopTestRun.
+ # startTest and stopTest will be called when running test cases.
+ # There are 3 test_commission and 3 test_decommission in this test suite.
+ # So 6 test cases will call mock_time for 12 times.
+ mock_time.side_effect = [0] + list(range(12)) + [11]
+ expected_test_suite_re = unit_test_utils.OUTPUT_STRING % {
+ 'suite_name': 'CommissionRegTest',
+ 'tests': 6,
+ 'failures': 0,
+ 'errors': 2,
+ 'run_time': 11,
+ 'start_time': re.escape(unit_test_utils.iso_timestamp(0)),
+ }
+ err = (errors.MoblySnippetError, fake_error, fake_error.__traceback__)
+ fake_err_msg = ''.join(traceback.format_exception(*err))
+ expected_testcase1_re = unit_test_utils.TESTCASE_STRING_WITH_ERRORS % {
+ 'run_time': 1,
+ 'start_time': re.escape(unit_test_utils.iso_timestamp(0)),
+ 'test_name': 'test_commission',
+ 'class_name': 'google3.java.com.google.assistant.verticals.homeautomation.partners.ui_automator.commission_reg_test.CommissionRegTest',
+ 'status': 'run',
+ 'result': 'FAIL',
+ 'message': xml_reporter._escape_xml_attr(str(err[1])),
+ 'error_type': xml_reporter._escape_xml_attr(str(err[0])),
+ 'error_msg': xml_reporter._escape_cdata(fake_err_msg),
+ }
+ expected_testcase2_re = unit_test_utils.TESTCASE_STRING_WITH_PROPERTIES % {
+ 'run_time': 1,
+ 'start_time': re.escape(unit_test_utils.iso_timestamp(2)),
+ 'test_name': 'test_decommission',
+ 'class_name': 'google3.java.com.google.assistant.verticals.homeautomation.partners.ui_automator.commission_reg_test.CommissionRegTest',
+ 'status': 'notrun',
+ 'result': 'N/A',
+ 'properties': (
+ ' <property name="skip_reason" value="%s"></property>'
+ % (xml_reporter._escape_xml_attr('Device was not commissioned.'),)
+ ),
+ 'message': '',
+ }
+ expected_testcase3_re = unit_test_utils.TESTCASE_STRING % {
+ 'run_time': 1,
+ 'start_time': re.escape(unit_test_utils.iso_timestamp(4)),
+ 'test_name': 'test_commission',
+ 'class_name': 'google3.java.com.google.assistant.verticals.homeautomation.partners.ui_automator.commission_reg_test.CommissionRegTest',
+ 'status': 'run',
+ 'result': 'PASS',
+ }
+ expected_testcase4_re = unit_test_utils.TESTCASE_STRING % {
+ 'run_time': 1,
+ 'start_time': re.escape(unit_test_utils.iso_timestamp(6)),
+ 'test_name': 'test_decommission',
+ 'class_name': 'google3.java.com.google.assistant.verticals.homeautomation.partners.ui_automator.commission_reg_test.CommissionRegTest',
+ 'status': 'run',
+ 'result': 'PASS',
+ }
+ expected_testcase5_re = unit_test_utils.TESTCASE_STRING % {
+ 'run_time': 1,
+ 'start_time': re.escape(unit_test_utils.iso_timestamp(8)),
+ 'test_name': 'test_commission',
+ 'class_name': 'google3.java.com.google.assistant.verticals.homeautomation.partners.ui_automator.commission_reg_test.CommissionRegTest',
+ 'status': 'run',
+ 'result': 'PASS',
+ }
+ expected_testcase6_re = unit_test_utils.TESTCASE_STRING_WITH_ERRORS % {
+ 'run_time': 1,
+ 'start_time': re.escape(unit_test_utils.iso_timestamp(10)),
+ 'test_name': 'test_decommission',
+ 'class_name': 'google3.java.com.google.assistant.verticals.homeautomation.partners.ui_automator.commission_reg_test.CommissionRegTest',
+ 'status': 'run',
+ 'result': 'FAIL',
+ 'message': xml_reporter._escape_xml_attr(str(err[1])),
+ 'error_type': xml_reporter._escape_xml_attr(str(err[0])),
+ 'error_msg': xml_reporter._escape_cdata(fake_err_msg),
+ }
+
+ with mock.patch.object(
+ test_reporter.TestResult,
+ '_exc_info_to_string',
+ return_value=fake_err_msg,
+ ):
+ with mock.patch.object(
+ sys, 'argv', _FAKE_VALID_SYS_ARGV_FOR_REGRESSION_TESTS
+ ):
+ ui_automator.run()
+
+ self.assertEqual(mock_commission_device.call_count, 3)
+ self.assertEqual(mock_decommission_device.call_count, 2)
+ (testcases,) = re.search(
+ expected_test_suite_re, xml_stream.getvalue()
+ ).groups()
+ [testcase1, testcase2, testcase3, testcase4, testcase5, testcase6] = (
+ testcases.split('\n </testcase>\n')
+ )
+ self.assertRegex(testcase1, expected_testcase1_re)
+ self.assertRegex(testcase2, expected_testcase2_re)
+ self.assertRegex(testcase3, expected_testcase3_re)
+ self.assertRegex(testcase4, expected_testcase4_re)
+ self.assertRegex(testcase5, expected_testcase5_re)
+ self.assertRegex(testcase6, expected_testcase6_re)
+ mock_exit.assert_called_once()
if __name__ == '__main__':
diff --git a/ui_automator/unit_test_utils.py b/ui_automator/unit_test_utils.py
new file mode 100644
index 0000000..d383152
--- /dev/null
+++ b/ui_automator/unit_test_utils.py
@@ -0,0 +1,129 @@
+# Copyright 2024 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Utilities for unit tests."""
+import datetime
+
+# Matches the entire XML output. Captures all <testcase> tags except for the
+# last closing </testcase> in a single group.
+OUTPUT_STRING = r"""<\?xml version="1.0"\?>
+<testsuites name="" tests="%(tests)d" failures="%(failures)d" errors="%(errors)d" time="%(run_time).3f" timestamp="%(start_time)s">
+<testsuite name="%(suite_name)s" tests="%(tests)d" failures="%(failures)d" errors="%(errors)d" time="%(run_time).3f" timestamp="%(start_time)s">
+( <testcase [.\S\s]*)
+ </testcase>
+</testsuite>
+</testsuites>
+"""
+
+# Matches a single <testcase> tag and its contents, without the closing
+# </testcase>, which we use as a separator to split multiple <testcase> tags.
+TESTCASE_STRING_WITH_PROPERTIES = r""" <testcase name="%(test_name)s" status="%(status)s" result="%(result)s" time="%(run_time).3f" classname="%(class_name)s" timestamp="%(start_time)s">
+ <properties>
+%(properties)s
+ </properties>%(message)s"""
+
+# Matches a single <testcase> tag and its contents, without the closing
+# </testcase>, which we use as a separator to split multiple <testcase> tags.
+TESTCASE_STRING_WITH_ERRORS = r""" <testcase name="%(test_name)s" status="%(status)s" result="%(result)s" time="%(run_time).3f" classname="%(class_name)s" timestamp="%(start_time)s">
+ <error message="%(message)s" type="%(error_type)s"><\!\[CDATA\[%(error_msg)s\]\]></error>"""
+
+# Matches a single <testcase> tag and its contents, without the closing
+# </testcase>, which we use as a separator to split multiple <testcase> tags.
+TESTCASE_STRING = r""" <testcase name="%(test_name)s" status="%(status)s" result="%(result)s" time="%(run_time).3f" classname="%(class_name)s" timestamp="%(start_time)s">"""
+
+
+def iso_timestamp(timestamp: float) -> str:
+ """Makes timestamp in iso format for unit tests.
+
+ Args:
+ timestamp: Time value in float.
+
+ Returns:
+ Formatted time according to ISO.
+ """
+ return datetime.datetime.fromtimestamp(
+ timestamp, tz=datetime.timezone.utc
+ ).isoformat()
+
+
+def make_summary(**kwargs) -> str:
+ """Makes test summary produced by `test_reporter` for unit tests.
+
+ Args:
+ **kwargs: Fields written in test summary.
+
+ Returns:
+ Test summary.
+ """
+ dut = kwargs.get('dut', 'placeholder')
+ gha = kwargs.get('gha', 'placeholder')
+ test_date = kwargs.get('test_date', 'placeholder')
+ gms_core = kwargs.get('gms_core', 'placeholder')
+ duration = kwargs.get('duration', 0)
+ hub = kwargs.get('hub', 'placeholder')
+ total_runs = kwargs.get('total_runs', 0)
+ device = kwargs.get('device', 'placeholder')
+ total_successful_runs = kwargs.get('total_successful_runs', 0)
+ success_rate = round(100.0 * float(total_successful_runs) / float(total_runs))
+ rows: list[list[str]] = []
+ rows.append(['Summary', '', 'Version Info', ''])
+ rows.append(['DUT:', dut, 'GHA', gha])
+ rows.append(['Test Time:', test_date, 'GMSCore', gms_core])
+ rows.append(['Duration:', duration, 'Hub', hub])
+ rows.append(['Number of runs:', str(total_runs), 'Device', device])
+ rows.append([
+ 'Success Rate:',
+ f'{success_rate}%({total_successful_runs}/{total_runs})',
+ ])
+ summary = []
+ for row in rows:
+ summary.append(''.join(element.ljust(25) for element in row))
+ summary.append('\n')
+
+ return ''.join(summary)
+
+
+def make_test_case_result(
+ total_runs: int,
+ res_of_test_commission: list[str] | None,
+ res_of_test_decommission: list[str] | None,
+) -> str:
+ """Makes test case result produced by `test_reporter` for unit tests.
+
+ Args:
+ total_runs: Total cycles for regression tests.
+ res_of_test_commission: Elements in list should be `PASS`, `FAIL` or `N/A`,
+ which indicates the test result for each test case `test_commission`.
+ res_of_test_decommission: Elements in list should be `PASS`, `FAIL` or
+ `N/A`, which indicates the test result for each test case
+ `test_decommission`.
+
+ Returns:
+ Test case result.
+ """
+ test_case_result = ['Test Case/Test Run'.ljust(25)]
+ for i in range(total_runs):
+ test_case_result.append(f'#{i + 1}'.ljust(8))
+ if res_of_test_commission:
+ test_case_result.append('\n')
+ test_case_result.append('Commission to GHA'.ljust(25))
+ for res in res_of_test_commission:
+ test_case_result.append(res.ljust(8))
+ if res_of_test_decommission:
+ test_case_result.append('\n')
+ test_case_result.append('Removing from GHA'.ljust(25))
+ for res in res_of_test_decommission:
+ test_case_result.append(res.ljust(8))
+
+ return ''.join(test_case_result)