forked from ISPsystem/isp-maintenance
		
	Compare commits
	
		
			2 Commits
		
	
	
		
			8ad5e2a230
			...
			f773898b71
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| f773898b71 | |||
| cdf3a92527 | 
@ -1,14 +1,18 @@
 | 
				
			|||||||
import sys
 | 
					import sys
 | 
				
			||||||
import json
 | 
					import json
 | 
				
			||||||
 | 
					import click
 | 
				
			||||||
import urllib
 | 
					import urllib
 | 
				
			||||||
import requests
 | 
					import requests
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from time import sleep
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from mgrctl.settings.api import (
 | 
					from mgrctl.settings.api import (
 | 
				
			||||||
    API_URL,
 | 
					    API_URL,
 | 
				
			||||||
    API_HEADERS,
 | 
					    API_HEADERS,
 | 
				
			||||||
    API_EMAIL,
 | 
					    API_EMAIL,
 | 
				
			||||||
    API_PASSWORD,
 | 
					    API_PASSWORD,
 | 
				
			||||||
    API_VERIFY_SSL
 | 
					    API_VERIFY_SSL,
 | 
				
			||||||
 | 
					    API_COUNT_TRY_CONNECTIONS
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -27,45 +31,64 @@ class BaseAPI(object):
 | 
				
			|||||||
        return f'{self.API_URL}/{self.API_DEFINITION}/{self.API_VERSION}{url}'
 | 
					        return f'{self.API_URL}/{self.API_DEFINITION}/{self.API_VERSION}{url}'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def call_api(self, url, method='GET', headers={}, data={}):
 | 
					    def call_api(self, url, method='GET', headers={}, data={}):
 | 
				
			||||||
        # Open session
 | 
					        attempt = API_COUNT_TRY_CONNECTIONS
 | 
				
			||||||
        with requests.Session() as session:
 | 
					        while attempt:
 | 
				
			||||||
            try:
 | 
					            try:
 | 
				
			||||||
                url = self._gen_request_url(url)
 | 
					                uri = self._gen_request_url(url)
 | 
				
			||||||
                headers = self.API_HEADERS if not headers else headers
 | 
					                headers = self.API_HEADERS if not headers else headers
 | 
				
			||||||
                params_str = urllib.parse.urlencode(data, safe="+'()")
 | 
					                params_str = urllib.parse.urlencode(data, safe="+'()")
 | 
				
			||||||
                if method == 'POST':
 | 
					                if method == 'POST':
 | 
				
			||||||
                    api_request = session.post(
 | 
					                    api_request = requests.post(
 | 
				
			||||||
                        url=url,
 | 
					                        url=uri,
 | 
				
			||||||
                        json=data,
 | 
					                        json=data,
 | 
				
			||||||
                        headers=headers,
 | 
					                        headers=headers,
 | 
				
			||||||
                        verify=self.API_VERIFY_SSL
 | 
					                        verify=self.API_VERIFY_SSL
 | 
				
			||||||
                    )
 | 
					                    )
 | 
				
			||||||
                if method == 'GET':
 | 
					                if method == 'GET':
 | 
				
			||||||
                    url = f'{url}?{params_str}' if params_str else url
 | 
					                    uri = f'{uri}?{params_str}' if params_str else uri
 | 
				
			||||||
                    api_request = session.get(
 | 
					                    api_request = requests.get(
 | 
				
			||||||
                        url=url,
 | 
					                        url=uri,
 | 
				
			||||||
                        headers=headers,
 | 
					                        headers=headers,
 | 
				
			||||||
                        verify=self.API_VERIFY_SSL
 | 
					                        verify=self.API_VERIFY_SSL
 | 
				
			||||||
                    )
 | 
					                    )
 | 
				
			||||||
            except Exception as error:
 | 
					            except Exception as error:
 | 
				
			||||||
                api_request = {
 | 
					                click.echo(f'Error: {type(error).__name__}')
 | 
				
			||||||
                    'result': False,
 | 
					                sys.exit()
 | 
				
			||||||
                    'error': type(error).__name__
 | 
					 | 
				
			||||||
                }
 | 
					 | 
				
			||||||
                return api_request
 | 
					 | 
				
			||||||
            finally:
 | 
					 | 
				
			||||||
                session.close()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Get response
 | 
					            # Get response:
 | 
				
			||||||
 | 
					            response = self._parse_response(api_request)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # Validate response:
 | 
				
			||||||
 | 
					            if self._error_handler(response):
 | 
				
			||||||
 | 
					                attempt -= 1
 | 
				
			||||||
 | 
					                sleep(2)  # wait 2 second timeout
 | 
				
			||||||
 | 
					                continue  # new attempt connection
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            return response
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _parse_response(self, api_request):
 | 
				
			||||||
        try:
 | 
					        try:
 | 
				
			||||||
            response = json.loads(api_request.text)
 | 
					            response = json.loads(api_request.text)
 | 
				
			||||||
            if 'error' in response and response['error']:
 | 
					 | 
				
			||||||
                print(response['error'])
 | 
					 | 
				
			||||||
                raise sys.exit()
 | 
					 | 
				
			||||||
            return response
 | 
					            return response
 | 
				
			||||||
        except json.decoder.JSONDecodeError:
 | 
					        except json.decoder.JSONDecodeError:
 | 
				
			||||||
            response = {'error': 'Can not parse response'}
 | 
					            click.echo('Error: Invalid API response')
 | 
				
			||||||
            print(response)
 | 
					            raise sys.exit()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _is_error(self, response):
 | 
				
			||||||
 | 
					        if response.get('error'):
 | 
				
			||||||
 | 
					            return True
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _is_error_3004(self, error):
 | 
				
			||||||
 | 
					        if error.get('code') == 3004:
 | 
				
			||||||
 | 
					            return True
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _error_handler(self, response):
 | 
				
			||||||
 | 
					        if self._is_error(response):
 | 
				
			||||||
 | 
					            error = response.get('error')
 | 
				
			||||||
 | 
					            if self._is_error_3004(error):
 | 
				
			||||||
 | 
					                return True
 | 
				
			||||||
 | 
					            else:
 | 
				
			||||||
 | 
					                click.echo(f'Error: API return {response}')
 | 
				
			||||||
                raise sys.exit()
 | 
					                raise sys.exit()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -38,16 +38,22 @@ def user():
 | 
				
			|||||||
)
 | 
					)
 | 
				
			||||||
