Advertisement
4nd3rs0n

deno_postgres_migrate.ts

Oct 17th, 2024
241
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
TypeScript 11.28 KB | Source Code | 0 0
  1. /*
  2.   This code provides basic functionality to work with a database
  3.  
  4.   This code is published under the MIT license.
  5.   Posted: 17 Oct 2024  
  6.   Author: Dmitry Anderson
  7.   Git: https://github.com/4nd3r5on
  8. */
  9.  
  10. import * as pg from "https://deno.land/x/postgres/mod.ts";
  11. import * as path from "jsr:@std/path"
  12.  
  13. export interface MigrationsConfig {
  14.   pool: pg.Pool;
  15.   mLocal: LocalMigrations;
  16. };
  17. export interface LocalMigrations {
  18.   /* Versions are sorted in the order from smaller ID to a bigger ID */
  19.   versionsUP:     bigint[];
  20.   migrationsUP:   Map<bigint, Migration>;
  21.   migrationsDown: Map<bigint, Migration>;
  22. };
  23.  
  24. export interface VerAndLabel {
  25.   version: bigint,
  26.   label:   string | null
  27. };
  28. export interface Migration extends VerAndLabel {
  29.   query: string,
  30. };
  31.  
  32. const withClient = (pool: pg.Pool, callback: (client: pg.PoolClient) => Promise<void> | void) => {
  33.   pool.connect().then(c => {
  34.     callback(c)
  35.     c.release()
  36.   })
  37. }
  38.  
  39. const tx = async (pool: pg.Pool, callback: (client: pg.PoolClient) => Promise<void>): Promise<void> => {
  40.   const client = await pool.connect()
  41.   try {
  42.     await client.queryArray("BEGIN")
  43.     await callback(client)
  44.     await client.queryArray("COMMIT")
  45.   } catch (err) {
  46.     await client.queryArray("ROLLBACK")
  47.     throw err
  48.   } finally {
  49.     client.release()
  50.   }
  51. }
  52.  
  53. // If returned version is -1 -- version is invalid
  54. const parseIdAndLabel = (verAndLabelStr: string): VerAndLabel => {
  55.   const result: VerAndLabel = {version: BigInt("-1"), label: null};
  56.   const underscoreIdx: number = verAndLabelStr.indexOf("_");
  57.   let idStr: string
  58.   let label: string | null = null;
  59.   if (underscoreIdx === -1) {
  60.     idStr = verAndLabelStr;
  61.   } else {
  62.     idStr = verAndLabelStr.substring(0, underscoreIdx);
  63.     if (underscoreIdx < verAndLabelStr.length - 1) {
  64.       label = verAndLabelStr.substring(underscoreIdx + 1, verAndLabelStr.length)
  65.     };
  66.   };
  67.   if (Number.isNaN(Number(BigInt(idStr))) || BigInt(idStr) < 0) {
  68.     return result;
  69.   };
  70.  
  71.   return {
  72.     version: BigInt(idStr),
  73.     label:   label,
  74.   };
  75. };
  76.  
  77. const loadMigrationDir = (dirPath: string): LocalMigrations => {
  78.   const result: LocalMigrations = {
  79.     versionsUP:     [],
  80.     migrationsUP:   new Map<bigint, Migration>,
  81.     migrationsDown: new Map<bigint, Migration>,
  82.   }
  83.  
  84.   for (const dirEntry of Deno.readDirSync("/")) {
  85.     const fname = dirEntry.name
  86.     const [idAndLabel, action, ext] = fname.split(".", 3);
  87.     if (ext != "sql") { continue };
  88.  
  89.     const verAndLabel = parseIdAndLabel(idAndLabel);
  90.     const decoder = new TextDecoder("utf-8");
  91.     const queryBuff = Deno.readFileSync(path.join(dirPath, fname))
  92.     const migration: Migration = {
  93.       version: verAndLabel.version,
  94.       label: verAndLabel.label,
  95.       query: decoder.decode(queryBuff),
  96.     }
  97.  
  98.     if (action === "up")  {
  99.       const existingMigration = result.migrationsUP.get(verAndLabel.version);
  100.       if (existingMigration) {
  101.         throw `Duplicate migration version files: ${existingMigration} and ${fname}`;
  102.       };
  103.       result.versionsUP = [...result.versionsUP, verAndLabel.version];
  104.       result.migrationsUP.set(verAndLabel.version, migration);
  105.     } else {
  106.       const existingMigration = result.migrationsDown.get(verAndLabel.version);
  107.       if (existingMigration) {
  108.         throw `Duplicate migration version files: ${existingMigration} and ${fname}`;
  109.       }
  110.       result.migrationsDown.set(verAndLabel.version, migration);
  111.     }
  112.   }
  113.  
  114.   return result;
  115. }
  116.  
  117.  
  118. // Just call it everytime if you're not sure if migrations table exists
  119. // It's safe to run this function even if table exists
  120. const createMigrationTable = (pool: pg.Pool) => {
  121.   withClient(pool, async c => {await c.queryArray(`
  122.   CREATE TABLE IF NOT EXISTS applied_migrations (
  123.     version BIGINT UNIQUE NOT NULL,
  124.     label TEXT
  125.   )`)})
  126. }
  127.  
  128.  
  129. // Returns an array of versions and labels from smaller to bigger version number.
  130. // Make sure that migrations table exists before calling this function
  131. export const getAppliedMigrations = (cfg: MigrationsConfig): VerAndLabel[] => {
  132.   const { pool } = cfg
  133.   let qresult: pg.QueryObjectResult<VerAndLabel>
  134.   withClient(pool, async c => {qresult = await c.queryObject<VerAndLabel>(
  135.     "SELECT version, label FROM applied_migrations ORDER BY version"
  136.   )})
  137.   return qresult!.rows
  138. }
  139.  
  140.  
  141. // Rollback a single migration using down migration from a file
  142. export const rollbackMigration = async (pool: pg.Pool, fpath: string, version: number): Promise<void> => {
  143.   await tx(pool, async (client) => {
  144.     const decoder = new TextDecoder("utf-8");
  145.     const queryBuff = await Deno.readFile(fpath).catch(err => { throw err });
  146.     await client.queryArray(decoder.decode(queryBuff));
  147.     await client.queryArray("DELETE FROM applied_migrations WHERE version=$1", [version]);
  148.   })
  149. }
  150.  
  151.  
  152. // Before rolling back be sure that applied migrations contain target version
  153. export const rollbackToVer = async (
  154.   pool: pg.Pool,
  155.   mLocal: LocalMigrations,
  156.   mApplied: VerAndLabel[],
  157.   targetVer: bigint,
  158. ) => {
  159.   let mRollaback: Migration[] = []
  160.   for (let i = mApplied.length - 1; i >= 0; i--) {
  161.     const mVer = mApplied[i].version
  162.     if (mVer < targetVer) {
  163.       break
  164.     }
  165.     const mDown = mLocal.migrationsDown.get(mVer)
  166.     if (!mDown) {
  167.       throw `No down migration for version ${mVer}`
  168.     }
  169.     if (mDown.label != mApplied[i].label) {
  170.       throw `Label of the applied migration (${mApplied[i].label}) doesn't match `+
  171.        `with local down migration (${mDown.label}). Migration version: ${mVer}`
  172.    }
  173.    mRollaback = [...mRollaback, mDown]
  174.  }
  175.  
  176.  return await tx(pool, async (client) => {
  177.    for (let i = 0; i < mRollaback.length; i++) {
  178.      const migration = mRollaback[i]
  179.      await client.queryArray(migration.query);
  180.      await client.queryArray("DELETE FROM applied_migrations WHERE version=$1", [migration.version]);
  181.    }
  182.  })
  183. };
  184.  
  185.  
  186. // if return is null -- every version is clean
  187. // if return is -1 -- no clean versions
  188. export const findLastCleanVer = (localMigrationIDs: bigint[], appliedMigrations: VerAndLabel[]): bigint | null =>  {
  189.  if (appliedMigrations.length < 1) {
  190.    return null
  191.  }
  192.  const minLen = Math.min(localMigrationIDs.length, appliedMigrations.length)
  193.  let i = 0
  194.  let clean: boolean = true
  195.  for (; i < minLen; i++) {
  196.    if (localMigrationIDs[i] !== appliedMigrations[i].version) {
  197.      clean = false
  198.      break;
  199.    }
  200.  }
  201.  return clean ? null : localMigrationIDs[i-1]
  202. }
  203.  
  204. // Make sure that migrations table exists before calling this function
  205. export const rollbackToCleanVer = (cfg: MigrationsConfig): bigint => {
  206.  const mApplied = getAppliedMigrations(cfg)
  207.  if (mApplied.length < 1) {
  208.    return -1n
  209.  }
  210.  const cleanVer = findLastCleanVer(cfg.mLocal.versionsUP, mApplied)
  211.  if (cleanVer === null) {
  212.    return mApplied[mApplied.length - 1].version
  213.  }
  214.  rollbackToVer(cfg.pool, cfg.mLocal, mApplied, cleanVer)
  215.  return cleanVer
  216. }
  217.  
  218.  
  219. // Apply a single migration from a file
  220. export const applyMigration = async (pool: pg.Pool, fpath: string, version: number, label: string) => {
  221.  return await tx(pool, async (client) => {
  222.    const decoder = new TextDecoder("utf-8");
  223.    const queryBuff = await Deno.readFile(fpath)
  224.    await client.queryArray(decoder.decode(queryBuff))
  225.    await client.queryArray("INSERT INTO applied_migrations (version, label) VALUES ($1, $2)", [version, label])
  226.  })
  227. }
  228.  
  229. // Before upgrading you must be sure that DB version is clean
  230. // (current version and all version bellow are exist in local migrations up)
  231. export const upgradeToVer = async (
  232.  pool: pg.Pool,
  233.  localMigrations: LocalMigrations,
  234.  currentVer: bigint,
  235.  targetVer: bigint,
  236. ) => {
  237.  let idxCurrent: number = -1
  238.  if (BigInt(currentVer.toString()) > -1) {
  239.    idxCurrent = localMigrations.versionsUP.indexOf(currentVer)
  240.    if (idxCurrent < 0) { throw "Current version isn't listed in local migrations" }
  241.  }
  242.  const idxTarget = localMigrations.versionsUP.indexOf(targetVer)
  243.  if (idxTarget < 0) { throw "Target version isn't listed in local migrations" }
  244.  const applyVersions = localMigrations.versionsUP
  245.  applyVersions.slice(idxCurrent+1, idxTarget)
  246.  
  247.  return await tx(pool, async (client) => {
  248.    let mVer: bigint
  249.    for (let i = 0; i < applyVersions.length; i++) {
  250.      mVer = applyVersions[i]
  251.      const migration = localMigrations.migrationsUP.get(mVer)
  252.      if (!migration) {
  253.        throw `Failed to get migration version ${mVer}. Probably error in migrations loading`
  254.      }
  255.      await client.queryArray(migration.query)
  256.      await client.queryArray("INSERT INTO applied_migrations (version, label) "+
  257.        "VALUES ($1, $2)", [migration.version, migration.label])
  258.    }
  259.  })
  260. }
  261.  
  262.  
  263. // Migrates to some specific version
  264. // Make sure that migrations table exists before calling this function
  265. //
  266. // If you're migrating down -- make sure u have
  267. export const migrateTo = async (cfg: MigrationsConfig, targetVer: bigint) => {
  268.   const { pool, mLocal } = cfg
  269.   const mApplied = getAppliedMigrations(cfg)
  270.  
  271.   let mCurrentVer: bigint | null = null
  272.   if (mApplied.length >= 1) {
  273.     mCurrentVer = mApplied[mApplied.length - 1].version
  274.   }
  275.  
  276.   if (mCurrentVer === targetVer) {
  277.     // Already up to date
  278.     return
  279.   } else if (!mCurrentVer || mCurrentVer < targetVer) {
  280.     const cleanVer = findLastCleanVer(mLocal.versionsUP, mApplied);
  281.     if (cleanVer !== null) {
  282.       throw "DB is dirty. Migrate to a clean version before upgrading DB"
  283.     }
  284.     await upgradeToVer(pool, mLocal, mCurrentVer || BigInt(-1), targetVer)
  285.   } else {
  286.     await rollbackToVer(pool, mLocal, mApplied, targetVer)
  287.   }
  288. }
  289.  
  290.  
  291. interface DoMigrateConfig {
  292.   pool: pg.Pool
  293.   migrationsDir: string
  294.   versionLimit: bigint | null
  295. }
  296. // Automated script for migrations on app start
  297. const doMigrate = async (cfg: DoMigrateConfig) => {
  298.   const mLocal = loadMigrationDir(cfg.migrationsDir)
  299.   const mCfg: MigrationsConfig = {
  300.     pool:   cfg.pool,
  301.     mLocal: mLocal
  302.   }
  303.  
  304.   if (mLocal.versionsUP.length < 1) {
  305.     throw `No migrations were found in ${cfg.migrationsDir}`
  306.   }
  307.   const maxVersion = mLocal.versionsUP[mLocal.versionsUP.length - 1]
  308.  
  309.   const migrateToVer = cfg.versionLimit || maxVersion
  310.   // If you're not sure if migration table exists
  311.   // just call that function
  312.   createMigrationTable(cfg.pool)
  313.   rollbackToCleanVer(mCfg)
  314.   await migrateTo(mCfg, migrateToVer)
  315.  
  316.   await new Promise(r => setTimeout(r, 300));
  317.   const mApplied = getAppliedMigrations(mCfg)
  318.   let currentVer: bigint | null = null
  319.   if (mApplied.length > 0) {
  320.     currentVer = mApplied[mApplied.length - 1].version
  321.   }
  322.   const mLastClean = findLastCleanVer(mLocal.versionsUP, mApplied)
  323.  
  324.   console.log(
  325.     `DB Info:\n`+
  326.     `\tDB Version Limit: ${cfg.versionLimit}\n`+
  327.     `\tCurrent DB Version: ${currentVer}/${maxVersion}\n`+
  328.     `\tDirty: ${mLastClean !== null ? true : false}\n`
  329.   )
  330. }
  331.  
  332.  
  333. interface SetupDbCfg {
  334.   migrationsDir: string
  335.   versionLimit: bigint | null
  336.   poolCfg: pg.ClientOptions | string
  337.   poolSize: number,
  338. }
  339. const setupDB = async (cfg: SetupDbCfg): Promise<pg.Pool> => {
  340.   const pool = new pg.Pool(cfg.poolCfg, 1)
  341.   const { versionLimit, migrationsDir } = cfg
  342.   await doMigrate({ pool, versionLimit, migrationsDir })
  343.   return pool
  344. }
  345.  
  346.  
  347. export default setupDB
  348. export { tx, withClient, createMigrationTable }
  349. export type { SetupDbCfg }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement