import logging import inspect from base64 import b64encode from os.path import join from typing import Optional from simplejson import JSONDecodeError import requests from requests.exceptions import HTTPError from retrying import retry from MaxIcsRegistry import MaxIcsRegistry from .settings import Settings MAX_RETRIES = 5 # max retries number WAIT_BETWEEN_RETRIES_IN_MS = 200 # time between each retry class VersionConflictError(Exception): """Raised when there is a version conflict when updating ES doc""" pass class BuildingGeneric: """ Stores the methods used to receive or update the building info from ES doc There are diferent levels for requesting: => req_with_retry : request again on error. To be used for GET essentially => update_with_version: pass the version of the ES doc to the payload. To be used: => for operations related to parts of doc that risk to be erased concurrently => whenever there is a risk that the doc gets updated by the time the request get caught by the BKC API (since a fetch operation is done within the BKC API) Eg: to be used for facade update Note: for all put methods, the version is included in the payload at the BKC API level, if not provided at the worker level (ie here) This way, a VersionConflictError pops whenever a conflicts of versions happens """ def __init__(self, uid: str = None, staging: bool = False, registry: MaxIcsRegistry = None, **kwargs) -> None: self.logger = logging.getLogger(self.__class__.__name__) self._uid = uid self._building = None self.settings = Settings(staging=staging, registry=registry, **kwargs) if self._uid: self.fetch(timeout=kwargs.get("timeout", None)) def __getitem__(self, item): return self._building.get(item, None) def __setitem__(self, key, item): self._building[key] = item # ----------------- Building level --------------------- def post_building(self, payload) -> None: """Post a new building""" res = self._req_with_retries("post", self.settings.base_url, json=payload) self._uid = res["id"] self.fetch() def get_all_buildings(self, **kwargs): """ Fetch all buildings that meet the criteria passed as kwargs Eg: get all buildings having given centroids """ buildings = self._req_with_retries("get", self.settings.base_url, **kwargs) return buildings def fetch(self, **kwargs) -> None: """Fetch the building document""" self._building = self._req_with_retries( "get", "/".join([self.settings.base_url, self._uid]), **kwargs ) def update_building(self, payload) -> None: """ Update ES doc with information at building level Note: the retries for conflict error are managed within BKC API """ self._req("put", "/".join([self.settings.base_url, self._uid]), json=payload) def update_status_history(self, status: str, **kwargs) -> None: """ Update the history. Note: the retries for conflict error are managed within BKC API """ payload = {"new_entry": status} url = "/".join([self.settings.base_url, self._uid, "history"]) return self._req(method="put", url=url, json=payload, **kwargs) def register_error_for_building(self, error_code: str): """ Add an error code at building level Note: the retries for conflict error are managed within BKC API """ url = "/".join([self.settings.base_url, self._uid, "errors"]) payload = {"error_code": error_code} return self._req(method="put", url=url, json=payload) def upload_metadata_for_building(self, name: str, payload: dict): """Upload the facade metadata to Hbase""" body = {"metadata": payload} self._req_with_retries( "post", "/".join([self.settings.base_url, self._uid, "metadata", name]), json=body, ) # ----------------- Roof level --------------------- def get_roof_image_by_name(self, image_name: str) -> str: """Get roof image""" image = self._req_with_retries( "get", "/".join([self.settings.base_url, self._uid, "roof", "image", image_name]) ) return image["image"] def upload_roof_image_by_name(self, image_name: str, image: bytes) -> None: """upload the roof image to Hbase""" body = {"image": b64encode(image).decode()} self._req_with_retries( "post", "/".join([self.settings.base_url, self._uid, "roof", "image", image_name]), json=body, ) def upload_metadata_for_roof(self, name: str, payload: dict): """Upload the facade metadata to Hbase""" body = {"metadata": payload} self._req_with_retries( "post", "/".join([self.settings.base_url, self._uid, "roof", "metadata", name]), json=body, ) # ----------------- Facade level --------------------- def get_number_of_visible_facades(self): """Determine the number of visible facades in building""" number_of_visible_facades = 0 for facade in self._building["facade"]: for _ in facade["visible_facades"]: number_of_visible_facades = number_of_visible_facades + 1 return number_of_visible_facades def get_facade_image_by_name(self, facade_id: str, image_name: str) -> str: """Get facade image from Hbase""" image = self._req_with_retries( "get", "/".join( [self.settings.base_url, self._uid, "facade", facade_id, "image", image_name] ), ) return image["image"] def get_segmentation_vectors(self, facade_id: str, item: str): """Get segmentation vectors for the specified item (eg: door) from Hbase""" vectors = self._req_with_retries( "get", "/".join([self.settings.base_url, self._uid, "facade", facade_id, item, "vectors"]), ) return vectors def upload_facade_image_by_name(self, facade_id: str, image_name: str, image: bytes): """Upload the facade image to Hbase""" body = {"image": b64encode(image).decode()} self._req_with_retries( "post", "/".join( [self.settings.base_url, self._uid, "facade", facade_id, "image", image_name] ), json=body, ) def upload_metadata_for_facade(self, facade_id, name: str, payload: dict): """Upload the facade metadata to Hbase""" body = {"metadata": payload} self._req_with_retries( "post", "/".join([self.settings.base_url, self._uid, "facade", facade_id, "metadata", name]), json=body, ) def register_error_for_facade(self, error_code: str, facade_id: str): """Add an error at facade level to ES doc""" url = "/".join([self.settings.base_url, self._uid, "facade", facade_id, "errors"]) payload = {"error_code": error_code} return self._force_update_with_version(payload=payload, url=url) @retry(stop_max_attempt_number=MAX_RETRIES, wait_fixed=WAIT_BETWEEN_RETRIES_IN_MS) def update_facade(self, visible_facade_id, **kwargs) -> None: """Update facade object (all the facades are fetched and uploaded)""" self.fetch() for f_index, facade in enumerate(self._building["facade"]): for v_index, visible_facade in enumerate(facade["visible_facades"]): if visible_facade["id"] == visible_facade_id: for k, v in kwargs.items(): self._building["facade"][f_index]["visible_facades"][v_index][k] = v def filter_value(value): """ Filter out None and empty dict values values = 0 are kept """ if value is not None: if isinstance(value, dict): return { k_: filter_value(v_) for k_, v_ in value.items() if filter_value(v_) or filter_value(v_) == 0 } elif isinstance(value, list): # eg: list of visible facades if all([isinstance(elem, dict) for elem in value]): return [ { k_: filter_value(v_) for k_, v_ in dict_.items() if filter_value(v_) or filter_value(v_) == 0 } for dict_ in value ] else: return value else: return value facade = [ {k: filter_value(v) for k, v in f.items() if filter_value(v) or filter_value(v) == 0} for f in self._building["facade"] ] url = "/".join([self.settings.base_url, self._uid]) self._update_with_version({"facade": facade}, url=url) self.fetch() # ----------------- Req methods --------------------- @retry(stop_max_attempt_number=MAX_RETRIES, wait_fixed=WAIT_BETWEEN_RETRIES_IN_MS) def _req_with_retries( self, method: str, url: str, json: Optional[dict] = None, headers: Optional[dict] = None, **kwargs ): """Request method. Retry if it failed""" res = self._req(method=method, url=url, json=json, headers=headers, **kwargs) return res def _req( self, method: str, url: str, json: Optional[dict] = None, headers: Optional[dict] = None, timeout=5, **kwargs ): """Request method""" if headers is None: headers = {} headers = {**headers, "Api-Key": self.settings.api_key} try: res = requests.request( method, url, json=json, headers=headers, timeout=timeout, **kwargs ) res.raise_for_status() except HTTPError as e: if ( e.response.status_code == 500 and "version conflict error" in e.response.text.lower() ): raise VersionConflictError else: raise try: return res.json() except JSONDecodeError: return None @retry(stop_max_attempt_number=MAX_RETRIES, wait_fixed=WAIT_BETWEEN_RETRIES_IN_MS) def _force_update_with_version(self, payload: dict, url: str) -> None: """ Update doc using current doc version to prevent version conflict errors This will trigger a VersionConflictError on version conflict Retry if VersionConflictError """ self.fetch() self._update_with_version(payload=payload, url=url) self.fetch() def _update_with_version(self, payload: dict, url: str) -> int: """ Update ES doc using the version of the doc that was fetched Use this to prevent version conflict when uploading data """ res = self._req("put", url, json={**payload, "version": str(self._building["version"])})