import logging from base64 import b64encode from os.path import join from typing import Optional import requests from requests.exceptions import HTTPError from retrying import retry from simplejson import JSONDecodeError from .settings import Settings MAX_RETRIES = 5 # max retries number WAIT_BETWEEN_RETRIES_IN_MS = 200 # time between each retry class VersionConflictError(Exception): """Raised when there is a version conflict when updating ES doc""" pass class BuildingGeneric: """Stores the methods used to receive or update the building info from ES doc""" def __init__(self, uid: str, **kwargs) -> None: staging = kwargs.get("staging", False) registry = kwargs.get("registry") self.logger = logging.getLogger(self.__class__.__name__) self._uid = uid self.settings = Settings(staging=staging, registry=registry) self.fetch() def __getitem__(self, item): return self._building.get(item) # ----------------- Building level --------------------- def fetch(self) -> None: """Fetch the building document""" self._building = self._req_with_retries("get", join(self.settings.base_url, self._uid)) def update_building(self, payload) -> None: """ Update ES doc with information at building level Note: the retries for conflict error are managed within BKC API """ self._req("put", join(self.settings.base_url, self._uid), json=payload) def update_status_history(self, status: str) -> None: """ update the history Note: the retries for conflict error are managed within BKC API """ payload = { "operation_history": [*self._building.get("operation_history"), status] } self._req("put", join(self.settings.base_url, self._uid), json={**payload}) def register_error_for_building(self, error_code: str): """ Add an error code at building level Note: the retries for conflict error are managed within BKC API """ url = join(self.settings.base_url, self._uid, "errors") payload = {"error_code": error_code} return self._req(method="put", url=url, json=payload) def upload_metadata_for_building(self, name: str, payload: dict): """Upload the facade metadata to Hbase""" body = {"metadata": payload} self._req_with_retries( "post", join(self.settings.base_url, self._uid, "metadata", name), json=body, ) # ----------------- Roof level --------------------- def get_roof_image_by_name(self, image_name: str) -> str: """Get roof image""" image = self._req_with_retries( "get", join(self.settings.base_url, self._uid, "roof", "image", image_name) ) return image["image"] def upload_roof_image_by_name(self, image_name: str, image: bytes) -> None: """upload the roof image to Hbase""" body = {"image": b64encode(image).decode()} self._req_with_retries( "post", join(self.settings.base_url, self._uid, "roof", "image", image_name), json=body, ) def upload_metadata_for_roof(self, name: str, payload: dict): """Upload the facade metadata to Hbase""" body = {"metadata": payload} self._req_with_retries( "post", join(self.settings.base_url, self._uid, "roof", "metadata", name), json=body, ) # ----------------- Facade level --------------------- def get_number_of_visible_facades(self): """Determine the number of visible facades in building""" number_of_visible_facades = 0 for facade in self._building["facade"]: for _ in facade["visible_facades"]: number_of_visible_facades = number_of_visible_facades + 1 return number_of_visible_facades def get_facade_image_by_name(self, facade_id: str, image_name: str) -> str: """Get facade image from Hbase""" image = self._req_with_retries( "get", join(self.settings.base_url, self._uid, "facade", facade_id, "image", image_name), ) return image["image"] def get_segmentation_vectors(self, facade_id: str, item: str): """Get segmentation vectors for the specified item (eg: door) from Hbase""" vectors = self._req_with_retries( "get", join(self.settings.base_url, self._uid, "facade", facade_id, item, "vectors") ) return vectors def upload_facade_image_by_name( self, facade_id: str, image_name: str, image: bytes ): """Upload the facade image to Hbase""" body = {"image": b64encode(image).decode()} self._req_with_retries( "post", join(self.settings.base_url, self._uid, "facade", facade_id, "image", image_name), json=body, ) def upload_metadata_for_facade(self, facade_id, name: str, payload: dict): """Upload the facade metadata to Hbase""" body = {"metadata": payload} self._req_with_retries( "post", join(self.settings.base_url, self._uid, "facade", facade_id, "metadata", name), json=body, ) def register_error_for_facade(self, error_code: str, facade_id: str): """Add an error at facade level to ES doc""" url = join(self.settings.base_url, self._uid, "facade", facade_id, "errors") payload = {"error_code": error_code} return self._force_update_with_version(payload=payload, url=url) @retry(stop_max_attempt_number=MAX_RETRIES, wait_fixed=WAIT_BETWEEN_RETRIES_IN_MS) def update_facade(self, visible_facade_id, **kwargs) -> None: """Update facade object (all the facades are fetched and uploaded)""" self.fetch() for f_index, facade in enumerate(self._building["facade"]): for v_index, visible_facade in enumerate(facade["visible_facades"]): if visible_facade["id"] == visible_facade_id: for k, v in kwargs.items(): self._building["facade"][f_index]["visible_facades"][v_index][ k ] = v def filter_value(value): """ Filter out None and empty dict values values = 0 are kept """ if value is not None: if isinstance(value, dict): return { k_: filter_value(v_) for k_, v_ in value.items() if filter_value(v_) or filter_value(v_) == 0 } elif isinstance(value, list): # eg: list of visible facades if all([isinstance(elem, dict) for elem in value]): return [ { k_: filter_value(v_) for k_, v_ in dict_.items() if filter_value(v_) or filter_value(v_) == 0 } for dict_ in value ] else: return value else: return value facade = [ { k: filter_value(v) for k, v in f.items() if filter_value(v) or filter_value(v) == 0 } for f in self._building["facade"] ] url = join(self.settings.base_url, self._uid) self._update_with_version({"facade": facade}, url=url) self.fetch() # ----------------- Req methods --------------------- @retry(stop_max_attempt_number=MAX_RETRIES, wait_fixed=WAIT_BETWEEN_RETRIES_IN_MS) def _req_with_retries( self, method: str, url: str, json: Optional[dict] = None, headers: Optional[dict] = None, ): """Request method. Retry if it failed""" res = self._req(method=method, url=url, json=json, headers=headers) return res def _req( self, method: str, url: str, json: Optional[dict] = None, headers: Optional[dict] = None, ): """Request method""" if headers is None: headers = {} headers = {**headers, "Api-Key": self.settings.api_key} res = requests.request(method, url, json=json, headers=headers) res.raise_for_status() try: return res.json() except JSONDecodeError: return None @retry(stop_max_attempt_number=MAX_RETRIES, wait_fixed=WAIT_BETWEEN_RETRIES_IN_MS) def _force_update_with_version(self, payload: dict, url: str) -> None: """ Update doc using current doc version to prevent version conflict errors Retry if VersionConflictError """ self.fetch() self._update_with_version(payload=payload, url=url) self.fetch() def _update_with_version(self, payload: dict, url: str) -> int: """ Update ES doc using the version of the doc that was fetched Use this to prevent version conflict when uploading data """ try: res = self._req( "put", url, json={**payload, "version": self._building["version"]} ) except HTTPError as e: if e.response.status_code == 500 and "version conflict error" in e.response.text.lower(): raise VersionConflictError else: self.logger.exception(e.response.text) return res