Update main.py to pass quality assurance tests.

This commit is contained in:
giomba 2023-04-08 22:25:29 +02:00
parent 72dab0b374
commit 7eb3e946e5
1 changed files with 169 additions and 80 deletions

249
main.py
View File

@ -1,115 +1,157 @@
import sqlite3
import os
from flask import Flask, request, jsonify, render_template, Response
from flask_httpauth import HTTPBasicAuth
from flask_httpauth import HTTPBasicAuth # type: ignore
import configparser
PREFIX='/data'
if os.environ.get('PREFIX') is not None:
PREFIX=os.environ.get('PREFIX')
PREFIX = "/data"
if os.environ.get("PREFIX") is not None:
PREFIX = os.environ["PREFIX"]
app = Flask(__name__)
config = configparser.ConfigParser()
auth = HTTPBasicAuth()
config.read(os.path.join(PREFIX, 'vpnunit.config.ini'))
config.read(os.path.join(PREFIX, "vpnunit.config.ini"))
class Ex(Exception):
def __init__(self, code, message):
self._code = code
self._message = message
def getCode(self):
return self._code
def __str__(self):
return self._message
DATABASE=os.path.join(PREFIX, 'database.sqlite3')
DATABASE = os.path.join(PREFIX, "database.sqlite3")
db = sqlite3.connect(DATABASE)
cu = db.cursor()
db.execute('SELECT name FROM sqlite_master')
db.execute("SELECT name FROM sqlite_master")
if len(cu.fetchall()) == 0:
print('creating database schema...')
db.execute('CREATE TABLE IF NOT EXISTS user (id INTEGER PRIMARY KEY, name TEXT UNIQUE)')
db.execute('CREATE TABLE IF NOT EXISTS gateway (id INTEGER PRIMARY KEY, subid INTEGER NOT NULL, name TEXT UNIQUE, user INTEGER NOT NULL REFERENCES user(id))')
print("creating database schema...")
db.execute(
"CREATE TABLE \
IF NOT EXISTS \
user (id INTEGER PRIMARY KEY, name TEXT UNIQUE)"
)
db.execute(
"CREATE TABLE \
IF NOT EXISTS \
gateway \
(id INTEGER PRIMARY KEY, \
subid INTEGER NOT NULL, \
name TEXT UNIQUE, \
user INTEGER NOT NULL \
REFERENCES user(id))"
)
db.commit()
cu.close()
db.close()
os.environ['EASYRSA_PKI'] = os.path.join(PREFIX, 'pki')
os.environ['EASYRSA_BATCH'] = '1'
os.environ['PATH'] = os.environ['PATH'] + ':' + os.getcwd() + '/easy-rsa/easyrsa3'
os.environ["EASYRSA_PKI"] = os.path.join(PREFIX, "pki")
os.environ["EASYRSA_BATCH"] = "1"
os.environ["PATH"] = (
os.environ["PATH"] + ":" + os.getcwd() + "/easy-rsa/easyrsa3"
)
@auth.verify_password
def verify_user(username, password):
if username == config['DEFAULT']['username'] and password == config['DEFAULT']['password']:
if (
username == config["DEFAULT"]["username"]
and password == config["DEFAULT"]["password"]
):
return username
@app.route('/')
def hello_world():
return 'It works!'
@app.route('/users', methods=['GET'])
@app.route("/")
def hello_world():
return "It works!"
@app.route("/users", methods=["GET"])
@auth.login_required
def get_users():
db = sqlite3.connect(DATABASE)
cu = db.cursor()
users = []
for row in cu.execute('SELECT id, name FROM user'):
users.append({'id': row[0], 'name': row[1]})
for row in cu.execute("SELECT id, name FROM user"):
users.append({"id": row[0], "name": row[1]})
cu.close()
db.close()
return jsonify(users)
@app.route('/users', methods=['POST'])
@app.route("/users", methods=["POST"])
@auth.login_required
def post_users():
db = sqlite3.connect(DATABASE)
cu = db.cursor()
name = request.json['name']
print('creating ' + str(name))
name = request.json["name"]
print("creating " + str(name))
try:
cu.execute('INSERT INTO user (name) VALUES (?)', (name,))
return jsonify({'status': 'ok'})
cu.execute("INSERT INTO user (name) VALUES (?)", (name,))
return jsonify({"status": "ok"})
except sqlite3.Error as e:
return jsonify({'status': 'error', 'message': str(e)}), 409
return jsonify({"status": "error", "message": str(e)}), 409
finally:
db.commit()
cu.close()
db.close()
@app.route('/gateways', methods=['GET'])
@app.route("/gateways", methods=["GET"])
@auth.login_required
def get_gateways():
db = sqlite3.connect(DATABASE)
cu = db.cursor()
gateways = []
for row in cu.execute('SELECT g.id, g.subid, g.name, u.name FROM gateway AS g INNER JOIN user AS u ON u.id = g.user'):
gateways.append({'id': row[0], 'subid': row[1], 'name': row[2], 'user': row[3]})
for row in cu.execute(
"SELECT g.id, g.subid, g.name, u.name \
FROM gateway AS g \
INNER JOIN user AS u ON u.id = g.user"
):
gateways.append(
{"id": row[0], "subid": row[1], "name": row[2], "user": row[3]}
)
cu.close()
db.close()
return jsonify(gateways)
@app.route('/gateways', methods=['POST'])
@app.route("/gateways", methods=["POST"])
@auth.login_required
def post_gateways():
db = sqlite3.connect(DATABASE)
cu = db.cursor()
try:
name = request.json['name']
user = request.json['user']
name = request.json["name"]
user = request.json["user"]
# TODO sanitize name, it must be a FQDN
used_gateway_subids = []
for row in cu.execute('SELECT g.subid, u.id FROM user AS u LEFT JOIN gateway AS g ON g.user = u.id WHERE u.name = ?', [str(user,)]):
for row in cu.execute(
"SELECT g.subid, u.id \
FROM user AS u \
LEFT JOIN gateway AS g ON g.user = u.id WHERE u.name = ?",
[
str(
user,
)
],
):
used_gateway_subids.append(row[0])
userid = row[1]
@ -119,62 +161,82 @@ def post_gateways():
break
if subid in used_gateway_subids:
raise Ex(403, 'exit: maximum number of gateways reached for user')
raise Ex(403, "exit: maximum number of gateways reached for user")
cu.execute('INSERT INTO gateway (subid, name, user) VALUES (?, ?, (SELECT id FROM user WHERE name = ?))', [subid, str(name,), str(user,)])
cu.execute(
"INSERT INTO gateway (subid, name, user) \
VALUES (?, ?, (SELECT id FROM user WHERE name = ?))",
[
subid,
str(
name,
),
str(
user,
),
],
)
os.environ['EASYRSA_REQ_CN'] = name
os.environ["EASYRSA_REQ_CN"] = name
r = os.system('easyrsa gen-req {} nopass'.format(name))
r = os.system("easyrsa gen-req {} nopass".format(name))
if r != 0:
raise Ex(500, 'exit: {} cannot gen-req'.format(r))
r = os.system('easyrsa sign-req client {}'.format(name))
raise Ex(500, "exit: {} cannot gen-req".format(r))
r = os.system("easyrsa sign-req client {}".format(name))
if r != 0:
raise Ex(500, 'exit: {} cannot sign-req'.format(r))
raise Ex(500, "exit: {} cannot sign-req".format(r))
ipid = '{:x}{:x}'.format(userid, subid)
ipid = "{:x}{:x}".format(userid, subid)
address = '2001:470:c844::' + ipid + '0'
network = '2001:470:c844:' + ipid + '0::/60'
address = "2001:470:c844::" + ipid + "0"
network = "2001:470:c844:" + ipid + "0::/60"
staticclient = render_template('staticclient', address=address, network=network)
with open(os.path.join(PREFIX, 'ovpn/clients/', name), 'w') as f:
staticclient = render_template(
"staticclient", address=address, network=network
)
with open(os.path.join(PREFIX, "ovpn/clients/", name), "w") as f:
f.write(staticclient)
with open(os.path.join(PREFIX, 'ip/routes'), 'a') as f:
f.write(network + ' via ' + address + '\n')
with open(os.path.join(PREFIX, "ip/routes"), "a") as f:
f.write(network + " via " + address + "\n")
db.commit()
return jsonify({'status': 'ok'})
return jsonify({"status": "ok"})
except KeyError as e:
return jsonify({'status': 'error', 'message': str(e)}), 400
return jsonify({"status": "error", "message": str(e)}), 400
except sqlite3.Error as e:
return jsonify({'status': 'error', 'message': str(e)}), 409
return jsonify({"status": "error", "message": str(e)}), 409
except Ex as e:
return jsonify({'status': 'error', 'message': str(e)}), e.getCode()
return jsonify({"status": "error", "message": str(e)}), e.getCode()
finally:
cu.close()
db.close()
@app.route('/gateway/<fqdn>', methods=['GET'])
@app.route("/gateway/<fqdn>", methods=["GET"])
@auth.login_required
def get_gateway(fqdn):
db = sqlite3.connect(DATABASE)
cu = db.cursor()
gateway = dict()
for row in cu.execute('SELECT g.id, g.subid, g.name, u.name FROM gateway AS g INNER JOIN user AS u ON u.id = g.user'):
gateway['id'] = row[0]
gateway['subid'] = row[1]
gateway['name'] = row[2]
gateway['user'] = row[3]
for row in cu.execute(
"SELECT g.id, g.subid, g.name, u.name \
FROM gateway AS g \
INNER JOIN user AS u ON u.id = g.user"
):
gateway["id"] = row[0]
gateway["subid"] = row[1]
gateway["name"] = row[2]
gateway["user"] = row[3]
cu.close()
db.close()
return jsonify(gateway)
@app.route('/gateway/<fqdn>/config', methods=['GET'])
@app.route("/gateway/<fqdn>/config", methods=["GET"])
@auth.login_required
def get_gateway_config(fqdn):
# TODO sanity check FQDN
@ -182,57 +244,84 @@ def get_gateway_config(fqdn):
# or you have to trust the user of these API
# eg: he could retrieve the CA.key !!!
with open(os.environ['EASYRSA_PKI'] + '/issued/' + fqdn + '.crt') as f:
with open(os.environ["EASYRSA_PKI"] + "/issued/" + fqdn + ".crt") as f:
cert = f.read()
with open(os.environ['EASYRSA_PKI'] + '/private/' + fqdn + '.key') as f:
with open(os.environ["EASYRSA_PKI"] + "/private/" + fqdn + ".key") as f:
key = f.read()
with open(os.environ['EASYRSA_PKI'] + '/ca.crt') as f:
with open(os.environ["EASYRSA_PKI"] + "/ca.crt") as f:
ca = f.read()
return Response(render_template('config.ovpn', ca=ca, cert=cert, key=key), mimetype='text/plain')
return Response(
render_template("config.ovpn", ca=ca, cert=cert, key=key),
mimetype="text/plain",
)
@app.route('/gateway/<fqdn>', methods=['DELETE'])
@app.route("/gateway/<fqdn>", methods=["DELETE"])
@auth.login_required
def delete_gateway(fqdn):
# TODO sanity check for this parameter! Possible system command injection
db = sqlite3.connect(DATABASE)
cu = db.cursor()
for row in cu.execute('SELECT u.id, g.subid FROM gateway AS g INNER JOIN user AS u ON u.id = g.user WHERE g.name = ?', [str(fqdn,)]):
for row in cu.execute(
"SELECT u.id, g.subid \
FROM gateway AS g \
INNER JOIN user AS u ON u.id = g.user WHERE g.name = ?",
[
str(
fqdn,
)
],
):
userid = row[0]
subid = row[1]
ipid = '{:x}{:x}'.format(userid, subid)
ipid = "{:x}{:x}".format(userid, subid)
break
address = '2001:470:c844::' + ipid + '0/64'
network = '2001:470:c844:' + ipid + '0::/60'
address = "2001:470:c844::" + ipid + "0/64"
network = "2001:470:c844:" + ipid + "0::/60"
sedrm = "sed -i '\\_^" + network + " via " + address + "$_d' " + PREFIX + "/ip/routes"
print('[sedrm] ' + sedrm)
sedrm = (
"sed -i '\\_^"
+ network
+ " via "
+ address
+ "$_d' "
+ PREFIX
+ "/ip/routes"
)
print("[sedrm] " + sedrm)
r = os.system(sedrm)
if r != 0:
raise Ex(500, 'exit: {} cannot sed-out ip route'.format(r))
raise Ex(500, "exit: {} cannot sed-out ip route".format(r))
cu.execute('DELETE FROM gateway AS g WHERE g.name = ?', [str(fqdn,)])
cu.execute(
"DELETE FROM gateway AS g WHERE g.name = ?",
[
str(
fqdn,
)
],
)
try:
r = os.system('easyrsa revoke {}'.format(fqdn))
r = os.system("easyrsa revoke {}".format(fqdn))
if r != 0:
raise Ex(500, 'exit: {} cannot revoke'.format(r))
r = os.system('easyrsa gen-crl')
raise Ex(500, "exit: {} cannot revoke".format(r))
r = os.system("easyrsa gen-crl")
if r != 0:
raise Ex(500, 'exit: {} cannot gen-crl'.format(r))
raise Ex(500, "exit: {} cannot gen-crl".format(r))
except Ex as e:
return jsonify({'status': 'error', 'message': str(e)}), e.getCode()
return jsonify({"status": "error", "message": str(e)}), e.getCode()
os.remove(os.path.join(PREFIX, 'ovpn/clients', fqdn))
os.remove(os.path.join(PREFIX, "ovpn/clients", fqdn))
db.commit()
cu.close()
db.close()
return jsonify({'status': 'ok'})
return jsonify({"status": "ok"})
if __name__ == '__main__':
if __name__ == "__main__":
app.run(host="::", port=5000, debug=True)