| # Copyright 2021 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """Module for lock command handler.""" |
| from typing import Any, Dict |
| |
| from gazoo_device import errors |
| |
| from local_agent import logger as logger_module |
| from local_agent.translation_layer.command_handlers import base |
| |
| |
| logger = logger_module.get_logger() |
| |
| PWRPC_LOCK_CAPABILITY = 'pw_rpc_lock' |
| |
| |
| class LockCommandHandler(base.BaseCommandHandler): |
| """Lock device command handler |
| |
| Smart Home LockUnlock Trait Schema: |
| https://developers.google.com/assistant/smarthome/traits/lockunlock |
| """ |
| |
| _GET_IS_LOCKED = 'getIsLocked' |
| _SET_LOCK = 'setLock' |
| _SET_UNLOCK = 'setUnlock' |
| SUPPORTED_METHODS = {_GET_IS_LOCKED, _SET_LOCK, _SET_UNLOCK} |
| |
| def _get_is_locked(self, params: Dict[str, Any]) -> bool: |
| """Returns if the device is locked or not. |
| |
| Returns: |
| True if the device is locked, false otherwise. |
| |
| Raises: |
| DeviceError: getting locked state fails. |
| """ |
| del params # not used |
| try: |
| return self.dut.pw_rpc_lock.state |
| except errors.DeviceError as exc: |
| logger.exception(f'Getting locked state of {self.dut.name} failed.') |
| raise exc |
| |
| def _set_lock(self, params: Dict[str, Any]) -> None: |
| """Locks the device. |
| |
| Raises: |
| DeviceError: locking device fails. |
| """ |
| del params # not used |
| try: |
| self.dut.pw_rpc_lock.lock() |
| except errors.DeviceError as exc: |
| logger.exception(f'Locking {self.dut.name} failed.') |
| raise exc |
| |
| def _set_unlock(self, params: Dict[str, Any]) -> None: |
| """Unlocks the device. |
| |
| Raises: |
| DeviceError: unlocking device on fails. |
| """ |
| del params # not used |
| try: |
| self.dut.pw_rpc_lock.unlock() |
| except errors.DeviceError as exc: |
| logger.exception(f'Unlocking {self.dut.name} failed.') |
| raise exc |