Create: api handlers
This commit is contained in:
parent
f9a213fe3e
commit
1d72b8d015
0
isp_maintenance/core/api/__init__.py
Normal file
0
isp_maintenance/core/api/__init__.py
Normal file
157
isp_maintenance/core/api/base.py
Normal file
157
isp_maintenance/core/api/base.py
Normal file
@ -0,0 +1,157 @@
|
||||
import sys
|
||||
import json
|
||||
import urllib
|
||||
import requests
|
||||
|
||||
from settings.api import INPUT_HOSTNAME, INPUT_PORT, HEADERS
|
||||
|
||||
|
||||
class BaseAPI(object):
|
||||
def __init__(self, api_url=None, verify_ssl=True):
|
||||
"""Announces required parameters"""
|
||||
internal_api_url = f'http://{INPUT_HOSTNAME}:{INPUT_PORT}'
|
||||
self.API_URL = api_url if api_url else internal_api_url
|
||||
self.API_VERSION = 'v3'
|
||||
self.API_DEFINITION = 'api'
|
||||
self.HEADERS = HEADERS
|
||||
self.VERIFY_SSL = verify_ssl
|
||||
|
||||
def _gen_request_url(self, url):
|
||||
return f'{self.API_URL}/{self.API_DEFINITION}/{self.API_VERSION}{url}'
|
||||
|
||||
def call_api(self, url, method='GET', headers={}, data={}):
|
||||
# Open session
|
||||
with requests.Session() as session:
|
||||
try:
|
||||
url = self._gen_request_url(url)
|
||||
headers = self.HEADERS if not headers else headers
|
||||
params_str = urllib.parse.urlencode(data, safe="+'()")
|
||||
if method == 'POST':
|
||||
api_request = session.post(
|
||||
url=url,
|
||||
json=data,
|
||||
headers=headers,
|
||||
verify=self.VERIFY_SSL
|
||||
)
|
||||
if method == 'GET':
|
||||
url = f'{url}?{params_str}' if params_str else url
|
||||
api_request = session.get(
|
||||
url=url,
|
||||
headers=headers,
|
||||
verify=self.VERIFY_SSL
|
||||
)
|
||||
except Exception as error:
|
||||
api_request = {
|
||||
'result': False,
|
||||
'error': type(error).__name__
|
||||
}
|
||||
return api_request
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
# Get response
|
||||
try:
|
||||
response = json.loads(api_request.text)
|
||||
if 'error' in response and response['error']:
|
||||
print(response['error'])
|
||||
raise sys.exit()
|
||||
return response
|
||||
except json.decoder.JSONDecodeError:
|
||||
response = {'error': 'Can not parse response'}
|
||||
print(response)
|
||||
raise sys.exit()
|
||||
|
||||
|
||||
class BaseAuthAPI(BaseAPI):
|
||||
def __init__(self, api_url=None, verify_ssl=True):
|
||||
"""
|
||||
Init class for /auth/v4 requests
|
||||
|
||||
Args:
|
||||
api_url (str, optional): url api host. Defaults to None.
|
||||
If None will use from settings.api.INPUT_HOSTNAME:INPUT_PORT
|
||||
|
||||
verify_ssl (bool, optional): Isn't recommended. Defaults to True.
|
||||
Use only for testing if you don't have root sign SSL cert.
|
||||
"""
|
||||
super().__init__(api_url, verify_ssl)
|
||||
self.API_VERSION = 'v4'
|
||||
self.API_DEFINITION = 'auth'
|
||||
|
||||
def get_auth_token(self, email: str, password: str) -> dict:
|
||||
"""
|
||||
Get auth token for authentication
|
||||
|
||||
Arg:
|
||||
email (str): user email
|
||||
password (str): user password
|
||||
|
||||
Returns:
|
||||
response (dict): {
|
||||
"confirmed": true,
|
||||
"expires_at": "date time",
|
||||
"id": "int",
|
||||
"token": "str"
|
||||
}
|
||||
"""
|
||||
return self.call_api(
|
||||
url='/public/token',
|
||||
method='POST',
|
||||
data={'email': email, 'password': password}
|
||||
)
|
||||
|
||||
def get_auth_key(self, token: str, user, auth_type='Internal') -> dict:
|
||||
"""
|
||||
Key authentication
|
||||
|
||||
Args:
|
||||
token (str): auth token
|
||||
user (str or int): user id or email
|
||||
auth_type (str, optional): May be Public for public auth.
|
||||
Defaults to 'Internal'.
|
||||
|
||||
Returns:
|
||||
response (dict): {
|
||||
"id": "int",
|
||||
"key": "str"
|
||||
}
|
||||
"""
|
||||
headers = {}
|
||||
if auth_type == 'Public':
|
||||
headers = self.make_auth_header(token)
|
||||
return self.call_api(
|
||||
url=f'/user/{user}/key',
|
||||
method='POST',
|
||||
headers=headers
|
||||
)
|
||||
|
||||
def make_auth_header(self, token: str) -> dict:
|
||||
"""
|
||||
Generate dict for auth header
|
||||
|
||||
Args:
|
||||
token (str): auth token
|
||||
|
||||
Returns:
|
||||
dict: {'x-xsrf-token': token} use it for request headers
|
||||
"""
|
||||
return {'x-xsrf-token': token}
|
||||
|
||||
def whoami(self, token: str) -> dict:
|
||||
return self.call_api(
|
||||
url='/whoami',
|
||||
method='GET',
|
||||
headers=self.make_auth_header(token)
|
||||
)
|
||||
|
||||
|
||||
class BaseIpAPI(BaseAPI):
|
||||
def __init__(self, api_url=None, verify_ssl=True):
|
||||
super().__init__(api_url, verify_ssl)
|
||||
self.API_DEFINITION = 'ip'
|
||||
|
||||
|
||||
class BaseDnsProxyAPI(BaseAPI):
|
||||
def __init__(self, api_url=None, verify_ssl=True):
|
||||
super().__init__(api_url, verify_ssl)
|
||||
self.API_DEFINITION = 'dnsproxy'
|
51
isp_maintenance/core/api/dci6.py
Normal file
51
isp_maintenance/core/api/dci6.py
Normal file
@ -0,0 +1,51 @@
|
||||
from core.api.base import BaseAPI, BaseAuthAPI, BaseDnsProxyAPI, BaseIpAPI
|
||||
|
||||
|
||||
class AuthAPI(BaseAuthAPI):
|
||||
pass
|
||||
|
||||
|
||||
class BackupAPI(BaseAPI):
|
||||
def __init__(self, api_url=None, verify_ssl=True):
|
||||
super().__init__(api_url, verify_ssl)
|
||||
self.API_VERSION = 'v4'
|
||||
self.API_DEFINITION = 'backup'
|
||||
|
||||
|
||||
class DnsProxyAPI(BaseDnsProxyAPI):
|
||||
pass
|
||||
|
||||
|
||||
class EserviceAPI(BaseAPI):
|
||||
def __init__(self, api_url=None, verify_ssl=True):
|
||||
super().__init__(api_url, verify_ssl)
|
||||
self.API_DEFINITION = 'eservice'
|
||||
|
||||
|
||||
class IsoAPI(BaseAPI):
|
||||
def __init__(self, api_url=None, verify_ssl=True):
|
||||
super().__init__(api_url, verify_ssl)
|
||||
self.API_DEFINITION = 'iso'
|
||||
|
||||
|
||||
class IpmiAPI(BaseAPI):
|
||||
def __init__(self, api_url=None, verify_ssl=True):
|
||||
super().__init__(api_url, verify_ssl)
|
||||
self.API_DEFINITION = 'ipmiproxy'
|
||||
|
||||
|
||||
class IpAPI(BaseIpAPI):
|
||||
pass
|
||||
|
||||
|
||||
class ReportAPI(BaseAPI):
|
||||
def __init__(self, api_url=None, verify_ssl=True):
|
||||
super().__init__(api_url, verify_ssl)
|
||||
self.API_VERSION = 'v4'
|
||||
self.API_DEFINITION = 'report'
|
||||
|
||||
|
||||
class UpdaterAPI(BaseAPI):
|
||||
def __init__(self, api_url=None, verify_ssl=True):
|
||||
super().__init__(api_url, verify_ssl)
|
||||
self.API_DEFINITION = 'updater'
|
19
isp_maintenance/core/api/vm6.py
Normal file
19
isp_maintenance/core/api/vm6.py
Normal file
@ -0,0 +1,19 @@
|
||||
from core.api.base import BaseAPI, BaseAuthAPI, BaseDnsProxyAPI, BaseIpAPI
|
||||
|
||||
|
||||
class AuthAPI(BaseAuthAPI):
|
||||
pass
|
||||
|
||||
|
||||
class DnsProxyAPI(BaseDnsProxyAPI):
|
||||
pass
|
||||
|
||||
|
||||
class IpAPI(BaseIpAPI):
|
||||
pass
|
||||
|
||||
|
||||
class VmAPI(BaseAPI):
|
||||
def __init__(self, api_url=None, verify_ssl=True):
|
||||
super().__init__(api_url, verify_ssl)
|
||||
self.API_DEFINITION = 'vm'
|
10
isp_maintenance/settings/api.py
Normal file
10
isp_maintenance/settings/api.py
Normal file
@ -0,0 +1,10 @@
|
||||
from settings.platform import PLATFORM_TYPE
|
||||
|
||||
# Name of nginx container:
|
||||
INPUT_HOSTNAME = 'input' if PLATFORM_TYPE == 'vm' else 'dci_input_1'
|
||||
|
||||
# Port that nginx container is listening:
|
||||
INPUT_PORT = '1500'
|
||||
|
||||
# Headers for internal auth:
|
||||
HEADERS = {"Internal-Auth": "on", "Accept": "application/json"}
|
Loading…
Reference in New Issue
Block a user