154 lines
4.7 KiB
Python
154 lines
4.7 KiB
Python
import sys
|
|
import json
|
|
import click
|
|
import urllib
|
|
import requests
|
|
|
|
from time import sleep
|
|
|
|
from mgrctl.settings.api import (
|
|
API_INPUT_PORT,
|
|
API_URL,
|
|
API_HEADERS,
|
|
API_EMAIL,
|
|
API_PASSWORD,
|
|
API_VERIFY_SSL,
|
|
API_COUNT_TRY_CONNECTIONS
|
|
)
|
|
from mgrctl.settings.platform import PLATFORM_TYPE
|
|
|
|
|
|
class BaseAPI(object):
|
|
def __init__(self):
|
|
"""Announces required parameters"""
|
|
self.API_URL = API_URL
|
|
self.API_HEADERS = API_HEADERS
|
|
self.API_EMAIL = API_EMAIL
|
|
self.API_PASSWORD = API_PASSWORD
|
|
self.API_VERIFY_SSL = API_VERIFY_SSL
|
|
self.API_VERSION = 'v3'
|
|
self.API_DEFINITION = 'api'
|
|
|
|
def _gen_request_url(self, url):
|
|
return f'{self.API_URL}/{self.API_DEFINITION}/{self.API_VERSION}{url}'
|
|
|
|
def call_api(self, url, method='GET', headers={}, data={}):
|
|
attempt = API_COUNT_TRY_CONNECTIONS
|
|
while attempt:
|
|
attempt -= 1
|
|
try:
|
|
uri = self._gen_request_url(url)
|
|
headers = self.API_HEADERS if not headers else headers
|
|
params_str = urllib.parse.urlencode(data, safe="+'()")
|
|
if method == 'POST':
|
|
api_request = requests.post(
|
|
url=uri,
|
|
json=data,
|
|
headers=headers,
|
|
verify=self.API_VERIFY_SSL
|
|
)
|
|
if method == 'GET':
|
|
uri = f'{uri}?{params_str}' if params_str else uri
|
|
api_request = requests.get(
|
|
url=uri,
|
|
headers=headers,
|
|
verify=self.API_VERIFY_SSL
|
|
)
|
|
except Exception as error:
|
|
ConnectionError = requests.exceptions.ConnectionError
|
|
if type(error) is ConnectionError and PLATFORM_TYPE == 'dci':
|
|
# ? workaround if new docker version use dashes
|
|
# TODO: ISPsystem developers must set container_name !!!
|
|
self.API_URL = f'http://dci-input-1:{API_INPUT_PORT}'
|
|
if attempt == 0:
|
|
click.echo(f'Error: {type(error).__name__}')
|
|
sys.exit()
|
|
else:
|
|
continue
|
|
else:
|
|
click.echo(f'Error: {type(error).__name__}')
|
|
sys.exit()
|
|
|
|
# Get response:
|
|
response = self._parse_response(api_request)
|
|
|
|
# Validate response:
|
|
if self._error_handler(response):
|
|
continue # new attempt connection
|
|
|
|
return response
|
|
|
|
def _parse_response(self, api_request):
|
|
try:
|
|
response = json.loads(api_request.text)
|
|
return response
|
|
except json.decoder.JSONDecodeError:
|
|
click.echo('Error: Invalid API response')
|
|
raise sys.exit()
|
|
|
|
def _is_error(self, response):
|
|
if response.get('error'):
|
|
return True
|
|
|
|
def _is_error_3004(self, error):
|
|
if error.get('code') == 3004:
|
|
sleep(2) # wait 2 second timeout
|
|
return True
|
|
|
|
def _error_handler(self, response):
|
|
if self._is_error(response):
|
|
error = response.get('error')
|
|
if self._is_error_3004(error):
|
|
return True
|
|
else:
|
|
click.echo(f'Error: API return {response}')
|
|
raise sys.exit()
|
|
|
|
|
|
class BaseAuthAPI(BaseAPI):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.API_VERSION = 'v4'
|
|
self.API_DEFINITION = 'auth'
|
|
|
|
def get_auth_token(self, email=None, password=None) -> dict:
|
|
email = self.API_EMAIL if not email else email
|
|
password = self.API_PASSWORD if not password else password
|
|
return self.call_api(
|
|
url='/public/token',
|
|
method='POST',
|
|
data={'email': email, 'password': password}
|
|
)
|
|
|
|
def get_auth_key(self, token=None, user=None) -> dict:
|
|
headers = {}
|
|
user = self.API_EMAIL if not user else user
|
|
if token:
|
|
headers = self.make_auth_header(token)
|
|
return self.call_api(
|
|
url=f'/user/{user}/key',
|
|
method='POST',
|
|
headers=headers
|
|
)
|
|
|
|
def make_auth_header(self, token: str) -> dict:
|
|
return {'x-xsrf-token': token}
|
|
|
|
def whoami(self) -> dict:
|
|
return self.call_api(
|
|
url='/whoami',
|
|
method='GET'
|
|
)
|
|
|
|
|
|
class BaseIpAPI(BaseAPI):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.API_DEFINITION = 'ip'
|
|
|
|
|
|
class BaseDnsProxyAPI(BaseAPI):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.API_DEFINITION = 'dnsproxy'
|