blob: d053eb36f1027d1d23bad18664bbb51565c26a43 [file] [log] [blame]
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Matter RPC verification tool."""
import enum
import datetime
import immutabledict
import logging
import os
import shutil
import simple_term_menu
import sys
import time
from typing import Optional
from mobly import asserts
from mobly import base_test
from mobly import test_runner
import pw_hdlc.rpc
import pw_rpc
import serial.tools.list_ports
import serial
from protos import attributes_service_pb2, descriptor_service_pb2, device_service_pb2
# Menu selection option
_EXIT_OPTION_TEXT = "exit"
# Matter spec definition
_ROOT_NODE_ID = 0
_ON_OFF_CLUSTER_ID = 6
_ON_OFF_ATTRIBUTE_ID = 0
_PROTOS = (attributes_service_pb2, descriptor_service_pb2, device_service_pb2)
# Test results and logs.
_MOBLY_LOG_DIRECTORY = "/tmp/logs/mobly/RpcTest/latest"
_TEST_RESULT_DIR_PREFIX = "rpc_test_result"
_PW_HDLC_LOG = "pw_hdlc.log"
# Serial connection.
_READ_BYTE_SIZE = 4096
_BAUDRATE = 115200
_SERIAL_TIMEOUT = 0.01
# Device bootup wait time.
_BOOTUP_TIMEOUT = 30
_POLLING_PERIOD = 1
_CALLBACK_TIMEOUT = 3
# Disable logger to avoid excessive logs.
_LOGGER = logging.getLogger("pw_rpc.callback_client")
_LOGGER.disabled = True
class MatterAppType(enum.Enum):
"""Matter application type."""
LIGHT = "light"
NONE = "none"
class MatterDeviceType(enum.Enum):
"""The supported Matter device types and their IDs in spec."""
# Matter device type spec:
# https://github.com/CHIP-Specifications/connectedhomeip-spec/tree/master/src/device_types
ColorTemperatureLight = 0x010C
DimmableLight = 0x0101
OnOffLight = 0x0100
NoneOfTheAbove = -1
# Device type and app type mapping
# TODO: Add more types mapping
DeviceTypeToAppType = immutabledict.immutabledict({
MatterDeviceType.ColorTemperatureLight: MatterAppType.LIGHT,
MatterDeviceType.DimmableLight: MatterAppType.LIGHT,
MatterDeviceType.OnOffLight: MatterAppType.LIGHT,
MatterDeviceType.NoneOfTheAbove: MatterAppType.NONE
})
class MatterDevice:
"""Matter device class wrapper."""
def __init__(self, address: str, device_type: MatterDeviceType):
"""Device class constructor.
1. Open serial connection of the device.
2. Create HDLC RPC client instance of the device
Args:
address: Device serial address.
device_type: Device type of the Matter device.
"""
self.serial_inst = serial.Serial(
address, _BAUDRATE, timeout=_SERIAL_TIMEOUT)
callback_client_impl = pw_rpc.callback_client.Impl(
default_unary_timeout_s=_CALLBACK_TIMEOUT,
default_stream_timeout_s=_CALLBACK_TIMEOUT)
self.client = pw_hdlc.rpc.HdlcRpcClient(
read=lambda: self.serial_inst.read(_READ_BYTE_SIZE),
paths_or_modules=_PROTOS,
channels=pw_hdlc.rpc.default_channels(self.serial_inst.write),
output=self.write_hdlc_log,
client_impl=callback_client_impl)
self._endpoint_id = self._get_endpoint_id_by_device_type(device_type)
self._app_type = DeviceTypeToAppType[device_type]
def _get_endpoint_id_by_device_type(
self, device_type: MatterDeviceType) -> int:
"""Gets the endpoint ID corresponding to the device type."""
if device_type == MatterDeviceType.NoneOfTheAbove:
return -1
_, endpoints = self.descriptor_service.PartsList(endpoint=_ROOT_NODE_ID)
for endpoint in endpoints:
endpoint_id = endpoint.endpoint
_, type_inst = self.descriptor_service.DeviceTypeList(
endpoint=endpoint_id)
if type_inst[0].device_type == device_type.value:
return endpoint_id
raise RuntimeError(f"No endpoint corresponding to {device_type.name}.")
@property
def app_type(self) -> MatterAppType:
"""App type of the device."""
return self._app_type
@property
def attribute_service(self) -> pw_rpc.client.ServiceClient:
"""Returns the attribute RPC service."""
return self.client.rpcs().chip.rpc.Attributes
@property
def descriptor_service(self) -> pw_rpc.client.ServiceClient:
"""Returns the descriptor RPC service."""
return self.client.rpcs().chip.rpc.Descriptor
@property
def device_service(self) -> pw_rpc.client.ServiceClient:
"""Returns the device RPC service."""
return self.client.rpcs().chip.rpc.Device
def close(self) -> None:
"""Close the serial connection."""
self.serial_inst.close()
@staticmethod
def write_hdlc_log(data: bytes) -> None:
"""Stores HDLC RPC logs."""
with open(_PW_HDLC_LOG, "w") as output:
output.write(data.decode() + "\n")
output.flush()
def turn_light_on_or_off(self, on: bool) -> None:
"""Turns on or off the light if applicable."""
if self.app_type != MatterAppType.LIGHT:
raise RuntimeError("Device is not a light.")
self.attribute_service.Write(
data=attributes_service_pb2.AttributeData(data_bool=on),
metadata=attributes_service_pb2.AttributeMetadata(
endpoint=self._endpoint_id,
cluster=_ON_OFF_CLUSTER_ID,
attribute_id=_ON_OFF_ATTRIBUTE_ID,
type=attributes_service_pb2.AttributeType.ZCL_BOOLEAN_ATTRIBUTE_TYPE))
def get_light_state(self) -> bool:
"""Returns True if the light is currently on. False if off."""
if self.app_type != MatterAppType.LIGHT:
raise RuntimeError("Device is not a light.")
_, data = self.attribute_service.Read(
endpoint=self._endpoint_id,
cluster=_ON_OFF_CLUSTER_ID,
attribute_id=_ON_OFF_ATTRIBUTE_ID,
type=attributes_service_pb2.AttributeType.ZCL_BOOLEAN_ATTRIBUTE_TYPE)
return data.data_bool
class MatterRpcTest(base_test.BaseTestClass):
"""Matter RPC Mobly Test.
Tests include: reboot, factory reset, descriptor cluster service
and attribute service RPC tests.
"""
def setup_class(self):
"""Device selection and creation."""
self.dut = None
self.fw_version = None
selected_device = self._device_selection()
if selected_device is None:
sys.exit(0)
device_type = self._device_type_selection()
self.dut = MatterDevice(selected_device, device_type)
def teardown_class(self):
"""Closes device and generates test result."""
if self.dut is not None:
self.dut.close()
self._generate_test_result()
def teardown_test(self):
"""Polls device until it's online."""
# Some devices will crash without cooling down first
time.sleep(_POLLING_PERIOD)
bootup_time = time.time() + _BOOTUP_TIMEOUT
while time.time() < bootup_time:
try:
self.dut.device_service.GetDeviceInfo()
return
except pw_rpc.callback_client.errors.RpcTimeout:
time.sleep(_POLLING_PERIOD)
raise RuntimeError(f"Device fails to bootup in {_BOOTUP_TIMEOUT} secs.")
def test_rpc_firmware_version(self):
"""Tests firmware_version."""
ack, device_info = self.dut.device_service.GetDeviceInfo()
asserts.assert_true(ack, "RPC ack value is not True.")
asserts.assert_is_not_none(device_info.software_version)
self.fw_version = device_info.software_version
def test_rpc_reboot(self):
"""Tests reboot method."""
ack, _ = self.dut.device_service.Reboot()
asserts.assert_true(ack, "RPC ack value is not True.")
def test_rpc_factory_reset(self):
"""Tests factory reset method."""
ack, _ = self.dut.device_service.FactoryReset()
asserts.assert_true(ack, "RPC ack value is not True.")
def test_rpc_descriptor_cluster(self):
"""Tests descriptor cluster service"""
ack, endpoints = self.dut.descriptor_service.PartsList(endpoint=0)
asserts.assert_true(ack, "RPC ack value is not True.")
asserts.assert_true(bool(endpoints), "Endpoint list is empty.")
for endpoint in endpoints:
ack, device_type = self.dut.descriptor_service.DeviceTypeList(endpoint=endpoint.endpoint)
asserts.assert_true(ack, "RPC ack value is not True.")
asserts.assert_is_instance(device_type[0].device_type, int)
def test_rpc_attribute_service(self):
"""Tests attribute service."""
# Every endpoint works, so pick the first one
_, endpoints = self.dut.descriptor_service.PartsList(endpoint=0)
endpoint_id = endpoints[0].endpoint
# Every cluster works, so pick the first one
ack, clusters = self.dut.descriptor_service.ServerList(endpoint=endpoint_id)
asserts.assert_true(ack, "RPC ack value is not True.")
asserts.assert_true(bool(clusters), "Cluster list is empty.")
cluster_id = clusters[0].cluster_id
# Read the attribute from the endpoint
ack, data = self.dut.attribute_service.Read(
endpoint=endpoint_id,
cluster=cluster_id,
attribute_id=0,
type=attributes_service_pb2.AttributeType.ZCL_BITMAP32_ATTRIBUTE_TYPE)
asserts.assert_true(ack, "RPC ack value is not True.")
asserts.assert_is_instance(data.data_uint32, int)
def test_light_app_type(self):
"""Tests light on off if device is a light."""
if self.dut.app_type == MatterAppType.LIGHT:
# Turn on the light
self.dut.turn_light_on_or_off(on=True)
asserts.assert_true(self.dut.get_light_state(), "The light is not on")
# Turn off the light
self.dut.turn_light_on_or_off(on=False)
asserts.assert_false(self.dut.get_light_state(), "The light is not off")
else:
asserts.skip("Light test is skipped.")
@staticmethod
def _device_selection() -> Optional[str]:
"""Selects device via terminal menu.
Returns:
The selected device address. None if "exit" is selected.
"""
devices = serial.tools.list_ports.comports()
options = [device.description + f" (serial number {device.serial_number})"
for device in devices] + [_EXIT_OPTION_TEXT]
menu = simple_term_menu.TerminalMenu(options)
selected_index = menu.show()
return None if selected_index >= len(devices) else devices[selected_index].device
@staticmethod
def _device_type_selection() -> MatterDeviceType:
"""Selects the Matter device type of your device."""
all_types = list(MatterDeviceType)
options = [matter_type.name for matter_type in all_types]
menu = simple_term_menu.TerminalMenu(options, title="Select the device type:")
selected_index = menu.show()
return all_types[selected_index]
def _generate_test_result(self) -> None:
"""Generates RPC test result."""
fw_version = self.fw_version if self.fw_version is not None else "unknown"
test_result_dir = (
f"{_TEST_RESULT_DIR_PREFIX}-{fw_version}-"
f"{str(datetime.datetime.now())}")
shutil.copytree(_MOBLY_LOG_DIRECTORY, test_result_dir)
if os.path.exists(_PW_HDLC_LOG):
shutil.move(_PW_HDLC_LOG, f"{test_result_dir}/{_PW_HDLC_LOG}")
shutil.make_archive(test_result_dir, "zip", test_result_dir)
shutil.rmtree(test_result_dir)
if __name__ == "__main__":
test_runner.main()