import datetime import hashlib import secrets import uuid import marko import quart as q from . import db MAX_LOGIN_TIME = datetime.timedelta(days=1) blueprint = q.Blueprint("admin", __name__, url_prefix="/admin") def get_now() -> datetime.datetime: return datetime.datetime.now(tz=datetime.timezone.utc) async def login_check(): if (token := q.session.get("login")) is None: return q.redirect(f"/admin/login?return={q.request.path}") pool = db.get_db() async with pool.acquire() as conn: res = await conn.fetch( "SELECT * FROM admin_logins WHERE admin_logins.id = $1;", token ) if not res: return q.redirect(f"/admin/login?return={q.request.path}") login = db.AdminLogin.from_record(res[0]) created_at = login.created_at.astimezone(datetime.timezone.utc) now = get_now() if (now - created_at) > MAX_LOGIN_TIME: return q.redirect(f"/admin/login?return={q.request.path}") return None @blueprint.route("/") async def index(): resp = await login_check() if resp is not None: return resp return await q.render_template("admin/index.html", title="Admin Panel") @blueprint.route("/login", methods=["GET", "POST"]) async def login(): if q.request.method == "GET": return await q.render_template("admin/login.html", title="Admin Login") queries = q.request.args ip_addr = q.request.remote_addr form = await q.request.form if ip_addr is None: return q.Response( "
An IP address MUST be provided in order to log in. It is required for logging purposes.
", status=401, ) if (redirect_url := queries.get("return")) is None: redirect_url = "/admin" # default route pool = db.get_db() token: str | None = q.session.get("login") if token is not None: async with pool.acquire() as conn: logins = await conn.fetch( "SELECT * FROM admin_logins WHERE admin_logins.id = $1;", token ) if not logins: del q.session["login"] else: login_info = db.AdminLogin.from_record(logins[0]) created_at = login_info.created_at.astimezone(tz=datetime.timezone.utc) now = get_now() if (now - created_at) > MAX_LOGIN_TIME: del q.session["login"] else: return q.Response( status=200, headers={ "HX-Redirect": redirect_url, }, ) if form: password = hashlib.sha256(form["password"].encode()).hexdigest() if secrets.compare_digest(password, q.current_app.config["ADMIN_PASSWORD"]): async with pool.acquire() as conn: ids = await conn.fetch( "INSERT INTO admin_logins (ipaddr) VALUES ($1) RETURNING id;", ip_addr, ) id: uuid.UUID = ids[0]["id"] q.session["login"] = id.hex return q.Response( status=200, headers={ "HX-Redirect": redirect_url, }, ) else: return q.Response( "Invalid username or password. You have 10 attempts left before a ban.
", status=401, ) return q.Response( "Your time has expired and you did not provide a form to log in. Please do that!
", status=401, ) @blueprint.route("/logout") async def logout(): if q.session.get("login") is not None: del q.session["login"] return q.redirect("/") @blueprint.route("/blog") async def blog_posts(): resp = await login_check() if resp is not None: return resp posts = await db.get_posts(public_only=False) return await q.render_template( "admin/blog.html", title="Blog Posts", posts=posts, ) @blueprint.route("/blog/create", methods=["GET", "POST"]) async def create_blog_post(): resp = await login_check() if resp is not None: return resp if q.request.method == "GET": return await q.render_template("admin/blog_create.html", title="Create a Post") data = await q.request.form text: str = data["text"] title: str = data["title"] description: str | None = data.get("description") public: bool = data.get("public") == "on" formatted = marko.convert(text) pool = db.get_db() async with pool.acquire() as conn: await conn.execute( "INSERT INTO posts (text, title, description, original, public) VALUES ($1, $2, $3, $4, $5);", formatted, title, description, text, public, ) return q.redirect("/admin/blog") @blueprint.route("/blog/