import sys import json import urllib import requests from settings.api import INPUT_HOSTNAME, INPUT_PORT, HEADERS class BaseAPI(object): def __init__(self, api_url=None, verify_ssl=True): """Announces required parameters""" internal_api_url = f'http://{INPUT_HOSTNAME}:{INPUT_PORT}' self.API_URL = api_url if api_url else internal_api_url self.API_VERSION = 'v3' self.API_DEFINITION = 'api' self.HEADERS = HEADERS self.VERIFY_SSL = verify_ssl def _gen_request_url(self, url): return f'{self.API_URL}/{self.API_DEFINITION}/{self.API_VERSION}{url}' def call_api(self, url, method='GET', headers={}, data={}): # Open session with requests.Session() as session: try: url = self._gen_request_url(url) headers = self.HEADERS if not headers else headers params_str = urllib.parse.urlencode(data, safe="+'()") if method == 'POST': api_request = session.post( url=url, json=data, headers=headers, verify=self.VERIFY_SSL ) if method == 'GET': url = f'{url}?{params_str}' if params_str else url api_request = session.get( url=url, headers=headers, verify=self.VERIFY_SSL ) except Exception as error: api_request = { 'result': False, 'error': type(error).__name__ } return api_request finally: session.close() # Get response try: response = json.loads(api_request.text) if 'error' in response and response['error']: print(response['error']) raise sys.exit() return response except json.decoder.JSONDecodeError: response = {'error': 'Can not parse response'} print(response) raise sys.exit() class BaseAuthAPI(BaseAPI): def __init__(self, api_url=None, verify_ssl=True): """ Init class for /auth/v4 requests Args: api_url (str, optional): url api host. Defaults to None. If None will use from settings.api.INPUT_HOSTNAME:INPUT_PORT verify_ssl (bool, optional): Isn't recommended. Defaults to True. Use only for testing if you don't have root sign SSL cert. """ super().__init__(api_url, verify_ssl) self.API_VERSION = 'v4' self.API_DEFINITION = 'auth' def get_auth_token(self, email: str, password: str) -> dict: """ Get auth token for authentication Arg: email (str): user email password (str): user password Returns: response (dict): { "confirmed": true, "expires_at": "date time", "id": "int", "token": "str" } """ return self.call_api( url='/public/token', method='POST', data={'email': email, 'password': password} ) def get_auth_key(self, token: str, user, auth_type='Internal') -> dict: """ Key authentication Args: token (str): auth token user (str or int): user id or email auth_type (str, optional): May be Public for public auth. Defaults to 'Internal'. Returns: response (dict): { "id": "int", "key": "str" } """ headers = {} if auth_type == 'Public': headers = self.make_auth_header(token) return self.call_api( url=f'/user/{user}/key', method='POST', headers=headers ) def make_auth_header(self, token: str) -> dict: """ Generate dict for auth header Args: token (str): auth token Returns: dict: {'x-xsrf-token': token} use it for request headers """ return {'x-xsrf-token': token} def whoami(self, token: str) -> dict: return self.call_api( url='/whoami', method='GET', headers=self.make_auth_header(token) ) class BaseIpAPI(BaseAPI): def __init__(self, api_url=None, verify_ssl=True): super().__init__(api_url, verify_ssl) self.API_DEFINITION = 'ip' class BaseDnsProxyAPI(BaseAPI): def __init__(self, api_url=None, verify_ssl=True): super().__init__(api_url, verify_ssl) self.API_DEFINITION = 'dnsproxy'