diff options
| author | Emma Terzioglu <emreterzioglu49@gmail.com> | 2026-03-13 13:49:15 -0700 |
|---|---|---|
| committer | Emma Terzioglu <emreterzioglu49@gmail.com> | 2026-03-13 13:49:15 -0700 |
| commit | 7a33856a527aebbd8d2a624c6d5937c25c9a1d90 (patch) | |
| tree | 855c642394e7ba1de40b8bb8737be12bec0aedaf /src/admin.py | |
initial commit
new website repo yay!!!
Diffstat (limited to 'src/admin.py')
| -rw-r--r-- | src/admin.py | 227 |
1 files changed, 227 insertions, 0 deletions
diff --git a/src/admin.py b/src/admin.py new file mode 100644 index 0000000..4dc8b6c --- /dev/null +++ b/src/admin.py @@ -0,0 +1,227 @@ +import datetime +import hashlib +import uuid + +import marko +import quart as q + +from . import db + +MAX_LOGIN_TIME = datetime.timedelta(days=1) + + +blueprint = q.Blueprint("admin", __name__, url_prefix="/admin") + + +def get_now() -> datetime.datetime: + return datetime.datetime.now(tz=datetime.timezone.utc) + + +async def login_check(): + if (token := q.session.get("login")) is None: + return q.redirect(f"/admin/login?return={q.request.path}") + + pool = db.get_db() + async with pool.acquire() as conn: + res = await conn.fetch( + "SELECT * FROM admin_logins WHERE admin_logins.id = $1;", token + ) + if not res: + return q.redirect(f"/admin/login?return={q.request.path}") + + login = db.AdminLogin.from_record(res[0]) + created_at = login.created_at.astimezone(datetime.timezone.utc) + now = get_now() + + if (now - created_at) > MAX_LOGIN_TIME: + return q.redirect(f"/admin/login?return={q.request.path}") + + return None + + +@blueprint.route("/") +async def index(): + resp = await login_check() + if resp is not None: + return resp + + return await q.render_template("admin/index.html", title="Admin Panel") + + +@blueprint.route("/login", methods=["GET", "POST"]) +async def login(): + if q.request.method == "GET": + return await q.render_template("admin/login.html", title="Admin Login") + + queries = q.request.args + ip_addr = q.request.remote_addr + form = await q.request.form + + if ip_addr is None: + return q.Response( + "<p>An IP address MUST be provided in order to log in. It is required for logging purposes.</p>", + status=401, + ) + + if (redirect_url := queries.get("return")) is None: + redirect_url = "/admin" # default route + + pool = db.get_db() + + token: str | None = q.session.get("login") + if token is not None: + async with pool.acquire() as conn: + logins = await conn.fetch( + "SELECT * FROM admin_logins WHERE admin_logins.id = $1;", token + ) + + if not logins: + del q.session["login"] + else: + login_info = db.AdminLogin.from_record(logins[0]) + created_at = login_info.created_at.astimezone(tz=datetime.timezone.utc) + now = get_now() + + if (now - created_at) > MAX_LOGIN_TIME: + del q.session["login"] + else: + return q.Response( + status=200, + headers={ + "HX-Redirect": redirect_url, + }, + ) + + if form: + password = hashlib.sha256(form["password"].encode()).hexdigest() + # FIXME: this is insecure due to timing attacks. use a library for this comparison. + if password == q.current_app.config["ADMIN_PASSWORD"]: + async with pool.acquire() as conn: + ids = await conn.fetch( + "INSERT INTO admin_logins (ipaddr) VALUES ($1) RETURNING id;", + ip_addr, + ) + id: uuid.UUID = ids[0]["id"] + q.session["login"] = id.hex + return q.Response( + status=200, + headers={ + "HX-Redirect": redirect_url, + }, + ) + else: + return q.Response( + "<p>Invalid username or password. You have 10 attempts left before a ban.</p>", + status=401, + ) + + return q.Response( + "<p>Your time has expired and you did not provide a form to log in. Please do that!</p>", + status=401, + ) + + +@blueprint.route("/logout") +async def logout(): + if q.session.get("login") is not None: + del q.session["login"] + + return q.redirect("/") + + +@blueprint.route("/blog") +async def blog_posts(): + resp = await login_check() + if resp is not None: + return resp + + posts = await db.get_posts(public_only=False) + return await q.render_template( + "admin/blog.html", + title="Blog Posts", + posts=posts, + ) + + +@blueprint.route("/blog/create", methods=["GET", "POST"]) +async def create_blog_post(): + resp = await login_check() + if resp is not None: + return resp + + if q.request.method == "GET": + return await q.render_template("admin/blog_create.html", title="Create a Post") + + data = await q.request.form + text: str = data["text"] + title: str = data["title"] + description: str | None = data.get("description") + public: bool = data.get("public") == "on" + + formatted = marko.convert(text) + + pool = db.get_db() + async with pool.acquire() as conn: + await conn.execute( + "INSERT INTO posts (text, title, description, original, public) VALUES ($1, $2, $3, $4, $5);", + formatted, + title, + description, + text, + public, + ) + + return q.redirect("/admin/blog") + + +@blueprint.route("/blog/<id>", methods=["GET", "POST"]) +async def edit_blog_post(id: str): + resp = await login_check() + if resp is not None: + return resp + + pool = db.get_db() + + if q.request.method == "GET": + post = await db.get_post(id, public_only=False) + if not post: + q.abort(404) + + return await q.render_template( + "admin/blog_edit.html", title="Edit Blog Post", post=post + ) + + data = await q.request.form + text: str = data["text"] + title: str = data["title"] + description: str | None = data.get("description") + public: bool = data.get("public") == "on" + + formatted = marko.convert(text) + + async with pool.acquire() as conn: + await conn.execute( + "UPDATE posts SET text = $1, title = $2, description = $3, original = $4, public = $5 WHERE posts.id = $6;", + formatted, + title, + description, + text, + public, + id, + ) + + return q.redirect("/admin/blog") + + +@blueprint.route("/blog/<id>", methods=["DELETE"]) +async def delete_blog_post(id: str): + resp = await login_check() + if resp is not None: + return resp + + pool = db.get_db() + + async with pool.acquire() as conn: + await conn.execute("DELETE FROM comments WHERE comments.post_id = $1;", id) + await conn.execute("DELETE FROM posts WHERE posts.id = $1;", id) + return q.Response(status=204) |
