appview-less bluesky client
24
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 0eaf2dff795c9df008f40a99ca4e2714395bd79d 640 lines 20 kB view raw
1import { writable } from 'svelte/store'; 2import { 3 AtpClient, 4 type NotificationsStream, 5 type NotificationsStreamEvent 6} from './at/client.svelte'; 7import { SvelteMap, SvelteDate, SvelteSet } from 'svelte/reactivity'; 8import type { Did, Handle, Nsid, RecordKey, ResourceUri } from '@atcute/lexicons'; 9import { fetchPosts, hydratePosts, type PostWithUri } from './at/fetch'; 10import { parseCanonicalResourceUri, type AtprotoDid } from '@atcute/lexicons/syntax'; 11import { 12 AppBskyActorProfile, 13 AppBskyFeedPost, 14 AppBskyGraphBlock, 15 type AppBskyGraphFollow 16} from '@atcute/bluesky'; 17import type { JetstreamSubscription, JetstreamEvent } from '@atcute/jetstream'; 18import { expect, ok } from './result'; 19import type { Backlink, BacklinksSource } from './at/constellation'; 20import { now as tidNow } from '@atcute/tid'; 21import type { Records } from '@atcute/lexicons/ambient'; 22import { 23 blockSource, 24 extractDidFromUri, 25 likeSource, 26 replyRootSource, 27 replySource, 28 repostSource, 29 timestampFromCursor, 30 toCanonicalUri 31} from '$lib'; 32import { Router } from './router.svelte'; 33import type { Account } from './accounts'; 34 35export const notificationStream = writable<NotificationsStream | null>(null); 36export const jetstream = writable<JetstreamSubscription | null>(null); 37 38export const profiles = new SvelteMap<Did, AppBskyActorProfile.Main>(); 39export const handles = new SvelteMap<Did, Handle>(); 40 41// source -> subject -> did (who did the interaction) -> rkey 42export type BacklinksMap = SvelteMap< 43 BacklinksSource, 44 SvelteMap<ResourceUri, SvelteMap<Did, SvelteSet<RecordKey>>> 45>; 46export const allBacklinks: BacklinksMap = new SvelteMap(); 47 48export const addBacklinks = ( 49 subject: ResourceUri, 50 source: BacklinksSource, 51 links: Iterable<Backlink> 52) => { 53 let subjectMap = allBacklinks.get(source); 54 if (!subjectMap) { 55 subjectMap = new SvelteMap(); 56 allBacklinks.set(source, subjectMap); 57 } 58 59 let didMap = subjectMap.get(subject); 60 if (!didMap) { 61 didMap = new SvelteMap(); 62 subjectMap.set(subject, didMap); 63 } 64 65 for (const link of links) { 66 let rkeys = didMap.get(link.did); 67 if (!rkeys) { 68 rkeys = new SvelteSet(); 69 didMap.set(link.did, rkeys); 70 } 71 rkeys.add(link.rkey); 72 } 73}; 74 75export const removeBacklinks = ( 76 subject: ResourceUri, 77 source: BacklinksSource, 78 links: Iterable<Backlink> 79) => { 80 const didMap = allBacklinks.get(source)?.get(subject); 81 if (!didMap) return; 82 83 for (const link of links) { 84 const rkeys = didMap.get(link.did); 85 if (!rkeys) continue; 86 rkeys.delete(link.rkey); 87 if (rkeys.size === 0) didMap.delete(link.did); 88 } 89}; 90 91export const findBacklinksBy = (subject: ResourceUri, source: BacklinksSource, did: Did) => { 92 const rkeys = allBacklinks.get(source)?.get(subject)?.get(did) ?? []; 93 // reconstruct the collection from the source 94 const collection = source.split(':')[0] as Nsid; 95 return rkeys.values().map((rkey) => ({ did, collection, rkey })); 96}; 97 98export const hasBacklink = (subject: ResourceUri, source: BacklinksSource, did: Did): boolean => { 99 return allBacklinks.get(source)?.get(subject)?.has(did) ?? false; 100}; 101 102export const getAllBacklinksFor = (subject: ResourceUri, source: BacklinksSource): Backlink[] => { 103 const subjectMap = allBacklinks.get(source); 104 if (!subjectMap) return []; 105 106 const didMap = subjectMap.get(subject); 107 if (!didMap) return []; 108 109 const collection = source.split(':')[0] as Nsid; 110 const result: Backlink[] = []; 111 112 for (const [did, rkeys] of didMap) 113 for (const rkey of rkeys) result.push({ did, collection, rkey }); 114 115 return result; 116}; 117 118export const isBlockedBy = (subject: Did, blocker: Did): boolean => { 119 return hasBacklink(`at://${subject}`, 'app.bsky.graph.block:subject', blocker); 120}; 121 122// eslint-disable-next-line @typescript-eslint/no-explicit-any 123const getNestedValue = (obj: any, path: string[]): any => { 124 return path.reduce((current, key) => current?.[key], obj); 125}; 126 127// eslint-disable-next-line @typescript-eslint/no-explicit-any 128const setNestedValue = (obj: any, path: string[], value: any): void => { 129 const lastKey = path[path.length - 1]; 130 const parent = path.slice(0, -1).reduce((current, key) => { 131 if (current[key] === undefined) current[key] = {}; 132 return current[key]; 133 }, obj); 134 parent[lastKey] = value; 135}; 136 137export const backlinksCursors = new SvelteMap< 138 Did, 139 SvelteMap<BacklinksSource, string | undefined> 140>(); 141 142export const fetchLinksUntil = async ( 143 subject: Did, 144 client: AtpClient, 145 backlinkSource: BacklinksSource, 146 timestamp: number = -1 147) => { 148 let cursorMap = backlinksCursors.get(subject); 149 if (!cursorMap) { 150 cursorMap = new SvelteMap<BacklinksSource, string | undefined>(); 151 backlinksCursors.set(subject, cursorMap); 152 } 153 154 const [_collection, source] = backlinkSource.split(':'); 155 const collection = _collection as keyof Records; 156 const cursor = cursorMap.get(backlinkSource); 157 158 // if already fetched we dont need to fetch again 159 const cursorTimestamp = timestampFromCursor(cursor); 160 if (cursorTimestamp && cursorTimestamp <= timestamp) return; 161 162 console.log(`${subject}: fetchLinksUntil`, backlinkSource, cursor, timestamp); 163 const result = await client.listRecordsUntil(subject, collection, cursor, timestamp); 164 165 if (!result.ok) { 166 console.error('failed to fetch links until', result.error); 167 return; 168 } 169 cursorMap.set(backlinkSource, result.value.cursor); 170 171 const path = source.split('.'); 172 for (const record of result.value.records) { 173 const uri = getNestedValue(record.value, path); 174 const parsedUri = parseCanonicalResourceUri(record.uri); 175 if (!parsedUri.ok) continue; 176 addBacklinks(uri, `${collection}:${source}`, [ 177 { 178 did: parsedUri.value.repo, 179 collection: parsedUri.value.collection, 180 rkey: parsedUri.value.rkey 181 } 182 ]); 183 } 184}; 185 186export const deletePostBacklink = async ( 187 client: AtpClient, 188 post: PostWithUri, 189 source: BacklinksSource 190) => { 191 const did = client.user?.did; 192 if (!did) return; 193 const collection = source.split(':')[0] as Nsid; 194 const links = findBacklinksBy(post.uri, source, did); 195 removeBacklinks(post.uri, source, links); 196 await Promise.allSettled( 197 links.map((link) => 198 client.user?.atcute.post('com.atproto.repo.deleteRecord', { 199 input: { repo: did, collection, rkey: link.rkey! } 200 }) 201 ) 202 ); 203}; 204 205export const createPostBacklink = async ( 206 client: AtpClient, 207 post: PostWithUri, 208 source: BacklinksSource 209) => { 210 const did = client.user?.did; 211 if (!did) return; 212 const [_collection, subject] = source.split(':'); 213 const collection = _collection as Nsid; 214 const rkey = tidNow(); 215 addBacklinks(post.uri, source, [ 216 { 217 did, 218 collection, 219 rkey 220 } 221 ]); 222 const record = { 223 $type: collection, 224 // eslint-disable-next-line svelte/prefer-svelte-reactivity 225 createdAt: new Date().toISOString() 226 }; 227 const subjectPath = subject.split('.'); 228 setNestedValue(record, subjectPath, post.uri); 229 setNestedValue(record, [...subjectPath.slice(0, -1), 'cid'], post.cid); 230 await client.user?.atcute.post('com.atproto.repo.createRecord', { 231 input: { 232 repo: did, 233 collection, 234 rkey, 235 record 236 } 237 }); 238}; 239 240export const pulsingPostId = writable<string | null>(null); 241 242export const viewClient = new AtpClient(); 243export const clients = new SvelteMap<Did, AtpClient>(); 244 245export const follows = new SvelteMap<Did, SvelteMap<ResourceUri, AppBskyGraphFollow.Main>>(); 246 247export const addFollows = ( 248 did: Did, 249 followMap: Iterable<[ResourceUri, AppBskyGraphFollow.Main]> 250) => { 251 let map = follows.get(did)!; 252 if (!map) { 253 map = new SvelteMap(followMap); 254 follows.set(did, map); 255 return; 256 } 257 for (const [uri, record] of followMap) map.set(uri, record); 258}; 259 260export const fetchFollows = async ( 261 account: Account 262): Promise<IteratorObject<AppBskyGraphFollow.Main>> => { 263 const client = clients.get(account.did)!; 264 const res = await client.listRecordsUntil(account.did, 'app.bsky.graph.follow'); 265 if (!res.ok) { 266 console.error("can't fetch follows:", res.error); 267 return [].values(); 268 } 269 addFollows( 270 account.did, 271 res.value.records.map((follow) => [follow.uri, follow.value as AppBskyGraphFollow.Main]) 272 ); 273 return res.value.records.values().map((follow) => follow.value as AppBskyGraphFollow.Main); 274}; 275 276// this fetches up to three days of posts and interactions for using in following list 277export const fetchForInteractions = async (client: AtpClient, subject: Did) => { 278 const threeDaysAgo = (Date.now() - 3 * 24 * 60 * 60 * 1000) * 1000; 279 280 const res = await client.listRecordsUntil(subject, 'app.bsky.feed.post', undefined, threeDaysAgo); 281 if (!res.ok) return; 282 const postsWithUri = res.value.records.map( 283 (post) => 284 ({ cid: post.cid, uri: post.uri, record: post.value as AppBskyFeedPost.Main }) as PostWithUri 285 ); 286 addPosts(postsWithUri); 287 288 const cursorTimestamp = timestampFromCursor(res.value.cursor) ?? -1; 289 const timestamp = Math.min(cursorTimestamp, threeDaysAgo); 290 console.log(`${subject}: fetchForInteractions`, res.value.cursor, timestamp); 291 await Promise.all([repostSource].map((s) => fetchLinksUntil(subject, client, s, timestamp))); 292}; 293 294// if did is in set, we have fetched blocks for them already (against logged in users) 295export const blockFlags = new SvelteMap<Did, SvelteSet<Did>>(); 296 297export const fetchBlocked = async (client: AtpClient, subject: Did, blocker: Did) => { 298 const subjectUri = `at://${subject}` as ResourceUri; 299 const res = await client.getBacklinks(subjectUri, blockSource, [blocker], 1); 300 if (!res.ok) return false; 301 if (res.value.total > 0) addBacklinks(subjectUri, blockSource, res.value.records); 302 303 // mark as fetched 304 let flags = blockFlags.get(subject); 305 if (!flags) { 306 flags = new SvelteSet(); 307 blockFlags.set(subject, flags); 308 } 309 flags.add(blocker); 310 311 return res.value.total > 0; 312}; 313 314export const fetchBlocks = async (account: Account) => { 315 const client = clients.get(account.did)!; 316 const res = await client.listRecordsUntil(account.did, 'app.bsky.graph.block'); 317 if (!res.ok) return; 318 for (const block of res.value.records) { 319 const record = block.value as AppBskyGraphBlock.Main; 320 const parsedUri = expect(parseCanonicalResourceUri(block.uri)); 321 addBacklinks(`at://${record.subject}`, blockSource, [ 322 { 323 did: parsedUri.repo, 324 collection: parsedUri.collection, 325 rkey: parsedUri.rkey 326 } 327 ]); 328 } 329}; 330 331export const createBlock = async (client: AtpClient, targetDid: Did) => { 332 const userDid = client.user?.did; 333 if (!userDid) return; 334 335 const rkey = tidNow(); 336 const targetUri = `at://${targetDid}` as ResourceUri; 337 338 addBacklinks(targetUri, blockSource, [ 339 { 340 did: userDid, 341 collection: 'app.bsky.graph.block', 342 rkey 343 } 344 ]); 345 346 const record: AppBskyGraphBlock.Main = { 347 $type: 'app.bsky.graph.block', 348 subject: targetDid, 349 // eslint-disable-next-line svelte/prefer-svelte-reactivity 350 createdAt: new Date().toISOString() 351 }; 352 353 await client.user?.atcute.post('com.atproto.repo.createRecord', { 354 input: { 355 repo: userDid, 356 collection: 'app.bsky.graph.block', 357 rkey, 358 record 359 } 360 }); 361}; 362 363export const deleteBlock = async (client: AtpClient, targetDid: Did) => { 364 const userDid = client.user?.did; 365 if (!userDid) return; 366 367 const targetUri = `at://${targetDid}` as ResourceUri; 368 const links = findBacklinksBy(targetUri, blockSource, userDid); 369 370 removeBacklinks(targetUri, blockSource, links); 371 372 await Promise.allSettled( 373 links.map((link) => 374 client.user?.atcute.post('com.atproto.repo.deleteRecord', { 375 input: { 376 repo: userDid, 377 collection: 'app.bsky.graph.block', 378 rkey: link.rkey 379 } 380 }) 381 ) 382 ); 383}; 384 385export const isBlockedByUser = (targetDid: Did, userDid: Did): boolean => { 386 return isBlockedBy(targetDid, userDid); 387}; 388 389export const isUserBlockedBy = (userDid: Did, targetDid: Did): boolean => { 390 return isBlockedBy(userDid, targetDid); 391}; 392 393export const hasBlockRelationship = (did1: Did, did2: Did): boolean => { 394 return isBlockedBy(did1, did2) || isBlockedBy(did2, did1); 395}; 396 397export const getBlockRelationship = ( 398 userDid: Did, 399 targetDid: Did 400): { userBlocked: boolean; blockedByTarget: boolean } => { 401 return { 402 userBlocked: isBlockedBy(targetDid, userDid), 403 blockedByTarget: isBlockedBy(userDid, targetDid) 404 }; 405}; 406 407export const allPosts = new SvelteMap<Did, SvelteMap<ResourceUri, PostWithUri>>(); 408// did -> post uris that are replies to that did 409export const replyIndex = new SvelteMap<Did, SvelteSet<ResourceUri>>(); 410 411export const getPost = (did: Did, rkey: RecordKey) => 412 allPosts.get(did)?.get(toCanonicalUri({ did, collection: 'app.bsky.feed.post', rkey })); 413const hydrateCacheFn: Parameters<typeof hydratePosts>[3] = (did, rkey) => { 414 const cached = getPost(did, rkey); 415 return cached ? ok(cached) : undefined; 416}; 417 418export const addPosts = (newPosts: Iterable<PostWithUri>) => { 419 for (const post of newPosts) { 420 const parsedUri = expect(parseCanonicalResourceUri(post.uri)); 421 let posts = allPosts.get(parsedUri.repo); 422 if (!posts) { 423 posts = new SvelteMap(); 424 allPosts.set(parsedUri.repo, posts); 425 } 426 posts.set(post.uri, post); 427 if (post.record.reply) { 428 const link = { 429 did: parsedUri.repo, 430 collection: parsedUri.collection, 431 rkey: parsedUri.rkey 432 }; 433 addBacklinks(post.record.reply.parent.uri, replySource, [link]); 434 addBacklinks(post.record.reply.root.uri, replyRootSource, [link]); 435 436 // update reply index 437 const parentDid = extractDidFromUri(post.record.reply.parent.uri); 438 if (parentDid) { 439 let set = replyIndex.get(parentDid); 440 if (!set) { 441 set = new SvelteSet(); 442 replyIndex.set(parentDid, set); 443 } 444 set.add(post.uri); 445 } 446 } 447 } 448}; 449 450export const timelines = new SvelteMap<Did, SvelteSet<ResourceUri>>(); 451export const postCursors = new SvelteMap<Did, { value?: string; end: boolean }>(); 452 453const traversePostChain = (post: PostWithUri) => { 454 const result = [post.uri]; 455 const parentUri = post.record.reply?.parent.uri; 456 if (parentUri) { 457 const parentPost = allPosts.get(extractDidFromUri(parentUri)!)?.get(parentUri); 458 if (parentPost) result.push(...traversePostChain(parentPost)); 459 } 460 return result; 461}; 462export const addTimeline = (did: Did, uris: Iterable<ResourceUri>) => { 463 let timeline = timelines.get(did); 464 if (!timeline) { 465 timeline = new SvelteSet(); 466 timelines.set(did, timeline); 467 } 468 for (const uri of uris) { 469 const post = allPosts.get(did)?.get(uri); 470 // we need to traverse the post chain to add all posts in the chain to the timeline 471 // because the parent posts might not be in the timeline yet 472 const chain = post ? traversePostChain(post) : [uri]; 473 for (const uri of chain) timeline.add(uri); 474 } 475}; 476 477export const fetchTimeline = async ( 478 client: AtpClient, 479 subject: AtprotoDid, 480 limit: number = 6, 481 withBacklinks: boolean = true 482) => { 483 const cursor = postCursors.get(subject); 484 if (cursor && cursor.end) return; 485 486 const accPosts = await fetchPosts(subject, client, cursor?.value, limit, withBacklinks); 487 if (!accPosts.ok) throw `cant fetch posts ${subject}: ${accPosts.error}`; 488 489 // if the cursor is undefined, we've reached the end of the timeline 490 const newCursor = { value: accPosts.value.cursor, end: !accPosts.value.cursor }; 491 postCursors.set(subject, newCursor); 492 const hydrated = await hydratePosts(client, subject, accPosts.value.posts, hydrateCacheFn); 493 if (!hydrated.ok) throw `cant hydrate posts ${subject}: ${hydrated.error}`; 494 495 addPosts(hydrated.value.values()); 496 addTimeline(subject, hydrated.value.keys()); 497 498 if (client.user?.did) { 499 const userDid = client.user.did; 500 // check if any of the post authors block the user 501 // eslint-disable-next-line svelte/prefer-svelte-reactivity 502 let distinctDids = new Set(hydrated.value.keys().map((uri) => extractDidFromUri(uri)!)); 503 distinctDids.delete(userDid); // dont need to check if user blocks themselves 504 const alreadyFetched = blockFlags.get(userDid); 505 if (alreadyFetched) distinctDids = distinctDids.difference(alreadyFetched); 506 if (distinctDids.size > 0) 507 await Promise.all(distinctDids.values().map((did) => fetchBlocked(client, userDid, did))); 508 } 509 510 console.log(`${subject}: fetchTimeline`, accPosts.value.cursor); 511 return newCursor; 512}; 513 514export const fetchInteractionsToTimelineEnd = async (client: AtpClient, did: Did) => { 515 const cursor = postCursors.get(did); 516 if (!cursor) return; 517 const timestamp = timestampFromCursor(cursor.value); 518 await Promise.all( 519 [likeSource, repostSource].map((s) => fetchLinksUntil(did, client, s, timestamp)) 520 ); 521}; 522 523export const fetchInitial = async (account: Account) => { 524 const client = clients.get(account.did)!; 525 await Promise.all([ 526 fetchBlocks(account), 527 fetchForInteractions(client, account.did), 528 fetchFollows(account).then((follows) => 529 Promise.all(follows.map((follow) => fetchForInteractions(client, follow.subject)) ?? []) 530 ) 531 ]); 532}; 533 534export const handleJetstreamEvent = async (event: JetstreamEvent) => { 535 if (event.kind !== 'commit') return; 536 537 const { did, commit } = event; 538 const uri: ResourceUri = toCanonicalUri({ did, ...commit }); 539 if (commit.collection === 'app.bsky.feed.post') { 540 if (commit.operation === 'create') { 541 const posts = [ 542 { 543 record: commit.record as AppBskyFeedPost.Main, 544 uri, 545 cid: commit.cid 546 } 547 ]; 548 const client = clients.get(did) ?? viewClient; 549 const hydrated = await hydratePosts(client, did, posts, hydrateCacheFn); 550 if (!hydrated.ok) { 551 console.error(`cant hydrate posts ${did}: ${hydrated.error}`); 552 return; 553 } 554 addPosts(hydrated.value.values()); 555 addTimeline(did, hydrated.value.keys()); 556 } else if (commit.operation === 'delete') { 557 const post = allPosts.get(did)?.get(uri); 558 if (post) { 559 allPosts.get(did)?.delete(uri); 560 // remove from timeline 561 timelines.get(did)?.delete(uri); 562 // remove reply from index 563 const subjectDid = extractDidFromUri(post.record.reply?.parent.uri ?? ''); 564 if (subjectDid) replyIndex.get(subjectDid)?.delete(uri); 565 } 566 } 567 } 568}; 569 570const handlePostNotification = async (event: NotificationsStreamEvent & { type: 'message' }) => { 571 const parsedSubjectUri = expect(parseCanonicalResourceUri(event.data.link.subject)); 572 const did = parsedSubjectUri.repo as AtprotoDid; 573 const client = clients.get(did); 574 if (!client) { 575 console.error(`${did}: cant handle post notification, client not found !?`); 576 return; 577 } 578 const subjectPost = await client.getRecord( 579 AppBskyFeedPost.mainSchema, 580 did, 581 parsedSubjectUri.rkey 582 ); 583 if (!subjectPost.ok) return; 584 585 const parsedSourceUri = expect(parseCanonicalResourceUri(event.data.link.source_record)); 586 const posts = [ 587 { 588 record: subjectPost.value.record, 589 uri: event.data.link.subject, 590 cid: subjectPost.value.cid, 591 replies: { 592 cursor: null, 593 total: 1, 594 records: [ 595 { 596 did: parsedSourceUri.repo, 597 collection: parsedSourceUri.collection, 598 rkey: parsedSourceUri.rkey 599 } 600 ] 601 } 602 } 603 ]; 604 const hydrated = await hydratePosts(client, did, posts, hydrateCacheFn); 605 if (!hydrated.ok) { 606 console.error(`cant hydrate posts ${did}: ${hydrated.error}`); 607 return; 608 } 609 610 // console.log(hydrated); 611 addPosts(hydrated.value.values()); 612 addTimeline(did, hydrated.value.keys()); 613}; 614 615const handleBacklink = (event: NotificationsStreamEvent & { type: 'message' }) => { 616 const parsedSource = expect(parseCanonicalResourceUri(event.data.link.source_record)); 617 addBacklinks(event.data.link.subject, event.data.link.source, [ 618 { 619 did: parsedSource.repo, 620 collection: parsedSource.collection, 621 rkey: parsedSource.rkey 622 } 623 ]); 624}; 625 626export const handleNotification = async (event: NotificationsStreamEvent) => { 627 if (event.type === 'message') { 628 if (event.data.link.source.startsWith('app.bsky.feed.post')) handlePostNotification(event); 629 else handleBacklink(event); 630 } 631}; 632 633export const currentTime = new SvelteDate(); 634 635if (typeof window !== 'undefined') 636 setInterval(() => { 637 currentTime.setTime(Date.now()); 638 }, 1000); 639 640export const router = new Router();