forked from ISPsystem/isp-maintenance
Compare commits
No commits in common. "d4de07c34056ba759b6f9b6925260ca1dd65b50a" and "8ad5e2a230e49fdc238ae0a25427eecfb49839f8" have entirely different histories.
d4de07c340
...
8ad5e2a230
@ -1,18 +1,14 @@
|
|||||||
import sys
|
import sys
|
||||||
import json
|
import json
|
||||||
import click
|
|
||||||
import urllib
|
import urllib
|
||||||
import requests
|
import requests
|
||||||
|
|
||||||
from time import sleep
|
|
||||||
|
|
||||||
from mgrctl.settings.api import (
|
from mgrctl.settings.api import (
|
||||||
API_URL,
|
API_URL,
|
||||||
API_HEADERS,
|
API_HEADERS,
|
||||||
API_EMAIL,
|
API_EMAIL,
|
||||||
API_PASSWORD,
|
API_PASSWORD,
|
||||||
API_VERIFY_SSL,
|
API_VERIFY_SSL
|
||||||
API_COUNT_TRY_CONNECTIONS
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -31,66 +27,47 @@ class BaseAPI(object):
|
|||||||
return f'{self.API_URL}/{self.API_DEFINITION}/{self.API_VERSION}{url}'
|
return f'{self.API_URL}/{self.API_DEFINITION}/{self.API_VERSION}{url}'
|
||||||
|
|
||||||
def call_api(self, url, method='GET', headers={}, data={}):
|
def call_api(self, url, method='GET', headers={}, data={}):
|
||||||
attempt = API_COUNT_TRY_CONNECTIONS
|
# Open session
|
||||||
while attempt:
|
with requests.Session() as session:
|
||||||
try:
|
try:
|
||||||
uri = self._gen_request_url(url)
|
url = self._gen_request_url(url)
|
||||||
headers = self.API_HEADERS if not headers else headers
|
headers = self.API_HEADERS if not headers else headers
|
||||||
params_str = urllib.parse.urlencode(data, safe="+'()")
|
params_str = urllib.parse.urlencode(data, safe="+'()")
|
||||||
if method == 'POST':
|
if method == 'POST':
|
||||||
api_request = requests.post(
|
api_request = session.post(
|
||||||
url=uri,
|
url=url,
|
||||||
json=data,
|
json=data,
|
||||||
headers=headers,
|
headers=headers,
|
||||||
verify=self.API_VERIFY_SSL
|
verify=self.API_VERIFY_SSL
|
||||||
)
|
)
|
||||||
if method == 'GET':
|
if method == 'GET':
|
||||||
uri = f'{uri}?{params_str}' if params_str else uri
|
url = f'{url}?{params_str}' if params_str else url
|
||||||
api_request = requests.get(
|
api_request = session.get(
|
||||||
url=uri,
|
url=url,
|
||||||
headers=headers,
|
headers=headers,
|
||||||
verify=self.API_VERIFY_SSL
|
verify=self.API_VERIFY_SSL
|
||||||
)
|
)
|
||||||
except Exception as error:
|
except Exception as error:
|
||||||
click.echo(f'Error: {type(error).__name__}')
|
api_request = {
|
||||||
sys.exit()
|
'result': False,
|
||||||
|
'error': type(error).__name__
|
||||||
|
}
|
||||||
|
return api_request
|
||||||
|
finally:
|
||||||
|
session.close()
|
||||||
|
|
||||||
# Get response:
|
# Get response
|
||||||
response = self._parse_response(api_request)
|
|
||||||
|
|
||||||
# Validate response:
|
|
||||||
if self._error_handler(response):
|
|
||||||
attempt -= 1
|
|
||||||
sleep(2) # wait 2 second timeout
|
|
||||||
continue # new attempt connection
|
|
||||||
|
|
||||||
return response
|
|
||||||
|
|
||||||
def _parse_response(self, api_request):
|
|
||||||
try:
|
try:
|
||||||
response = json.loads(api_request.text)
|
response = json.loads(api_request.text)
|
||||||
|
if 'error' in response and response['error']:
|
||||||
|
print(response['error'])
|
||||||
|
raise sys.exit()
|
||||||
return response
|
return response
|
||||||
except json.decoder.JSONDecodeError:
|
except json.decoder.JSONDecodeError:
|
||||||
click.echo('Error: Invalid API response')
|
response = {'error': 'Can not parse response'}
|
||||||
|
print(response)
|
||||||
raise sys.exit()
|
raise sys.exit()
|
||||||
|
|
||||||
def _is_error(self, response):
|
|
||||||
if response.get('error'):
|
|
||||||
return True
|
|
||||||
|
|
||||||
def _is_error_3004(self, error):
|
|
||||||
if error.get('code') == 3004:
|
|
||||||
return True
|
|
||||||
|
|
||||||
def _error_handler(self, response):
|
|
||||||
if self._is_error(response):
|
|
||||||
error = response.get('error')
|
|
||||||
if self._is_error_3004(error):
|
|
||||||
return True
|
|
||||||
else:
|
|
||||||
click.echo(f'Error: API return {response}')
|
|
||||||
raise sys.exit()
|
|
||||||
|
|
||||||
|
|
||||||
class BaseAuthAPI(BaseAPI):
|
class BaseAuthAPI(BaseAPI):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
@ -1,10 +0,0 @@
|
|||||||
# █▀▄▀█ █▀▀ ▀█▀ ▄▀█ ▀
|
|
||||||
# █░▀░█ ██▄ ░█░ █▀█ ▄
|
|
||||||
# -- -- -- -- -- -- -
|
|
||||||
__author__ = "MOIS3Y"
|
|
||||||
__credits__ = ["Stepan Zhukovsky"]
|
|
||||||
__license__ = "MIT"
|
|
||||||
__version__ = "0.1.0"
|
|
||||||
__maintainer__ = "Stepan Zhukovsky"
|
|
||||||
__email__ = "stepan@zhukovsky.me"
|
|
||||||
__status__ = "Development"
|
|
@ -1,99 +1,12 @@
|
|||||||
import click
|
import click
|
||||||
|
|
||||||
from mgrctl.apps.dci6.auth import __version__
|
|
||||||
from mgrctl.api.dci6 import AuthAPI
|
@click.group(help='access command for lazy example')
|
||||||
from mgrctl.utils.api_users import UserAPI
|
@click.option('--debug/--no-debug', default=False)
|
||||||
|
def cli(debug):
|
||||||
|
click.echo(f"Debug mode is {'on' if debug else 'off'}")
|
||||||
|
|
||||||
|
|
||||||
user_cursor = UserAPI(callback_class=AuthAPI)
|
@cli.command()
|
||||||
|
def enable():
|
||||||
|
click.echo('Access granted')
|
||||||
@click.group(help='auth cmd for auth in DCImanager 6')
|
|
||||||
@click.version_option(
|
|
||||||
version=__version__,
|
|
||||||
package_name='mgrctl.apps.dci6.auth',
|
|
||||||
message=__version__
|
|
||||||
)
|
|
||||||
def cli():
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
@cli.group(help='Manage users')
|
|
||||||
def user():
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
@user.command(help='List users')
|
|
||||||
@click.option(
|
|
||||||
'--all',
|
|
||||||
is_flag=True,
|
|
||||||
required=False,
|
|
||||||
help='Show all users'
|
|
||||||
)
|
|
||||||
@click.option(
|
|
||||||
'--admins',
|
|
||||||
is_flag=True,
|
|
||||||
required=False,
|
|
||||||
help='Show all active admins',
|
|
||||||
)
|
|
||||||
def ls(all, admins):
|
|
||||||
if all:
|
|
||||||
users = user_cursor.get_users(role='all')
|
|
||||||
elif admins:
|
|
||||||
users = user_cursor.get_users(role='admin')
|
|
||||||
else:
|
|
||||||
users = user_cursor.get_users(role='all')
|
|
||||||
# print users:
|
|
||||||
user_cursor.echo_users(users)
|
|
||||||
|
|
||||||
|
|
||||||
@user.command(
|
|
||||||
help='Generate an access key and return auth link(s)',
|
|
||||||
no_args_is_help=True
|
|
||||||
)
|
|
||||||
@click.option(
|
|
||||||
'--id',
|
|
||||||
'_id',
|
|
||||||
required=False,
|
|
||||||
type=int,
|
|
||||||
help='User id'
|
|
||||||
)
|
|
||||||
@click.option(
|
|
||||||
'--count',
|
|
||||||
required=False,
|
|
||||||
type=int,
|
|
||||||
help='Number of access keys generated',
|
|
||||||
)
|
|
||||||
@click.option(
|
|
||||||
'--random',
|
|
||||||
is_flag=True,
|
|
||||||
required=False,
|
|
||||||
help='Generate access key for the first available admin'
|
|
||||||
)
|
|
||||||
@click.option(
|
|
||||||
'--interactive',
|
|
||||||
is_flag=True,
|
|
||||||
required=False,
|
|
||||||
help='Interactive mode, ignores other keys'
|
|
||||||
)
|
|
||||||
def access(_id, count, interactive, random):
|
|
||||||
if _id and not count:
|
|
||||||
keys = user_cursor.get_access_keys(user=_id, count=1)
|
|
||||||
elif _id and count:
|
|
||||||
keys = user_cursor.get_access_keys(user=_id, count=count)
|
|
||||||
elif random:
|
|
||||||
admin = user_cursor.get_first_random_admin()
|
|
||||||
keys = user_cursor.get_access_keys(user=admin.get('id', 3))
|
|
||||||
elif interactive:
|
|
||||||
user_cursor.gen_access_links_interactive()
|
|
||||||
return # exit from func
|
|
||||||
else:
|
|
||||||
pass
|
|
||||||
links = user_cursor.gen_access_links(keys)
|
|
||||||
user_cursor.echo_access_links(links)
|
|
||||||
|
|
||||||
|
|
||||||
@user.command(help='Generate API token for mgrctl user')
|
|
||||||
def token():
|
|
||||||
token = user_cursor.gen_api_token()
|
|
||||||
user_cursor.echo_api_token(token)
|
|
||||||
|
@ -7,7 +7,7 @@ from mgrctl.settings.general import INSTALLED_APPS
|
|||||||
@click.group(
|
@click.group(
|
||||||
cls=LazyGroup,
|
cls=LazyGroup,
|
||||||
lazy_subcommands=INSTALLED_APPS['dci6'],
|
lazy_subcommands=INSTALLED_APPS['dci6'],
|
||||||
help='dci6 command for DCI6manager management',
|
help='dci6 command for lazy example',
|
||||||
)
|
)
|
||||||
def cli():
|
def cli():
|
||||||
pass
|
pass
|
||||||
|
@ -38,22 +38,16 @@ def user():
|
|||||||
)
|
)
|
||||||
def ls(all, admins):
|
def ls(all, admins):
|
||||||
if all:
|
if all:
|
||||||
users = user_cursor.get_users(role='all')
|
user_cursor.echo_users(role='all')
|
||||||
elif admins:
|
elif admins:
|
||||||
users = user_cursor.get_users(role='admin')
|
user_cursor.echo_users(role='admin')
|
||||||
else:
|
else:
|
||||||
users = user_cursor.get_users(role='all')
|
user_cursor.echo_users(role='all')
|
||||||
# print users:
|
|
||||||
user_cursor.echo_users(users)
|
|
||||||
|
|
||||||
|
|
||||||
@user.command(
|
@user.command(help='Generate access key and return auth link(s)')
|
||||||
help='Generate an access key and return auth link(s)',
|
|
||||||
no_args_is_help=True
|
|
||||||
)
|
|
||||||
@click.option(
|
@click.option(
|
||||||
'--id',
|
'--id',
|
||||||
'_id',
|
|
||||||
required=False,
|
required=False,
|
||||||
type=int,
|
type=int,
|
||||||
help='User id'
|
help='User id'
|
||||||
@ -68,7 +62,7 @@ def ls(all, admins):
|
|||||||
'--random',
|
'--random',
|
||||||
is_flag=True,
|
is_flag=True,
|
||||||
required=False,
|
required=False,
|
||||||
help='Generate access key for the first available admin'
|
help='Interactive mode, ignores other keys'
|
||||||
)
|
)
|
||||||
@click.option(
|
@click.option(
|
||||||
'--interactive',
|
'--interactive',
|
||||||
@ -76,21 +70,24 @@ def ls(all, admins):
|
|||||||
required=False,
|
required=False,
|
||||||
help='Interactive mode, ignores other keys'
|
help='Interactive mode, ignores other keys'
|
||||||
)
|
)
|
||||||
def access(_id, count, interactive, random):
|
def access(id, count, interactive, random):
|
||||||
if _id and not count:
|
if id and not count:
|
||||||
keys = user_cursor.get_access_keys(user=_id, count=1)
|
keys = user_cursor.get_access_keys(user=id, count=1)
|
||||||
elif _id and count:
|
links = user_cursor.gen_access_links(keys)
|
||||||
keys = user_cursor.get_access_keys(user=_id, count=count)
|
user_cursor.echo_access_links(links)
|
||||||
|
elif id and count:
|
||||||
|
keys = user_cursor.get_access_keys(user=id, count=count)
|
||||||
|
links = user_cursor.gen_access_links(keys)
|
||||||
|
user_cursor.echo_access_links(links)
|
||||||
|
elif interactive:
|
||||||
|
pass
|
||||||
elif random:
|
elif random:
|
||||||
admin = user_cursor.get_first_random_admin()
|
admin = user_cursor.get_first_random_admin()
|
||||||
keys = user_cursor.get_access_keys(user=admin.get('id', 3))
|
keys = user_cursor.get_access_keys(user=admin.get('id', 3), count=1)
|
||||||
elif interactive:
|
links = user_cursor.gen_access_links(keys)
|
||||||
user_cursor.gen_access_links_interactive()
|
user_cursor.echo_access_links(links)
|
||||||
return # exit from func
|
|
||||||
else:
|
else:
|
||||||
pass
|
pass
|
||||||
links = user_cursor.gen_access_links(keys)
|
|
||||||
user_cursor.echo_access_links(links)
|
|
||||||
|
|
||||||
|
|
||||||
@user.command(help='Generate API token for mgrctl user')
|
@user.command(help='Generate API token for mgrctl user')
|
||||||
|
@ -8,7 +8,7 @@ from mgrctl.apps.vm6 import __version__
|
|||||||
@click.group(
|
@click.group(
|
||||||
cls=LazyGroup,
|
cls=LazyGroup,
|
||||||
lazy_subcommands=INSTALLED_APPS['vm6'],
|
lazy_subcommands=INSTALLED_APPS['vm6'],
|
||||||
help='vm6 command for VM6manager management',
|
help='vm6 command for lazy example',
|
||||||
)
|
)
|
||||||
@click.version_option(
|
@click.version_option(
|
||||||
version=__version__,
|
version=__version__,
|
||||||
|
@ -17,7 +17,7 @@ from mgrctl.cli.lazy_group import LazyGroup
|
|||||||
'vm6': 'mgrctl.apps.vm6.commands.cli',
|
'vm6': 'mgrctl.apps.vm6.commands.cli',
|
||||||
'dci6': 'mgrctl.apps.dci6.commands.cli',
|
'dci6': 'mgrctl.apps.dci6.commands.cli',
|
||||||
},
|
},
|
||||||
help='main CLI command for mgrctl app',
|
help='main CLI command for lazy example',
|
||||||
)
|
)
|
||||||
@click.version_option(
|
@click.version_option(
|
||||||
version=__version__,
|
version=__version__,
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
from requests.packages import urllib3
|
from requests.packages import urllib3
|
||||||
|
|
||||||
from mgrctl.settings.environment import env
|
|
||||||
from mgrctl.settings.platform import (
|
from mgrctl.settings.platform import (
|
||||||
PLATFORM_TYPE,
|
PLATFORM_TYPE,
|
||||||
PLATFORM_VERIFY_SSL,
|
PLATFORM_VERIFY_SSL,
|
||||||
@ -27,9 +26,6 @@ API_HEADERS = {"Internal-Auth": "on", "Accept": "application/json"}
|
|||||||
# Alias for import:
|
# Alias for import:
|
||||||
API_VERIFY_SSL = PLATFORM_VERIFY_SSL
|
API_VERIFY_SSL = PLATFORM_VERIFY_SSL
|
||||||
|
|
||||||
# API 3004 Unavailable error handler:
|
|
||||||
API_COUNT_TRY_CONNECTIONS = env.int('API_COUNT_TRY_CONNECTIONS', 3)
|
|
||||||
|
|
||||||
# Suppress warning from urllib3:
|
# Suppress warning from urllib3:
|
||||||
if not PLATFORM_VERIFY_SSL_WARNING:
|
if not PLATFORM_VERIFY_SSL_WARNING:
|
||||||
# ! This is not recommended,
|
# ! This is not recommended,
|
||||||
|
@ -10,43 +10,38 @@ class UserAPI(object):
|
|||||||
self.callback_class = callback_class
|
self.callback_class = callback_class
|
||||||
self.callback = callback_class()
|
self.callback = callback_class()
|
||||||
|
|
||||||
def get_users(self, role: str) -> list:
|
def get_users(self, role: str) -> dict:
|
||||||
data = {}
|
data = {}
|
||||||
if role == 'admin':
|
if role == 'admin':
|
||||||
data = {"where": "((roles+CP+'%@admin%')+AND+(state+EQ+'active'))"}
|
data = {"where": "((roles+CP+'%@admin%')+AND+(state+EQ+'active'))"}
|
||||||
response = self.callback.call_api(
|
return self.callback.call_api(
|
||||||
url='/user',
|
url='/user',
|
||||||
method='GET',
|
method='GET',
|
||||||
data=data
|
data=data
|
||||||
)
|
)
|
||||||
users = self._extract_users(users=response)
|
|
||||||
return users
|
|
||||||
|
|
||||||
def _extract_users(self, users: dict) -> list:
|
def _format_users(self, users: dict) -> list:
|
||||||
return users.get('list', [])
|
|
||||||
|
|
||||||
def _format_users(self, users: list) -> list:
|
|
||||||
output = []
|
output = []
|
||||||
for user in users:
|
for user in users.get('list', []):
|
||||||
output.append({
|
output.append({
|
||||||
'id': user.get('id', ''),
|
'id': user.get('id', ''),
|
||||||
'email': user.get('email', ''),
|
'email': user.get('email', ''),
|
||||||
'roles': user.get('roles', []),
|
'roles': user.get('roles', []),
|
||||||
'state': user.get('state', ''),
|
'state': user.get('state', '')
|
||||||
# add more fields here...
|
|
||||||
})
|
})
|
||||||
return output
|
return output
|
||||||
|
|
||||||
def get_first_random_admin(self):
|
def get_first_random_admin(self):
|
||||||
users = self.get_users(role='admin')
|
users = self.get_users(role='admin')
|
||||||
admin = {}
|
admin = {}
|
||||||
for user in users:
|
for user in users.get('list', []):
|
||||||
if '@admin' in user.get('roles', []):
|
if '@admin' in admin.get('roles', []):
|
||||||
admin = user
|
admin = user
|
||||||
break
|
break
|
||||||
return admin
|
return admin
|
||||||
|
|
||||||
def echo_users(self, users: list) -> None:
|
def echo_users(self, role: str) -> None:
|
||||||
|
users = self.get_users(role)
|
||||||
output = self._format_users(users)
|
output = self._format_users(users)
|
||||||
click.echo(tabulate(output, headers='keys'))
|
click.echo(tabulate(output, headers='keys'))
|
||||||
|
|
||||||
@ -71,19 +66,6 @@ class UserAPI(object):
|
|||||||
def echo_access_links(self, links: list) -> None:
|
def echo_access_links(self, links: list) -> None:
|
||||||
click.echo(tabulate(links, headers='keys'))
|
click.echo(tabulate(links, headers='keys'))
|
||||||
|
|
||||||
def gen_access_links_interactive(self) -> None:
|
|
||||||
users = self.get_users(role='admin')
|
|
||||||
self.echo_users(users)
|
|
||||||
try:
|
|
||||||
click.echo('Choose user id and count of keys')
|
|
||||||
_id = int(input('User ID: '))
|
|
||||||
count = int(input('Count of keys: '))
|
|
||||||
keys = self.get_access_keys(user=_id, count=count)
|
|
||||||
links = self.gen_access_links(keys)
|
|
||||||
self.echo_access_links(links)
|
|
||||||
except ValueError:
|
|
||||||
click.echo('Error: Invalid, value is not a valid integer')
|
|
||||||
|
|
||||||
def gen_api_token(self, email=None, password=None):
|
def gen_api_token(self, email=None, password=None):
|
||||||
token = self.callback.get_auth_token(email, password)
|
token = self.callback.get_auth_token(email, password)
|
||||||
return token
|
return token
|
||||||
|
Loading…
Reference in New Issue
Block a user