Ionosphere.tv
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: NLP enrichment for stream transcripts

Export stream transcripts from DB, run sentence/paragraph/entity/topic
pipeline (without LLM pass — streams are too large). Overlay NLP facets
onto stream transcript documents at serve time in tracks.ts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+172 -1
+25 -1
apps/ionosphere-appview/src/tracks.ts
··· 325 325 || (hardcoded ? loadDiarizationFromFile(hardcoded.dirName) : []); 326 326 327 327 const streamRecordUri = dbStream ? `at://${dbStream.did}/tv.ionosphere.stream/${slug}` : ""; 328 - const transcript = getStreamTranscriptFromDb(db, streamRecordUri) 328 + let transcript = getStreamTranscriptFromDb(db, streamRecordUri) 329 329 ?? (hardcoded ? loadTranscriptFromFile(hardcoded.dirName) : null); 330 + 331 + // Overlay NLP structural facets if available 332 + if (transcript) { 333 + const nlpPath = path.join(path.resolve(import.meta.dirname, "../../../pipeline/data/stream-nlp"), `stream-${slug}.json`); 334 + if (existsSync(nlpPath)) { 335 + try { 336 + const nlp = JSON.parse(readFileSync(nlpPath, "utf-8")); 337 + for (const s of nlp.sentences || []) { 338 + transcript.facets.push({ index: { byteStart: s.byteStart, byteEnd: s.byteEnd }, features: [{ $type: "tv.ionosphere.facet#sentence" }] }); 339 + } 340 + for (const p of nlp.paragraphs || []) { 341 + transcript.facets.push({ index: { byteStart: p.byteStart, byteEnd: p.byteEnd }, features: [{ $type: "tv.ionosphere.facet#paragraph" }] }); 342 + } 343 + for (const e of nlp.entities || []) { 344 + if (e.conceptUri) { 345 + transcript.facets.push({ index: { byteStart: e.byteStart, byteEnd: e.byteEnd }, features: [{ $type: "tv.ionosphere.facet#concept-ref", conceptUri: e.conceptUri, conceptName: e.label }] }); 346 + } 347 + } 348 + for (const tb of nlp.topicBreaks || []) { 349 + transcript.facets.push({ index: { byteStart: tb.byteStart, byteEnd: tb.byteStart }, features: [{ $type: "tv.ionosphere.facet#topic-break" }] }); 350 + } 351 + } catch {} 352 + } 353 + } 330 354 const words = getStreamWordsFromDb(db, streamRecordUri).length > 0 331 355 ? getStreamWordsFromDb(db, streamRecordUri) 332 356 : (hardcoded ? loadWordsFromFile(hardcoded.dirName) : []);
+60
pipeline/nlp/export_streams.py
··· 1 + """Export stream transcripts from DB as compact JSON for the NLP pipeline.""" 2 + 3 + import json 4 + import sqlite3 5 + from pathlib import Path 6 + 7 + 8 + def export_stream_transcripts(db_path: str, output_dir: str): 9 + conn = sqlite3.connect(db_path) 10 + 11 + streams = conn.execute("SELECT uri, slug FROM streams").fetchall() 12 + print(f"Found {len(streams)} streams") 13 + 14 + out = Path(output_dir) 15 + out.mkdir(parents=True, exist_ok=True) 16 + 17 + exported = 0 18 + for stream_uri, slug in streams: 19 + chunks = conn.execute( 20 + "SELECT text, start_ms, timings FROM stream_transcripts WHERE stream_uri = ? ORDER BY chunk_index ASC", 21 + (stream_uri,) 22 + ).fetchall() 23 + 24 + if not chunks: 25 + continue 26 + 27 + # Reassemble into a single compact transcript 28 + full_text = "" 29 + all_timings = [] 30 + start_ms = chunks[0][1] 31 + 32 + for text, chunk_start_ms, timings_json in chunks: 33 + timings = json.loads(timings_json) 34 + 35 + if full_text: 36 + # Add a gap between chunks if needed 37 + full_text += " " 38 + 39 + full_text += text 40 + all_timings.extend(timings) 41 + 42 + result = { 43 + "text": full_text, 44 + "startMs": start_ms, 45 + "timings": all_timings, 46 + } 47 + 48 + out_path = out / f"stream-{slug}.json" 49 + out_path.write_text(json.dumps(result)) 50 + print(f" {slug}: {len(full_text)} chars, {len(all_timings)} timing values") 51 + exported += 1 52 + 53 + conn.close() 54 + print(f"Exported {exported} stream transcripts") 55 + 56 + 57 + if __name__ == "__main__": 58 + db_path = str(Path(__file__).resolve().parent.parent.parent / "apps" / "data" / "ionosphere.sqlite") 59 + output_dir = str(Path(__file__).resolve().parent.parent / "data" / "stream-transcripts") 60 + export_stream_transcripts(db_path, output_dir)
+87
pipeline/nlp/run_streams.py
··· 1 + """Run NLP pipeline on stream transcripts (without LLM pass — too large).""" 2 + 3 + import json 4 + import sqlite3 5 + import sys 6 + from pathlib import Path 7 + from nlp.sentences import detect_sentences 8 + from nlp.paragraphs import detect_paragraphs 9 + from nlp.entities import detect_entities 10 + from nlp.speaker_lookup import build_speaker_lookup 11 + from nlp.topics import detect_topic_breaks 12 + 13 + 14 + def main(): 15 + streams_dir = Path(__file__).resolve().parent.parent / "data" / "stream-transcripts" 16 + output_dir = Path(__file__).resolve().parent.parent / "data" / "stream-nlp" 17 + output_dir.mkdir(parents=True, exist_ok=True) 18 + 19 + if not streams_dir.exists(): 20 + print("Run export_streams.py first") 21 + sys.exit(1) 22 + 23 + # Load speaker/concept data 24 + db_path = Path(__file__).resolve().parent.parent.parent / "apps" / "data" / "ionosphere.sqlite" 25 + speaker_rows = [] 26 + concept_rows = [] 27 + if db_path.exists(): 28 + conn = sqlite3.connect(str(db_path)) 29 + speaker_rows = conn.execute("SELECT name, handle, speaker_did FROM speakers").fetchall() 30 + concept_rows = [ 31 + {"name": r[0], "uri": r[1], "aliases": r[2] or "[]"} 32 + for r in conn.execute("SELECT name, uri, aliases FROM concepts").fetchall() 33 + ] 34 + conn.close() 35 + print(f"Loaded {len(speaker_rows)} speakers, {len(concept_rows)} concepts", flush=True) 36 + 37 + speaker_lookup = build_speaker_lookup(speaker_rows) if speaker_rows else None 38 + 39 + for tf in sorted(streams_dir.glob("*.json")): 40 + slug = tf.stem.replace("stream-", "") 41 + transcript = json.loads(tf.read_text()) 42 + text = transcript["text"] 43 + timings = transcript["timings"] 44 + start_ms = transcript["startMs"] 45 + 46 + print(f"Processing {slug} ({len(text)} chars)...", flush=True) 47 + 48 + # Pass 1: Sentences 49 + sentences = detect_sentences(text) 50 + print(f" {len(sentences)} sentences", flush=True) 51 + 52 + # Pass 2: Paragraphs 53 + paragraphs = detect_paragraphs( 54 + text=text, timings=timings, start_ms=start_ms, 55 + sentences=sentences, 56 + ) 57 + print(f" {len(paragraphs)} paragraphs", flush=True) 58 + 59 + # Pass 3: NER + entity linking (no LLM — too large) 60 + entities = detect_entities(text, speaker_lookup=speaker_lookup, concept_rows=concept_rows) 61 + print(f" {len(entities)} entities", flush=True) 62 + 63 + # Pass 4: Topic segmentation 64 + sentences_with_text = [] 65 + for s in sentences: 66 + sent_text = text.encode("utf-8")[s["byteStart"]:s["byteEnd"]].decode("utf-8") 67 + sentences_with_text.append({**s, "text": sent_text}) 68 + topic_breaks = detect_topic_breaks(sentences_with_text) 69 + print(f" {len(topic_breaks)} topics", flush=True) 70 + 71 + result = { 72 + "slug": slug, 73 + "sentences": sentences, 74 + "paragraphs": paragraphs, 75 + "entities": entities, 76 + "topicBreaks": [{"byteStart": tb["byteStart"]} for tb in topic_breaks], 77 + } 78 + 79 + out_path = output_dir / f"stream-{slug}.json" 80 + out_path.write_text(json.dumps(result, indent=2)) 81 + print(f" Done: {slug}", flush=True) 82 + 83 + print("All streams processed.") 84 + 85 + 86 + if __name__ == "__main__": 87 + main()