Ionosphere.tv
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: batch script to fetch and upsert Bluesky mentions for talks

Searches speaker @-mentions during each talk window with pagination,
maps post times to transcript byte positions, fetches reply threads,
and searches domain mentions for ionosphere.tv and stream.place.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+456
+456
scripts/fetch-mentions.mjs
··· 1 + /** 2 + * Fetch Bluesky mentions for all talks and upsert into SQLite. 3 + * 4 + * For each talk: 5 + * 1. Searches @-mentions of each speaker handle during the talk window 6 + * (starts_at - 5min to ends_at + 30min) 7 + * 2. Paginates with cursors (up to 10 pages per query) 8 + * 3. Computes talk_offset_ms and maps to transcript byte_position 9 + * 4. Fetches threads (depth 2) for posts with replies 10 + * 5. Searches post-conference domain mentions (ionosphere.tv, stream.place) 11 + * 6. Upserts everything into the `mentions` SQLite table 12 + * 13 + * Usage: 14 + * BOT_PASSWORD=xxx node scripts/fetch-mentions.mjs 15 + */ 16 + 17 + import { createRequire } from 'module'; 18 + const require = createRequire( 19 + new URL('../apps/ionosphere-appview/package.json', import.meta.url).pathname 20 + ); 21 + const { BskyAgent } = require('@atproto/api'); 22 + const Database = require('better-sqlite3'); 23 + 24 + import { fileURLToPath } from 'url'; 25 + import { dirname, join } from 'path'; 26 + 27 + const __dirname = dirname(fileURLToPath(import.meta.url)); 28 + const DB_PATH = join(__dirname, '..', 'apps', 'data', 'ionosphere.sqlite'); 29 + 30 + const PRE_BUFFER_MS = 5 * 60 * 1000; // 5 min before talk 31 + const POST_BUFFER_MS = 30 * 60 * 1000; // 30 min after talk 32 + const MAX_PAGES = 10; 33 + const PAGE_LIMIT = 100; 34 + 35 + const agent = new BskyAgent({ service: 'https://bsky.social' }); 36 + 37 + function sleep(ms) { return new Promise(r => setTimeout(r, ms)); } 38 + 39 + function randomDelay() { return 150 + Math.random() * 50; } // 150–200ms 40 + 41 + // --------------------------------------------------------------------------- 42 + // Database helpers 43 + // --------------------------------------------------------------------------- 44 + 45 + function getTalksWithSpeakers(db) { 46 + const rows = db.prepare(` 47 + SELECT DISTINCT t.uri, t.title, t.starts_at, t.ends_at, t.room, 48 + s.name AS speaker_name, s.handle AS speaker_handle, 49 + s.did AS speaker_did 50 + FROM talks t 51 + JOIN talk_speakers ts ON ts.talk_uri = t.uri 52 + JOIN speakers s ON s.uri = ts.speaker_uri 53 + WHERE t.starts_at IS NOT NULL AND t.ends_at IS NOT NULL 54 + ORDER BY t.starts_at 55 + `).all(); 56 + 57 + const talks = new Map(); 58 + for (const r of rows) { 59 + if (!talks.has(r.uri)) { 60 + talks.set(r.uri, { 61 + uri: r.uri, 62 + title: r.title, 63 + starts_at: r.starts_at, 64 + ends_at: r.ends_at, 65 + room: r.room, 66 + speakers: [], 67 + }); 68 + } 69 + const t = talks.get(r.uri); 70 + if (r.speaker_handle && !t.speakers.find(s => s.handle === r.speaker_handle)) { 71 + t.speakers.push({ 72 + name: r.speaker_name, 73 + handle: r.speaker_handle, 74 + did: r.speaker_did, 75 + }); 76 + } 77 + } 78 + 79 + // Deduplicate by title + start time 80 + const seen = new Set(); 81 + return [...talks.values()].filter(t => { 82 + const key = `${t.title}|${t.starts_at}`; 83 + if (seen.has(key)) return false; 84 + seen.add(key); 85 + return true; 86 + }); 87 + } 88 + 89 + function getTranscriptForTalk(db, talkUri) { 90 + return db.prepare(` 91 + SELECT text, start_ms, timings FROM transcripts WHERE talk_uri = ? 92 + `).get(talkUri); 93 + } 94 + 95 + // --------------------------------------------------------------------------- 96 + // Transcript byte-position mapping 97 + // --------------------------------------------------------------------------- 98 + 99 + /** 100 + * Given a transcript (text, startMs, timings array) and a target offset in ms 101 + * (relative to the talk start), return the byte position in the transcript text. 102 + * 103 + * Walk the timings array: positive = word duration ms, negative = silence gap ms. 104 + * Track cursor in ms (starting at startMs) and a word index. When cursor crosses 105 + * the target, return the byte position of that word. 106 + */ 107 + function offsetToBytePosition(text, startMs, timings, targetOffsetMs) { 108 + if (!text || !timings || timings.length === 0) return null; 109 + 110 + const encoder = new TextEncoder(); 111 + // Split text on whitespace and find byte offset of each word 112 + const words = text.split(/\s+/); 113 + const wordByteOffsets = []; 114 + let searchFrom = 0; 115 + for (const word of words) { 116 + const idx = text.indexOf(word, searchFrom); 117 + if (idx === -1) { 118 + wordByteOffsets.push(encoder.encode(text.substring(0, searchFrom)).length); 119 + } else { 120 + wordByteOffsets.push(encoder.encode(text.substring(0, idx)).length); 121 + searchFrom = idx + word.length; 122 + } 123 + } 124 + 125 + // Walk timings to find which word corresponds to the target offset 126 + let cursorMs = startMs; 127 + let wordIdx = 0; 128 + 129 + for (const val of timings) { 130 + if (val < 0) { 131 + // Silence gap 132 + cursorMs += Math.abs(val); 133 + } else { 134 + // Word duration 135 + if (cursorMs + val >= targetOffsetMs) { 136 + // This word spans the target offset 137 + return wordIdx < wordByteOffsets.length ? wordByteOffsets[wordIdx] : null; 138 + } 139 + cursorMs += val; 140 + wordIdx++; 141 + } 142 + } 143 + 144 + // Past the end — return last word position 145 + return wordIdx > 0 && wordIdx - 1 < wordByteOffsets.length 146 + ? wordByteOffsets[wordIdx - 1] 147 + : null; 148 + } 149 + 150 + // --------------------------------------------------------------------------- 151 + // Bluesky API helpers 152 + // --------------------------------------------------------------------------- 153 + 154 + async function searchPostsPaginated(params) { 155 + const allPosts = []; 156 + let cursor = undefined; 157 + 158 + for (let page = 0; page < MAX_PAGES; page++) { 159 + const res = await agent.app.bsky.feed.searchPosts({ 160 + ...params, 161 + limit: PAGE_LIMIT, 162 + cursor, 163 + }); 164 + const posts = res.data?.posts || []; 165 + allPosts.push(...posts); 166 + 167 + cursor = res.data?.cursor; 168 + if (!cursor || posts.length < PAGE_LIMIT) break; 169 + 170 + await sleep(randomDelay()); 171 + } 172 + 173 + return allPosts; 174 + } 175 + 176 + async function searchMentionsOf(handle, since, until) { 177 + try { 178 + return await searchPostsPaginated({ 179 + q: '*', 180 + mentions: handle, 181 + since, 182 + until, 183 + sort: 'latest', 184 + }); 185 + } catch { 186 + // Fallback with broader query 187 + try { 188 + return await searchPostsPaginated({ 189 + q: 'atmosphere OR atproto', 190 + mentions: handle, 191 + since, 192 + until, 193 + sort: 'latest', 194 + }); 195 + } catch { 196 + return []; 197 + } 198 + } 199 + } 200 + 201 + async function fetchThread(uri) { 202 + try { 203 + const res = await agent.app.bsky.feed.getPostThread({ uri, depth: 2 }); 204 + return res.data?.thread; 205 + } catch { 206 + return null; 207 + } 208 + } 209 + 210 + /** 211 + * Extract reply posts from a thread tree (depth-first). 212 + */ 213 + function extractReplies(thread, maxDepth = 2, depth = 0) { 214 + const replies = []; 215 + if (!thread?.replies || depth >= maxDepth) return replies; 216 + for (const reply of thread.replies) { 217 + if (reply?.post) { 218 + replies.push(reply.post); 219 + } 220 + replies.push(...extractReplies(reply, maxDepth, depth + 1)); 221 + } 222 + return replies; 223 + } 224 + 225 + // --------------------------------------------------------------------------- 226 + // Upsert 227 + // --------------------------------------------------------------------------- 228 + 229 + function buildUpsertStmt(db) { 230 + return db.prepare(` 231 + INSERT INTO mentions ( 232 + uri, talk_uri, author_did, author_handle, text, created_at, 233 + talk_offset_ms, byte_position, likes, reposts, replies, 234 + parent_uri, mention_type, indexed_at 235 + ) VALUES ( 236 + @uri, @talk_uri, @author_did, @author_handle, @text, @created_at, 237 + @talk_offset_ms, @byte_position, @likes, @reposts, @replies, 238 + @parent_uri, @mention_type, @indexed_at 239 + ) ON CONFLICT(uri) DO UPDATE SET 240 + talk_uri = @talk_uri, 241 + author_did = @author_did, 242 + author_handle = @author_handle, 243 + text = @text, 244 + talk_offset_ms = @talk_offset_ms, 245 + byte_position = @byte_position, 246 + likes = @likes, 247 + reposts = @reposts, 248 + replies = @replies, 249 + parent_uri = @parent_uri, 250 + mention_type = @mention_type, 251 + indexed_at = @indexed_at 252 + `); 253 + } 254 + 255 + function postToRow(post, talkUri, talkStartMs, transcript, mentionType, parentUri) { 256 + const createdAt = post.record?.createdAt || post.indexedAt; 257 + const postMs = new Date(createdAt).getTime(); 258 + const talkOffsetMs = talkStartMs ? postMs - talkStartMs : null; 259 + 260 + let bytePosition = null; 261 + if (transcript && talkOffsetMs != null) { 262 + const timings = typeof transcript.timings === 'string' 263 + ? JSON.parse(transcript.timings) 264 + : transcript.timings; 265 + bytePosition = offsetToBytePosition( 266 + transcript.text, transcript.start_ms, timings, talkOffsetMs 267 + ); 268 + } 269 + 270 + return { 271 + uri: post.uri, 272 + talk_uri: talkUri, 273 + author_did: post.author.did, 274 + author_handle: post.author.handle || null, 275 + text: post.record?.text || null, 276 + created_at: createdAt, 277 + talk_offset_ms: talkOffsetMs, 278 + byte_position: bytePosition, 279 + likes: post.likeCount || 0, 280 + reposts: post.repostCount || 0, 281 + replies: post.replyCount || 0, 282 + parent_uri: parentUri || null, 283 + mention_type: mentionType, 284 + indexed_at: new Date().toISOString(), 285 + }; 286 + } 287 + 288 + // --------------------------------------------------------------------------- 289 + // Main 290 + // --------------------------------------------------------------------------- 291 + 292 + async function main() { 293 + if (!process.env.BOT_PASSWORD) { 294 + console.error('BOT_PASSWORD env var required'); 295 + process.exit(1); 296 + } 297 + 298 + console.log('=== Fetch Mentions ===\n'); 299 + 300 + await agent.login({ 301 + identifier: 'ionosphere.tv', 302 + password: process.env.BOT_PASSWORD, 303 + }); 304 + console.log('Authenticated as ionosphere.tv\n'); 305 + 306 + const db = new Database(DB_PATH); 307 + const upsert = buildUpsertStmt(db); 308 + const talks = getTalksWithSpeakers(db); 309 + console.log(`${talks.length} talks with scheduled times\n`); 310 + 311 + let totalUpserted = 0; 312 + 313 + // ---- Phase 1: Talk-aligned speaker mentions ---- 314 + console.log('--- Phase 1: Talk-aligned speaker mentions ---\n'); 315 + 316 + for (let i = 0; i < talks.length; i++) { 317 + const talk = talks[i]; 318 + const talkStart = new Date(talk.starts_at); 319 + const talkEnd = new Date(talk.ends_at); 320 + const talkStartMs = talkStart.getTime(); 321 + 322 + const since = new Date(talkStart.getTime() - PRE_BUFFER_MS).toISOString(); 323 + const until = new Date(talkEnd.getTime() + POST_BUFFER_MS).toISOString(); 324 + 325 + const transcript = getTranscriptForTalk(db, talk.uri); 326 + 327 + const allPosts = new Map(); 328 + 329 + for (const speaker of talk.speakers) { 330 + if (!speaker.handle) continue; 331 + const posts = await searchMentionsOf(speaker.handle, since, until); 332 + for (const p of posts) { 333 + if (!allPosts.has(p.uri)) allPosts.set(p.uri, p); 334 + } 335 + await sleep(randomDelay()); 336 + } 337 + 338 + // Upsert main posts 339 + let talkCount = 0; 340 + const postsWithReplies = []; 341 + 342 + for (const post of allPosts.values()) { 343 + const row = postToRow(post, talk.uri, talkStartMs, transcript, 'during_talk', null); 344 + upsert.run(row); 345 + talkCount++; 346 + if ((post.replyCount || 0) > 0) { 347 + postsWithReplies.push(post); 348 + } 349 + } 350 + 351 + // Fetch threads for posts with replies 352 + for (const post of postsWithReplies) { 353 + const thread = await fetchThread(post.uri); 354 + if (!thread) continue; 355 + 356 + const replies = extractReplies(thread); 357 + for (const reply of replies) { 358 + const row = postToRow( 359 + reply, talk.uri, talkStartMs, transcript, 'reply', post.uri 360 + ); 361 + upsert.run(row); 362 + talkCount++; 363 + } 364 + await sleep(randomDelay()); 365 + } 366 + 367 + totalUpserted += talkCount; 368 + if (talkCount > 0) { 369 + console.log(`[${i + 1}/${talks.length}] "${talk.title}" -- ${talkCount} mentions`); 370 + } else { 371 + console.log(`[${i + 1}/${talks.length}] "${talk.title}" -- no mentions`); 372 + } 373 + } 374 + 375 + // ---- Phase 2: Post-conference domain mentions ---- 376 + console.log('\n--- Phase 2: Post-conference domain mentions ---\n'); 377 + 378 + const domainQueries = [ 379 + { q: 'ionosphere.tv', label: 'ionosphere.tv' }, 380 + { q: 'stream.place', label: 'stream.place' }, 381 + ]; 382 + 383 + // Search across wider conference window 384 + const confSince = '2026-03-25T00:00:00Z'; 385 + const confUntil = '2026-04-30T00:00:00Z'; 386 + 387 + for (const { q, label } of domainQueries) { 388 + const posts = await searchPostsPaginated({ 389 + q, 390 + since: confSince, 391 + until: confUntil, 392 + sort: 'latest', 393 + }); 394 + 395 + let count = 0; 396 + for (const post of posts) { 397 + // Try to match to a talk by finding the closest talk time 398 + const createdAt = post.record?.createdAt || post.indexedAt; 399 + const postMs = new Date(createdAt).getTime(); 400 + let bestTalk = null; 401 + let bestDist = Infinity; 402 + 403 + for (const talk of talks) { 404 + const talkStartMs = new Date(talk.starts_at).getTime(); 405 + const talkEndMs = new Date(talk.ends_at).getTime(); 406 + // Only match if post is within the talk's extended window 407 + if (postMs >= talkStartMs - PRE_BUFFER_MS && postMs <= talkEndMs + POST_BUFFER_MS) { 408 + const dist = Math.abs(postMs - talkStartMs); 409 + if (dist < bestDist) { 410 + bestDist = dist; 411 + bestTalk = talk; 412 + } 413 + } 414 + } 415 + 416 + const talkUri = bestTalk?.uri || null; 417 + const talkStartMs = bestTalk ? new Date(bestTalk.starts_at).getTime() : null; 418 + const transcript = bestTalk ? getTranscriptForTalk(db, bestTalk.uri) : null; 419 + 420 + const row = postToRow( 421 + post, talkUri, talkStartMs, transcript, 'domain_mention', null 422 + ); 423 + upsert.run(row); 424 + count++; 425 + } 426 + 427 + // Fetch threads for domain mention posts that have replies 428 + for (const post of posts) { 429 + if ((post.replyCount || 0) === 0) continue; 430 + const thread = await fetchThread(post.uri); 431 + if (!thread) continue; 432 + 433 + const replies = extractReplies(thread); 434 + for (const reply of replies) { 435 + // Domain mention replies don't map to a specific talk 436 + const row = postToRow(reply, null, null, null, 'domain_reply', post.uri); 437 + upsert.run(row); 438 + count++; 439 + } 440 + await sleep(randomDelay()); 441 + } 442 + 443 + totalUpserted += count; 444 + console.log(`"${label}": ${posts.length} posts, ${count} total rows (with replies)`); 445 + await sleep(randomDelay()); 446 + } 447 + 448 + db.close(); 449 + 450 + console.log(`\n=== Done: ${totalUpserted} rows upserted ===`); 451 + } 452 + 453 + main().catch(err => { 454 + console.error('Fatal error:', err); 455 + process.exit(1); 456 + });