blob: 2b834e294f5f669772b25fcbf0430df972042a51 [file] [log] [blame]
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for SuiteSessionManager class."""
import datetime
import os
import time
import threading
from typing import Any, Callable, Dict, List, Optional
from local_agent import errors as agent_errors
from local_agent import logger as logger_module
logger = logger_module.get_logger()
# ============= Constants ============= #
_ARTIFACT_EXPIRE_DAYS = 30 # days
_SUITE_SESSION_TIME_OUT = 5400 # 90 mins in seconds
_SUITE_SESSION_TIME_OUT_HUMAN_READABLE = '90 mins'
_SUITE_SESSION_TIME_OUT_INTERVAL_SECONDS = 30
# ===================================== #
class SuiteSessionManager:
"""Suite Session Manager for test suite session management."""
def __init__(self,
artifacts_fn: Callable[[str, str], None],
artifact_root_dir: str,
create_devices_fn: Callable[[List[str], str], None],
close_devices_fn: Callable[[], None]):
# Artifacts related
self._compress_artifacts_and_upload = artifacts_fn
self._artifact_root_dir = artifact_root_dir
# Tracks test suite related
self._termination_event = None
self._suite_timeout_checker = threading.Thread(
target=self._check_suite_timeout, daemon=True)
self._ongoing_test_suite_id = None
self._ongoing_test_suite_start_time = None
# Device control related
self._create_devices = create_devices_fn
self._close_devices = close_devices_fn
def start(self, termination_event: threading.Event) -> None:
"""Starts the suite session manager by enabling the background threads.
Args:
termination_event: The termination threading event for the thread.
"""
self._termination_event = termination_event
self._suite_timeout_checker.start()
def start_test_suite(self, rpc_request: Dict[str, str]) -> Dict[str, Any]:
"""Subroutine for handling START_TEST_SUITE command.
Args:
rpc_request: JSON-RPC request.
Raises:
InvalidRPCError: Invalid RPC.
InvalidTestSuiteSessionError: Previous test suite has not ended.
Returns:
RPC response.
"""
dut_ids = rpc_request['params'].get('dutDeviceIds')
if dut_ids is None:
raise agent_errors.InvalidRPCError(
'Invalid rpc command, no dutDeviceIds')
suite_id = rpc_request['params'].get('id')
force_start = rpc_request['params'].get('forceStart')
# Support single suite execution only at a time.
if self._ongoing_test_suite_id is not None:
if force_start:
self.clean_up()
else:
raise agent_errors.InvalidTestSuiteSessionError(
f'The previous test suite {self._ongoing_test_suite_id}'
' has not ended yet.')
self._ongoing_test_suite_id = suite_id
self._ongoing_test_suite_start_time = time.time()
test_suite_dir = self._initialize_artifact_directory(
self._ongoing_test_suite_id)
self._create_devices(dut_ids, test_suite_dir)
logger.info(f'Start a new test suite: {self._ongoing_test_suite_id}')
return {'id': rpc_request['id'], 'jsonrpc': '2.0', 'result': {}}
def end_test_suite(self, rpc_request: Dict[str, str]) -> Dict[str, Any]:
"""Subroutine for handling END_TEST_SUITE request.
Args:
rpc_request: JSON-RPC request.
Raises:
InvalidTestSuiteSessionError: Invalid testsuite name.
Returns:
RPC response.
"""
suite_id = rpc_request['params'].get('id')
test_result_id = rpc_request['params'].get('testResultId')
if suite_id == self._ongoing_test_suite_id:
self.clean_up(test_result_id)
else:
# Invalid testsuite session
raise agent_errors.InvalidTestSuiteSessionError(
f'Session {suite_id} has never started before.')
return {'id': rpc_request['id'], 'jsonrpc': '2.0', 'result': {}}
def clean_up(self, test_result_id: Optional[str] = None) -> None:
"""Cleans up the ongoing test suite and removes outdated artifacts.
Args:
test_result_id: Test result ID.
"""
if self._ongoing_test_suite_id is not None:
logger.info(
f'Cleaning up the test suite: {self._ongoing_test_suite_id}')
self._close_devices()
self._compress_artifacts_and_upload(
test_suite_id=self._ongoing_test_suite_id,
test_result_id=test_result_id)
elapsed_time = time.time() - self._ongoing_test_suite_start_time
logger.info(
f'Cleaned up test suite {self._ongoing_test_suite_id}.'
f'Suite session elapsed time = {elapsed_time} seconds.')
self._ongoing_test_suite_id = None
self._ongoing_test_suite_start_time = None
self._remove_outdated_artifacts()
def _initialize_artifact_directory(self, ongoing_test_suite_id: str) -> str:
"""Creates a directory for artifacts of a test suite session.
Args:
ongoing_test_suite_id: ID of the ongoing test suite.
Returns:
Artifact directory path.
"""
test_suite_dir = os.path.join(
self._artifact_root_dir, ongoing_test_suite_id)
if not os.path.exists(test_suite_dir):
os.makedirs(test_suite_dir)
local_agent_log = os.path.join(test_suite_dir, 'local_agent.log')
logger_module.add_file_handler(local_agent_log)
logger.info(f'Start collecting log for {ongoing_test_suite_id}')
return test_suite_dir
def _remove_outdated_artifacts(self) -> None:
"""Deletes the outdated local artifacts (older than 1 month)."""
if not os.path.exists(self._artifact_root_dir):
return
today = datetime.datetime.today()
for name in os.listdir(self._artifact_root_dir):
artifact = os.path.join(self._artifact_root_dir, name)
if os.path.isfile(artifact):
created_time = os.stat(artifact).st_ctime
delta = today - datetime.datetime.fromtimestamp(created_time)
if delta.days >= _ARTIFACT_EXPIRE_DAYS:
try:
os.remove(artifact)
logger.info(
f'Deleted outdated artifact: {artifact}')
except OSError as e:
logger.warning(
'Failed to remove the outdated artifact'
f' {artifact}: {str(e)}')
def _check_suite_timeout(self) -> None:
"""Checks if the current test suite has timed out.
Practically no suite in Rainier will be longer than 1 hour,
if the current suite has been running over 90 mins, we'll
force clean up the current suite session.
"""
while (self._termination_event is not None and
not self._termination_event.wait(
_SUITE_SESSION_TIME_OUT_INTERVAL_SECONDS)):
if self._ongoing_test_suite_id is None:
logger.info('No ongoing test suite.')
continue
logger.info('Checking if suite session has timed out.')
suite_elapsed_time = (
time.time() - self._ongoing_test_suite_start_time)
if suite_elapsed_time >= _SUITE_SESSION_TIME_OUT:
logger.warning(
f'Suite {self._ongoing_test_suite_id} has timed out '
f'(over {_SUITE_SESSION_TIME_OUT_HUMAN_READABLE}), '
'forcing suite session clean up.')
self.clean_up()