def ls(all, admins):
 | 
					def ls(all, admins):
 | 
				
			||||||
    if all:
 | 
					    if all:
 | 
				
			||||||
        user_cursor.echo_users(role='all')
 | 
					        users = user_cursor.get_users(role='all')
 | 
				
			||||||
    elif admins:
 | 
					    elif admins:
 | 
				
			||||||
        user_cursor.echo_users(role='admin')
 | 
					        users = user_cursor.get_users(role='admin')
 | 
				
			||||||
    else:
 | 
					    else:
 | 
				
			||||||
        user_cursor.echo_users(role='all')
 | 
					        users = user_cursor.get_users(role='all')
 | 
				
			||||||
 | 
					    # print users:
 | 
				
			||||||
 | 
					    user_cursor.echo_users(users)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@user.command(help='Generate access key and return auth link(s)')
 | 
					@user.command(
 | 
				
			||||||
 | 
					    help='Generate an access key and return auth link(s)',
 | 
				
			||||||
 | 
					    no_args_is_help=True
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
@click.option(
 | 
					@click.option(
 | 
				
			||||||
    '--id',
 | 
					    '--id',
 | 
				
			||||||
 | 
					    '_id',
 | 
				
			||||||
    required=False,
 | 
					    required=False,
 | 
				
			||||||
    type=int,
 | 
					    type=int,
 | 
				
			||||||
    help='User id'
 | 
					    help='User id'
 | 
				
			||||||
@ -62,7 +68,7 @@ def ls(all, admins):
 | 
				
			|||||||
    '--random',
 | 
					    '--random',
 | 
				
			||||||
    is_flag=True,
 | 
					    is_flag=True,
 | 
				
			||||||
    required=False,
 | 
					    required=False,
 | 
				
			||||||
    help='Interactive mode, ignores other keys'
 | 
					    help='Generate access key for the first available admin'
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
@click.option(
 | 
					@click.option(
 | 
				
			||||||
    '--interactive',
 | 
					    '--interactive',
 | 
				
			||||||
@ -70,24 +76,21 @@ def ls(all, admins):
 | 
				
			|||||||
    required=False,
 | 
					    required=False,
 | 
				
			||||||
    help='Interactive mode, ignores other keys'
 | 
					    help='Interactive mode, ignores other keys'
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
def access(id, count, interactive, random):
 | 
					def access(_id, count, interactive, random):
 | 
				
			||||||
    if id and not count:
 | 
					    if _id and not count:
 | 
				
			||||||
        keys = user_cursor.get_access_keys(user=id, count=1)
 | 
					        keys = user_cursor.get_access_keys(user=_id, count=1)
 | 
				
			||||||
        links = user_cursor.gen_access_links(keys)
 | 
					    elif _id and count:
 | 
				
			||||||
        user_cursor.echo_access_links(links)
 | 
					        keys = user_cursor.get_access_keys(user=_id, count=count)
 | 
				
			||||||
    elif id and count:
 | 
					 | 
				
			||||||
        keys = user_cursor.get_access_keys(user=id, count=count)
 | 
					 | 
				
			||||||
        links = user_cursor.gen_access_links(keys)
 | 
					 | 
				
			||||||
        user_cursor.echo_access_links(links)
 | 
					 | 
				
			||||||
    elif interactive:
 | 
					 | 
				
			||||||
        pass
 | 
					 | 
				
			||||||
    elif random:
 | 
					    elif random:
 | 
				
			||||||
        admin = user_cursor.get_first_random_admin()
 | 
					        admin = user_cursor.get_first_random_admin()
 | 
				
			||||||
        keys = user_cursor.get_access_keys(user=admin.get('id', 3), count=1)
 | 
					        keys = user_cursor.get_access_keys(user=admin.get('id', 3))
 | 
				
			||||||
        links = user_cursor.gen_access_links(keys)
 | 
					    elif interactive:
 | 
				
			||||||
        user_cursor.echo_access_links(links)
 | 
					        user_cursor.gen_access_links_interactive()
 | 
				
			||||||
 | 
					        return  # exit from func
 | 
				
			||||||
    else:
 | 
					    else:
 | 
				
			||||||
        pass
 | 
					        pass
 | 
				
			||||||
 | 
					    links = user_cursor.gen_access_links(keys)
 | 
				
			||||||
 | 
					    user_cursor.echo_access_links(links)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@user.command(help='Generate API token for mgrctl user')
 | 
					@user.command(help='Generate API token for mgrctl user')
 | 
				
			||||||
 | 
				
			|||||||
@ -1,5 +1,6 @@
 | 
				
			|||||||
from requests.packages import urllib3
 | 
					from requests.packages import urllib3
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from mgrctl.settings.environment import env
 | 
				
			||||||
from mgrctl.settings.platform import (
 | 
					from mgrctl.settings.platform import (
 | 
				
			||||||
    PLATFORM_TYPE,
 | 
					    PLATFORM_TYPE,
 | 
				
			||||||
    PLATFORM_VERIFY_SSL,
 | 
					    PLATFORM_VERIFY_SSL,
 | 
				
			||||||
@ -26,6 +27,9 @@ API_HEADERS = {"Internal-Auth": "on", "Accept": "application/json"}
 | 
				
			|||||||
# Alias for import:
 | 
					# Alias for import:
 | 
				
			||||||
API_VERIFY_SSL = PLATFORM_VERIFY_SSL
 | 
					API_VERIFY_SSL = PLATFORM_VERIFY_SSL
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# API 3004 Unavailable error handler:
 | 
				
			||||||
 | 
					API_COUNT_TRY_CONNECTIONS = env.int('API_COUNT_TRY_CONNECTIONS', 3)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# Suppress warning from urllib3:
 | 
					# Suppress warning from urllib3:
 | 
				
			||||||
if not PLATFORM_VERIFY_SSL_WARNING:
 | 
					if not PLATFORM_VERIFY_SSL_WARNING:
 | 
				
			||||||
    # ! This is not recommended,
 | 
					    # ! This is not recommended,
 | 
				
			||||||
 | 
				
			|||||||
@ -10,38 +10,43 @@ class UserAPI(object):
 | 
				
			|||||||
        self.callback_class = callback_class
 | 
					        self.callback_class = callback_class
 | 
				
			||||||
        self.callback = callback_class()
 | 
					        self.callback = callback_class()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def get_users(self, role: str) -> dict:
 | 
					    def get_users(self, role: str) -> list:
 | 
				
			||||||
        data = {}
 | 
					        data = {}
 | 
				
			||||||
        if role == 'admin':
 | 
					        if role == 'admin':
 | 
				
			||||||
            data = {"where": "((roles+CP+'%@admin%')+AND+(state+EQ+'active'))"}
 | 
					            data = {"where": "((roles+CP+'%@admin%')+AND+(state+EQ+'active'))"}
 | 
				
			||||||
        return self.callback.call_api(
 | 
					        response = self.callback.call_api(
 | 
				
			||||||
            url='/user',
 | 
					            url='/user',
 | 
				
			||||||
            method='GET',
 | 
					            method='GET',
 | 
				
			||||||
            data=data
 | 
					            data=data
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					        users = self._extract_users(users=response)
 | 
				
			||||||
 | 
					        return users
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def _format_users(self, users: dict) -> list:
 | 
					    def _extract_users(self, users: dict) -> list:
 | 
				
			||||||
 | 
					        return users.get('list', [])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _format_users(self, users: list) -> list:
 | 
				
			||||||
        output = []
 | 
					        output = []
 | 
				
			||||||
        for user in users.get('list', []):
 | 
					        for user in users:
 | 
				
			||||||
            output.append({
 | 
					            output.append({
 | 
				
			||||||
                'id': user.get('id', ''),
 | 
					                'id': user.get('id', ''),
 | 
				
			||||||
                'email': user.get('email', ''),
 | 
					                'email': user.get('email', ''),
 | 
				
			||||||
                'roles': user.get('roles', []),
 | 
					                'roles': user.get('roles', []),
 | 
				
			||||||
                'state': user.get('state', '')
 | 
					                'state': user.get('state', ''),
 | 
				
			||||||
 | 
					                # add more fields here...
 | 
				
			||||||
            })
 | 
					            })
 | 
				
			||||||
        return output
 | 
					        return output
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def get_first_random_admin(self):
 | 
					    def get_first_random_admin(self):
 | 
				
			||||||
        users = self.get_users(role='admin')
 | 
					        users = self.get_users(role='admin')
 | 
				
			||||||
        admin = {}
 | 
					        admin = {}
 | 
				
			||||||
        for user in users.get('list', []):
 | 
					        for user in users:
 | 
				
			||||||
            if '@admin' in admin.get('roles', []):
 | 
					            if '@admin' in user.get('roles', []):
 | 
				
			||||||
                admin = user
 | 
					                admin = user
 | 
				
			||||||
                break
 | 
					                break
 | 
				
			||||||
        return admin
 | 
					        return admin
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def echo_users(self, role: str) -> None:
 | 
					    def echo_users(self, users: list) -> None:
 | 
				
			||||||
        users = self.get_users(role)
 | 
					 | 
				
			||||||
        output = self._format_users(users)
 | 
					        output = self._format_users(users)
 | 
				
			||||||
        click.echo(tabulate(output, headers='keys'))
 | 
					        click.echo(tabulate(output, headers='keys'))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -66,6 +71,19 @@ class UserAPI(object):
 | 
				
			|||||||
    def echo_access_links(self, links: list) -> None:
 | 
					    def echo_access_links(self, links: list) -> None:
 | 
				
			||||||
        click.echo(tabulate(links, headers='keys'))
 | 
					        click.echo(tabulate(links, headers='keys'))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def gen_access_links_interactive(self) -> None:
 | 
				
			||||||
 | 
					        users = self.get_users(role='admin')
 | 
				
			||||||
 | 
					        self.echo_users(users)
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            click.echo('Choose user id and count of keys')
 | 
				
			||||||
 | 
					            _id = int(input('User ID: '))
 | 
				
			||||||
 | 
					            count = int(input('Count of keys: '))
 | 
				
			||||||
 | 
					            keys = self.get_access_keys(user=_id, count=count)
 | 
				
			||||||
 | 
					            links = self.gen_access_links(keys)
 | 
				
			||||||
 | 
					            self.echo_access_links(links)
 | 
				
			||||||
 | 
					        except ValueError:
 | 
				
			||||||
 | 
					            click.echo('Error: Invalid, value is not a valid integer')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def gen_api_token(self, email=None, password=None):
 | 
					    def gen_api_token(self, email=None, password=None):
 | 
				
			||||||
        token = self.callback.get_auth_token(email, password)
 | 
					        token = self.callback.get_auth_token(email, password)
 | 
				
			||||||
        return token
 | 
					        return token
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user