wg-bootstrap/wg_bootstrap/wg_start.py

200 lines
5.3 KiB
Python
Raw Normal View History

2023-07-22 18:52:51 +08:00
#!/usr/bin/env python3
import os
import sys
import argparse
import subprocess as sp
from pathlib import Path
from functools import wraps
__author__ = "MOIS3Y"
__version__ = "0.1.0"
# check development or production environment:
if os.environ.get("WG_BOOTSTRAP"):
CONFIG_PATH = Path(__file__).resolve().parents[1] / "etc" / "wireguard"
else:
CONFIG_PATH = Path("/etc/wireguard")
def sudo_required(func):
"""
the decorator raise an exception PermissionError
when trying to call a function without superuser privileges (sudo)
or not from under the root.
Args:
func (_function_): any function
"""
@wraps(func) # <- save meta info for wrapped func
def is_root(*args, **kwargs):
if not os.environ.get("SUDO_UID") and os.geteuid() != 0:
sys.tracebacklimit = 0
prog = os.path.basename(__file__)
error_message = f"You need to run {prog} with sudo or as root."
raise PermissionError(error_message)
result = func(*args, **kwargs)
return result
return is_root
@sudo_required
def get_wg_conf() -> dict:
"""
gets the values for the settings dict interactively from the user input
Returns:
dict: settings for wg interface
"""
settings: dict = {
"Interface": {
"PrivateKey": None,
"Address": None,
"DNS": None
},
"Peer": {
"PublicKey": None,
"AllowedIPs": None,
"Endpoint": None
}
}
for title in settings:
for key, value in settings[title].items():
settings[title][key] = input(f"[{title}] - {key}: ")
return settings
@sudo_required
def gen_wg_conf(config_path, **settings):
"""
2023-07-22 20:29:41 +08:00
generates a config file for the wireguard interface,
2023-07-22 18:52:51 +08:00
receiving a dict (**settings) with the desired values
Args:
config_path (Path): path to wg interface configuration file
**settings (dict): settings for wg interface
"""
# check if interface parent dir exist:
config_path.parent.mkdir(parents=True, exist_ok=True)
with open(config_path, 'w') as file:
for title in settings:
if title == "Peer":
file.write(f"\n[{title}]\n")
else:
file.write(f"[{title}]\n")
for key, value in settings[title].items():
file.write(f"{key} = {value}\n")
@sudo_required
def use_wg_conf(config_path, command):
sp.run([command, config_path])
def wg_quick(interface, command):
sp.run(["wg-quick", command, interface])
def exist_wg_conf(config_path):
return config_path.exists()
@sudo_required
def change_allowed_ips(config_path, allowed_ips, action="replace"):
"""
2023-07-22 20:29:41 +08:00
the function changes the AllowedIPs value in
2023-07-22 18:52:51 +08:00
the passed configuration file.
Args:
config_file (Path): path to wg interface configuration file
allowed_ips (str): comma and space separated ip addresses
action (str, optional): replace or add. Defaults to "replace".
"""
with open(config_path, 'r') as source:
lines = source.readlines()
with open(config_path, 'w') as target:
for line in lines:
if line.strip().startswith('AllowedIPs = '):
if action == "add":
old = line.strip('\n')
line = f"{old}, {allowed_ips}\n"
else:
line = f"AllowedIPs = {allowed_ips}\n"
target.write(line)
def create_parser():
parser = argparse.ArgumentParser(
description="CRUD WireGuard config file or UP/DOWN wg-interface"
)
parser.add_argument(
"command",
type=str,
choices=["init", "cat", "rm", "add", "replace", "up", "down"],
help="Action with interface"
)
parser.add_argument(
"interface",
type=str,
help="WG interface name (wg0, wg1, wgName etc...)"
)
parser.add_argument(
"allowedIPs",
type=str,
nargs="?",
help="Set AllowedIPs must be a string"
)
parser.add_argument(
"-v",
"--version",
action='version',
version=f"%(prog)s - {__version__}")
return parser
def main():
# init cmd args parser:
parser = create_parser()
args = parser.parse_args()
# current wg interface (get from required arg from user):
config_path = CONFIG_PATH / f"{args.interface}.conf"
# interface config:
if args.command == "init":
settings = get_wg_conf()
gen_wg_conf(config_path, **settings)
if args.command == "cat" or args.command == "rm":
if not exist_wg_conf(config_path):
raise parser.error(f"{config_path} does not exist")
use_wg_conf(config_path, args.command)
if args.command == "add" or args.command == "replace":
if not args.allowedIPs:
raise parser.error("With add/replace you must pass AllowedIPs")
if not exist_wg_conf(config_path):
raise parser.error(f"{config_path} does not exist")
change_allowed_ips(
config_path,
args.allowedIPs,
args.command
)
# run wg-quick:
if args.command == "up" or args.command == "down":
try:
wg_quick(args.interface, args.command)
except KeyboardInterrupt or Exception:
sys.exit(0)
if __name__ == "__main__":
main()