blob: 950f20e3bff02ba73c3ee13f99df9bf57a646bd9 [file] [log] [blame]
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for GDM manager."""
import threading
from typing import Any, Collection, List, Mapping, Optional, Set, Tuple, Type,Callable,Dict
import gazoo_device
from gazoo_device import errors as gdm_errors
from gazoo_device.auxiliary_devices import chip_tool
from gazoo_device.capabilities.matter_endpoints.interfaces import endpoint_base
from gazoo_device.capabilities.matter_clusters.interfaces import cluster_base
from local_agent import errors as agent_errors
from local_agent import logger as logger_module
from local_agent.translation_layer.command_handlers import common
logger = logger_module.get_logger()
# ============= Constants ============= #
_DEVICE_DETECTION_LOG_DIR = '/tmp'
# ===================================== #
class GdmManager:
"""GDM manager for device management."""
def __init__(
self, update_handlers_cls_map_fn: Callable[[str, Any], None]):
self._mgr_lock = threading.RLock()
self._mgr = gazoo_device.Manager()
self._first_detection = True
self._update_handlers_cls_map = update_handlers_cls_map_fn
# Caching device information to avoid race condition when doing device
# communication multiple times.
self._connected_devices = {} # device id -> device information dict
def detect_devices(self) -> List[Dict[str, Any]]:
"""Detects connected devices.
Returns:
The list of device dict, each dict includes the following fields:
deviceId, serialNumber, capabilities and deviceType in GDM.
"""
devices = [] # List[Dict[str, Any]]
with self._mgr_lock:
self._mgr.detect(force_overwrite=self._first_detection,
log_directory=_DEVICE_DETECTION_LOG_DIR)
chip_tool_id = 'chip_tool-0000'
# device_type = 'chip_tool'
# serial_number = '00000000'
connected_devices = self._mgr.get_devices('all')
if not chip_tool_id in connected_devices:
return []
if not self._mgr.is_device_connected(chip_tool_id):
return []
with self._mgr.create_and_close_device(chip_tool_id) as device:
if not isinstance(device, chip_tool.ChipTool):
return []
device.set_property("matter_node_id", 6666)
# devices has to be commissioned to chip tool first
endpoint_instances_and_cluster_mapping = device.matter_endpoints.get_supported_endpoint_instances_and_cluster_flavors()
for endpoint_instances_and_cluster_pair in endpoint_instances_and_cluster_mapping.items():
endpoint_instance = endpoint_instances_and_cluster_pair[0]
device_type = endpoint_instance.name
# Skip root node
if(device_type == "root_node"):
continue
device_id = endpoint_instance.id
cluster_capabilities = sorted(cluster.get_capability_name() for cluster in endpoint_instances_and_cluster_pair[1])
# connected_devices = self._mgr.get_devices('all')
# for device_id, info in connected_devices.items():
# # The device has been removed.
# if not self._mgr.is_device_connected(device_id):
# continue
# # The first time this device is detected.
# if device_id not in self._connected_devices:
# serial_number = info['persistent']['serial_number']
# device_type = info['persistent']['device_type']
# # Retrieve the Matter endpoints and clusters in the first detection.
# with self._mgr.create_and_close_device(device_id) as device:
# # If the device is not Matter device
# if not device.has_capabilities(["matter_endpoints"]):
# matter_endpoints = []
# cluster_capabilities = []
# else:
# matter_endpoints = device.matter_endpoints.get_supported_endpoints()
# endpoint_clusters_mapping = device.matter_endpoints.get_supported_endpoints_and_clusters()
# cluster_capabilities = list(set(capabilities).union(*endpoint_clusters_mapping.values()))
# # (deprecate) Additional non-Matter capabilities.
# if device.has_capabilities([common.PWRPC_COMMON_CAPABILITY]):
# matter_endpoints.append(common.PWRPC_COMMON_CAPABILITY)
# cluster_capabilities.append(common.PWRPC_COMMON_CAPABILITY)
# Therefore, we only need to update the command handlers once.
self._update_handlers_cls_map(device_type, device_type)
# We could not get the serial number
# Store the device information in cache.
device_info = {
'deviceId': device_id,
'serialNumber': 'unknown',
'deviceType': device_type,
'capabilities': cluster_capabilities,
}
self._connected_devices[device_id] = device_info
devices.append(self._connected_devices[device_id])
self._first_detection = False
return devices
def create_devices(
self,
identifiers: List[str],
log_directory: Optional[str] = None) -> None:
"""Creates GDM device instances.
Args:
identifiers: List of GDM device id.
log_directory: GDM log directory.
"""
with self._mgr_lock:
for device_id in identifiers:
self._mgr.create_device(
identifier=device_id, log_directory=log_directory)
def check_device_connected(self, identifier: str) -> None:
"""Checks if the device is connected.
Args:
identifier: GDM device id.
Raises:
DeviceNotConnectedError: When device is not connected.
DeviceError: When unexpected error occurs.
"""
with self._mgr_lock:
try:
if not self._mgr.is_device_connected(identifier):
raise agent_errors.DeviceNotConnectedError(
f'Device {identifier} is not connected.')
except gdm_errors.DeviceError as e:
logger.warning(f'check_device_connected failed: {e}')
raise e
def get_device_instance(self, identifier: str) -> Any:
"""Gets the device instance in GDM.
Args:
identifier: GDM device id.
Returns:
Device instance in GDM.
"""
with self._mgr_lock:
if identifier not in self._mgr.get_open_device_names():
raise agent_errors.DeviceNotOpenError(f'{identifier} is not open.')
return self._mgr.get_open_device(identifier)
def get_device_type(self, identifier: str) -> str:
"""Gets the device type in GDM.
Args:
identifier: GDM device id.
Returns:
Device type in GDM.
"""
with self._mgr_lock:
dut = self.get_device_instance(identifier)
return dut.device_type
def close_open_devices(self) -> None:
"""Closes all open devices in GDM."""
with self._mgr_lock:
self._mgr.close_open_devices()
def update_handlers_cls_map(
device_type: str, capabilities: List[str]) -> None:
return
mgr = GdmManager(update_handlers_cls_map)
mgr.detect_devices()