121 lines
3.2 KiB
Python
121 lines
3.2 KiB
Python
#! /usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
__author__ = 'MOIS3Y'
|
|
|
|
import requests
|
|
from time import sleep
|
|
from models_vm import VmAccount
|
|
from models_dci import AuthUser
|
|
from settings import CLIENT_HOST, PLATFORM
|
|
|
|
|
|
class Access(object):
|
|
"""
|
|
Allows you to get a link with an authorization key
|
|
For quick access to the web interface of the client platform.
|
|
The PLATFORM variable can be set as an environment variable.
|
|
Default PLATFORM='vm'
|
|
"""
|
|
def __init__(self, platform):
|
|
if platform == 'vm':
|
|
self.user_table = VmAccount
|
|
if platform == 'dci':
|
|
self.user_table = AuthUser
|
|
self.platform = platform
|
|
|
|
def get_admin(self):
|
|
"""
|
|
The method gets the first 10 active database users
|
|
and finds the admin among them. I couldn't create
|
|
a request filtering by the roles field because
|
|
peewee doesn't understand json and can't make a selection
|
|
|
|
Returns:
|
|
_dict_: admin data
|
|
"""
|
|
query_admin = (self.user_table.select(
|
|
self.user_table.id,
|
|
self.user_table.email,
|
|
self.user_table.state,
|
|
self.user_table.roles).where(
|
|
(self.user_table.id <= 10) &
|
|
(self.user_table.state == "active")
|
|
)
|
|
)
|
|
# !peewee doesn't work well with JSONfield MySQL
|
|
for field in query_admin:
|
|
if field.roles[0] == '@admin':
|
|
admin = dict(
|
|
id=field.id,
|
|
email=field.email,
|
|
state=field.state,
|
|
role=field.roles[0]
|
|
)
|
|
break
|
|
return admin
|
|
|
|
def get_key(self, admin):
|
|
"""
|
|
Makes a post request to the endpoint
|
|
passing the admin data and
|
|
returns the generated access key
|
|
|
|
Args:
|
|
admin (_dict_): _admin data_
|
|
|
|
Returns:
|
|
_dict_: _access key_
|
|
"""
|
|
host = 'input'
|
|
if self.platform == 'dci':
|
|
host = 'dci_input_1' # because this hostname
|
|
|
|
url = 'http://{}:1500/api/auth/v4/user/{}/key'.format(
|
|
host, admin['id']
|
|
)
|
|
headers = {"Internal-Auth": "on", "Accept": "application/json"}
|
|
|
|
check_key = 0
|
|
|
|
while check_key != 3:
|
|
try:
|
|
req = requests.post(url, headers=headers, data={})
|
|
answer = req.json()
|
|
key = answer['key']
|
|
print("Key will be used:")
|
|
print(answer, "\n")
|
|
break
|
|
except Exception:
|
|
sleep(2)
|
|
check_key += 1
|
|
key = 'not_found_try_again'
|
|
return key
|
|
|
|
|
|
def main():
|
|
"""
|
|
app entrypoint
|
|
"""
|
|
access = Access(PLATFORM)
|
|
print("="*47)
|
|
print("\nGetting an admin account, please waiting....\n")
|
|
|
|
account = access.get_admin()
|
|
|
|
print("Account will be used:")
|
|
print(account, "\n")
|
|
print("Getting an access key, please waiting...\n")
|
|
|
|
key = access.get_key(account)
|
|
|
|
access_link = 'https://{}/auth/key-v4/{}'.format(CLIENT_HOST, key)
|
|
|
|
print("Your access link: \n")
|
|
print(access_link, "\n")
|
|
print("ENJOY MY FRIEND! \n")
|
|
print("="*47)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|