Compare commits

...

2 Commits

Author SHA1 Message Date
f773898b71 Update: BaseAPI class now has error handler 2024-06-06 20:59:03 +09:00
cdf3a92527 Update: access funcs 2024-06-06 16:01:21 +09:00
4 changed files with 98 additions and 50 deletions

View File

@ -1,14 +1,18 @@
import sys import sys
import json import json
import click
import urllib import urllib
import requests import requests
from time import sleep
from mgrctl.settings.api import ( from mgrctl.settings.api import (
API_URL, API_URL,
API_HEADERS, API_HEADERS,
API_EMAIL, API_EMAIL,
API_PASSWORD, API_PASSWORD,
API_VERIFY_SSL API_VERIFY_SSL,
API_COUNT_TRY_CONNECTIONS
) )
@ -27,47 +31,66 @@ class BaseAPI(object):
return f'{self.API_URL}/{self.API_DEFINITION}/{self.API_VERSION}{url}' return f'{self.API_URL}/{self.API_DEFINITION}/{self.API_VERSION}{url}'
def call_api(self, url, method='GET', headers={}, data={}): def call_api(self, url, method='GET', headers={}, data={}):
# Open session attempt = API_COUNT_TRY_CONNECTIONS
with requests.Session() as session: while attempt:
try: try:
url = self._gen_request_url(url) uri = self._gen_request_url(url)
headers = self.API_HEADERS if not headers else headers headers = self.API_HEADERS if not headers else headers
params_str = urllib.parse.urlencode(data, safe="+'()") params_str = urllib.parse.urlencode(data, safe="+'()")
if method == 'POST': if method == 'POST':
api_request = session.post( api_request = requests.post(
url=url, url=uri,
json=data, json=data,
headers=headers, headers=headers,
verify=self.API_VERIFY_SSL verify=self.API_VERIFY_SSL
) )
if method == 'GET': if method == 'GET':
url = f'{url}?{params_str}' if params_str else url uri = f'{uri}?{params_str}' if params_str else uri
api_request = session.get( api_request = requests.get(
url=url, url=uri,
headers=headers, headers=headers,
verify=self.API_VERIFY_SSL verify=self.API_VERIFY_SSL
) )
except Exception as error: except Exception as error:
api_request = { click.echo(f'Error: {type(error).__name__}')
'result': False, sys.exit()
'error': type(error).__name__
}
return api_request
finally:
session.close()
# Get response # Get response:
response = self._parse_response(api_request)
# Validate response:
if self._error_handler(response):
attempt -= 1
sleep(2) # wait 2 second timeout
continue # new attempt connection
return response
def _parse_response(self, api_request):
try: try:
response = json.loads(api_request.text) response = json.loads(api_request.text)
if 'error' in response and response['error']:
print(response['error'])
raise sys.exit()
return response return response
except json.decoder.JSONDecodeError: except json.decoder.JSONDecodeError:
response = {'error': 'Can not parse response'} click.echo('Error: Invalid API response')
print(response)
raise sys.exit() raise sys.exit()
def _is_error(self, response):
if response.get('error'):
return True
def _is_error_3004(self, error):
if error.get('code') == 3004:
return True
def _error_handler(self, response):
if self._is_error(response):
error = response.get('error')
if self._is_error_3004(error):
return True
else:
click.echo(f'Error: API return {response}')
raise sys.exit()
class BaseAuthAPI(BaseAPI): class BaseAuthAPI(BaseAPI):
def __init__(self): def __init__(self):

View File

@ -38,16 +38,22 @@ def user():
) )
def ls(all, admins): def ls(all, admins):
if all: if all:
user_cursor.echo_users(role='all') users = user_cursor.get_users(role='all')
elif admins: elif admins:
user_cursor.echo_users(role='admin') users = user_cursor.get_users(role='admin')
else: else:
user_cursor.echo_users(role='all') users = user_cursor.get_users(role='all')
# print users:
user_cursor.echo_users(users)
@user.command(help='Generate access key and return auth link(s)') @user.command(
help='Generate an access key and return auth link(s)',
no_args_is_help=True
)
@click.option( @click.option(
'--id', '--id',
'_id',
required=False, required=False,
type=int, type=int,
help='User id' help='User id'
@ -62,7 +68,7 @@ def ls(all, admins):
'--random', '--random',
is_flag=True, is_flag=True,
required=False, required=False,
help='Interactive mode, ignores other keys' help='Generate access key for the first available admin'
) )
@click.option( @click.option(
'--interactive', '--interactive',
@ -70,24 +76,21 @@ def ls(all, admins):
required=False, required=False,
help='Interactive mode, ignores other keys' help='Interactive mode, ignores other keys'
) )
def access(id, count, interactive, random): def access(_id, count, interactive, random):
if id and not count: if _id and not count:
keys = user_cursor.get_access_keys(user=id, count=1) keys = user_cursor.get_access_keys(user=_id, count=1)
links = user_cursor.gen_access_links(keys) elif _id and count:
user_cursor.echo_access_links(links) keys = user_cursor.get_access_keys(user=_id, count=count)
elif id and count:
keys = user_cursor.get_access_keys(user=id, count=count)
links = user_cursor.gen_access_links(keys)
user_cursor.echo_access_links(links)
elif interactive:
pass
elif random: elif random:
admin = user_cursor.get_first_random_admin() admin = user_cursor.get_first_random_admin()
keys = user_cursor.get_access_keys(user=admin.get('id', 3), count=1) keys = user_cursor.get_access_keys(user=admin.get('id', 3))
links = user_cursor.gen_access_links(keys) elif interactive:
user_cursor.echo_access_links(links) user_cursor.gen_access_links_interactive()
return # exit from func
else: else:
pass pass
links = user_cursor.gen_access_links(keys)
user_cursor.echo_access_links(links)
@user.command(help='Generate API token for mgrctl user') @user.command(help='Generate API token for mgrctl user')

