A system for building static webapps
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: data integrity issues

+252 -21
+4 -1
deno.json
··· 27 27 "jsxImportSource": "@hono/hono/jsx", 28 28 "types": ["./index.d.ts"] 29 29 }, 30 + "check": { 31 + "exclude": ["**/dist", "**/docs"] 32 + }, 30 33 "fmt": { 31 34 "singleQuote": true, 32 35 "proseWrap": "preserve", ··· 34 37 "exclude": ["**/dist"] 35 38 }, 36 39 "lint": { 37 - "exclude": ["**/dist"], 40 + "exclude": ["**/dist", "**/docs"], 38 41 "rules": { 39 42 "exclude": ["no-import-prefix", "verbatim-module-syntax"] 40 43 }
+105
packages/store/entities/__tests__/collection.test.ts
··· 596 596 }) 597 597 }) 598 598 599 + describe('Atomic writes', () => { 600 + it('set() on a new document writes both document and change entry', async () => { 601 + const backend = new MemoryStorage<Todo>() 602 + const col = new Collection<Todo>(backend, { name: 'todos' }) 603 + await col.set('1', { title: 'Task', done: false }) 604 + const doc = await backend.getDocument('todos/1') 605 + assertExists(doc) 606 + assertEquals(doc.data, { title: 'Task', done: false }) 607 + const changes = await backend.getChanges('todos/1') 608 + assertEquals(changes.length, 1) 609 + assertEquals(changes[0].documentId, 'todos/1') 610 + }) 611 + 612 + it('set() on existing doc writes both updated document and change entry', async () => { 613 + const backend = new MemoryStorage<Todo>() 614 + const col = new Collection<Todo>(backend, { name: 'todos' }) 615 + await col.set('1', { title: 'Task', done: false }) 616 + await col.set('1', { title: 'Task', done: true }) 617 + const doc = await backend.getDocument('todos/1') 618 + assertExists(doc) 619 + assertEquals(doc.data.done, true) 620 + const changes = await backend.getChanges('todos/1') 621 + assertEquals(changes.length, 2) 622 + }) 623 + 624 + it('set() uses backend setDocumentWithChange when available', async () => { 625 + const backend = new MemoryStorage<Todo>() 626 + let atomicCallCount = 0 627 + const original = backend.setDocumentWithChange!.bind(backend) 628 + backend.setDocumentWithChange = (id, doc, entry) => { 629 + atomicCallCount++ 630 + return original(id, doc, entry) 631 + } 632 + const col = new Collection<Todo>(backend, { name: 'todos' }) 633 + await col.set('1', { title: 'Task', done: false }) 634 + assertEquals(atomicCallCount, 1) 635 + }) 636 + 637 + it('set() falls back to sequential writes when setDocumentWithChange is absent', async () => { 638 + const backend = new MemoryStorage<Todo>() 639 + delete (backend as unknown as Record<string, unknown>)[ 640 + 'setDocumentWithChange' 641 + ] 642 + const col = new Collection<Todo>(backend, { name: 'todos' }) 643 + await col.set('1', { title: 'Task', done: false }) 644 + const doc = await backend.getDocument('todos/1') 645 + assertExists(doc) 646 + const changes = await backend.getChanges('todos/1') 647 + assertEquals(changes.length, 1) 648 + }) 649 + }) 650 + 651 + describe('Migration write failure', () => { 652 + it('get() surfaces write failure during migration with a descriptive error', async () => { 653 + interface V1 { 654 + name: string 655 + } 656 + interface V2 { 657 + name: string 658 + score: number 659 + } 660 + 661 + const backend = new MemoryStorage<V2>() 662 + const now = new Date().toISOString() 663 + await backend.setDocument('items/a', { 664 + id: 'items/a', 665 + data: { name: 'Alice' } as unknown as V2, 666 + version: '1.0.0', 667 + hlc: '000000000001:0000:test', 668 + lastOrigin: 'test', 669 + createdAt: now, 670 + updatedAt: now, 671 + }) 672 + 673 + const originalSet = backend.setDocument.bind(backend) 674 + let callCount = 0 675 + backend.setDocument = (id, doc) => { 676 + callCount++ 677 + if (callCount === 1) throw new Error('disk full') 678 + return originalSet(id, doc) 679 + } 680 + 681 + const col = new Collection<V2>(backend, { 682 + name: 'items', 683 + schema: { 684 + versions: [ 685 + { version: '1.0.0', schema: { type: 'object' } }, 686 + { 687 + version: '2.0.0', 688 + schema: { type: 'object' }, 689 + from: '1.0.0', 690 + migrate: (data) => ({ ...(data as V1), score: 0 }), 691 + }, 692 + ], 693 + }, 694 + }) 695 + 696 + await assertRejects( 697 + () => col.get('a'), 698 + Error, 699 + 'failed to persist migrated document', 700 + ) 701 + }) 702 + }) 703 + 599 704 describe('ready() and dispose()', () => { 600 705 it('ready() resolves', async () => { 601 706 const col = makeCollection()
+41 -20
packages/store/entities/collection.ts
··· 189 189 : '1.0.0' 190 190 } 191 191 192 + /** 193 + * Write a document snapshot and its change entry. Uses the backend's atomic 194 + * `setDocumentWithChange` when available; falls back to two sequential writes 195 + * for backends that don't implement it. 196 + */ 197 + async #setDocumentWithChange( 198 + id: string, 199 + doc: Document<T>, 200 + entry: ChangeEntry, 201 + ): Promise<void> { 202 + if (typeof this.#backend.setDocumentWithChange === 'function') { 203 + await this.#backend.setDocumentWithChange(id, doc, entry) 204 + } else { 205 + await this.#backend.setDocument(id, doc) 206 + await this.#backend.appendChange(entry) 207 + } 208 + } 209 + 192 210 // Lifecycle 193 211 194 212 /** Resolves once the collection is ready to accept operations. */ ··· 269 287 // Persist the migrated snapshot without creating a change entry 270 288 const hlcTs = increment(this.#clock) 271 289 const hlcStr = packHLC(hlcTs) 272 - await this.#backend.setDocument(scopedId, { 273 - ...doc, 274 - data: data as T, 275 - version, 276 - hlc: hlcStr, 277 - lastOrigin: this.#clientId, 278 - updatedAt: new Date().toISOString(), 279 - }) 290 + try { 291 + await this.#backend.setDocument(scopedId, { 292 + ...doc, 293 + data: data as T, 294 + version, 295 + hlc: hlcStr, 296 + lastOrigin: this.#clientId, 297 + updatedAt: new Date().toISOString(), 298 + }) 299 + } catch (cause) { 300 + throw new Error( 301 + `Collection.get: failed to persist migrated document "${id}". ` + 302 + `The next read will attempt migration again.`, 303 + { cause }, 304 + ) 305 + } 280 306 } 281 307 return data as T 282 308 } ··· 313 339 createdAt: now, 314 340 synced: false, 315 341 } 316 - await this.#backend.setDocument(scopedId, { 342 + await this.#setDocumentWithChange(scopedId, { 317 343 id: scopedId, 318 344 data: value, 319 345 version, ··· 321 347 lastOrigin: this.#clientId, 322 348 createdAt: now, 323 349 updatedAt: now, 324 - }) 325 - await this.#backend.appendChange(entry) 350 + }, entry) 326 351 this.#notify(id, value, entry) 327 352 await this.#maybeCompact(scopedId) 328 353 return ··· 341 366 createdAt: now, 342 367 synced: false, 343 368 } 344 - await this.#backend.setDocument(scopedId, { 369 + await this.#setDocumentWithChange(scopedId, { 345 370 ...existing, 346 371 data: value, 347 372 version, 348 373 hlc: hlcStr, 349 374 lastOrigin: this.#clientId, 350 375 updatedAt: now, 351 - }) 352 - await this.#backend.appendChange(entry) 376 + }, entry) 353 377 this.#notify(id, value, entry) 354 378 await this.#maybeCompact(scopedId) 355 379 } ··· 682 706 const base = existing?.data ?? ({} as T) 683 707 const newData = applyPatch(base, change.patch) as T 684 708 const now = new Date().toISOString() 685 - await this.#backend.setDocument(scopedId, { 709 + const remoteEntry: ChangeEntry = { ...change, documentId: scopedId } 710 + await this.#setDocumentWithChange(scopedId, { 686 711 id: scopedId, 687 712 data: newData, 688 713 version: this.#currentVersion(), ··· 690 715 lastOrigin: change.origin, 691 716 createdAt: existing?.createdAt ?? now, 692 717 updatedAt: now, 693 - }) 694 - await this.#backend.appendChange({ 695 - ...change, 696 - documentId: scopedId, 697 - }) 718 + }, remoteEntry) 698 719 this.#notify(docId, newData, change) 699 720 lastData = newData 700 721 lastOrigin = 'remote'
+12
packages/store/entities/storage.ts
··· 45 45 appendChange(entry: ChangeEntry): Promise<void> 46 46 47 47 /** 48 + * Atomically write a document snapshot and its change entry in one operation. 49 + * Prevents inconsistency if the process crashes between a `setDocument` and 50 + * `appendChange` call. Optional — {@linkcode Collection} falls back to two 51 + * sequential writes for backends that don't implement it. 52 + */ 53 + setDocumentWithChange?( 54 + id: string, 55 + doc: Document<T>, 56 + entry: ChangeEntry, 57 + ): Promise<void> 58 + 59 + /** 48 60 * Query changes for a document, ordered by HLC ascending. 49 61 * 50 62 * `since` and `until` are HLC strings (exclusive and inclusive respectively).
+29
packages/store/storage/__tests__/memory.test.ts
··· 144 144 }) 145 145 }) 146 146 147 + describe('setDocumentWithChange', () => { 148 + it('writes document and change in a single call', async () => { 149 + const s = new MemoryStorage() 150 + const doc = makeDoc('doc-1', { title: 'Hello' }) 151 + const change = makeChange('c1', 'doc-1') 152 + await s.setDocumentWithChange!('doc-1', doc, change) 153 + assertEquals((await s.getDocument('doc-1'))?.data, { title: 'Hello' }) 154 + const changes = await s.getChanges('doc-1') 155 + assertEquals(changes.length, 1) 156 + assertEquals(changes[0].id, 'c1') 157 + }) 158 + 159 + it('overwrites an existing document and appends change', async () => { 160 + const s = new MemoryStorage() 161 + await s.setDocument('doc-1', makeDoc('doc-1', { title: 'Old' })) 162 + const updated = makeDoc( 163 + 'doc-1', 164 + { title: 'New' }, 165 + '000000000002:0000:test', 166 + ) 167 + const change = makeChange('c1', 'doc-1', '000000000002:0000:test') 168 + await s.setDocumentWithChange!('doc-1', updated, change) 169 + assertEquals((await s.getDocument('doc-1'))?.data, { title: 'New' }) 170 + const changes = await s.getChanges('doc-1') 171 + assertEquals(changes.length, 1) 172 + assertEquals(changes[0].id, 'c1') 173 + }) 174 + }) 175 + 147 176 describe('checkpoints', () => { 148 177 it('getCheckpoint() returns null when none exist', async () => { 149 178 const s = new MemoryStorage()
+19
packages/store/storage/deno_fs.ts
··· 127 127 await this.#writeJSON(this.#changeFile(entry.documentId, entry.id), entry) 128 128 } 129 129 130 + /** 131 + * Best-effort atomic write: writes the document via temp file + rename 132 + * (atomic on POSIX), then writes the change file. A crash between the two 133 + * writes will leave the document updated but the change missing. Acceptable 134 + * for this backend, which is intended for development/testing only. 135 + */ 136 + async setDocumentWithChange( 137 + id: string, 138 + doc: Document<T>, 139 + entry: ChangeEntry, 140 + ): Promise<void> { 141 + const docPath = this.#docFile(id) 142 + const tmpPath = docPath + '.tmp' 143 + await ensureDir(dirname(docPath)) 144 + await Deno.writeTextFile(tmpPath, JSON.stringify(doc)) 145 + await Deno.rename(tmpPath, docPath) 146 + await this.#writeJSON(this.#changeFile(entry.documentId, entry.id), entry) 147 + } 148 + 130 149 async getChanges( 131 150 documentId: string, 132 151 options?: {
+15
packages/store/storage/deno_kv.ts
··· 154 154 .commit() 155 155 } 156 156 157 + async setDocumentWithChange( 158 + id: string, 159 + doc: Document<T>, 160 + entry: ChangeEntry, 161 + ): Promise<void> { 162 + await this.#kv.atomic() 163 + .set(this.#docKey(id), doc) 164 + .set(this.#chgKey(entry.documentId, entry.hlc, entry.id), entry) 165 + .set(this.#chgIdxKey(entry.id), { 166 + documentId: entry.documentId, 167 + hlc: entry.hlc, 168 + }) 169 + .commit() 170 + } 171 + 157 172 async getChanges( 158 173 documentId: string, 159 174 options?: {
+15
packages/store/storage/idb.ts
··· 217 217 }) 218 218 } 219 219 220 + async setDocumentWithChange( 221 + _id: string, 222 + doc: Document<T>, 223 + entry: ChangeEntry, 224 + ): Promise<void> { 225 + const db = await this.#getDb() 226 + await new Promise<void>((resolve, reject) => { 227 + const tx = db.transaction(['documents', 'changes'], 'readwrite') 228 + tx.oncomplete = () => resolve() 229 + tx.onerror = () => reject(tx.error) 230 + tx.objectStore('documents').put(doc) 231 + tx.objectStore('changes').add(entry) 232 + }) 233 + } 234 + 220 235 async getChanges( 221 236 documentId: string, 222 237 options?: {
+12
packages/store/storage/memory.ts
··· 68 68 return Promise.resolve() 69 69 } 70 70 71 + setDocumentWithChange( 72 + id: string, 73 + doc: Document<T>, 74 + entry: ChangeEntry, 75 + ): Promise<void> { 76 + this.documents.set(id, doc) 77 + const arr = this.changes.get(entry.documentId) ?? [] 78 + arr.push(entry) 79 + this.changes.set(entry.documentId, arr) 80 + return Promise.resolve() 81 + } 82 + 71 83 getChanges( 72 84 documentId: string, 73 85 options?: {