summaryrefslogtreecommitdiff
path: root/src/db.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/db.py')
-rw-r--r--src/db.py108
1 files changed, 108 insertions, 0 deletions
diff --git a/src/db.py b/src/db.py
new file mode 100644
index 0000000..78b1031
--- /dev/null
+++ b/src/db.py
@@ -0,0 +1,108 @@
+import dataclasses
+import datetime
+import ipaddress as ipaddr
+import uuid
+
+import asyncpg
+import quart as q
+
+
+def get_db() -> asyncpg.Pool:
+ return q.current_app.pool # pyright: ignore[reportAttributeAccessIssue]
+
+
+@dataclasses.dataclass
+class BlogPost:
+ id: uuid.UUID
+ title: str
+ description: str | None
+ text: str
+ original: str
+ created_at: datetime.datetime
+ public: bool
+
+ @classmethod
+ def from_record(cls, record: asyncpg.Record):
+ return cls(
+ id=record["id"],
+ title=record["title"],
+ description=record.get("description"),
+ text=record["text"],
+ original=record["original"],
+ created_at=record["created_at"],
+ public=record["public"],
+ )
+
+
+@dataclasses.dataclass
+class BlogComment:
+ id: uuid.UUID
+ author: str
+ text: str
+ created_at: datetime.datetime
+ post: uuid.UUID | None = None
+ video: uuid.UUID | None = None
+
+ @classmethod
+ def from_record(cls, record: asyncpg.Record):
+ return cls(
+ id=record["id"],
+ author=record["author"],
+ text=record["text"],
+ created_at=record["created_at"],
+ post=record.get("post_id"),
+ video=record.get("video_id"),
+ )
+
+
+@dataclasses.dataclass
+class AdminLogin:
+ id: int
+ addr: ipaddr.IPv4Interface | ipaddr.IPv6Interface
+ created_at: datetime.datetime
+
+ @classmethod
+ def from_record(cls, record: asyncpg.Record):
+ return cls(
+ id=record["id"],
+ addr=record["ipaddr"],
+ created_at=record["created_at"],
+ )
+
+
+async def get_post(post: str, *, public_only: bool = True):
+ query = "SELECT * FROM posts WHERE posts.id = $1"
+ if public_only:
+ query += " AND posts.public = true"
+ query += ";"
+
+ pool = get_db()
+ async with pool.acquire() as conn:
+ res = await conn.fetch(query, post)
+ if not res:
+ return None
+
+ return BlogPost.from_record(res[0])
+
+
+async def get_posts(*, public_only: bool = True):
+ query = "SELECT * FROM posts"
+ if public_only:
+ query += " WHERE posts.public = true"
+ query += ";"
+
+ pool = get_db()
+ async with pool.acquire() as conn:
+ res = await conn.fetch(query)
+ return [BlogPost.from_record(r) for r in res]
+
+
+async def get_post_comments(post: str):
+ pool = get_db()
+ async with pool.acquire() as conn:
+ res = await conn.fetch(
+ "SELECT * FROM comments WHERE comments.post_id = $1 ORDER BY comments.created_at",
+ post,
+ )
+ finals = [BlogComment.from_record(r) for r in res]
+ return finals