blob: a5e036f7c74cef79b68627863533878f73cfc5f6 [file] [log] [blame]
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Fake front end for local agent and real AMS integration test."""
import argparse
import http
import immutabledict
import logging
import requests
import signal
import sys
import time
import threading
from typing import Any, Dict, List, Optional, Set, Tuple
import fake_test_suite
_LOCAL_TSB_HOST = '127.0.0.1'
_LOCAL_TSB_PORT = 8080
_TEST_PROJECT = 'test-project'
_POLLING_PERIOD = 1 # seconds
_STATUS_POLLING_PERIOD = 30 # seconds
_COOL_DOWN_SEC = 1 # seconds
_DETECTION_COOL_DOWN_SEC = 3 # To wait for GDM detection to complete.
logger = logging.getLogger(__name__)
# ======================== TSB endpoints ========================== #
TEST_SUITE_AUTH = '/tsb/api/test-suite/auth'
TEST_SUITE_PROJECTS = '/tsb/api/test-suite/projects'
LINKING_CODE = '/tsb/api/test-suite/local-agent/linking-code'
AGENT_STATUS = '/tsb/api/test-suite/local-agent/info'
AGENT_RPC = '/tsb/api/test-suite/local-agent/rpc'
UNLINK_AGENT = '/tsb/api/test-suite/local-agent/unlink'
RPC_METADATA = f'{AGENT_RPC}/metadata'
# ================================================================= #
# ======================== Constants ========================== #
ALL_SUITE_CLASSES = (
fake_test_suite.BrightnessSuite,
fake_test_suite.ColorSuite,
fake_test_suite.DeviceCommonSuite,
fake_test_suite.LightOnOffSuite,
fake_test_suite.LockUnlockSuite,
)
GDM_CAPABILITY_TO_HG_TRAIT = immutabledict.immutabledict({
'pw_rpc_common': 'Common',
'pw_rpc_light': 'OnOff',
'pw_rpc_lock': 'LockUnlock',
})
# ============================================================= #
# ======================== Module level functions ========================== #
def setup_logger() -> None:
"""Sets up the logger for logging."""
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
handler.setFormatter(
logging.Formatter('[%(asctime)s %(levelname)s] %(message)s'))
logger.addHandler(handler)
def parse_args() -> Tuple[str, Optional[int]]:
"""Sets up the parser for argument parsing.
Returns:
Tuple: TSB host, TSB port
"""
parser = argparse.ArgumentParser()
parser.add_argument(
'-host', '--tsb_host', type=str, required=False,
default=_LOCAL_TSB_HOST, help='TSB host')
parser.add_argument(
'-port', '--tsb_port', type=int, required=False,
default=_LOCAL_TSB_PORT, help='TSB port')
args, leftover = parser.parse_known_args(sys.argv[1:])
sys.argv[1:] = leftover
tsb_host = args.tsb_host
tsb_port = args.tsb_port if tsb_host == _LOCAL_TSB_HOST else None
return tsb_host, tsb_port
def raise_exception(response: requests.models.Response, err_msg: str) -> None:
"""Raises exception for HTTP response status code != 200.
Args:
response: HTTP response.
err_msg: Error message.
"""
err_msg = f'{err_msg}. Status: {response.status_code}'
try:
ams_err_msg = response.json()['errorMessage']
except:
ams_err_msg = ''
if ams_err_msg:
err_msg += f', AMS error message: {ams_err_msg}'
raise RuntimeError(err_msg)
# ========================================================================== #
class TSBService:
"""TSB endpoint service."""
def __init__(self, tsb_host: str, tsb_port: Optional[int]):
if tsb_port is None:
self._base_url = f'http://{tsb_host}'
else:
self._base_url = f'http://{tsb_host}:{tsb_port}'
auth_token = self.get_auth_token()
self._auth = {'Authorization': auth_token}
def get_auth_token(self) -> str:
"""Service to obtain the test suite user's auth token.
Returns:
Test suite user auth token.
Raises:
RuntimeError: HTTP response status code is not 200.
"""
url = self._base_url + TEST_SUITE_AUTH
resp = requests.post(url, json={'idToken': 'debug-token'})
if resp.status_code != http.HTTPStatus.OK:
raise_exception(resp, 'Failed to get auth token')
auth_token = resp.json().get('authToken')
return auth_token
def get_or_create_test_project(self) -> None:
"""Service to create test project if needed.
Raises:
RuntimeError: HTTP response status code is not 200.
"""
url = self._base_url + TEST_SUITE_PROJECTS
resp = requests.get(url, headers=self._auth)
if resp.status_code != http.HTTPStatus.OK:
raise_exception(resp, 'Failed to get test project')
for project in resp.json().get('result'):
if project['id'] == _TEST_PROJECT:
return
resp = requests.post(
url, headers=self._auth, json={'projectIds': [_TEST_PROJECT]})
if resp.status_code != http.HTTPStatus.CREATED:
raise_exception(resp, 'Failed to create test project')
def get_linking_code(self) -> str:
"""Service to obtain the linking code.
Returns:
Linking code.
Raises:
RuntimeError: HTTP response status code is not 200.
"""
url = self._base_url + LINKING_CODE
resp = requests.post(
url, headers=self._auth, json={'projectId': _TEST_PROJECT})
if resp.status_code != http.HTTPStatus.OK:
raise_exception(resp, 'Failed to get linking code')
linking_code = resp.json()['result'].get('code')
return linking_code
def get_agent_status(self) -> Optional[Dict[str, Any]]:
"""Service to retrieve the local agent status.
Returns:
Local Agent status dict or None if agent is not linked.
Raises:
RuntimeError: HTTP response status code is not 200 nor 404.
"""
url = self._base_url + AGENT_STATUS + f'?projectId={_TEST_PROJECT}'
resp = requests.get(url, headers=self._auth)
if resp.status_code == http.HTTPStatus.NOT_FOUND:
return None
elif resp.status_code == http.HTTPStatus.OK:
return resp.json()['result']
else:
raise_exception(resp, 'Failed to get agent status')
def send_rpc_request(self, rpc_request: Dict[str, Any]) -> str:
"""Service to send RPC request to the AMS.
Args:
rpc_request: JSON RPC request.
Returns:
JSON-RPC id.
Raises:
RuntimeError: HTTP response status code is not 200.
"""
url = self._base_url + AGENT_RPC
resp = requests.post(url,
headers=self._auth, json=rpc_request)
if resp.status_code != http.HTTPStatus.OK:
raise_exception(resp, 'Failed to send RPC request')
rpc_id = resp.json()['result']['id']
return rpc_id
def get_rpc_metadata(self, rpc_id: str) -> Dict[str, Optional[int]]:
"""Service to get RPC metadata with given rpc_id.
Args:
rpc_id: JSON-RPC id.
Returns:
RPC metadata.
Raises:
RuntimeError: HTTP response status code is not 200.
"""
url = (self._base_url + RPC_METADATA +
f'?projectId={_TEST_PROJECT}&rpcId={rpc_id}')
resp = requests.get(url, headers=self._auth)
if resp.status_code != http.HTTPStatus.OK:
raise_exception(resp, 'Failed to get RPC metadata')
metadata = resp.json()['result']
return metadata
def get_rpc_response(self, rpc_id: str) -> Dict[str, Any]:
"""Service to get RPC response with given rpc_id.
Args:
rpc_id: JSON-RPC id.
Returns:
RPC response.
Raises:
RuntimeError: HTTP response status code is not 200.
"""
url = (self._base_url + AGENT_RPC +
f'?projectId={_TEST_PROJECT}&rpcId={rpc_id}')
resp = requests.get(url, headers=self._auth)
if resp.status_code != http.HTTPStatus.OK:
raise_exception(resp, 'Failed to get RPC response')
rpc_response = resp.json()['result']
return rpc_response
def unlink_agent(self) -> None:
"""API to unlink local agent. Raises RuntimeError if failed."""
url = self._base_url + UNLINK_AGENT
post_data = {'projectId': _TEST_PROJECT}
resp = requests.post(url, json=post_data, headers=self._auth)
if resp.status_code != http.HTTPStatus.OK:
raise_exception(resp, 'Failed to unlink local agent')
class FakeFrontEnd:
"""Fake front end module."""
def __init__(self, host: str, port: int):
# Registers termination signal handler
signal.signal(signal.SIGINT, self._terminate)
# Retrieves auth token and creates test project
self._tsb_service = TSBService(host, port)
self._tsb_service.get_or_create_test_project()
# Local Agent status
self._local_agent_status = None
# Worker threads: executing test plan and retrieving agent status.
self._termination_event = threading.Event()
self._test_plan_worker = threading.Thread(
target=self._create_and_execute_test_plan, daemon=True)
self._agent_status_worker = threading.Thread(
target=self._retrieve_agent_status, daemon=True)
def _terminate(self, sig_num: int, frame: 'frame') -> None:
"""Signal handler upon receiving a SIGINT.
Args:
sig_num: Signal number passed to the handler.
frame: Current stack frame passed to the handler.
"""
del sig_num, frame # Unused
logger.warning('Terminates fake front end process.')
self._termination_event.set()
def run(self) -> None:
"""Runs fake front end.
Simulate the front end behaviors:
1. Links the local agent.
2. Sends RPC requests to TSB, polls the metadata and gets response.
3. Retrieves the local agent status simultaneously.
4. Unlinks the agent after the test is completed.
"""
if self._link_agent():
time.sleep(_DETECTION_COOL_DOWN_SEC)
self._test_plan_worker.start()
self._agent_status_worker.start()
while not self._termination_event.is_set():
time.sleep(_POLLING_PERIOD)
self._unlink_agent()
def _checks_if_agent_is_linked(self) -> bool:
"""Returns if local agent is linked.
Returns:
True if agent is linked, false otherwise.
"""
status = self._tsb_service.get_agent_status()
return status is not None and status.get('status') != 'OFFLINE'
def _checks_if_response_is_stored(self, rpc_id: str) -> bool:
"""Returns if the rpc response of rpc_id is stored.
Returns:
True if the rpc response is stored, false otherwise.
"""
metadata = self._tsb_service.get_rpc_metadata(rpc_id=rpc_id)
resp_timestamp = metadata.get('responseStoredTimestamp')
return resp_timestamp is not None
def _link_agent(self) -> bool:
"""Links local agent.
Retrieves linking code and checks if agent is linked.
Returns:
True if agent is linked, false otherwise.
"""
linking_code = self._tsb_service.get_linking_code()
print(f'\033[1m******************** Linking Code: {linking_code} ****'
'****************\033[0m')
while (not self._termination_event.is_set() and
not self._checks_if_agent_is_linked()):
logger.info(f'No agent is linked, sleep {_POLLING_PERIOD} sec...')
time.sleep(_POLLING_PERIOD)
if not self._termination_event.is_set():
logger.info('The local agent is linked.')
return True
return False
def _run_rpc_requests(
self, rpc_request: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Runs RPC request.
Simulates the FE behavior: sends the rpc request to BE, polls for the
rpc metadata, once it gets updated, retrieves the rpc response from BE.
Args:
rpc_request: JSON-RPC request.
Returns:
JSON-RPC response or None if fake front end is interrupted by SIGINT.
"""
rpc_id = self._tsb_service.send_rpc_request(rpc_request=rpc_request)
logger.info(f'Sent RPC request: {rpc_request}.')
while (not self._termination_event.is_set() and
not self._checks_if_response_is_stored(rpc_id)):
logger.info(
f'RPC response not available, sleep {_POLLING_PERIOD} sec...')
time.sleep(_POLLING_PERIOD)
# Interrupted by SIGINT
if self._termination_event.is_set():
return None
logger.info('Metadata is updated.')
rpc_response = self._tsb_service.get_rpc_response(rpc_id=rpc_id)
return rpc_response
def _unlink_agent(self) -> None:
"""Unlinks local agent."""
logger.info('Unlinking local agent.')
self._tsb_service.unlink_agent()
logger.info('Local agent unlinked.')
def _create_and_execute_test_plan(self) -> None:
"""Creates and executes test plan."""
while self._local_agent_status is None:
logger.info(f'Local Agent status not available yet.')
time.sleep(_POLLING_PERIOD)
connected_devices = []
for device_info in self._local_agent_status['devices']:
device_id = device_info['deviceId']
hg_traits = self._get_hg_traits(device_info['capabilities'])
connected_devices.append((device_id, hg_traits))
for device_id, hg_traits in connected_devices:
suites = self._generate_suites(device_id, hg_traits)
for suite in suites:
logger.info(
f'Executes suite {suite} for device {device_id} ...')
self._run_suite(suite)
def _get_hg_traits(self, capabilities: List[str]) -> List[str]:
"""Maps the GDM capability to the corresponding HG trait.
Args:
capabilities: List of GDM capability.
Returns:
List of HG traits.
"""
hg_traits = []
for capability in capabilities:
hg_trait = GDM_CAPABILITY_TO_HG_TRAIT.get(capability)
if hg_trait is not None:
hg_traits.append(hg_trait)
return hg_traits
def _retrieve_agent_status(self) -> None:
"""Retrieves local agent status."""
while not self._termination_event.is_set():
status = self._tsb_service.get_agent_status()
if status is not None:
logger.info(f'Retrieves local agent status: {status}')
self._local_agent_status = status
time.sleep(_STATUS_POLLING_PERIOD)
def _generate_suites(
self, device_id: str, device_traits: Set[str]) -> List[Any]:
"""Generates test suites based on device trait.
In reality, FE sends device info to BE for test plan/suite generation.
To reduce the maintenance effort, the fake FE here is to simply map the
corresponding fake-suite directly.
Args:
device_id: GDM device id.
device_traits: Set of device traits on HG.
Returns:
The list of suites which are applicable to the device traits.
"""
suites = []
for suite_class in ALL_SUITE_CLASSES:
if suite_class.is_applicable_to(device_traits):
suites.append(suite_class(device_id))
return suites
def _run_suite(self, suite: Any) -> None:
"""Runs suite procedures.
Args:
suite: Suite instance.
"""
device_ids = [suite.device_id]
start_suite_rpc, end_suite_rpc = (
fake_test_suite.generate_start_end_suite_rpc(device_ids))
all_procedures = [start_suite_rpc] + suite.procedures + [end_suite_rpc]
for rpc_request in all_procedures:
logger.info(f'Runs RPC request {rpc_request}')
rpc_response = self._run_rpc_requests(rpc_request=rpc_request)
logger.info(f'Retrieves RPC response: {rpc_response}')
time.sleep(_COOL_DOWN_SEC)
def main() -> None:
"""Main entry of fake front end."""
setup_logger()
tsb_host, tsb_port = parse_args()
fake_front_end = FakeFrontEnd(host=tsb_host, port=tsb_port)
fake_front_end.run()
if __name__ == '__main__':
main()