Compare commits

...

2 Commits

Author SHA1 Message Date
f773898b71 Update: BaseAPI class now has error handler 2024-06-06 20:59:03 +09:00
cdf3a92527 Update: access funcs 2024-06-06 16:01:21 +09:00
4 changed files with 98 additions and 50 deletions

View File

@ -1,14 +1,18 @@
import sys
import json
import click
import urllib
import requests
from time import sleep
from mgrctl.settings.api import (
API_URL,
API_HEADERS,
API_EMAIL,
API_PASSWORD,
API_VERIFY_SSL
API_VERIFY_SSL,
API_COUNT_TRY_CONNECTIONS
)
@ -27,45 +31,64 @@ class BaseAPI(object):
return f'{self.API_URL}/{self.API_DEFINITION}/{self.API_VERSION}{url}'
def call_api(self, url, method='GET', headers={}, data={}):
# Open session
with requests.Session() as session:
attempt = API_COUNT_TRY_CONNECTIONS
while attempt:
try:
url = self._gen_request_url(url)
uri = self._gen_request_url(url)
headers = self.API_HEADERS if not headers else headers
params_str = urllib.parse.urlencode(data, safe="+'()")
if method == 'POST':
api_request = session.post(
url=url,
api_request = requests.post(
url=uri,
json=data,
headers=headers,
verify=self.API_VERIFY_SSL
)
if method == 'GET':
url = f'{url}?{params_str}' if params_str else url
api_request = session.get(
url=url,
uri = f'{uri}?{params_str}' if params_str else uri
api_request = requests.get(
url=uri,
headers=headers,
verify=self.API_VERIFY_SSL
)
except Exception as error:
api_request = {
'result': False,
'error': type(error).__name__
}
return api_request
finally:
session.close()
click.echo(f'Error: {type(error).__name__}')
sys.exit()
# Get response
# Get response:
response = self._parse_response(api_request)
# Validate response:
if self._error_handler(response):
attempt -= 1
sleep(2) # wait 2 second timeout
continue # new attempt connection
return response
def _parse_response(self, api_request):
try:
response = json.loads(api_request.text)
if 'error' in response and response['error']:
print(response['error'])
raise sys.exit()
return response
except json.decoder.JSONDecodeError:
response = {'error': 'Can not parse response'}
print(response)
click.echo('Error: Invalid API response')
raise sys.exit()
def _is_error(self, response):
if response.get('error'):
return True
def _is_error_3004(self, error):
if error.get('code') == 3004:
return True
def _error_handler(self, response):
if self._is_error(response):
error = response.get('error')
if self._is_error_3004(error):
return True
else:
click.echo(f'Error: API return {response}')
raise sys.exit()

View File

@ -38,16 +38,22 @@ def user():
)
def ls(all, admins):
if all:
user_cursor.echo_users(role='all')
users = user_cursor.get_users(role='all')
elif admins:
user_cursor.echo_users(role='admin')
users = user_cursor.get_users(role='admin')
else:
user_cursor.echo_users(role='all')
users = user_cursor.get_users(role='all')
# print users:
user_cursor.echo_users(users)
@user.command(help='Generate access key and return auth link(s)')
@user.command(
help='Generate an access key and return auth link(s)',
no_args_is_help=True
)
@click.option(
'--id',
'_id',
required=False,
type=int,
help='User id'
@ -62,7 +68,7 @@ def ls(all, admins):
'--random',
is_flag=True,
required=False,
help='Interactive mode, ignores other keys'
help='Generate access key for the first available admin'
)
@click.option(
'--interactive',
@ -70,24 +76,21 @@ def ls(all, admins):
required=False,
help='Interactive mode, ignores other keys'
)
def access(id, count, interactive, random):
if id and not count:
keys = user_cursor.get_access_keys(user=id, count=1)
links = user_cursor.gen_access_links(keys)
user_cursor.echo_access_links(links)
elif id and count:
keys = user_cursor.get_access_keys(user=id, count=count)
links = user_cursor.gen_access_links(keys)
user_cursor.echo_access_links(links)
elif interactive:
pass
def access(_id, count, interactive, random):
if _id and not count:
keys = user_cursor.get_access_keys(user=_id, count=1)
elif _id and count:
keys = user_cursor.get_access_keys(user=_id, count=count)
elif random:
admin = user_cursor.get_first_random_admin()
keys = user_cursor.get_access_keys(user=admin.get('id', 3), count=1)
links = user_cursor.gen_access_links(keys)
user_cursor.echo_access_links(links)
keys = user_cursor.get_access_keys(user=admin.get('id', 3))
elif interactive:
user_cursor.gen_access_links_interactive()
return # exit from func
else:
pass
links = user_cursor.gen_access_links(keys)
user_cursor.echo_access_links(links)
@user.command(help='Generate API token for mgrctl user')

View File

@ -1,5 +1,6 @@
from requests.packages import urllib3
from mgrctl.settings.environment import env
from mgrctl.settings.platform import (
PLATFORM_TYPE,
PLATFORM_VERIFY_SSL,
@ -26,6 +27,9 @@ API_HEADERS = {"Internal-Auth": "on", "Accept": "application/json"}
# Alias for import:
API_VERIFY_SSL = PLATFORM_VERIFY_SSL
# API 3004 Unavailable error handler:
API_COUNT_TRY_CONNECTIONS = env.int('API_COUNT_TRY_CONNECTIONS', 3)
# Suppress warning from urllib3:
if not PLATFORM_VERIFY_SSL_WARNING:
# ! This is not recommended,

View File

@ -10,38 +10,43 @@ class UserAPI(object):
self.callback_class = callback_class
self.callback = callback_class()
def get_users(self, role: str) -> dict:
def get_users(self, role: str) -> list:
data = {}
if role == 'admin':
data = {"where": "((roles+CP+'%@admin%')+AND+(state+EQ+'active'))"}
return self.callback.call_api(
response = self.callback.call_api(
url='/user',
method='GET',
data=data
)
users = self._extract_users(users=response)
return users
def _format_users(self, users: dict) -> list:
def _extract_users(self, users: dict) -> list:
return users.get('list', [])
def _format_users(self, users: list) -> list:
output = []
for user in users.get('list', []):
for user in users:
output.append({
'id': user.get('id', ''),
'email': user.get('email', ''),
'roles': user.get('roles', []),
'state': user.get('state', '')
'state': user.get('state', ''),
# add more fields here...
})
return output
def get_first_random_admin(self):
users = self.get_users(role='admin')
admin = {}
for user in users.get('list', []):
if '@admin' in admin.get('roles', []):
for user in users:
if '@admin' in user.get('roles', []):
admin = user
break
return admin
def echo_users(self, role: str) -> None:
users = self.get_users(role)
def echo_users(self, users: list) -> None:
output = self._format_users(users)
click.echo(tabulate(output, headers='keys'))
@ -66,6 +71,19 @@ class UserAPI(object):
def echo_access_links(self, links: list) -> None:
click.echo(tabulate(links, headers='keys'))
def gen_access_links_interactive(self) -> None:
users = self.get_users(role='admin')
self.echo_users(users)
try:
click.echo('Choose user id and count of keys')
_id = int(input('User ID: '))
count = int(input('Count of keys: '))
keys = self.get_access_keys(user=_id, count=count)
links = self.gen_access_links(keys)
self.echo_access_links(links)
except ValueError:
click.echo('Error: Invalid, value is not a valid integer')
def gen_api_token(self, email=None, password=None):
token = self.callback.get_auth_token(email, password)
return token