import logging from base64 import b64encode from os.path import join from typing import Optional import requests from requests.exceptions import HTTPError from retrying import retry from .settings import Settings MAX_RETRIES = 5 # max retries number WAIT_BETWEEN_RETRIES_IN_MS = 200 # time between each retry class VersionConflictError(Exception): """Raised when there is a version conflict when updating ES doc""" pass class BuildingGeneric: """Stores the methods used to receive or update the building info from ES doc""" def __init__(self, uid: str = None, **kwargs) -> None: staging = kwargs.get("staging", False) registry = kwargs.get("registry") self.logger = logging.getLogger(self.__class__.__name__) self._uid = uid self._building = None self.settings = Settings(staging=staging, registry=registry) if self._uid: self.fetch() def __getitem__(self, item): return self._building.get(item) # ----------------- Building level --------------------- def post_building(self, payload) -> None: """Post a new building""" res = self._req_with_retries("post", join(self.settings.base_url), json=payload) self._uid = res["id"] self.fetch() def fetch(self) -> None: """Fetch the building document""" self._building = self._req_with_retries("get", join(self.settings.base_url, self._uid)) def update_building(self, payload) -> None: """ Update ES doc with information at building level Note: the retries for conflict error are managed within BKC API """ self._req("put", join(self.settings.base_url, self._uid), json=payload) def update_status_history(self, status: str) -> None: """ update the history Note: the retries for conflict error are managed within BKC API """ payload = { "operation_history": [*self._building.get("operation_history"), status] } self._req("put", join(self.settings.base_url, self._uid), json={**payload}) def register_error_for_building(self, error_code: str): """ Add an error code at building level Note: the retries for conflict error are managed within BKC API """ url = join(self.settings.base_url, self._uid, "errors") payload = {"error_code": error_code} return self._req(method="put", url=url, json=payload) def upload_metadata_for_building(self, name: str, payload: dict): """Upload the facade metadata to Hbase""" body = {"metadata": payload} self._req_with_retries( "post", join(self.settings.base_url, self._uid, "metadata", name), json=body, ) # ----------------- Roof level --------------------- def get_roof_image_by_name(self, image_name: str) -> str: """Get roof image""" image = self._req_with_retries( "get", join(self.settings.base_url, self._uid, "roof", "image", image_name) ) return image["image"] def upload_roof_image_by_name(self, image_name: str, image: bytes) -> None: """upload the roof image to Hbase""" body = {"image": b64encode(image).decode()} self._req_with_retries( "post", join(self.settings.base_url, self._uid, "roof", "image", image_name), json=body, ) def upload_metadata_for_roof(self, name: str, payload: dict): """Upload the facade metadata to Hbase""" body = {"metadata": payload} self._req_with_retries( "post", join(self.settings.base_url, self._uid, "roof", "metadata", name), json=body, ) # ----------------- Facade level --------------------- def get_number_of_visible_facades(self): """Determine the number of visible facades in building""" number_of_visible_facades = 0 for facade in self._building["facade"]: for _ in facade["visible_facades"]: number_of_visible_facades = number_of_visible_facades + 1 return number_of_visible_facades def get_facade_image_by_name(self, facade_id: str, image_name: str) -> str: """Get facade image from Hbase""" image = self._req_with_retries( "get", join(self.settings.base_url, self._uid, "facade", facade_id, "image", image_name), ) return image["image"] def get_segmentation_vectors(self, facade_id: str, item: str): """Get segmentation vectors for the specified item (eg: door) from Hbase""" vectors = self._req_with_retries( "get", join(self.settings.base_url, self._uid, "facade", facade_id, item, "vectors") ) return vectors def upload_facade_image_by_name( self, facade_id: str, image_name: str, image: bytes ): """Upload the facade image to Hbase""" body = {"image": b64encode(image).decode()} self._req_with_retries( "post", join(self.settings.base_url, self._uid, "facade", facade_id, "image", image_name), json=body, ) def upload_metadata_for_facade(self, facade_id, name: str, payload: dict): """Upload the facade metadata to Hbase""" body = {"metadata": payload} self._req_with_retries( "post", join(self.settings.base_url, self._uid, "facade", facade_id, "metadata", name), json=body, ) def register_error_for_facade(self, error_code: str, facade_id: str): """Add an error at facade level to ES doc""" url = join(self.settings.base_url, self._uid, "facade", facade_id, "errors") payload = {"error_code": error_code} return self._force_update_with_version(payload=payload, url=url) @retry(stop_max_attempt_number=MAX_RETRIES, wait_fixed=WAIT_BETWEEN_RETRIES_IN_MS) def update_facade(self, visible_facade_id, **kwargs) -> None: """Update facade object (all the facades are fetched and uploaded)""" self.fetch() for f_index, facade in enumerate(self._building["facade"]): for v_index, visible_facade in enumerate(facade["visible_facades"]): if visible_facade["id"] == visible_facade_id: for k, v in kwargs.items(): self._building["facade"][f_index]["visible_facades"][v_index][ k ] = v def filter_value(value): """ Filter out None and empty dict values values = 0 are kept """ if value is not None: if isinstance(value, dict): return { k_: filter_value(v_) for k_, v_ in value.items() if filter_value(v_) or filter_value(v_) == 0 } elif isinstance(value, list): # eg: list of visible facades if all([isinstance(elem, dict) for elem in value]): return [ { k_: filter_value(v_) for k_, v_ in dict_.items() if filter_value(v_) or filter_value(v_) == 0 } for dict_ in value ] else: return value else: return value facade = [ { k: filter_value(v) for k, v in f.items() if filter_value(v) or filter_value(v) == 0 } for f in self._building["facade"] ] url = join(self.settings.base_url, self._uid) self._update_with_version({"facade": facade}, url=url) self.fetch() # ----------------- Req methods --------------------- @retry(stop_max_attempt_number=MAX_RETRIES, wait_fixed=WAIT_BETWEEN_RETRIES_IN_MS) def _req_with_retries( self, method: str, url: str, json: Optional[dict] = None, headers: Optional[dict] = None, **kwargs ): """Request method. Retry if it failed""" res = self._req(method=method, url=url, json=json, headers=headers) return res def _req( self, method: str, url: str, json: Optional[dict] = None, headers: Optional[dict] = None, timeout=5, **kwargs ): """Request method""" if headers is None: headers = {} headers = {**headers, "Api-Key": self.settings.api_key} res = requests.request(method, url, json=json, headers=headers, timeout=timeout, **kwargs) res.raise_for_status() try: return res.json() except JSONDecodeError: return None @retry(stop_max_attempt_number=MAX_RETRIES, wait_fixed=WAIT_BETWEEN_RETRIES_IN_MS) def _force_update_with_version(self, payload: dict, url: str) -> None: """ Update doc using current doc version to prevent version conflict errors Retry if VersionConflictError """ self.fetch() self._update_with_version(payload=payload, url=url) self.fetch() def _update_with_version(self, payload: dict, url: str) -> int: """ Update ES doc using the version of the doc that was fetched Use this to prevent version conflict when uploading data """ try: res = self._req( "put", url, json={**payload, "version": str(self._building["version"])} ) except HTTPError as e: if e.response.status_code == 500 and "version conflict error" in e.response.text.lower(): raise VersionConflictError else: self.logger.exception(e.response.text) return res