···11-import { createDiff } from "../../lib/diff.ts";
11+import { applyDiff, createDiff } from "../../lib/diff.ts";
22import { extractWikilinks } from "../../lib/markdown.ts";
33import { generateTid } from "../../lib/tid.ts";
44import { getDb } from "./index.ts";
···260260261261 return { revisionAtUri };
262262}
263263+264264+// --- Firehose ingestion queries ---
265265+266266+export function getWikiByAtUri(atUri: string): WikiRow | null {
267267+ const db = getDb();
268268+ return (
269269+ (db.query("SELECT * FROM wikis WHERE at_uri = ?").get(atUri) as WikiRow) ??
270270+ null
271271+ );
272272+}
273273+274274+export function getNoteByAtUri(
275275+ atUri: string,
276276+): (NoteRow & { wiki_slug: string }) | null {
277277+ const db = getDb();
278278+ return (
279279+ (db.query("SELECT * FROM notes WHERE at_uri = ?").get(atUri) as
280280+ | (NoteRow & { wiki_slug: string })
281281+ | null) ?? null
282282+ );
283283+}
284284+285285+export function upsertWiki(
286286+ slug: string,
287287+ did: string,
288288+ name: string,
289289+ visibility: string,
290290+ atUri: string,
291291+ createdAt: string,
292292+): void {
293293+ const db = getDb();
294294+ db.run(
295295+ `INSERT INTO wikis (slug, did, name, visibility, at_uri, created_at, updated_at)
296296+ VALUES (?, ?, ?, ?, ?, ?, datetime('now'))
297297+ ON CONFLICT(slug) DO UPDATE SET
298298+ name = excluded.name,
299299+ visibility = excluded.visibility,
300300+ updated_at = datetime('now')`,
301301+ [slug, did, name, visibility, atUri, createdAt],
302302+ );
303303+}
304304+305305+export function deleteWikiByAtUri(atUri: string): void {
306306+ const db = getDb();
307307+ const wiki = db
308308+ .query("SELECT slug FROM wikis WHERE at_uri = ?")
309309+ .get(atUri) as { slug: string } | null;
310310+ if (!wiki) return;
311311+312312+ db.transaction(() => {
313313+ const notes = db
314314+ .query("SELECT at_uri FROM notes WHERE wiki_slug = ?")
315315+ .all(wiki.slug) as { at_uri: string }[];
316316+ for (const note of notes) {
317317+ db.run("DELETE FROM current_note WHERE note_at_uri = ?", [note.at_uri]);
318318+ db.run("DELETE FROM revisions WHERE note_at_uri = ?", [note.at_uri]);
319319+ db.run("DELETE FROM snapshots WHERE note_at_uri = ?", [note.at_uri]);
320320+ db.run("DELETE FROM backlinks WHERE source_note_uri = ?", [note.at_uri]);
321321+ }
322322+ db.run("DELETE FROM notes WHERE wiki_slug = ?", [wiki.slug]);
323323+ db.run("DELETE FROM memberships WHERE wiki_slug = ?", [wiki.slug]);
324324+ db.run("DELETE FROM requests WHERE wiki_slug = ?", [wiki.slug]);
325325+ db.run("DELETE FROM wikis WHERE slug = ?", [wiki.slug]);
326326+ })();
327327+}
328328+329329+export function upsertNote(
330330+ wikiSlug: string,
331331+ slug: string,
332332+ title: string,
333333+ did: string,
334334+ atUri: string,
335335+ createdAt: string,
336336+): void {
337337+ const db = getDb();
338338+ db.run(
339339+ `INSERT INTO notes (slug, wiki_slug, title, did, at_uri, created_at)
340340+ VALUES (?, ?, ?, ?, ?, ?)
341341+ ON CONFLICT(wiki_slug, slug) DO UPDATE SET
342342+ title = excluded.title`,
343343+ [slug, wikiSlug, title, did, atUri, createdAt],
344344+ );
345345+}
346346+347347+export function deleteNoteByAtUri(atUri: string): void {
348348+ const db = getDb();
349349+ db.transaction(() => {
350350+ db.run("DELETE FROM current_note WHERE note_at_uri = ?", [atUri]);
351351+ db.run("DELETE FROM revisions WHERE note_at_uri = ?", [atUri]);
352352+ db.run("DELETE FROM snapshots WHERE note_at_uri = ?", [atUri]);
353353+ db.run("DELETE FROM backlinks WHERE source_note_uri = ?", [atUri]);
354354+ db.run("DELETE FROM notes WHERE at_uri = ?", [atUri]);
355355+ })();
356356+}
357357+358358+export function insertRevisionFromFirehose(
359359+ noteAtUri: string,
360360+ did: string,
361361+ revisionAtUri: string,
362362+ parentRevisionUri: string | null,
363363+ diff: string,
364364+ diffFormat: string,
365365+ message: string | null,
366366+): string {
367367+ const db = getDb();
368368+369369+ const current = db
370370+ .query("SELECT content FROM current_note WHERE note_at_uri = ?")
371371+ .get(noteAtUri) as { content: string } | null;
372372+373373+ const oldContent = current?.content ?? "";
374374+ const newContent = applyDiff(oldContent, diff);
375375+376376+ db.transaction(() => {
377377+ db.run(
378378+ `INSERT INTO revisions (note_at_uri, did, at_uri, parent_revision_uri, diff, diff_format, message)
379379+ VALUES (?, ?, ?, ?, ?, ?, ?)`,
380380+ [
381381+ noteAtUri,
382382+ did,
383383+ revisionAtUri,
384384+ parentRevisionUri,
385385+ diff,
386386+ diffFormat,
387387+ message,
388388+ ],
389389+ );
390390+ db.run(
391391+ `INSERT INTO current_note (note_at_uri, content, latest_revision_uri, updated_at)
392392+ VALUES (?, ?, ?, datetime('now'))
393393+ ON CONFLICT(note_at_uri) DO UPDATE SET
394394+ content = excluded.content,
395395+ latest_revision_uri = excluded.latest_revision_uri,
396396+ updated_at = excluded.updated_at`,
397397+ [noteAtUri, newContent, revisionAtUri],
398398+ );
399399+ db.run(
400400+ "INSERT INTO snapshots (note_at_uri, revision_at_uri, content) VALUES (?, ?, ?)",
401401+ [noteAtUri, revisionAtUri, newContent],
402402+ );
403403+ })();
404404+405405+ capSnapshots(noteAtUri);
406406+407407+ return newContent;
408408+}
409409+410410+export function upsertMembership(
411411+ wikiSlug: string,
412412+ did: string,
413413+ role: string,
414414+ atUri: string,
415415+ createdAt: string,
416416+): void {
417417+ const db = getDb();
418418+ db.run(
419419+ `INSERT INTO memberships (wiki_slug, did, role, at_uri, created_at)
420420+ VALUES (?, ?, ?, ?, ?)
421421+ ON CONFLICT(wiki_slug, did) DO UPDATE SET
422422+ role = excluded.role`,
423423+ [wikiSlug, did, role, atUri, createdAt],
424424+ );
425425+}
426426+427427+export function deleteMembershipByAtUri(atUri: string): void {
428428+ const db = getDb();
429429+ db.run("DELETE FROM memberships WHERE at_uri = ?", [atUri]);
430430+}
431431+432432+export function upsertRequest(
433433+ wikiSlug: string,
434434+ did: string,
435435+ atUri: string,
436436+ createdAt: string,
437437+): void {
438438+ const db = getDb();
439439+ db.run(
440440+ `INSERT INTO requests (wiki_slug, did, at_uri, created_at)
441441+ VALUES (?, ?, ?, ?)
442442+ ON CONFLICT(wiki_slug, did) DO UPDATE SET
443443+ at_uri = excluded.at_uri`,
444444+ [wikiSlug, did, atUri, createdAt],
445445+ );
446446+}
447447+448448+export function deleteRequestByAtUri(atUri: string): void {
449449+ const db = getDb();
450450+ db.run("DELETE FROM requests WHERE at_uri = ?", [atUri]);
451451+}
452452+453453+// --- Cursor queries ---
454454+455455+export function getCursor(): number | null {
456456+ const db = getDb();
457457+ const row = db
458458+ .query("SELECT cursor FROM firehose_cursor WHERE id = 1")
459459+ .get() as { cursor: number } | null;
460460+ return row?.cursor ?? null;
461461+}
462462+463463+export function setCursor(cursor: number): void {
464464+ const db = getDb();
465465+ db.run(
466466+ `INSERT INTO firehose_cursor (id, cursor, updated_at)
467467+ VALUES (1, ?, datetime('now'))
468468+ ON CONFLICT(id) DO UPDATE SET
469469+ cursor = excluded.cursor,
470470+ updated_at = excluded.updated_at`,
471471+ [cursor],
472472+ );
473473+}
+8
src/server/db/schema.ts
···104104 `);
105105106106 db.run(`
107107+ CREATE TABLE IF NOT EXISTS firehose_cursor (
108108+ id INTEGER PRIMARY KEY CHECK (id = 1),
109109+ cursor INTEGER NOT NULL,
110110+ updated_at TEXT NOT NULL DEFAULT (datetime('now'))
111111+ )
112112+ `);
113113+114114+ db.run(`
107115 CREATE TABLE IF NOT EXISTS oauth_sessions (
108116 did TEXT PRIMARY KEY,
109117 session_data TEXT NOT NULL,
···11-// Set DB_PATH before any imports that trigger getDb()
22-process.env.DB_PATH = `:memory:`;
33-41import { afterAll, beforeAll, describe, expect, test } from "bun:test";
52import { applyDiff } from "../../../src/lib/diff.ts";
63import { getDb } from "../../../src/server/db/index.ts";