Ionosphere.tv
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: index layers.pub records and rebuild materialized documents via Lens 3

Wire layers-indexer.ts into the existing Jetstream indexer to handle
pub.layers.expression, segmentation, and annotationLayer records.
On each create/update, rebuildDocument() re-runs Lens 3 to materialize
the talk document. Only processes records from the bot DID.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+273 -1
+2 -1
apps/ionosphere-appview/src/appview.ts
··· 1 1 import { serve } from "@hono/node-server"; 2 2 import { openDb, migrate, getCursor, setCursor } from "./db.js"; 3 3 import { createRoutes } from "./routes.js"; 4 - import { processEvent } from "./indexer.js"; 4 + import { processEvent, setBotDid } from "./indexer.js"; 5 5 import { JetstreamClient } from "./jetstream.js"; 6 6 import { backfill } from "./backfill.js"; 7 7 import { startPublicJetstream } from "./public-jetstream.js"; ··· 39 39 } 40 40 41 41 if (did) { 42 + setBotDid(did); 42 43 console.log(`Backfilling from ${did}`); 43 44 try { 44 45 await backfill(db, PDS_URL, did);
+59
apps/ionosphere-appview/src/indexer.ts
··· 1 1 import type Database from "better-sqlite3"; 2 2 import { ensureProfile } from "./profiles.js"; 3 + import { 4 + indexExpression, 5 + indexSegmentation, 6 + indexAnnotationLayer, 7 + deleteExpression, 8 + deleteSegmentation, 9 + deleteAnnotationLayer, 10 + rebuildDocument, 11 + } from "./layers-indexer.js"; 3 12 4 13 // ─── Types ──────────────────────────────────────────────────────────────────── 5 14 ··· 17 26 time_us: number; 18 27 } 19 28 29 + // ─── Bot DID filter ────────────────────────────────────────────────────────── 30 + 31 + let _botDid = ""; 32 + 33 + /** Set the bot DID for filtering layers.pub records. Called from appview.ts. */ 34 + export function setBotDid(did: string): void { 35 + _botDid = did; 36 + } 37 + 20 38 // ─── Constants ──────────────────────────────────────────────────────────────── 21 39 22 40 export const IONOSPHERE_COLLECTIONS = [ ··· 30 48 "tv.ionosphere.streamTranscript", 31 49 "tv.ionosphere.diarization", 32 50 "org.relationaltext.lens", 51 + "pub.layers.expression.expression", 52 + "pub.layers.segmentation.segmentation", 53 + "pub.layers.annotation.annotationLayer", 33 54 ]; 34 55 35 56 const COLLECTIONS_SET = new Set(IONOSPHERE_COLLECTIONS); 36 57 58 + const LAYERS_PUB_COLLECTIONS = new Set([ 59 + "pub.layers.expression.expression", 60 + "pub.layers.segmentation.segmentation", 61 + "pub.layers.annotation.annotationLayer", 62 + ]); 63 + 37 64 // ─── Event processor ────────────────────────────────────────────────────────── 38 65 39 66 export function processEvent(db: Database.Database, event: JetstreamEvent): void { ··· 44 71 45 72 const uri = `at://${event.did}/${collection}/${rkey}`; 46 73 74 + // Only process layers.pub records from the bot DID 75 + if (LAYERS_PUB_COLLECTIONS.has(collection) && _botDid && event.did !== _botDid) { 76 + return; 77 + } 78 + 47 79 // ── Deletes ─────────────────────────────────────────────────────────────── 48 80 49 81 if (operation === "delete") { ··· 83 115 case "org.relationaltext.lens": 84 116 db.prepare("DELETE FROM lenses WHERE uri = ?").run(uri); 85 117 break; 118 + case "pub.layers.expression.expression": 119 + deleteExpression(db, uri); 120 + break; 121 + case "pub.layers.segmentation.segmentation": 122 + deleteSegmentation(db, uri); 123 + break; 124 + case "pub.layers.annotation.annotationLayer": 125 + deleteAnnotationLayer(db, uri); 126 + break; 86 127 } 87 128 return; 88 129 } ··· 121 162 break; 122 163 case "org.relationaltext.lens": 123 164 indexLens(db, event.did, rkey, uri, record); 165 + break; 166 + case "pub.layers.expression.expression": 167 + indexExpression(db, event.did, rkey, uri, record); 168 + rebuildDocument(db, uri).catch((err) => 169 + console.error("rebuildDocument error:", err), 170 + ); 171 + break; 172 + case "pub.layers.segmentation.segmentation": 173 + indexSegmentation(db, event.did, rkey, uri, record); 174 + rebuildDocument(db, (record.expression as string) || "").catch((err) => 175 + console.error("rebuildDocument error:", err), 176 + ); 177 + break; 178 + case "pub.layers.annotation.annotationLayer": 179 + indexAnnotationLayer(db, event.did, rkey, uri, record); 180 + rebuildDocument(db, (record.expression as string) || "").catch((err) => 181 + console.error("rebuildDocument error:", err), 182 + ); 124 183 break; 125 184 } 126 185 }
+212
apps/ionosphere-appview/src/layers-indexer.ts
··· 1 + import type Database from "better-sqlite3"; 2 + import { 3 + layersPubToDocument, 4 + type ExpressionRecord, 5 + type SegmentationRecord, 6 + type AnnotationLayersResult, 7 + type AnnotationLayerRecord, 8 + } from "@ionosphere/format/layers-pub"; 9 + 10 + // ─── Index functions (create/update) ───────────────────────────────────────── 11 + 12 + export function indexExpression( 13 + db: Database.Database, 14 + did: string, 15 + rkey: string, 16 + uri: string, 17 + record: Record<string, unknown>, 18 + ): void { 19 + db.prepare( 20 + `INSERT OR REPLACE INTO layers_expressions 21 + (uri, rkey, did, transcript_uri, text, language, created_at) 22 + VALUES (?, ?, ?, ?, ?, ?, ?)`, 23 + ).run( 24 + uri, 25 + rkey, 26 + did, 27 + (record.sourceRef as string) || "", 28 + (record.text as string) || "", 29 + (record.language as string) || "en", 30 + (record.createdAt as string) || new Date().toISOString(), 31 + ); 32 + } 33 + 34 + export function indexSegmentation( 35 + db: Database.Database, 36 + did: string, 37 + rkey: string, 38 + uri: string, 39 + record: Record<string, unknown>, 40 + ): void { 41 + db.prepare( 42 + `INSERT OR REPLACE INTO layers_segmentations 43 + (uri, rkey, did, expression_uri, tokens_json, created_at) 44 + VALUES (?, ?, ?, ?, ?, ?)`, 45 + ).run( 46 + uri, 47 + rkey, 48 + did, 49 + (record.expression as string) || "", 50 + JSON.stringify(record.tokenizations), 51 + (record.createdAt as string) || new Date().toISOString(), 52 + ); 53 + } 54 + 55 + export function indexAnnotationLayer( 56 + db: Database.Database, 57 + did: string, 58 + rkey: string, 59 + uri: string, 60 + record: Record<string, unknown>, 61 + ): void { 62 + db.prepare( 63 + `INSERT OR REPLACE INTO layers_annotations 64 + (uri, rkey, did, expression_uri, kind, subkind, annotations_json, created_at) 65 + VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, 66 + ).run( 67 + uri, 68 + rkey, 69 + did, 70 + (record.expression as string) || "", 71 + (record.kind as string) || "", 72 + (record.subkind as string) || "", 73 + JSON.stringify(record.annotations), 74 + (record.createdAt as string) || new Date().toISOString(), 75 + ); 76 + } 77 + 78 + // ─── Delete functions ──────────────────────────────────────────────────────── 79 + 80 + export function deleteExpression(db: Database.Database, uri: string): void { 81 + // CASCADE: delete segmentations and annotations that reference this expression 82 + db.prepare("DELETE FROM layers_segmentations WHERE expression_uri = ?").run(uri); 83 + db.prepare("DELETE FROM layers_annotations WHERE expression_uri = ?").run(uri); 84 + db.prepare("DELETE FROM layers_expressions WHERE uri = ?").run(uri); 85 + } 86 + 87 + export function deleteSegmentation(db: Database.Database, uri: string): void { 88 + db.prepare("DELETE FROM layers_segmentations WHERE uri = ?").run(uri); 89 + } 90 + 91 + export function deleteAnnotationLayer(db: Database.Database, uri: string): void { 92 + db.prepare("DELETE FROM layers_annotations WHERE uri = ?").run(uri); 93 + } 94 + 95 + // ─── Document rebuild ──────────────────────────────────────────────────────── 96 + 97 + /** 98 + * Rebuild the materialized document for a talk by re-running Lens 3 99 + * (layersPubToDocument) from the stored layers.pub records. 100 + * 101 + * Called after any layers.pub record is created/updated. If the full set 102 + * of records (expression + segmentation + at least one annotation layer) 103 + * isn't available yet, this is a no-op — the document will be rebuilt 104 + * when the final piece arrives. 105 + */ 106 + export async function rebuildDocument( 107 + db: Database.Database, 108 + expressionUri: string, 109 + ): Promise<void> { 110 + // 1. Look up expression 111 + const expr = db 112 + .prepare("SELECT * FROM layers_expressions WHERE uri = ?") 113 + .get(expressionUri) as any; 114 + if (!expr) return; 115 + 116 + // 2. Look up segmentation 117 + const seg = db 118 + .prepare("SELECT * FROM layers_segmentations WHERE expression_uri = ?") 119 + .get(expressionUri) as any; 120 + if (!seg) return; 121 + 122 + // 3. Look up all annotation layers 123 + const annRows = db 124 + .prepare("SELECT * FROM layers_annotations WHERE expression_uri = ?") 125 + .all(expressionUri) as any[]; 126 + 127 + // Map subkind → layer key 128 + const subkindToKey: Record<string, keyof AnnotationLayersResult> = { 129 + "sentence-boundary": "sentences", 130 + "paragraph-boundary": "paragraphs", 131 + ner: "entities", 132 + "topic-segment": "topics", 133 + }; 134 + 135 + const annotationLayers: Partial<AnnotationLayersResult> = {}; 136 + for (const row of annRows) { 137 + const key = subkindToKey[row.subkind]; 138 + if (key) { 139 + annotationLayers[key] = { 140 + $type: "pub.layers.annotation.annotationLayer", 141 + expression: expressionUri, 142 + kind: row.kind, 143 + subkind: row.subkind, 144 + sourceMethod: "automatic", 145 + metadata: { tool: "ionosphere-pipeline", timestamp: row.created_at }, 146 + annotations: JSON.parse(row.annotations_json), 147 + createdAt: row.created_at, 148 + } as AnnotationLayerRecord; 149 + } 150 + } 151 + 152 + // Need at least one annotation layer to produce a useful document 153 + if (Object.keys(annotationLayers).length === 0) return; 154 + 155 + // 4. Build typed records for layersPubToDocument 156 + const expressionRecord: ExpressionRecord = { 157 + $type: "pub.layers.expression.expression", 158 + id: expr.rkey, 159 + kind: "transcript", 160 + text: expr.text, 161 + language: expr.language, 162 + sourceRef: expr.transcript_uri, 163 + metadata: { tool: "ionosphere-pipeline", timestamp: expr.created_at }, 164 + createdAt: expr.created_at, 165 + }; 166 + 167 + const segmentationRecord: SegmentationRecord = { 168 + $type: "pub.layers.segmentation.segmentation", 169 + expression: expressionUri, 170 + tokenizations: JSON.parse(seg.tokens_json), 171 + createdAt: seg.created_at, 172 + }; 173 + 174 + // Fill missing layers with empty annotations so layersPubToDocument gets 175 + // the full AnnotationLayersResult shape it expects 176 + const emptyLayer = (kind: string, subkind: string): AnnotationLayerRecord => ({ 177 + $type: "pub.layers.annotation.annotationLayer", 178 + expression: expressionUri, 179 + kind, 180 + subkind, 181 + sourceMethod: "automatic", 182 + metadata: { tool: "ionosphere-pipeline", timestamp: "" }, 183 + annotations: [], 184 + createdAt: "", 185 + }); 186 + 187 + const fullLayers: AnnotationLayersResult = { 188 + sentences: annotationLayers.sentences ?? emptyLayer("span", "sentence-boundary"), 189 + paragraphs: annotationLayers.paragraphs ?? emptyLayer("span", "paragraph-boundary"), 190 + entities: annotationLayers.entities ?? emptyLayer("span", "ner"), 191 + topics: annotationLayers.topics ?? emptyLayer("span", "topic-segment"), 192 + }; 193 + 194 + // 5. Run Lens 3 195 + const document = await layersPubToDocument( 196 + expressionRecord, 197 + segmentationRecord, 198 + fullLayers, 199 + ); 200 + 201 + // 6. Find the talk_uri from the transcript table 202 + const transcript = db 203 + .prepare("SELECT talk_uri FROM transcripts WHERE uri = ?") 204 + .get(expr.transcript_uri) as any; 205 + if (!transcript) return; 206 + 207 + // 7. Update the talk's document field 208 + db.prepare("UPDATE talks SET document = ? WHERE uri = ?").run( 209 + JSON.stringify(document), 210 + transcript.talk_uri, 211 + ); 212 + }