| # Copyright 2022 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """Matter RPC verification tool.""" |
| import enum |
| import datetime |
| import immutabledict |
| import logging |
| import os |
| import shutil |
| import simple_term_menu |
| import sys |
| import time |
| from typing import Optional |
| |
| from mobly import asserts |
| from mobly import base_test |
| from mobly import test_runner |
| |
| import pw_hdlc.rpc |
| import pw_rpc |
| |
| import serial.tools.list_ports |
| import serial |
| |
| from protos import attributes_service_pb2, descriptor_service_pb2, device_service_pb2 |
| |
| # Menu selection option |
| _EXIT_OPTION_TEXT = "exit" |
| |
| # Matter spec definition |
| _ROOT_NODE_ID = 0 |
| _ON_OFF_CLUSTER_ID = 6 |
| _ON_OFF_ATTRIBUTE_ID = 0 |
| _PROTOS = (attributes_service_pb2, descriptor_service_pb2, device_service_pb2) |
| |
| # Test results and logs. |
| _MOBLY_LOG_DIRECTORY = "/tmp/logs/mobly/RpcTest/latest" |
| _TEST_RESULT_DIR_PREFIX = "rpc_test_result" |
| _PW_HDLC_LOG = "pw_hdlc.log" |
| |
| # Serial connection. |
| _READ_BYTE_SIZE = 4096 |
| _BAUDRATE = 115200 |
| _SERIAL_TIMEOUT = 0.01 |
| |
| # Device bootup wait time. |
| _BOOTUP_TIMEOUT = 30 |
| _POLLING_PERIOD = 1 |
| _CALLBACK_TIMEOUT = 3 |
| |
| # Disable logger to avoid excessive logs. |
| _LOGGER = logging.getLogger("pw_rpc.callback_client") |
| _LOGGER.disabled = True |
| |
| |
| class MatterAppType(enum.Enum): |
| """Matter application type.""" |
| LIGHT = "light" |
| CLOSURE = "closure" |
| NONE = "none" |
| |
| |
| class MatterDeviceType(enum.Enum): |
| """The supported Matter device types and their IDs in spec.""" |
| # Matter device type spec: |
| # https://github.com/CHIP-Specifications/connectedhomeip-spec/tree/master/src/device_types |
| |
| # Lighting Device Types |
| OnOffLight = 0x0100 |
| DimmableLight = 0x0101 |
| ColorTemperatureLight = 0x010C |
| ExtendedColorLight = 0x010D |
| |
| # Closure Device Types |
| DoorLock = 0x000A |
| DoorLockController = 0x000B |
| WindowCovering = 0x0202 |
| WindowCoveringController = 0x0203 |
| |
| NoneOfTheAbove = -1 |
| |
| |
| # Device type and app type mapping |
| # TODO: Add more types mapping |
| DeviceTypeToAppType = immutabledict.immutabledict({ |
| |
| # Lighting Device Types |
| MatterDeviceType.OnOffLight: MatterAppType.LIGHT, |
| MatterDeviceType.DimmableLight: MatterAppType.LIGHT, |
| MatterDeviceType.ColorTemperatureLight: MatterAppType.LIGHT, |
| MatterDeviceType.ExtendedColorLight: MatterAppType.LIGHT, |
| |
| # Closure Device Types |
| MatterDeviceType.DoorLock: MatterAppType.CLOSURE, |
| MatterDeviceType.DoorLockController: MatterAppType.CLOSURE, |
| MatterDeviceType.WindowCovering: MatterAppType.CLOSURE, |
| MatterDeviceType.WindowCoveringController: MatterAppType.CLOSURE, |
| |
| MatterDeviceType.NoneOfTheAbove: MatterAppType.NONE |
| }) |
| |
| |
| class MatterDevice: |
| """Matter device class wrapper.""" |
| |
| def __init__(self, address: str, device_type: MatterDeviceType): |
| """Device class constructor. |
| |
| 1. Open serial connection of the device. |
| 2. Create HDLC RPC client instance of the device |
| |
| Args: |
| address: Device serial address. |
| device_type: Device type of the Matter device. |
| """ |
| self.serial_inst = serial.Serial( |
| address, _BAUDRATE, timeout=_SERIAL_TIMEOUT) |
| |
| callback_client_impl = pw_rpc.callback_client.Impl( |
| default_unary_timeout_s=_CALLBACK_TIMEOUT, |
| default_stream_timeout_s=_CALLBACK_TIMEOUT) |
| |
| self.client = pw_hdlc.rpc.HdlcRpcClient( |
| read=lambda: self.serial_inst.read(_READ_BYTE_SIZE), |
| paths_or_modules=_PROTOS, |
| channels=pw_hdlc.rpc.default_channels(self.serial_inst.write), |
| output=self.write_hdlc_log, |
| client_impl=callback_client_impl) |
| |
| self._endpoint_id = self._get_endpoint_id_by_device_type(device_type) |
| self._app_type = DeviceTypeToAppType[device_type] |
| |
| def _get_endpoint_id_by_device_type( |
| self, device_type: MatterDeviceType) -> int: |
| """Gets the endpoint ID corresponding to the device type.""" |
| if device_type == MatterDeviceType.NoneOfTheAbove: |
| return -1 |
| _, endpoints = self.descriptor_service.PartsList(endpoint=_ROOT_NODE_ID) |
| for endpoint in endpoints: |
| endpoint_id = endpoint.endpoint |
| _, type_inst = self.descriptor_service.DeviceTypeList( |
| endpoint=endpoint_id) |
| if type_inst[0].device_type == device_type.value: |
| return endpoint_id |
| raise RuntimeError(f"No endpoint corresponding to {device_type.name}.") |
| |
| @property |
| def app_type(self) -> MatterAppType: |
| """App type of the device.""" |
| return self._app_type |
| |
| @property |
| def attribute_service(self) -> pw_rpc.client.ServiceClient: |
| """Returns the attribute RPC service.""" |
| return self.client.rpcs().chip.rpc.Attributes |
| |
| @property |
| def descriptor_service(self) -> pw_rpc.client.ServiceClient: |
| """Returns the descriptor RPC service.""" |
| return self.client.rpcs().chip.rpc.Descriptor |
| |
| @property |
| def device_service(self) -> pw_rpc.client.ServiceClient: |
| """Returns the device RPC service.""" |
| return self.client.rpcs().chip.rpc.Device |
| |
| def close(self) -> None: |
| """Close the serial connection.""" |
| self.serial_inst.close() |
| |
| @staticmethod |
| def write_hdlc_log(data: bytes) -> None: |
| """Stores HDLC RPC logs.""" |
| with open(_PW_HDLC_LOG, "a") as output: |
| output.write(data.decode() + "\n") |
| output.flush() |
| |
| def turn_light_on_or_off(self, on: bool) -> None: |
| """Turns on or off the light if applicable.""" |
| if self.app_type != MatterAppType.LIGHT: |
| raise RuntimeError("Device is not a light.") |
| |
| self.attribute_service.Write( |
| data=attributes_service_pb2.AttributeData(data_bool=on), |
| metadata=attributes_service_pb2.AttributeMetadata( |
| endpoint=self._endpoint_id, |
| cluster=_ON_OFF_CLUSTER_ID, |
| attribute_id=_ON_OFF_ATTRIBUTE_ID, |
| type=attributes_service_pb2.AttributeType.ZCL_BOOLEAN_ATTRIBUTE_TYPE)) |
| |
| def get_light_state(self) -> bool: |
| """Returns True if the light is currently on. False if off.""" |
| if self.app_type != MatterAppType.LIGHT: |
| raise RuntimeError("Device is not a light.") |
| |
| _, data = self.attribute_service.Read( |
| endpoint=self._endpoint_id, |
| cluster=_ON_OFF_CLUSTER_ID, |
| attribute_id=_ON_OFF_ATTRIBUTE_ID, |
| type=attributes_service_pb2.AttributeType.ZCL_BOOLEAN_ATTRIBUTE_TYPE) |
| |
| return data.data_bool |
| |
| |
| class MatterRpcTest(base_test.BaseTestClass): |
| """Matter RPC Mobly Test. |
| |
| Tests include: reboot, factory reset, descriptor cluster service |
| and attribute service RPC tests. |
| """ |
| |
| def setup_class(self): |
| """Device selection and creation.""" |
| self.dut = None |
| self.fw_version = None |
| selected_device = self._device_selection() |
| if selected_device is None: |
| sys.exit(0) |
| device_type = self._device_type_selection() |
| self.dut = MatterDevice(selected_device, device_type) |
| |
| def teardown_class(self): |
| """Closes device and generates test result.""" |
| if self.dut is not None: |
| self.dut.close() |
| self._generate_test_result() |
| |
| def teardown_test(self): |
| """Polls device until it's online.""" |
| # Some devices will crash without cooling down first |
| time.sleep(_POLLING_PERIOD) |
| bootup_time = time.time() + _BOOTUP_TIMEOUT |
| while time.time() < bootup_time: |
| try: |
| self.dut.device_service.GetDeviceInfo() |
| return |
| except pw_rpc.callback_client.errors.RpcTimeout: |
| time.sleep(_POLLING_PERIOD) |
| raise RuntimeError(f"Device fails to bootup in {_BOOTUP_TIMEOUT} secs.") |
| |
| def test_rpc_firmware_version(self): |
| """Tests firmware_version.""" |
| ack, device_info = self.dut.device_service.GetDeviceInfo() |
| asserts.assert_true(ack, "RPC ack value is not True.") |
| asserts.assert_is_not_none(device_info.software_version) |
| self.fw_version = device_info.software_version |
| |
| def test_rpc_reboot(self): |
| """Tests reboot method.""" |
| ack, _ = self.dut.device_service.Reboot() |
| asserts.assert_true(ack, "RPC ack value is not True.") |
| |
| def test_rpc_factory_reset(self): |
| """Tests factory reset method.""" |
| ack, _ = self.dut.device_service.FactoryReset() |
| asserts.assert_true(ack, "RPC ack value is not True.") |
| |
| def test_rpc_descriptor_cluster(self): |
| """Tests descriptor cluster service""" |
| ack, endpoints = self.dut.descriptor_service.PartsList(endpoint=0) |
| asserts.assert_true(ack, "RPC ack value is not True.") |
| asserts.assert_true(bool(endpoints), "Endpoint list is empty.") |
| for endpoint in endpoints: |
| ack, device_type = self.dut.descriptor_service.DeviceTypeList(endpoint=endpoint.endpoint) |
| asserts.assert_true(ack, "RPC ack value is not True.") |
| asserts.assert_is_instance(device_type[0].device_type, int) |
| |
| def test_rpc_attribute_service(self): |
| """Tests attribute service.""" |
| # Every endpoint works, so pick the first one |
| _, endpoints = self.dut.descriptor_service.PartsList(endpoint=0) |
| endpoint_id = endpoints[0].endpoint |
| |
| # Every cluster works, so pick the first one |
| ack, clusters = self.dut.descriptor_service.ServerList(endpoint=endpoint_id) |
| asserts.assert_true(ack, "RPC ack value is not True.") |
| asserts.assert_true(bool(clusters), "Cluster list is empty.") |
| cluster_id = clusters[0].cluster_id |
| |
| # Read the attribute from the endpoint |
| ack, data = self.dut.attribute_service.Read( |
| endpoint=endpoint_id, |
| cluster=cluster_id, |
| attribute_id=0, |
| type=attributes_service_pb2.AttributeType.ZCL_BITMAP32_ATTRIBUTE_TYPE) |
| asserts.assert_true(ack, "RPC ack value is not True.") |
| asserts.assert_is_instance(data.data_uint32, int) |
| |
| def test_light_app_type(self): |
| """Tests light on off if device is a light.""" |
| if self.dut.app_type == MatterAppType.LIGHT: |
| # Turn on the light |
| self.dut.turn_light_on_or_off(on=True) |
| asserts.assert_true(self.dut.get_light_state(), "The light is not on") |
| # Turn off the light |
| self.dut.turn_light_on_or_off(on=False) |
| asserts.assert_false(self.dut.get_light_state(), "The light is not off") |
| else: |
| asserts.skip("Light test is skipped.") |
| |
| @staticmethod |
| def _device_selection() -> Optional[str]: |
| """Selects device via terminal menu. |
| |
| Returns: |
| The selected device address. None if "exit" is selected. |
| """ |
| devices = serial.tools.list_ports.comports() |
| options = [device.description + f" (serial number {device.serial_number})" |
| for device in devices] + [_EXIT_OPTION_TEXT] |
| menu = simple_term_menu.TerminalMenu(options) |
| selected_index = menu.show() |
| return None if selected_index >= len(devices) else devices[selected_index].device |
| |
| @staticmethod |
| def _device_type_selection() -> MatterDeviceType: |
| """Selects the Matter device type of your device.""" |
| all_types = list(MatterDeviceType) |
| options = [matter_type.name for matter_type in all_types] |
| menu = simple_term_menu.TerminalMenu(options, title="Select the device type:") |
| selected_index = menu.show() |
| return all_types[selected_index] |
| |
| def _generate_test_result(self) -> None: |
| """Generates RPC test result.""" |
| fw_version = self.fw_version if self.fw_version is not None else "unknown" |
| test_result_dir = ( |
| f"{_TEST_RESULT_DIR_PREFIX}-{fw_version}-" |
| f"{str(datetime.datetime.now())}") |
| shutil.copytree(_MOBLY_LOG_DIRECTORY, test_result_dir) |
| if os.path.exists(_PW_HDLC_LOG): |
| shutil.move(_PW_HDLC_LOG, f"{test_result_dir}/{_PW_HDLC_LOG}") |
| shutil.make_archive(test_result_dir, "zip", test_result_dir) |
| shutil.rmtree(test_result_dir) |
| |
| |
| if __name__ == "__main__": |
| test_runner.main() |