appview-less bluesky client
24
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 1117 lines 33 kB view raw
1import { get, writable } from 'svelte/store'; 2import { 3 AtpClient, 4 setRecordCache, 5 type NotificationsStream, 6 type NotificationsStreamEvent 7} from './at/client.svelte'; 8import { SvelteMap, SvelteDate, SvelteSet } from 'svelte/reactivity'; 9import type { Did, Handle, Nsid, RecordKey, ResourceUri } from '@atcute/lexicons'; 10import { fetchPosts, hydratePosts, type HydrateOptions, type PostWithUri } from './at/fetch'; 11import { parseCanonicalResourceUri, type AtprotoDid } from '@atcute/lexicons/syntax'; 12import { 13 AppBskyActorProfile, 14 AppBskyFeedPost, 15 AppBskyGraphBlock, 16 type AppBskyGraphFollow 17} from '@atcute/bluesky'; 18import type { JetstreamSubscription, JetstreamEvent } from '@atcute/jetstream'; 19import { expect, ok } from './result'; 20import type { Backlink, BacklinksSource } from './at/constellation'; 21import { now as tidNow } from '@atcute/tid'; 22import type { Records } from '@atcute/lexicons/ambient'; 23import { 24 blockSource, 25 extractDidFromUri, 26 likeSource, 27 replyRootSource, 28 replySource, 29 repostSource, 30 timestampFromCursor, 31 toCanonicalUri 32} from '$lib'; 33import { Router } from './router.svelte'; 34import { accounts, type Account } from './accounts'; 35import { 36 getPreferences, 37 putPreferences, 38 type Preferences 39} from './at/pocket'; 40 41export const notificationStream = writable<NotificationsStream | null>(null); 42export const jetstream = writable<JetstreamSubscription | null>(null); 43 44export const profiles = new SvelteMap<Did, AppBskyActorProfile.Main>(); 45export const handles = new SvelteMap<Did, Handle>(); 46 47// source -> subject -> did (who did the interaction) -> rkey 48export type BacklinksMap = SvelteMap< 49 BacklinksSource, 50 SvelteMap<ResourceUri, SvelteMap<Did, SvelteSet<RecordKey>>> 51>; 52export const allBacklinks: BacklinksMap = new SvelteMap(); 53 54export const addBacklinks = ( 55 subject: ResourceUri, 56 source: BacklinksSource, 57 links: Iterable<Backlink> 58) => { 59 let subjectMap = allBacklinks.get(source); 60 if (!subjectMap) { 61 subjectMap = new SvelteMap(); 62 allBacklinks.set(source, subjectMap); 63 } 64 65 let didMap = subjectMap.get(subject); 66 if (!didMap) { 67 didMap = new SvelteMap(); 68 subjectMap.set(subject, didMap); 69 } 70 71 for (const link of links) { 72 let rkeys = didMap.get(link.did); 73 if (!rkeys) { 74 rkeys = new SvelteSet(); 75 didMap.set(link.did, rkeys); 76 } 77 rkeys.add(link.rkey); 78 } 79}; 80 81export const removeBacklinks = ( 82 subject: ResourceUri, 83 source: BacklinksSource, 84 links: Iterable<Backlink> 85) => { 86 const didMap = allBacklinks.get(source)?.get(subject); 87 if (!didMap) return; 88 89 for (const link of links) { 90 const rkeys = didMap.get(link.did); 91 if (!rkeys) continue; 92 rkeys.delete(link.rkey); 93 if (rkeys.size === 0) didMap.delete(link.did); 94 } 95}; 96 97export const findBacklinksBy = (subject: ResourceUri, source: BacklinksSource, did: Did) => { 98 const rkeys = allBacklinks.get(source)?.get(subject)?.get(did) ?? []; 99 // reconstruct the collection from the source 100 const collection = source.split(':')[0] as Nsid; 101 return rkeys.values().map((rkey) => ({ did, collection, rkey })); 102}; 103 104export const hasBacklink = (subject: ResourceUri, source: BacklinksSource, did: Did): boolean => { 105 return allBacklinks.get(source)?.get(subject)?.has(did) ?? false; 106}; 107 108export const getAllBacklinksFor = (subject: ResourceUri, source: BacklinksSource): Backlink[] => { 109 const subjectMap = allBacklinks.get(source); 110 if (!subjectMap) return []; 111 112 const didMap = subjectMap.get(subject); 113 if (!didMap) return []; 114 115 const collection = source.split(':')[0] as Nsid; 116 const result: Backlink[] = []; 117 118 for (const [did, rkeys] of didMap) 119 for (const rkey of rkeys) result.push({ did, collection, rkey }); 120 121 return result; 122}; 123 124export const isBlockedBy = (subject: Did, blocker: Did): boolean => { 125 return hasBacklink(`at://${subject}`, 'app.bsky.graph.block:subject', blocker); 126}; 127 128// eslint-disable-next-line @typescript-eslint/no-explicit-any 129const getNestedValue = (obj: any, path: string[]): any => { 130 return path.reduce((current, key) => current?.[key], obj); 131}; 132 133// eslint-disable-next-line @typescript-eslint/no-explicit-any 134const setNestedValue = (obj: any, path: string[], value: any): void => { 135 const lastKey = path[path.length - 1]; 136 const parent = path.slice(0, -1).reduce((current, key) => { 137 if (current[key] === undefined) current[key] = {}; 138 return current[key]; 139 }, obj); 140 parent[lastKey] = value; 141}; 142 143export const backlinksCursors = new SvelteMap< 144 Did, 145 SvelteMap<BacklinksSource, string | undefined> 146>(); 147 148export const fetchLinksUntil = async ( 149 subject: Did, 150 client: AtpClient, 151 backlinkSource: BacklinksSource, 152 timestamp: number = -1 153) => { 154 let cursorMap = backlinksCursors.get(subject); 155 if (!cursorMap) { 156 cursorMap = new SvelteMap<BacklinksSource, string | undefined>(); 157 backlinksCursors.set(subject, cursorMap); 158 } 159 160 const [_collection, source] = backlinkSource.split(':'); 161 const collection = _collection as keyof Records; 162 const cursor = cursorMap.get(backlinkSource); 163 164 // if already fetched we dont need to fetch again 165 const cursorTimestamp = timestampFromCursor(cursor); 166 if (cursorTimestamp && cursorTimestamp <= timestamp) return; 167 168 console.log(`${subject}: fetchLinksUntil`, backlinkSource, cursor, timestamp); 169 const result = await client.listRecordsUntil(subject, collection, cursor, timestamp); 170 171 if (!result.ok) { 172 console.error('failed to fetch links until', result.error); 173 return; 174 } 175 cursorMap.set(backlinkSource, result.value.cursor); 176 177 const path = source.split('.'); 178 for (const record of result.value.records) { 179 const uri = getNestedValue(record.value, path); 180 const parsedUri = parseCanonicalResourceUri(record.uri); 181 if (!parsedUri.ok) continue; 182 addBacklinks(uri, `${collection}:${source}`, [ 183 { 184 did: parsedUri.value.repo, 185 collection: parsedUri.value.collection, 186 rkey: parsedUri.value.rkey 187 } 188 ]); 189 } 190}; 191 192export const deletePostBacklink = async ( 193 client: AtpClient, 194 post: PostWithUri, 195 source: BacklinksSource 196) => { 197 const did = client.user?.did; 198 if (!did) return; 199 const collection = source.split(':')[0] as Nsid; 200 const links = findBacklinksBy(post.uri, source, did); 201 removeBacklinks(post.uri, source, links); 202 await Promise.allSettled( 203 links.map((link) => 204 client.user?.atcute.post('com.atproto.repo.deleteRecord', { 205 input: { repo: did, collection, rkey: link.rkey! } 206 }) 207 ) 208 ); 209}; 210 211export const createPostBacklink = async ( 212 client: AtpClient, 213 post: PostWithUri, 214 source: BacklinksSource 215) => { 216 const did = client.user?.did; 217 if (!did) return; 218 const [_collection, subject] = source.split(':'); 219 const collection = _collection as Nsid; 220 const rkey = tidNow(); 221 addBacklinks(post.uri, source, [ 222 { 223 did, 224 collection, 225 rkey 226 } 227 ]); 228 const record = { 229 $type: collection, 230 // eslint-disable-next-line svelte/prefer-svelte-reactivity 231 createdAt: new Date().toISOString() 232 }; 233 const subjectPath = subject.split('.'); 234 setNestedValue(record, subjectPath, post.uri); 235 setNestedValue(record, [...subjectPath.slice(0, -1), 'cid'], post.cid); 236 await client.user?.atcute.post('com.atproto.repo.createRecord', { 237 input: { 238 repo: did, 239 collection, 240 rkey, 241 record 242 } 243 }); 244}; 245 246export const pulsingPostId = writable<string | null>(null); 247 248export const viewClient = new AtpClient(); 249export const clients = new SvelteMap<Did, AtpClient>(); 250 251export const accountPreferences = new SvelteMap<Did, Preferences>(); 252 253const PREFS_STORAGE_KEY = 'accountPreferences'; 254 255const loadLocalPreferences = (): Map<Did, Preferences> => { 256 if (typeof localStorage === 'undefined') return new Map(); 257 try { 258 const stored = localStorage.getItem(PREFS_STORAGE_KEY); 259 if (!stored) return new Map(); 260 return new Map(Object.entries(JSON.parse(stored))) as Map<Did, Preferences>; 261 } catch { 262 return new Map(); 263 } 264}; 265 266const saveLocalPreferences = () => { 267 if (typeof localStorage === 'undefined') return; 268 const obj = Object.fromEntries(accountPreferences.entries()); 269 localStorage.setItem(PREFS_STORAGE_KEY, JSON.stringify(obj)); 270}; 271 272export const loadAccountPreferences = async (account: Account) => { 273 const client = clients.get(account.did); 274 if (!client) return; 275 276 const localPrefs = loadLocalPreferences().get(account.did); 277 const remoteResult = await getPreferences(client); 278 279 if (!remoteResult.ok) { 280 console.error('failed to load preferences from pocket:', remoteResult.error); 281 if (localPrefs) accountPreferences.set(account.did, localPrefs); 282 return; 283 } 284 285 const remotePrefs = remoteResult.value; 286 287 if (!remotePrefs && !localPrefs) { 288 return; 289 } 290 291 if (!remotePrefs && localPrefs) { 292 accountPreferences.set(account.did, localPrefs); 293 await putPreferences(client, localPrefs); 294 return; 295 } 296 297 if (remotePrefs && !localPrefs) { 298 accountPreferences.set(account.did, remotePrefs); 299 saveLocalPreferences(); 300 return; 301 } 302 303 // both exist - last modified wins 304 const localTime = new Date(localPrefs!.updatedAt).getTime(); 305 const remoteTime = new Date(remotePrefs!.updatedAt).getTime(); 306 307 if (localTime > remoteTime) { 308 accountPreferences.set(account.did, localPrefs!); 309 await putPreferences(client, localPrefs!); 310 } else { 311 accountPreferences.set(account.did, remotePrefs!); 312 saveLocalPreferences(); 313 } 314}; 315 316export const setAccountPreferences = ( 317 did: Did, 318 partial: Partial<Omit<Preferences, 'updatedAt'>> 319) => { 320 const existing = accountPreferences.get(did) ?? { updatedAt: '' }; 321 const updated: Preferences = { 322 ...existing, 323 ...partial, 324 updatedAt: new Date().toISOString() 325 }; 326 327 accountPreferences.set(did, updated); 328 saveLocalPreferences(); 329 return updated; 330}; 331 332export const syncAccountPreferences = async (did: Did) => { 333 const prefs = accountPreferences.get(did); 334 if (!prefs) return; 335 336 const client = clients.get(did); 337 if (client) { 338 const result = await putPreferences(client, prefs); 339 if (!result.ok) console.error('failed to sync preferences to pocket:', result.error); 340 } 341}; 342 343export const updateAccountPreferences = async ( 344 did: Did, 345 partial: Partial<Omit<Preferences, 'updatedAt'>> 346) => { 347 setAccountPreferences(did, partial); 348 await syncAccountPreferences(did); 349}; 350 351type FollowWithUri = { 352 uri: ResourceUri; 353 record: AppBskyGraphFollow.Main; 354}; 355export const follows = new SvelteMap<Did, SvelteMap<Did, FollowWithUri>>(); 356 357export const addFollows = ( 358 did: Did, 359 followList: Iterable<FollowWithUri> 360) => { 361 let map = follows.get(did)!; 362 if (!map) { 363 map = new SvelteMap(); 364 follows.set(did, map); 365 } 366 for (const follow of followList) map.set(follow.record.subject, follow); 367}; 368 369export const fetchFollows = async ( 370 account: Account 371): Promise<IteratorObject<AppBskyGraphFollow.Main>> => { 372 const client = clients.get(account.did)!; 373 const res = await client.listRecordsUntil(account.did, 'app.bsky.graph.follow'); 374 if (!res.ok) { 375 console.error("can't fetch follows:", res.error); 376 return [].values(); 377 } 378 addFollows( 379 account.did, 380 res.value.records.map((follow) => ({ 381 uri: follow.uri, 382 record: follow.value as AppBskyGraphFollow.Main 383 })) 384 ); 385 return res.value.records.values().map((follow) => follow.value as AppBskyGraphFollow.Main); 386}; 387 388// this fetches up to three days of posts and interactions for using in following list 389export const fetchForInteractions = async (client: AtpClient, subject: Did) => { 390 const userDid = client.user?.did; 391 if (!userDid) return; 392 393 const threeDaysAgo = (Date.now() - 3 * 24 * 60 * 60 * 1000) * 1000; 394 395 const res = await client.listRecordsUntil(subject, 'app.bsky.feed.post', undefined, threeDaysAgo); 396 if (!res.ok) return; 397 const postsWithUri = res.value.records.map( 398 (post) => 399 ({ cid: post.cid, uri: post.uri, record: post.value as AppBskyFeedPost.Main }) as PostWithUri 400 ); 401 addPosts(postsWithUri); 402 403 // add to following buffer (not feed directly) 404 let buffer = followingBuffer.get(userDid); 405 if (!buffer) { 406 buffer = new SvelteSet(); 407 followingBuffer.set(userDid, buffer); 408 } 409 for (const post of postsWithUri) buffer.add(post.uri); 410 411 if (res.value.cursor) { 412 let userCursors = followingCursors.get(userDid); 413 if (!userCursors) { 414 userCursors = new SvelteMap(); 415 followingCursors.set(userDid, userCursors); 416 } 417 userCursors.set(subject, res.value.cursor); 418 } 419 420 const cursorTimestamp = timestampFromCursor(res.value.cursor) ?? -1; 421 const timestamp = Math.min(cursorTimestamp, threeDaysAgo); 422 console.log(`${subject}: fetchForInteractions`, res.value.cursor, timestamp); 423 await Promise.all([repostSource].map((s) => fetchLinksUntil(subject, client, s, timestamp))); 424}; 425 426export const fetchFollowingTimeline = async (client: AtpClient, targetDid?: Did, limit: number = 10) => { 427 // 1. Identify candidates (active follows + self) 428 const userDid = targetDid ?? client.user?.did; 429 if (!userDid) return; 430 431 let buffer = followingBuffer.get(userDid); 432 if (!buffer) { 433 buffer = new SvelteSet(); 434 followingBuffer.set(userDid, buffer); 435 } 436 437 let userFeed = followingFeed.get(userDid); 438 if (!userFeed) { 439 userFeed = new SvelteSet(); 440 followingFeed.set(userDid, userFeed); 441 } 442 443 let userCursors = followingCursors.get(userDid); 444 if (!userCursors) { 445 userCursors = new SvelteMap(); 446 followingCursors.set(userDid, userCursors); 447 } 448 449 // 0. Drain buffer first 450 if (buffer.size > 0) { 451 const sorted = Array.from(buffer).sort((a, b) => { 452 const postA = getPostFromUri(a); 453 const postB = getPostFromUri(b); 454 return ( 455 new Date(postB?.record.createdAt ?? 0).getTime() - 456 new Date(postA?.record.createdAt ?? 0).getTime() 457 ); 458 }); 459 460 const toAdd = sorted.slice(0, limit); 461 for (const uri of toAdd) { 462 userFeed.add(uri); 463 buffer.delete(uri); 464 } 465 466 return; 467 } 468 469 const subjects = new Set(follows.get(userDid)?.keys()); 470 subjects.add(userDid); 471 472 // 2. Find the "newest" cursor(s) 473 let maxCursor: string | undefined = undefined; 474 let candidates: Did[] = []; 475 476 for (const subject of subjects) { 477 const cursor = userCursors.get(subject); 478 479 // null means exhausted 480 if (cursor === null) continue; 481 482 // if we haven't fetched this user yet (undefined cursor), they are a candidate (newest) 483 if (cursor === undefined) { 484 if (maxCursor !== undefined) { 485 maxCursor = undefined; 486 candidates = [subject]; 487 } else { 488 candidates.push(subject); 489 } 490 continue; 491 } 492 493 // If maxCursor is undefined (meaning we have some 'now' candidates), skip checked cursors 494 if (maxCursor === undefined && candidates.length > 0) continue; 495 496 if (maxCursor === undefined || cursor > maxCursor) { 497 maxCursor = cursor; 498 candidates = [subject]; 499 } else if (cursor === maxCursor) { 500 candidates.push(subject); 501 } 502 } 503 504 if (candidates.length === 0) return; // Everyone exhausted? 505 506 // 3. Fetch from candidates 507 console.log('fetching following timeline from', candidates, maxCursor); 508 const results = await Promise.all( 509 candidates.map(async (did) => { 510 const cursor = userCursors!.get(did) ?? undefined; 511 // fetch limit is 4th argument, cursor is 3rd 512 const res = await client.listRecords(did, 'app.bsky.feed.post', cursor, limit); 513 if (!res.ok) { 514 console.error(`fetchFollowingTimeline failed for ${did}:`, res.error); 515 return null; 516 } 517 return { did, res: res.value }; 518 }) 519 ); 520 521 // 4. Update state - use records directly from listRecords instead of re-fetching 522 const validPosts: PostWithUri[] = []; 523 for (const result of results) { 524 if (!result) continue; 525 const { did, res } = result; 526 527 // update cursor 528 if (res.cursor) userCursors!.set(did, res.cursor); 529 else userCursors!.set(did, null); // null = exhausted 530 531 for (const record of res.records) { 532 validPosts.push({ 533 uri: record.uri, 534 cid: record.cid, 535 record: record.value as AppBskyFeedPost.Main 536 }); 537 } 538 } 539 540 if (validPosts.length === 0) return; 541 542 addPosts(validPosts); 543 544 for (const post of validPosts) userFeed.add(post.uri); 545 546 // check if any of the post authors block the user 547 await fetchBlocksForPosts( 548 client, 549 validPosts.map((p) => p.uri) 550 ); 551}; 552 553// if did is in set, we have fetched blocks for them already (against logged in users) 554export const blockFlags = new SvelteMap<Did, SvelteSet<Did>>(); 555 556export const fetchBlocked = async (client: AtpClient, subject: Did, blocker: Did) => { 557 const subjectUri = `at://${subject}` as ResourceUri; 558 const res = await client.getBacklinks(subjectUri, blockSource, [blocker], 1); 559 if (!res.ok) return false; 560 if (res.value.total > 0) addBacklinks(subjectUri, blockSource, res.value.records); 561 562 // mark as fetched 563 let flags = blockFlags.get(subject); 564 if (!flags) { 565 flags = new SvelteSet(); 566 blockFlags.set(subject, flags); 567 } 568 flags.add(blocker); 569 570 return res.value.total > 0; 571}; 572 573export const fetchBlocksForPosts = async (client: AtpClient, postUris: Iterable<ResourceUri>) => { 574 const userDid = client.user?.did; 575 if (!userDid) return; 576 577 // check if any of the post authors block the user 578 // eslint-disable-next-line svelte/prefer-svelte-reactivity 579 let distinctDids = new Set(Array.from(postUris).map((uri) => extractDidFromUri(uri)!)); 580 distinctDids.delete(userDid); // dont need to check if user blocks themselves 581 const alreadyFetched = blockFlags.get(userDid); 582 if (alreadyFetched) distinctDids = distinctDids.difference(alreadyFetched); 583 if (distinctDids.size > 0) 584 await Promise.all(distinctDids.values().map((did) => fetchBlocked(client, userDid, did))); 585}; 586 587export const fetchBlocks = async (account: Account) => { 588 const client = clients.get(account.did)!; 589 const res = await client.listRecordsUntil(account.did, 'app.bsky.graph.block'); 590 if (!res.ok) return; 591 for (const block of res.value.records) { 592 const record = block.value as AppBskyGraphBlock.Main; 593 const parsedUri = expect(parseCanonicalResourceUri(block.uri)); 594 addBacklinks(`at://${record.subject}`, blockSource, [ 595 { 596 did: parsedUri.repo, 597 collection: parsedUri.collection, 598 rkey: parsedUri.rkey 599 } 600 ]); 601 } 602}; 603 604export const createBlock = async (client: AtpClient, targetDid: Did) => { 605 const userDid = client.user?.did; 606 if (!userDid) return; 607 608 const rkey = tidNow(); 609 const targetUri = `at://${targetDid}` as ResourceUri; 610 611 addBacklinks(targetUri, blockSource, [ 612 { 613 did: userDid, 614 collection: 'app.bsky.graph.block', 615 rkey 616 } 617 ]); 618 619 const record: AppBskyGraphBlock.Main = { 620 $type: 'app.bsky.graph.block', 621 subject: targetDid, 622 // eslint-disable-next-line svelte/prefer-svelte-reactivity 623 createdAt: new Date().toISOString() 624 }; 625 626 await client.user?.atcute.post('com.atproto.repo.createRecord', { 627 input: { 628 repo: userDid, 629 collection: 'app.bsky.graph.block', 630 rkey, 631 record 632 } 633 }); 634}; 635 636export const deleteBlock = async (client: AtpClient, targetDid: Did) => { 637 const userDid = client.user?.did; 638 if (!userDid) return; 639 640 const targetUri = `at://${targetDid}` as ResourceUri; 641 const links = findBacklinksBy(targetUri, blockSource, userDid); 642 643 removeBacklinks(targetUri, blockSource, links); 644 645 await Promise.allSettled( 646 links.map((link) => 647 client.user?.atcute.post('com.atproto.repo.deleteRecord', { 648 input: { 649 repo: userDid, 650 collection: 'app.bsky.graph.block', 651 rkey: link.rkey 652 } 653 }) 654 ) 655 ); 656}; 657 658export const isBlockedByUser = (targetDid: Did, userDid: Did): boolean => { 659 return isBlockedBy(targetDid, userDid); 660}; 661 662export const isUserBlockedBy = (userDid: Did, targetDid: Did): boolean => { 663 return isBlockedBy(userDid, targetDid); 664}; 665 666export const hasBlockRelationship = (did1: Did, did2: Did): boolean => { 667 return isBlockedBy(did1, did2) || isBlockedBy(did2, did1); 668}; 669 670export const getBlockRelationship = ( 671 userDid: Did, 672 targetDid: Did 673): { userBlocked: boolean; blockedByTarget: boolean } => { 674 return { 675 userBlocked: isBlockedBy(targetDid, userDid), 676 blockedByTarget: isBlockedBy(userDid, targetDid) 677 }; 678}; 679 680export const allPosts = new SvelteMap<ResourceUri, PostWithUri>(); 681export const postsByDid = new SvelteMap<Did, SvelteSet<ResourceUri>>(); 682export type DeletedPostInfo = { reply?: PostWithUri['record']['reply'] }; 683export const deletedPosts = new SvelteMap<ResourceUri, DeletedPostInfo>(); 684 685// posts grouped by root uri for efficient thread building 686export const postsByRootUri = new SvelteMap<ResourceUri, SvelteSet<ResourceUri>>(); 687// did -> post uris that are replies to that did 688export const replyIndex = new SvelteMap<Did, SvelteSet<ResourceUri>>(); 689 690export const getPost = (did: Did, rkey: RecordKey) => 691 allPosts.get(toCanonicalUri({ did, collection: 'app.bsky.feed.post', rkey })); 692 693export const getPostFromUri = (uri: ResourceUri) => allPosts.get(uri); 694 695const hydrateCacheFn: Parameters<typeof hydratePosts>[3] = (did, rkey) => { 696 const cached = getPost(did, rkey); 697 return cached ? ok(cached) : undefined; 698}; 699 700export const addPosts = (newPosts: Iterable<PostWithUri>) => { 701 for (const post of newPosts) { 702 const parsedUri = expect(parseCanonicalResourceUri(post.uri)); 703 allPosts.set(post.uri, post); 704 705 // update postsByDid index 706 let didPosts = postsByDid.get(parsedUri.repo); 707 if (!didPosts) { 708 didPosts = new SvelteSet(); 709 postsByDid.set(parsedUri.repo, didPosts); 710 } 711 didPosts.add(post.uri); 712 713 // update postsByRootUri grouping 714 const rootUri = (post.record.reply?.root.uri as ResourceUri) || post.uri; 715 let rootGroup = postsByRootUri.get(rootUri); 716 if (!rootGroup) { 717 rootGroup = new SvelteSet(); 718 postsByRootUri.set(rootUri, rootGroup); 719 } 720 rootGroup.add(post.uri); 721 722 if (post.record.reply) { 723 const link = { 724 did: parsedUri.repo, 725 collection: parsedUri.collection, 726 rkey: parsedUri.rkey 727 }; 728 addBacklinks(post.record.reply.parent.uri, replySource, [link]); 729 addBacklinks(post.record.reply.root.uri, replyRootSource, [link]); 730 731 // update reply index 732 const parentDid = extractDidFromUri(post.record.reply.parent.uri); 733 if (parentDid) { 734 let set = replyIndex.get(parentDid); 735 if (!set) { 736 set = new SvelteSet(); 737 replyIndex.set(parentDid, set); 738 } 739 set.add(post.uri); 740 } 741 } 742 } 743}; 744 745export const deletePost = (uri: ResourceUri) => { 746 const did = extractDidFromUri(uri)!; 747 const post = allPosts.get(uri); 748 if (!post) return; 749 allPosts.delete(uri); 750 postsByDid.get(did)?.delete(uri); 751 // remove reply from index 752 const subjectDid = extractDidFromUri(post.record.reply?.parent.uri ?? ''); 753 if (subjectDid) replyIndex.get(subjectDid)?.delete(uri); 754 deletedPosts.set(uri, { reply: post.record.reply }); 755}; 756 757export const timelines = new SvelteMap<Did, SvelteSet<ResourceUri>>(); 758export const postCursors = new SvelteMap<Did, { value?: string; end: boolean }>(); 759 760// feed state: Did -> feedUri -> post URIs 761export const feedTimelines = new SvelteMap<Did, SvelteMap<string, ResourceUri[]>>(); 762export const feedCursors = new SvelteMap<Did, SvelteMap<string, { value?: string; end: boolean }>>(); 763 764export const followingFeed = new SvelteMap<Did, SvelteSet<ResourceUri>>(); // merged timeline: UserDid -> Set<Uri> 765export const followingBuffer = new SvelteMap<Did, SvelteSet<ResourceUri>>(); // buffer: UserDid -> Set<Uri> 766export const followingCursors = new SvelteMap<Did, SvelteMap<Did, string | null>>(); // cursors: UserDid -> SubjectDid -> Cursor 767 768export const fetchFeed = async ( 769 client: AtpClient, 770 feedUri: string, 771 feedServiceDid: string, 772 limit: number = 10 773) => { 774 const userDid = client.user?.did; 775 if (!userDid) return; 776 777 let userFeedCursors = feedCursors.get(userDid); 778 if (!userFeedCursors) { 779 userFeedCursors = new SvelteMap(); 780 feedCursors.set(userDid, userFeedCursors); 781 } 782 783 const cursor = userFeedCursors.get(feedUri); 784 if (cursor?.end) return; 785 786 const skeleton = await import('./at/feeds').then((m) => 787 m.fetchFeedSkeleton(client, feedUri, feedServiceDid, cursor?.value, limit) 788 ); 789 if (!skeleton) throw `failed to fetch feed skeleton for ${feedUri}`; 790 791 const newCursor = { value: skeleton.cursor, end: skeleton.feed.length === 0 }; 792 userFeedCursors.set(feedUri, newCursor); 793 794 const uris = skeleton.feed.slice(0, limit).map((item) => item.post as ResourceUri); 795 796 let userFeedTimelines = feedTimelines.get(userDid); 797 if (!userFeedTimelines) { 798 userFeedTimelines = new SvelteMap(); 799 feedTimelines.set(userDid, userFeedTimelines); 800 } 801 802 const existing = userFeedTimelines.get(feedUri) ?? []; 803 userFeedTimelines.set(feedUri, [...existing, ...uris]); 804 805 // fetch each post record 806 const posts = await Promise.all( 807 uris.map(async (uri) => { 808 const result = await client.getRecordUri(AppBskyFeedPost.mainSchema, uri); 809 if (!result.ok) return null; 810 return { uri: result.value.uri, cid: result.value.cid, record: result.value.record }; 811 }) 812 ); 813 814 const validPosts = posts.filter((p): p is PostWithUri => p !== null); 815 addPosts(validPosts); 816 817 // check if any of the post authors block the user 818 await fetchBlocksForPosts( 819 client, 820 validPosts.map((p) => p.uri) 821 ); 822 823 return newCursor; 824}; 825 826export const checkForNewPosts = async (client: AtpClient, feedUri: string, feedServiceDid: string) => { 827 const userDid = client.user?.did; 828 if (!userDid) return false; 829 830 const currentFeed = feedTimelines.get(userDid)?.get(feedUri); 831 if (!currentFeed || currentFeed.length === 0) return false; 832 833 const skeleton = await import('./at/feeds').then((m) => 834 m.fetchFeedSkeleton(client, feedUri, feedServiceDid, undefined, 1) 835 ); 836 837 if (skeleton && skeleton.feed.length > 0) { 838 const latestPost = skeleton.feed[0].post; 839 return latestPost !== currentFeed[0]; 840 } 841 842 return false; 843}; 844 845export const resetFeed = (did: Did, feedUri: string) => { 846 feedTimelines.get(did)?.delete(feedUri); 847 feedCursors.get(did)?.delete(feedUri); 848}; 849 850 851const traversePostChain = (post: PostWithUri) => { 852 const result = [post.uri]; 853 const parentUri = post.record.reply?.parent.uri; 854 if (parentUri) { 855 const parentPost = allPosts.get(parentUri as ResourceUri); 856 if (parentPost) result.push(...traversePostChain(parentPost)); 857 } 858 return result; 859}; 860export const addTimeline = (did: Did, uris: Iterable<ResourceUri>) => { 861 let timeline = timelines.get(did); 862 if (!timeline) { 863 timeline = new SvelteSet(); 864 timelines.set(did, timeline); 865 } 866 for (const uri of uris) { 867 const post = allPosts.get(uri); 868 // we need to traverse the post chain to add all posts in the chain to the timeline 869 // because the parent posts might not be in the timeline yet 870 const chain = post ? traversePostChain(post) : [uri]; 871 for (const uri of chain) timeline.add(uri); 872 } 873}; 874 875export const fetchTimeline = async ( 876 client: AtpClient, 877 subject: Did, 878 limit: number = 6, 879 withBacklinks: boolean = true, 880 hydrateOptions?: Partial<HydrateOptions> 881) => { 882 const cursor = postCursors.get(subject); 883 if (cursor && cursor.end) return; 884 885 const accPosts = await fetchPosts(subject, client, cursor?.value, limit, withBacklinks); 886 if (!accPosts.ok) throw `cant fetch posts ${subject}: ${accPosts.error}`; 887 888 // if the cursor is undefined, we've reached the end of the timeline 889 const newCursor = { value: accPosts.value.cursor, end: !accPosts.value.cursor }; 890 postCursors.set(subject, newCursor); 891 const hydrated = await hydratePosts( 892 client, 893 subject, 894 accPosts.value.posts, 895 hydrateCacheFn, 896 hydrateOptions 897 ); 898 if (!hydrated.ok) throw `cant hydrate posts ${subject}: ${hydrated.error}`; 899 900 addPosts(hydrated.value.values()); 901 addTimeline(subject, hydrated.value.keys()); 902 903 await fetchBlocksForPosts(client, hydrated.value.keys()); 904 905 console.log(`${subject}: fetchTimeline`, accPosts.value.cursor); 906 return newCursor; 907}; 908 909export const fetchInteractionsToTimelineEnd = async ( 910 client: AtpClient, 911 interactor: Did, 912 subject: Did 913) => { 914 const cursor = postCursors.get(subject); 915 if (!cursor) return; 916 const timestamp = timestampFromCursor(cursor.value); 917 await Promise.all( 918 [likeSource, repostSource].map((s) => fetchLinksUntil(interactor, client, s, timestamp)) 919 ); 920}; 921 922export const fetchInteractionsToFollowingTimelineEnd = async ( 923 client: AtpClient, 924 userDid: Did 925) => { 926 const userCursors = followingCursors.get(userDid); 927 if (!userCursors) return; 928 929 const feed = followingFeed.get(userDid); 930 if (!feed || feed.size === 0) return; 931 932 let minTimestamp = Date.now() * 1000; 933 let found = false; 934 935 for (const uri of feed) { 936 const post = getPostFromUri(uri); 937 if (post) { 938 const ts = new Date(post.record.createdAt).getTime() * 1000; 939 if (ts < minTimestamp) minTimestamp = ts; 940 found = true; 941 } 942 } 943 944 if (!found) return; 945 946 await Promise.all( 947 [likeSource, repostSource].map((s) => fetchLinksUntil(userDid, client, s, minTimestamp)) 948 ); 949}; 950 951export const fetchInteractionsToFeedTimelineEnd = async ( 952 client: AtpClient, 953 userDid: Did, 954 feedUri: string 955) => { 956 const userFeedTimelines = feedTimelines.get(userDid); 957 if (!userFeedTimelines) return; 958 const posts = userFeedTimelines.get(feedUri); 959 if (!posts || posts.length === 0) return; 960 961 let minTimestamp = Date.now() * 1000; 962 let found = false; 963 964 for (const uri of posts) { 965 const post = getPostFromUri(uri); 966 if (post) { 967 const ts = new Date(post.record.createdAt).getTime() * 1000; 968 if (ts < minTimestamp) minTimestamp = ts; 969 found = true; 970 } 971 } 972 973 if (!found) return; 974 975 await Promise.all( 976 [likeSource, repostSource].map((s) => fetchLinksUntil(userDid, client, s, minTimestamp)) 977 ); 978}; 979 980export const initialDone = new SvelteSet<Did>(); 981export const fetchInitial = async (account: Account) => { 982 const start = Date.now(); 983 const client = clients.get(account.did)!; 984 const profile = await client.getProfile(); 985 if (profile.ok) profiles.set(account.did, profile.value); 986 await Promise.allSettled([ 987 fetchBlocks(account), 988 fetchForInteractions(client, account.did), 989 fetchFollows(account).then((follows) => 990 Promise.allSettled(follows.map((follow) => fetchForInteractions(client, follow.subject)) ?? []) 991 ) 992 ]); 993 initialDone.add(account.did); 994 console.log(`initial done for ${account.did} in ${Date.now() - start}ms`); 995}; 996 997export const handleJetstreamEvent = async (event: JetstreamEvent) => { 998 if (event.kind !== 'commit') return; 999 1000 const { did, commit } = event; 1001 const uri: ResourceUri = toCanonicalUri({ did, ...commit }); 1002 if (commit.collection === 'app.bsky.feed.post') { 1003 if (commit.operation === 'create') { 1004 const record = commit.record as AppBskyFeedPost.Main; 1005 const posts = [ 1006 { 1007 record, 1008 uri, 1009 cid: commit.cid 1010 } 1011 ]; 1012 await setRecordCache(uri, record); 1013 const client = clients.get(did) ?? viewClient; 1014 const hydrated = await hydratePosts(client, did, posts, hydrateCacheFn); 1015 if (!hydrated.ok) { 1016 console.error(`cant hydrate posts ${did}: ${hydrated.error}`); 1017 return; 1018 } 1019 addPosts(hydrated.value.values()); 1020 addTimeline(did, hydrated.value.keys()); 1021 1022 if (record.reply) { 1023 const parentDid = extractDidFromUri(record.reply.parent.uri)!; 1024 addTimeline(parentDid, [uri]); 1025 } 1026 1027 // Broadcast to following feeds of local accounts 1028 for (const account of get(accounts)) { 1029 // does this account follow the author? 1030 let isFollowing = account.did === did; 1031 if (!isFollowing) { 1032 const accountFollows = follows.get(account.did); 1033 if (accountFollows?.has(did)) isFollowing = true; 1034 } 1035 1036 if (isFollowing) { 1037 const feed = followingFeed.get(account.did); 1038 if (feed) for (const uri of hydrated.value.keys()) feed.add(uri); 1039 } 1040 } 1041 } else if (commit.operation === 'delete') { 1042 deletePost(uri); 1043 } 1044 } 1045}; 1046 1047const handlePostNotification = async (event: NotificationsStreamEvent & { type: 'message' }) => { 1048 const parsedSubjectUri = expect(parseCanonicalResourceUri(event.data.link.subject)); 1049 const did = parsedSubjectUri.repo as AtprotoDid; 1050 const client = clients.get(did); 1051 if (!client) { 1052 console.error(`${did}: cant handle post notification, client not found !?`); 1053 return; 1054 } 1055 const subjectPost = await client.getRecord( 1056 AppBskyFeedPost.mainSchema, 1057 did, 1058 parsedSubjectUri.rkey 1059 ); 1060 if (!subjectPost.ok) return; 1061 1062 const parsedSourceUri = expect(parseCanonicalResourceUri(event.data.link.source_record)); 1063 const posts = [ 1064 { 1065 record: subjectPost.value.record, 1066 uri: event.data.link.subject, 1067 cid: subjectPost.value.cid, 1068 replies: { 1069 cursor: null, 1070 total: 1, 1071 records: [ 1072 { 1073 did: parsedSourceUri.repo, 1074 collection: parsedSourceUri.collection, 1075 rkey: parsedSourceUri.rkey 1076 } 1077 ] 1078 } 1079 } 1080 ]; 1081 const hydrated = await hydratePosts(client, did, posts, hydrateCacheFn); 1082 if (!hydrated.ok) { 1083 console.error(`cant hydrate posts ${did}: ${hydrated.error}`); 1084 return; 1085 } 1086 1087 // console.log(hydrated); 1088 addPosts(hydrated.value.values()); 1089 addTimeline(did, hydrated.value.keys()); 1090}; 1091 1092const handleBacklink = (event: NotificationsStreamEvent & { type: 'message' }) => { 1093 const parsedSource = expect(parseCanonicalResourceUri(event.data.link.source_record)); 1094 addBacklinks(event.data.link.subject, event.data.link.source, [ 1095 { 1096 did: parsedSource.repo, 1097 collection: parsedSource.collection, 1098 rkey: parsedSource.rkey 1099 } 1100 ]); 1101}; 1102 1103export const handleNotification = async (event: NotificationsStreamEvent) => { 1104 if (event.type === 'message') { 1105 if (event.data.link.source.startsWith('app.bsky.feed.post')) handlePostNotification(event); 1106 else handleBacklink(event); 1107 } 1108}; 1109 1110export const currentTime = new SvelteDate(); 1111 1112if (typeof window !== 'undefined') 1113 setInterval(() => { 1114 currentTime.setTime(Date.now()); 1115 }, 1000); 1116 1117export const router = new Router();