isp-maintenance/mgrctl/api/base.py

141 lines
4.0 KiB
Python

import sys
import json
import click
import urllib
import requests
from time import sleep
from mgrctl.settings.api import (
API_URL,
API_HEADERS,
API_EMAIL,
API_PASSWORD,
API_VERIFY_SSL,
API_COUNT_TRY_CONNECTIONS
)
class BaseAPI(object):
def __init__(self):
"""Announces required parameters"""
self.API_URL = API_URL
self.API_HEADERS = API_HEADERS
self.API_EMAIL = API_EMAIL
self.API_PASSWORD = API_PASSWORD
self.API_VERIFY_SSL = API_VERIFY_SSL
self.API_VERSION = 'v3'
self.API_DEFINITION = 'api'
def _gen_request_url(self, url):
return f'{self.API_URL}/{self.API_DEFINITION}/{self.API_VERSION}{url}'
def call_api(self, url, method='GET', headers={}, data={}):
attempt = API_COUNT_TRY_CONNECTIONS
while attempt:
try:
uri = self._gen_request_url(url)
headers = self.API_HEADERS if not headers else headers
params_str = urllib.parse.urlencode(data, safe="+'()")
if method == 'POST':
api_request = requests.post(
url=uri,
json=data,
headers=headers,
verify=self.API_VERIFY_SSL
)
if method == 'GET':
uri = f'{uri}?{params_str}' if params_str else uri
api_request = requests.get(
url=uri,
headers=headers,
verify=self.API_VERIFY_SSL
)
except Exception as error:
click.echo(f'Error: {type(error).__name__}')
sys.exit()
# Get response:
response = self._parse_response(api_request)
# Validate response:
if self._error_handler(response):
attempt -= 1
sleep(2) # wait 2 second timeout
continue # new attempt connection
return response
def _parse_response(self, api_request):
try:
response = json.loads(api_request.text)
return response
except json.decoder.JSONDecodeError:
click.echo('Error: Invalid API response')
raise sys.exit()
def _is_error(self, response):
if response.get('error'):
return True
def _is_error_3004(self, error):
if error.get('code') == 3004:
return True
def _error_handler(self, response):
if self._is_error(response):
error = response.get('error')
if self._is_error_3004(error):
return True
else:
click.echo(f'Error: API return {response}')
raise sys.exit()
class BaseAuthAPI(BaseAPI):
def __init__(self):
super().__init__()
self.API_VERSION = 'v4'
self.API_DEFINITION = 'auth'
def get_auth_token(self, email=None, password=None) -> dict:
email = self.API_EMAIL if not email else email
password = self.API_PASSWORD if not password else password
return self.call_api(
url='/public/token',
method='POST',
data={'email': email, 'password': password}
)
def get_auth_key(self, token=None, user=None) -> dict:
headers = {}
user = self.API_EMAIL if not user else user
if token:
headers = self.make_auth_header(token)
return self.call_api(
url=f'/user/{user}/key',
method='POST',
headers=headers
)
def make_auth_header(self, token: str) -> dict:
return {'x-xsrf-token': token}
def whoami(self) -> dict:
return self.call_api(
url='/whoami',
method='GET'
)
class BaseIpAPI(BaseAPI):
def __init__(self):
super().__init__()
self.API_DEFINITION = 'ip'
class BaseDnsProxyAPI(BaseAPI):
def __init__(self):
super().__init__()
self.API_DEFINITION = 'dnsproxy'