View File

@ -1,5 +1,6 @@
from requests.packages import urllib3 from requests.packages import urllib3
from mgrctl.settings.environment import env
from mgrctl.settings.platform import ( from mgrctl.settings.platform import (
PLATFORM_TYPE, PLATFORM_TYPE,
PLATFORM_VERIFY_SSL, PLATFORM_VERIFY_SSL,
@ -26,6 +27,9 @@ API_HEADERS = {"Internal-Auth": "on", "Accept": "application/json"}
# Alias for import: # Alias for import:
API_VERIFY_SSL = PLATFORM_VERIFY_SSL API_VERIFY_SSL = PLATFORM_VERIFY_SSL
# API 3004 Unavailable error handler:
API_COUNT_TRY_CONNECTIONS = env.int('API_COUNT_TRY_CONNECTIONS', 3)
# Suppress warning from urllib3: # Suppress warning from urllib3:
if not PLATFORM_VERIFY_SSL_WARNING: if not PLATFORM_VERIFY_SSL_WARNING:
# ! This is not recommended, # ! This is not recommended,

View File

@ -10,38 +10,43 @@ class UserAPI(object):
self.callback_class = callback_class self.callback_class = callback_class
self.callback = callback_class() self.callback = callback_class()
def get_users(self, role: str) -> dict: def get_users(self, role: str) -> list:
data = {} data = {}
if role == 'admin': if role == 'admin':
data = {"where": "((roles+CP+'%@admin%')+AND+(state+EQ+'active'))"} data = {"where": "((roles+CP+'%@admin%')+AND+(state+EQ+'active'))"}
return self.callback.call_api( response = self.callback.call_api(
url='/user', url='/user',
method='GET', method='GET',
data=data data=data
) )
users = self._extract_users(users=response)
return users
def _format_users(self, users: dict) -> list: def _extract_users(self, users: dict) -> list:
return users.get('list', [])
def _format_users(self, users: list) -> list:
output = [] output = []
for user in users.get('list', []): for user in users:
output.append({ output.append({
'id': user.get('id', ''), 'id': user.get('id', ''),
'email': user.get('email', ''), 'email': user.get('email', ''),
'roles': user.get('roles', []), 'roles': user.get('roles', []),
'state': user.get('state', '') 'state': user.get('state', ''),
# add more fields here...
}) })
return output return output
def get_first_random_admin(self): def get_first_random_admin(self):
users = self.get_users(role='admin') users = self.get_users(role='admin')
admin = {} admin = {}
for user in users.get('list', []): for user in users:
if '@admin' in admin.get('roles', []): if '@admin' in user.get('roles', []):
admin = user admin = user
break break
return admin return admin
def echo_users(self, role: str) -> None: def echo_users(self, users: list) -> None:
users = self.get_users(role)
output = self._format_users(users) output = self._format_users(users)
click.echo(tabulate(output, headers='keys')) click.echo(tabulate(output, headers='keys'))
@ -66,6 +71,19 @@ class UserAPI(object):
def echo_access_links(self, links: list) -> None: def echo_access_links(self, links: list) -> None:
click.echo(tabulate(links, headers='keys')) click.echo(tabulate(links, headers='keys'))
def gen_access_links_interactive(self) -> None:
users = self.get_users(role='admin')
self.echo_users(users)
try:
click.echo('Choose user id and count of keys')
_id = int(input('User ID: '))
count = int(input('Count of keys: '))
keys = self.get_access_keys(user=_id, count=count)
links = self.gen_access_links(keys)
self.echo_access_links(links)
except ValueError:
click.echo('Error: Invalid, value is not a valid integer')
def gen_api_token(self, email=None, password=None): def gen_api_token(self, email=None, password=None):
token = self.callback.get_auth_token(email, password) token = self.callback.get_auth_token(email, password)
return token return token