200 lines
5.3 KiB
Python
Executable File
200 lines
5.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import subprocess as sp
|
|
from pathlib import Path
|
|
from functools import wraps
|
|
|
|
__author__ = "MOIS3Y"
|
|
__version__ = "0.1.0"
|
|
|
|
|
|
# check development or production environment:
|
|
if os.environ.get("WG_BOOTSTRAP"):
|
|
CONFIG_PATH = Path(__file__).resolve().parents[1] / "etc" / "wireguard"
|
|
else:
|
|
CONFIG_PATH = Path("/etc/wireguard")
|
|
|
|
|
|
def sudo_required(func):
|
|
"""
|
|
the decorator raise an exception PermissionError
|
|
when trying to call a function without superuser privileges (sudo)
|
|
or not from under the root.
|
|
|
|
Args:
|
|
func (_function_): any function
|
|
"""
|
|
@wraps(func) # <- save meta info for wrapped func
|
|
def is_root(*args, **kwargs):
|
|
if not os.environ.get("SUDO_UID") and os.geteuid() != 0:
|
|
sys.tracebacklimit = 0
|
|
prog = os.path.basename(__file__)
|
|
error_message = f"You need to run {prog} with sudo or as root."
|
|
raise PermissionError(error_message)
|
|
result = func(*args, **kwargs)
|
|
return result
|
|
|
|
return is_root
|
|
|
|
|
|
@sudo_required
|
|
def get_wg_conf() -> dict:
|
|
"""
|
|
gets the values for the settings dict interactively from the user input
|
|
|
|
Returns:
|
|
dict: settings for wg interface
|
|
"""
|
|
settings: dict = {
|
|
"Interface": {
|
|
"PrivateKey": None,
|
|
"Address": None,
|
|
"DNS": None
|
|
},
|
|
"Peer": {
|
|
"PublicKey": None,
|
|
"AllowedIPs": None,
|
|
"Endpoint": None
|
|
}
|
|
}
|
|
for title in settings:
|
|
for key, value in settings[title].items():
|
|
settings[title][key] = input(f"[{title}] - {key}: ")
|
|
|
|
return settings
|
|
|
|
|
|
@sudo_required
|
|
def gen_wg_conf(config_path, **settings):
|
|
"""
|
|
generates a config file for the wireguard interface,
|
|
receiving a dict (**settings) with the desired values
|
|
|
|
Args:
|
|
config_path (Path): path to wg interface configuration file
|
|
**settings (dict): settings for wg interface
|
|
"""
|
|
# check if interface parent dir exist:
|
|
config_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
with open(config_path, 'w') as file:
|
|
for title in settings:
|
|
if title == "Peer":
|
|
file.write(f"\n[{title}]\n")
|
|
else:
|
|
file.write(f"[{title}]\n")
|
|
|
|
for key, value in settings[title].items():
|
|
file.write(f"{key} = {value}\n")
|
|
|
|
|
|
@sudo_required
|
|
def use_wg_conf(config_path, command):
|
|
sp.run([command, config_path])
|
|
|
|
|
|
def wg_quick(interface, command):
|
|
sp.run(["wg-quick", command, interface])
|
|
|
|
|
|
def exist_wg_conf(config_path):
|
|
return config_path.exists()
|
|
|
|
|
|
@sudo_required
|
|
def change_allowed_ips(config_path, allowed_ips, action="replace"):
|
|
"""
|
|
the function changes the AllowedIPs value in
|
|
the passed configuration file.
|
|
|
|
Args:
|
|
config_file (Path): path to wg interface configuration file
|
|
allowed_ips (str): comma and space separated ip addresses
|
|
action (str, optional): replace or add. Defaults to "replace".
|
|
"""
|
|
with open(config_path, 'r') as source:
|
|
lines = source.readlines()
|
|
|
|
with open(config_path, 'w') as target:
|
|
for line in lines:
|
|
if line.strip().startswith('AllowedIPs = '):
|
|
if action == "add":
|
|
old = line.strip('\n')
|
|
line = f"{old}, {allowed_ips}\n"
|
|
else:
|
|
line = f"AllowedIPs = {allowed_ips}\n"
|
|
target.write(line)
|
|
|
|
|
|
def create_parser():
|
|
parser = argparse.ArgumentParser(
|
|
description="CRUD WireGuard config file or UP/DOWN wg-interface"
|
|
)
|
|
parser.add_argument(
|
|
"command",
|
|
type=str,
|
|
choices=["init", "cat", "rm", "add", "replace", "up", "down"],
|
|
help="Action with interface"
|
|
)
|
|
parser.add_argument(
|
|
"interface",
|
|
type=str,
|
|
help="WG interface name (wg0, wg1, wgName etc...)"
|
|
)
|
|
parser.add_argument(
|
|
"allowedIPs",
|
|
type=str,
|
|
nargs="?",
|
|
help="Set AllowedIPs must be a string"
|
|
)
|
|
parser.add_argument(
|
|
"-v",
|
|
"--version",
|
|
action='version',
|
|
version=f"%(prog)s - {__version__}")
|
|
|
|
return parser
|
|
|
|
|
|
def main():
|
|
# init cmd args parser:
|
|
parser = create_parser()
|
|
args = parser.parse_args()
|
|
|
|
# current wg interface (get from required arg from user):
|
|
config_path = CONFIG_PATH / f"{args.interface}.conf"
|
|
|
|
# interface config:
|
|
if args.command == "init":
|
|
settings = get_wg_conf()
|
|
gen_wg_conf(config_path, **settings)
|
|
|
|
if args.command == "cat" or args.command == "rm":
|
|
if not exist_wg_conf(config_path):
|
|
raise parser.error(f"{config_path} does not exist")
|
|
use_wg_conf(config_path, args.command)
|
|
|
|
if args.command == "add" or args.command == "replace":
|
|
if not args.allowedIPs:
|
|
raise parser.error("With add/replace you must pass AllowedIPs")
|
|
if not exist_wg_conf(config_path):
|
|
raise parser.error(f"{config_path} does not exist")
|
|
change_allowed_ips(
|
|
config_path,
|
|
args.allowedIPs,
|
|
args.command
|
|
)
|
|
|
|
# run wg-quick:
|
|
if args.command == "up" or args.command == "down":
|
|
try:
|
|
wg_quick(args.interface, args.command)
|
|
except KeyboardInterrupt or Exception:
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|