summaryrefslogtreecommitdiff
path: root/scripts
diff options
context:
space:
mode:
Diffstat (limited to 'scripts')
-rw-r--r--scripts/generate_migration.py110
-rw-r--r--scripts/migrate_db.py80
2 files changed, 190 insertions, 0 deletions
diff --git a/scripts/generate_migration.py b/scripts/generate_migration.py
new file mode 100644
index 0000000..19552d9
--- /dev/null
+++ b/scripts/generate_migration.py
@@ -0,0 +1,110 @@
+import argparse
+import datetime
+import pathlib
+import re
+import uuid
+
+MIGRATION_FILE_PATT = re.compile(
+ r"^(?P<year>\d{4})(?P<month>\d{2})(?P<day>\d{2})_(?P<from>\w+)_(?P<direction>(:?up)|(:?down))_(?P<to>\w+).(?P<format>(:?sql)|(:?py))$"
+)
+
+
+def parse_args():
+ parser = argparse.ArgumentParser(
+ description="Automatically generates a new migration file for asyncpg-trek."
+ )
+ parser.add_argument(
+ "-f", "--folder", default="migrations", help="the migration folder to check"
+ )
+ parser.add_argument(
+ "--dry-run",
+ action="store_true",
+ help="whether to perform a dry run of this script",
+ )
+ parser.add_argument(
+ "-t",
+ "--type",
+ choices=["sql", "py"],
+ default="sql",
+ help="the type of file to produce. defaults to .sql",
+ )
+ parser.add_argument(
+ "-d",
+ "--direction",
+ choices=["up", "down"],
+ default=None,
+ help="direction of migration to generate. defaults to up & down",
+ )
+ parser.add_argument(
+ "--preserve-env",
+ default=False,
+ action="store_true",
+ help="whether to not modify the .env with the new revision",
+ )
+ return parser.parse_args()
+
+
+def main():
+ args = parse_args()
+ migration_folder = pathlib.Path(args.folder)
+
+ revisions = {}
+ possible_unlinked = []
+ for file in migration_folder.iterdir():
+ if not file.is_file():
+ continue
+
+ match = MIGRATION_FILE_PATT.match(file.name)
+ if not match:
+ continue
+
+ if match.group("direction") != "up":
+ continue
+
+ from_rev = match.group("from")
+ to_rev = match.group("to")
+
+ revisions[from_rev] = to_rev
+ if to_rev not in revisions:
+ possible_unlinked.append(to_rev)
+ if from_rev in possible_unlinked:
+ possible_unlinked.remove(from_rev)
+
+ from_: str
+ to = uuid.uuid4().hex[0:6]
+ if len(revisions) == 0:
+ from_ = "initial"
+ elif (
+ len(possible_unlinked) == 1
+ ): # this most likely means the lone unlinked pair remains
+ from_ = possible_unlinked[0]
+ else:
+ keys = list(revisions.keys())
+ for revision in possible_unlinked:
+ if revision not in keys:
+ from_ = revision
+ break
+ else:
+ raise RuntimeError("last node in linked list not found")
+
+ today = datetime.datetime.now()
+ month = f"0{today.month}" if today.month < 10 else str(today.month)
+ day = f"0{today.day}" if today.day < 10 else str(today.day)
+ filename = f"{today.year}{month}{day}_{from_}_{{direction}}_{to}.{args.type}"
+
+ directions: tuple[str, ...] = (
+ (args.direction,) if args.direction is not None else ("up", "down")
+ )
+ for direction in directions:
+ file = migration_folder / filename.format(direction=direction)
+
+ if args.dry_run:
+ print("Made file", file)
+ else:
+ file.touch()
+
+ print("Done! New revision is now", to)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/scripts/migrate_db.py b/scripts/migrate_db.py
new file mode 100644
index 0000000..b389f60
--- /dev/null
+++ b/scripts/migrate_db.py
@@ -0,0 +1,80 @@
+import argparse
+import asyncio
+import traceback
+
+import asyncpg
+import asyncpg_trek as pgtrek
+import dotenv
+from asyncpg_trek.asyncpg import AsyncpgBackend
+
+env_cfg: dict[str, str | None] = {}
+
+
+def parse_args():
+ parser = argparse.ArgumentParser(
+ description="Migrates the database from the current revision to a new specified one.",
+ )
+ parser.add_argument(
+ "revision",
+ default="NEWEST",
+ help="the revision to upgrade/downgrade to. defaults to the newest one via the constant 'NEWEST'",
+ )
+ parser.add_argument(
+ "-f",
+ "--folder",
+ default="migrations",
+ help="the folder where migrations are stored",
+ )
+ parser.add_argument(
+ "-e",
+ "--env",
+ default=".env",
+ help="the .env file for the database. this is used to log into the database",
+ )
+ parser.add_argument(
+ "--dry-run",
+ action="store_true",
+ help="whether to perform a dry run of this script",
+ )
+ parser.add_argument(
+ "-d",
+ "--direction",
+ default="up",
+ help="which direction the migration from the current to the specified goes in. defaults to up",
+ )
+ return parser.parse_args()
+
+
+async def main(args: argparse.Namespace):
+ if (db_url := env_cfg.get("DATABASE")) is None:
+ raise RuntimeError()
+
+ dir: pgtrek.Direction
+ if args.direction.lower() == "down":
+ dir = pgtrek.Direction.down
+ else:
+ dir = pgtrek.Direction.up
+
+ conn = await asyncpg.connect(db_url)
+ backend = AsyncpgBackend(conn)
+
+ try:
+ plans = await pgtrek.plan(
+ backend,
+ args.folder,
+ target_revision=args.revision,
+ direction=dir,
+ )
+ await pgtrek.execute(backend, plans)
+ except Exception as e:
+ print("oh nyo, something went wrong!")
+ traceback.print_exception(e)
+ return
+
+ print("all done! :D")
+
+
+if __name__ == "__main__":
+ args = parse_args()
+ env_cfg = dotenv.dotenv_values(args.env)
+ asyncio.run(main(args))