Fork of github.com/did-method-plc/did-method-plc
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Row locks (#12)

* v0.0.2

* add row locks for users

authored by

Daniel Holmgren and committed by
GitHub
cee78371 19527154

+151 -40
+49 -40
packages/server/src/db/index.ts
··· 101 101 const { nullified, prev } = await plc.assureValidNextOp(did, ops, proposed) 102 102 const cid = await cidForCbor(proposed) 103 103 104 - await this.db 105 - .transaction() 106 - .setIsolationLevel('serializable') 107 - .execute(async (tx) => { 108 - await tx 109 - .insertInto('operations') 110 - .values({ 111 - did, 112 - operation: proposed, 113 - cid: cid.toString(), 114 - nullified: false, 115 - }) 116 - .execute() 104 + await this.db.transaction().execute(async (tx) => { 105 + // grab a row lock on user table 106 + const userLock = await tx 107 + .selectFrom('dids') 108 + .forUpdate() 109 + .selectAll() 110 + .where('did', '=', did) 111 + .executeTakeFirst() 112 + 113 + if (!userLock) { 114 + await tx.insertInto('dids').values({ did }).execute() 115 + } 117 116 118 - if (nullified.length > 0) { 119 - const nullfiedStrs = nullified.map((cid) => cid.toString()) 120 - await tx 121 - .updateTable('operations') 122 - .set({ nullified: true }) 123 - .where('did', '=', did) 124 - .where('cid', 'in', nullfiedStrs) 125 - .execute() 126 - } 117 + await tx 118 + .insertInto('operations') 119 + .values({ 120 + did, 121 + operation: proposed, 122 + cid: cid.toString(), 123 + nullified: false, 124 + }) 125 + .execute() 127 126 128 - // verify that the 2nd to last tx matches the proposed prev 129 - // otherwise rollback to prevent forks in history 130 - const mostRecent = await tx 131 - .selectFrom('operations') 132 - .select('cid') 127 + if (nullified.length > 0) { 128 + const nullfiedStrs = nullified.map((cid) => cid.toString()) 129 + await tx 130 + .updateTable('operations') 131 + .set({ nullified: true }) 133 132 .where('did', '=', did) 134 - .where('nullified', '=', false) 135 - .orderBy('createdAt', 'desc') 136 - .limit(2) 133 + .where('cid', 'in', nullfiedStrs) 137 134 .execute() 138 - const isMatch = 139 - (prev === null && !mostRecent[1]) || 140 - (prev && prev.equals(CID.parse(mostRecent[1].cid))) 141 - if (!isMatch) { 142 - throw new ServerError( 143 - 409, 144 - `Proposed prev does not match the most recent operation: ${mostRecent?.toString()}`, 145 - ) 146 - } 147 - }) 135 + } 136 + 137 + // verify that the 2nd to last tx matches the proposed prev 138 + // otherwise rollback to prevent forks in history 139 + const mostRecent = await tx 140 + .selectFrom('operations') 141 + .select('cid') 142 + .where('did', '=', did) 143 + .where('nullified', '=', false) 144 + .orderBy('createdAt', 'desc') 145 + .limit(2) 146 + .execute() 147 + const isMatch = 148 + (prev === null && !mostRecent[1]) || 149 + (prev && prev.equals(CID.parse(mostRecent[1].cid))) 150 + if (!isMatch) { 151 + throw new ServerError( 152 + 409, 153 + `Proposed prev does not match the most recent operation: ${mostRecent?.toString()}`, 154 + ) 155 + } 156 + }) 148 157 } 149 158 150 159 async mostRecentCid(did: string, notIncluded: CID[]): Promise<CID | null> {
+5
packages/server/src/db/types.ts
··· 17 17 exportOps(count: number, after?: Date): Promise<plc.ExportedOp[]> 18 18 } 19 19 20 + export interface DidsTable { 21 + did: string 22 + } 23 + 20 24 export interface OperationsTable { 21 25 did: string 22 26 operation: plc.CompatibleOpOrTombstone ··· 26 30 } 27 31 28 32 export interface DatabaseSchema { 33 + dids: DidsTable 29 34 operations: OperationsTable 30 35 }
+20
packages/server/src/migrations/20230406T174552885Z-did-locks.ts
··· 1 + import { Kysely } from 'kysely' 2 + 3 + export async function up(db: Kysely<any>): Promise<void> { 4 + await db.schema 5 + .createTable('dids') 6 + .addColumn('did', 'text', (col) => col.primaryKey()) 7 + .execute() 8 + 9 + await db 10 + .insertInto('dids') 11 + .columns(['did']) 12 + .expression((qb) => qb.selectFrom('operations').select(['did']).distinct()) 13 + .execute() 14 + // Migration code 15 + } 16 + 17 + export async function down(db: Kysely<unknown>): Promise<void> { 18 + await db.schema.dropTable('dids').execute() 19 + // Migration code 20 + }
+1
packages/server/src/migrations/index.ts
··· 4 4 5 5 export * as _20221020T204908820Z from './20221020T204908820Z-operations-init' 6 6 export * as _20230223T215019669Z from './20230223T215019669Z-refactor' 7 + export * as _20230406T174552885Z from './20230406T174552885Z-did-locks'
+76
packages/server/tests/migrations/did-locks.test.ts
··· 1 + import { cidForCbor, DAY } from '@atproto/common' 2 + import { Secp256k1Keypair } from '@atproto/crypto' 3 + import * as plc from '@did-plc/lib' 4 + import { Kysely } from 'kysely' 5 + import { Database } from '../../src' 6 + 7 + describe('did-locks migration', () => { 8 + let db: Database 9 + let rawDb: Kysely<any> 10 + 11 + beforeAll(async () => { 12 + const dbUrl = process.env.DATABASE_URL 13 + if (!dbUrl) { 14 + throw new Error('No postgres url provided') 15 + } 16 + db = Database.postgres({ 17 + url: dbUrl, 18 + schema: 'migration_did_locks', 19 + }) 20 + 21 + await db.migrateToOrThrow('_20230223T215019669Z') 22 + rawDb = db.db 23 + }) 24 + 25 + afterAll(async () => { 26 + await db.close() 27 + }) 28 + 29 + const dids: string[] = [] 30 + 31 + it('fills the database with some operations', async () => { 32 + const ops: any[] = [] 33 + for (let i = 0; i < 100; i++) { 34 + const signingKey = await Secp256k1Keypair.create() 35 + const recoveryKey = await Secp256k1Keypair.create() 36 + const { op, did } = await plc.createOp({ 37 + signingKey: signingKey.did(), 38 + rotationKeys: [recoveryKey.did()], 39 + handle: `user${i}.test`, 40 + pds: 'https://example.com', 41 + signer: recoveryKey, 42 + }) 43 + const cid = await cidForCbor(op) 44 + const randomOffset = Math.floor(Math.random() * DAY * 60) 45 + const time = new Date(Date.now() - randomOffset).toISOString() 46 + ops.push({ 47 + did, 48 + operation: JSON.stringify(op), 49 + cid: cid.toString(), 50 + nullified: 0, 51 + createdAt: time, 52 + }) 53 + dids.push(did) 54 + const op2 = await plc.updateHandleOp(op, recoveryKey, `user${i}-2.test`) 55 + const cid2 = await cidForCbor(op2) 56 + ops.push({ 57 + did, 58 + operation: JSON.stringify(op2), 59 + cid: cid2.toString(), 60 + nullified: 0, 61 + createdAt: new Date().toISOString(), 62 + }) 63 + } 64 + await rawDb.insertInto('operations').values(ops).execute() 65 + }) 66 + 67 + it('migrates', async () => { 68 + await db.migrateToOrThrow('_20230406T174552885Z') 69 + }) 70 + 71 + it('correctly filled in dids', async () => { 72 + const migrated = await rawDb.selectFrom('dids').selectAll().execute() 73 + const sorted = migrated.map((row) => row.did).sort() 74 + expect(sorted).toEqual(dids.sort()) 75 + }) 76 + })