| # Copyright 2022 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """Module for SuiteSessionManager class.""" |
| import datetime |
| import os |
| import time |
| import threading |
| from typing import Any, Callable, Dict, List, Optional |
| |
| from local_agent import errors as agent_errors |
| from local_agent import logger as logger_module |
| |
| |
| logger = logger_module.get_logger() |
| |
| # ============= Constants ============= # |
| _ARTIFACT_EXPIRE_DAYS = 30 # days |
| _SUITE_SESSION_TIME_OUT = 5400 # 90 mins in seconds |
| _SUITE_SESSION_TIME_OUT_HUMAN_READABLE = '90 mins' |
| _SUITE_SESSION_TIME_OUT_INTERVAL_SECONDS = 30 |
| # ===================================== # |
| |
| |
| class SuiteSessionManager: |
| """Suite Session Manager for test suite session management.""" |
| |
| def __init__(self, |
| artifacts_fn: Callable[[str, str], None], |
| artifact_root_dir: str, |
| create_devices_fn: Callable[[List[str], str], None], |
| close_devices_fn: Callable[[], None]): |
| # Artifacts related |
| self._compress_artifacts_and_upload = artifacts_fn |
| self._artifact_root_dir = artifact_root_dir |
| |
| # Tracks test suite related |
| self._termination_event = None |
| self._suite_timeout_checker = threading.Thread( |
| target=self._check_suite_timeout, daemon=True) |
| self._ongoing_test_suite_id = None |
| self._ongoing_test_suite_start_time = None |
| |
| # Device control related |
| self._create_devices = create_devices_fn |
| self._close_devices = close_devices_fn |
| |
| def start(self, termination_event: threading.Event) -> None: |
| """Starts the suite session manager by enabling the background threads. |
| |
| Args: |
| termination_event: The termination threading event for the thread. |
| """ |
| self._termination_event = termination_event |
| self._suite_timeout_checker.start() |
| |
| def start_test_suite(self, rpc_request: Dict[str, str]) -> Dict[str, Any]: |
| """Subroutine for handling START_TEST_SUITE command. |
| |
| Args: |
| rpc_request: JSON-RPC request. |
| |
| Raises: |
| InvalidRPCError: Invalid RPC. |
| InvalidTestSuiteSessionError: Previous test suite has not ended. |
| |
| Returns: |
| RPC response. |
| """ |
| dut_ids = rpc_request['params'].get('dutDeviceIds') |
| if dut_ids is None: |
| raise agent_errors.InvalidRPCError( |
| 'Invalid rpc command, no dutDeviceIds') |
| suite_id = rpc_request['params'].get('id') |
| force_start = rpc_request['params'].get('forceStart') |
| |
| # Support single suite execution only at a time. |
| if self._ongoing_test_suite_id is not None: |
| if force_start: |
| self.clean_up() |
| else: |
| raise agent_errors.InvalidTestSuiteSessionError( |
| f'The previous test suite {self._ongoing_test_suite_id}' |
| ' has not ended yet.') |
| |
| self._ongoing_test_suite_id = suite_id |
| self._ongoing_test_suite_start_time = time.time() |
| test_suite_dir = self._initialize_artifact_directory( |
| self._ongoing_test_suite_id) |
| |
| self._create_devices(dut_ids, test_suite_dir) |
| |
| logger.info(f'Start a new test suite: {self._ongoing_test_suite_id}') |
| return {'id': rpc_request['id'], 'jsonrpc': '2.0', 'result': {}} |
| |
| def end_test_suite(self, rpc_request: Dict[str, str]) -> Dict[str, Any]: |
| """Subroutine for handling END_TEST_SUITE request. |
| |
| Args: |
| rpc_request: JSON-RPC request. |
| |
| Raises: |
| InvalidTestSuiteSessionError: Invalid testsuite name. |
| |
| Returns: |
| RPC response. |
| """ |
| suite_id = rpc_request['params'].get('id') |
| test_result_id = rpc_request['params'].get('testResultId') |
| |
| if suite_id == self._ongoing_test_suite_id: |
| self.clean_up(test_result_id) |
| else: |
| # Invalid testsuite session |
| raise agent_errors.InvalidTestSuiteSessionError( |
| f'Session {suite_id} has never started before.') |
| |
| return {'id': rpc_request['id'], 'jsonrpc': '2.0', 'result': {}} |
| |
| def clean_up(self, test_result_id: Optional[str] = None) -> None: |
| """Cleans up the ongoing test suite and removes outdated artifacts. |
| |
| Args: |
| test_result_id: Test result ID. |
| """ |
| if self._ongoing_test_suite_id is not None: |
| logger.info( |
| f'Cleaning up the test suite: {self._ongoing_test_suite_id}') |
| self._close_devices() |
| self._compress_artifacts_and_upload( |
| test_suite_id=self._ongoing_test_suite_id, |
| test_result_id=test_result_id) |
| |
| elapsed_time = time.time() - self._ongoing_test_suite_start_time |
| logger.info( |
| f'Cleaned up test suite {self._ongoing_test_suite_id}.' |
| f'Suite session elapsed time = {elapsed_time} seconds.') |
| self._ongoing_test_suite_id = None |
| self._ongoing_test_suite_start_time = None |
| |
| self._remove_outdated_artifacts() |
| |
| def _initialize_artifact_directory(self, ongoing_test_suite_id: str) -> str: |
| """Creates a directory for artifacts of a test suite session. |
| |
| Args: |
| ongoing_test_suite_id: ID of the ongoing test suite. |
| |
| Returns: |
| Artifact directory path. |
| """ |
| test_suite_dir = os.path.join( |
| self._artifact_root_dir, ongoing_test_suite_id) |
| if not os.path.exists(test_suite_dir): |
| os.makedirs(test_suite_dir) |
| |
| local_agent_log = os.path.join(test_suite_dir, 'local_agent.log') |
| logger_module.add_file_handler(local_agent_log) |
| |
| logger.info(f'Start collecting log for {ongoing_test_suite_id}') |
| |
| return test_suite_dir |
| |
| def _remove_outdated_artifacts(self) -> None: |
| """Deletes the outdated local artifacts (older than 1 month).""" |
| if not os.path.exists(self._artifact_root_dir): |
| return |
| today = datetime.datetime.today() |
| for name in os.listdir(self._artifact_root_dir): |
| artifact = os.path.join(self._artifact_root_dir, name) |
| if os.path.isfile(artifact): |
| created_time = os.stat(artifact).st_ctime |
| delta = today - datetime.datetime.fromtimestamp(created_time) |
| if delta.days >= _ARTIFACT_EXPIRE_DAYS: |
| try: |
| os.remove(artifact) |
| logger.info( |
| f'Deleted outdated artifact: {artifact}') |
| except OSError as e: |
| logger.warning( |
| 'Failed to remove the outdated artifact' |
| f' {artifact}: {str(e)}') |
| |
| def _check_suite_timeout(self) -> None: |
| """Checks if the current test suite has timed out. |
| |
| Practically no suite in Rainier will be longer than 1 hour, |
| if the current suite has been running over 90 mins, we'll |
| force clean up the current suite session. |
| """ |
| while (self._termination_event is not None and |
| not self._termination_event.wait( |
| _SUITE_SESSION_TIME_OUT_INTERVAL_SECONDS)): |
| |
| if self._ongoing_test_suite_id is None: |
| logger.info('No ongoing test suite.') |
| continue |
| |
| logger.info('Checking if suite session has timed out.') |
| suite_elapsed_time = ( |
| time.time() - self._ongoing_test_suite_start_time) |
| if suite_elapsed_time >= _SUITE_SESSION_TIME_OUT: |
| logger.warning( |
| f'Suite {self._ongoing_test_suite_id} has timed out ' |
| f'(over {_SUITE_SESSION_TIME_OUT_HUMAN_READABLE}), ' |
| 'forcing suite session clean up.') |
| self.clean_up() |