[READ ONLY MIRROR] Spark Social AppView Server github.com/sprksocial/server
atproto deno hono lexicon
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 1176 lines 35 kB view raw
1import { assert } from "@std/assert"; 2import { AtUri } from "@atp/syntax"; 3import { DataPlane } from "../data-plane/index.ts"; 4import * as so from "../lex/so.ts"; 5import { uriToDid as didFromUri } from "../utils/uris.ts"; 6import { 7 ActivitySubscriptionStates, 8 ActorHydrator, 9 Actors, 10 KnownFollowersStates, 11 ProfileAggs, 12 ProfileRecord, 13 ProfileViewerStates, 14} from "./actor.ts"; 15import { 16 FeedGenAggs, 17 FeedGens, 18 FeedGenViewerStates, 19 FeedHydrator, 20 FeedItem, 21 KnownInteractionsStates, 22 Likes, 23 Post, 24 PostAggs, 25 Posts, 26 PostViewerStates, 27 Replies, 28 Reply, 29 ReplyAggs, 30 Reposts, 31 SoundAggs, 32 Sounds, 33 ThreadContexts, 34 ThreadRef, 35} from "./feed.ts"; 36import { Stories, StoryHydrator, StoryRecord } from "./story.ts"; 37 38import { 39 BlockEntry, 40 Follows, 41 GraphHydrator, 42 RelationshipPair, 43} from "./graph.ts"; 44import { 45 HydrationMap, 46 ItemRef, 47 mergeMaps, 48 parseRecord, 49 RecordInfo, 50 urisByCollection, 51} from "./util.ts"; 52 53import { 54 LabelerAggs, 55 Labelers, 56 LabelerViewerStates, 57 LabelHydrator, 58 Labels, 59} from "./label.ts"; 60import { ParsedLabelers } from "../util.ts"; 61import { Notification } from "../data-plane/routes/notifs.ts"; 62 63export class HydrateCtx { 64 labelers: ParsedLabelers; 65 viewer: string | null; 66 includeTakedowns?: boolean; 67 includeActorTakedowns?: boolean; 68 include3pBlocks?: boolean; 69 70 constructor(private vals: HydrateCtxVals) { 71 this.labelers = this.vals.labelers; 72 this.viewer = this.vals.viewer !== null 73 ? serviceRefToDid(this.vals.viewer) 74 : null; 75 this.includeTakedowns = this.vals.includeTakedowns; 76 this.includeActorTakedowns = this.vals.includeActorTakedowns; 77 this.include3pBlocks = this.vals.include3pBlocks; 78 } 79 // Convenience with use with dataplane.getActors cache control 80 get skipCacheForViewer() { 81 if (!this.viewer) return undefined; 82 return [this.viewer]; 83 } 84 copy<V extends Partial<HydrateCtxVals>>(vals?: V): HydrateCtx & V { 85 return new HydrateCtx({ ...this.vals, ...vals }) as HydrateCtx & V; 86 } 87} 88 89export type HydrateCtxVals = { 90 labelers: ParsedLabelers; 91 viewer: string | null; 92 includeTakedowns?: boolean; 93 includeActorTakedowns?: boolean; 94 include3pBlocks?: boolean; 95}; 96 97export type HydrationState = { 98 ctx?: HydrateCtx; 99 actors?: Actors; 100 profileViewers?: ProfileViewerStates; 101 profileAggs?: ProfileAggs; 102 posts?: Posts; 103 replies?: Replies; 104 postAggs?: PostAggs; 105 replyAggs?: ReplyAggs; 106 postViewers?: PostViewerStates; 107 threadContexts?: ThreadContexts; 108 sounds?: Sounds; 109 soundAggs?: SoundAggs; 110 stories?: Stories; 111 actorStoryRefs?: ActorStoryRefs; 112 113 postBlocks?: PostBlocks; 114 reposts?: Reposts; 115 follows?: Follows; 116 followBlocks?: FollowBlocks; 117 likes?: Likes; 118 likeBlocks?: LikeBlocks; 119 labels?: Labels; 120 feedgens?: FeedGens; 121 feedgenViewers?: FeedGenViewerStates; 122 feedgenAggs?: FeedGenAggs; 123 labelers?: Labelers; 124 labelerViewers?: LabelerViewerStates; 125 labelerAggs?: LabelerAggs; 126 knownFollowers?: KnownFollowersStates; 127 knownInteractions?: KnownInteractionsStates; 128 activitySubscriptions?: ActivitySubscriptionStates; 129 bidirectionalBlocks?: BidirectionalBlocks; 130}; 131 132export type PostBlock = { embed: boolean; parent: boolean; root: boolean }; 133export type PostBlocks = HydrationMap<PostBlock>; 134type PostBlockPairs = { 135 embed?: RelationshipPair; 136 parent?: RelationshipPair; 137 root?: RelationshipPair; 138}; 139 140export type LikeBlock = boolean; 141export type LikeBlocks = HydrationMap<LikeBlock>; 142 143export type FollowBlock = boolean; 144export type FollowBlocks = HydrationMap<FollowBlock>; 145 146export type BidirectionalBlocks = HydrationMap<HydrationMap<boolean>>; 147export type ActorStoryRefs = HydrationMap<ItemRef[]>; 148 149export class Hydrator { 150 actor: ActorHydrator; 151 feed: FeedHydrator; 152 graph: GraphHydrator; 153 story: StoryHydrator; 154 label: LabelHydrator; 155 serviceLabelers: Set<string>; 156 157 constructor( 158 public dataplane: DataPlane, 159 serviceLabelers: string[], 160 ) { 161 this.actor = new ActorHydrator(dataplane); 162 this.feed = new FeedHydrator(dataplane); 163 this.graph = new GraphHydrator(dataplane); 164 this.story = new StoryHydrator(dataplane); 165 this.label = new LabelHydrator(dataplane); 166 this.serviceLabelers = new Set(serviceLabelers); 167 } 168 169 // so.sprk.actor.defs#profileView 170 // - profile viewer 171 // Note: builds on the naive profile viewer hydrator and removes references to lists that have been deleted 172 async hydrateProfileViewers( 173 dids: string[], 174 ctx: HydrateCtx, 175 ): Promise<HydrationState> { 176 const viewer = ctx.viewer; 177 if (!viewer) return {}; 178 const profileViewers = await this.actor.getProfileViewerStatesNaive( 179 dids, 180 viewer, 181 ); 182 183 return { 184 profileViewers, 185 ctx, 186 }; 187 } 188 189 // so.sprk.actor.defs#profileView 190 // - profile 191 // - list basic 192 async hydrateProfiles( 193 dids: string[], 194 ctx: HydrateCtx, 195 opts: { 196 includeStories?: boolean; 197 } = {}, 198 ): Promise<HydrationState> { 199 const includeStories = opts.includeStories ?? true; 200 const includeTakedowns = ctx.includeTakedowns || ctx.includeActorTakedowns; 201 const [actors, labels, profileViewersState, actorStories] = await Promise 202 .all([ 203 this.actor.getActors(dids, { 204 includeTakedowns, 205 }), 206 this.label.getLabelsForSubjects( 207 labelSubjectsForDid(dids), 208 ctx.labelers, 209 ), 210 this.hydrateProfileViewers(dids, ctx), 211 includeStories 212 ? this.story.getActorStories(dids) 213 : Promise.resolve(new HydrationMap<ItemRef[]>()), 214 ]); 215 let actorStoryRefs: ActorStoryRefs | undefined; 216 let storyState: HydrationState = {}; 217 if (includeStories) { 218 actorStoryRefs = actorStories; 219 const storyUris = new Set<string>(); 220 for (const [_did, stories] of actorStories) { 221 if (!stories) continue; 222 for (const story of stories) { 223 storyUris.add(story.uri); 224 } 225 } 226 if (storyUris.size > 0) { 227 storyState = await this.hydrateStories(Array.from(storyUris), ctx); 228 } 229 } 230 if (!includeTakedowns) { 231 actionTakedownLabels(dids, actors, labels); 232 } 233 return mergeManyStates(profileViewersState ?? {}, storyState, { 234 actors, 235 labels, 236 actorStoryRefs, 237 ctx, 238 }); 239 } 240 241 // so.sprk.actor.defs#profileViewBasic 242 // - profile basic 243 // - profile 244 // - list basic 245 hydrateProfilesBasic( 246 dids: string[], 247 ctx: HydrateCtx, 248 ): Promise<HydrationState> { 249 return this.hydrateProfiles(dids, ctx, { includeStories: false }); 250 } 251 252 // so.sprk.actor.defs#profileViewDetailed 253 // - profile detailed 254 // - profile 255 // - list basic 256 // - starterpack 257 // - profile 258 // - list basic 259 // - labels 260 async hydrateProfilesDetailed( 261 dids: string[], 262 ctx: HydrateCtx, 263 ): Promise<HydrationState> { 264 let knownFollowers: KnownFollowersStates = new HydrationMap(); 265 try { 266 knownFollowers = await this.actor.getKnownFollowers(dids, ctx.viewer); 267 } catch (err) { 268 console.error( 269 "Failed to get known followers for profiles", 270 { err }, 271 ); 272 } 273 274 const subjectsToKnownFollowersMap = Array.from( 275 knownFollowers.keys(), 276 ).reduce((acc, did) => { 277 const known = knownFollowers.get(did); 278 if (known) { 279 acc.set(did, known.followers); 280 } 281 return acc; 282 }, new Map<string, string[]>()); 283 const allKnownFollowerDids = Array.from(knownFollowers.values()) 284 .filter(Boolean) 285 .flatMap((f) => f!.followers); 286 const [state, knownFollowerState, profileAggs, bidirectionalBlocks] = 287 await Promise.all([ 288 this.hydrateProfiles(dids, ctx), 289 allKnownFollowerDids.length > 0 290 ? this.hydrateProfilesBasic(allKnownFollowerDids, ctx) 291 : Promise.resolve<HydrationState>({}), 292 this.actor.getProfileAggregates(dids), 293 this.hydrateBidirectionalBlocks(subjectsToKnownFollowersMap), 294 ]); 295 return mergeManyStates(state, knownFollowerState, { 296 profileAggs, 297 knownFollowers, 298 ctx, 299 bidirectionalBlocks, 300 }); 301 } 302 303 // so.sprk.feed.defs#postView 304 // - post 305 // - profile 306 // - list basic 307 // - list 308 // - profile 309 // - list basic 310 // - feedgen 311 // - profile 312 // - list basic 313 // - mod service 314 // - profile 315 // - list basic 316 async hydratePosts( 317 refs: ItemRef[], 318 ctx: HydrateCtx, 319 state: HydrationState = {}, 320 ): Promise<HydrationState> { 321 const postRefs = refs.filter((ref) => 322 new AtUri(ref.uri).collection === so.sprk.feed.post.$type 323 ); 324 const replyRefs = refs.filter((ref) => 325 new AtUri(ref.uri).collection === so.sprk.feed.reply.$type 326 ); 327 328 const allUris = refs.map((ref) => ref.uri); 329 330 state.posts ??= new HydrationMap<Post>(); 331 state.replies ??= new HydrationMap<Reply>(); 332 333 const [postsLayer0, repliesLayer0] = await Promise.all([ 334 this.feed.getPosts( 335 postRefs.map((ref) => ref.uri), 336 ctx.includeTakedowns, 337 state.posts, 338 ), 339 this.feed.getReplies( 340 replyRefs.map((ref) => ref.uri), 341 ctx.includeTakedowns, 342 state.replies, 343 ), 344 ]); 345 346 postsLayer0.forEach((post, uri) => { 347 state.posts!.set(uri, post); 348 }); 349 repliesLayer0.forEach((reply, uri) => { 350 state.replies!.set(uri, reply); 351 }); 352 353 const additionalRootUris = rootUrisFromReplies(repliesLayer0).filter( 354 (uri) => !state.posts!.has(uri), 355 ); 356 357 const postsLayer1 = await this.feed.getPosts( 358 additionalRootUris, 359 ctx.includeTakedowns, 360 state.posts, 361 ); 362 postsLayer1.forEach((post, uri) => { 363 state.posts!.set(uri, post); 364 }); 365 366 const threadRefs: ThreadRef[] = []; 367 for (const ref of refs) { 368 const collection = new AtUri(ref.uri).collection; 369 if (collection === so.sprk.feed.post.$type) { 370 const post = state.posts!.get(ref.uri); 371 if (!post) continue; 372 threadRefs.push({ 373 uri: ref.uri, 374 cid: post.cid, 375 threadRoot: ref.uri, 376 }); 377 } else if (collection === so.sprk.feed.reply.$type) { 378 const reply = state.replies!.get(ref.uri); 379 if (!reply) continue; 380 const rootUri = reply.record.reply?.root.uri ?? ref.uri; 381 threadRefs.push({ 382 uri: ref.uri, 383 cid: reply.cid, 384 threadRoot: rootUri, 385 }); 386 } 387 } 388 389 const authorUris = Array.from( 390 new Set<string>([ 391 ...state.posts!.keys(), 392 ...state.replies!.keys(), 393 ]), 394 ); 395 const authorDids = authorUris.map(didFromUri); 396 397 const soundUris = new Set<string>(); 398 for (const post of state.posts!.values()) { 399 if (post && post.record.sound) { 400 soundUris.add(post.record.sound.uri); 401 } 402 } 403 404 // Fetch known interactions first so we can batch all profile hydration 405 const knownInteractions = await this.feed.getKnownInteractions( 406 refs, 407 ctx.viewer, 408 ); 409 410 // Gather DIDs from known interactions for profile hydration 411 const knownInteractionDids = new Set<string>(); 412 for (const interactions of knownInteractions.values()) { 413 if (interactions) { 414 for (const interaction of interactions) { 415 knownInteractionDids.add(interaction.by); 416 } 417 } 418 } 419 420 // Combine all DIDs for a single batched profile hydration call 421 const allProfileDids = Array.from( 422 new Set([...authorDids, ...knownInteractionDids]), 423 ); 424 425 // Build map for bidirectional block checking between post authors and interactors 426 const subjectsToInteractorsMap = new Map<string, string[]>(); 427 for (const [uri, interactions] of knownInteractions) { 428 if (interactions && interactions.length > 0) { 429 subjectsToInteractorsMap.set( 430 didFromUri(uri), 431 interactions.map((i) => i.by), 432 ); 433 } 434 } 435 436 const [ 437 postAggs, 438 replyAggs, 439 postViewers, 440 labels, 441 postBlocks, 442 profileState, 443 threadContexts, 444 soundState, 445 interactionBlocks, 446 ] = await Promise.all([ 447 this.feed.getPostAggregates(postRefs), 448 this.feed.getReplyAggregates(replyRefs), 449 ctx.viewer 450 ? this.feed.getPostViewerStates(threadRefs, ctx.viewer) 451 : Promise.resolve<PostViewerStates | undefined>(undefined), 452 this.label.getLabelsForSubjects(allUris, ctx.labelers), 453 this.hydratePostBlocks(state.posts!, state.replies!), 454 this.hydrateProfilesBasic(allProfileDids, ctx), 455 this.feed.getThreadContexts(threadRefs), 456 this.hydrateSounds(Array.from(soundUris), ctx), 457 this.hydrateBidirectionalBlocks(subjectsToInteractorsMap), 458 ]); 459 460 return mergeManyStates( 461 profileState, 462 soundState, 463 { 464 posts: state.posts, 465 replies: state.replies, 466 postAggs, 467 replyAggs, 468 postViewers, 469 postBlocks, 470 labels, 471 threadContexts, 472 knownInteractions, 473 ctx, 474 bidirectionalBlocks: interactionBlocks, 475 }, 476 ); 477 } 478 479 private async hydratePostBlocks( 480 posts: Posts, 481 replies: Replies, 482 ): Promise<PostBlocks> { 483 const postBlocks = new HydrationMap<PostBlock>(); 484 const postBlocksPairs = new Map<string, PostBlockPairs>(); 485 const relationships: RelationshipPair[] = []; 486 487 for (const [uri, item] of posts) { 488 if (!item) continue; 489 postBlocksPairs.set(uri, {}); 490 } 491 492 for (const [uri, item] of replies) { 493 if (!item) continue; 494 const reply = item.record; 495 const creator = didFromUri(uri); 496 const pairs = postBlocksPairs.get(uri) ?? {}; 497 postBlocksPairs.set(uri, pairs); 498 499 const parentUri = reply.reply?.parent.uri; 500 const parentDid = parentUri && didFromUri(parentUri); 501 if (parentDid && parentDid !== creator) { 502 const pair: RelationshipPair = [creator, parentDid]; 503 relationships.push(pair); 504 pairs.parent = pair; 505 } 506 507 const rootUri = reply.reply?.root.uri; 508 const rootDid = rootUri && didFromUri(rootUri); 509 if (rootDid && rootDid !== creator) { 510 const pair: RelationshipPair = [creator, rootDid]; 511 relationships.push(pair); 512 pairs.root = pair; 513 } 514 } 515 516 const blocks = await this.hydrateBidirectionalBlocks( 517 pairsToMap(relationships), 518 ); 519 520 for (const [uri, { embed, parent, root }] of postBlocksPairs) { 521 postBlocks.set(uri, { 522 embed: !!embed && !!isBlocked(blocks, embed), 523 parent: !!parent && !!isBlocked(blocks, parent), 524 root: !!root && !!isBlocked(blocks, root), 525 }); 526 } 527 528 return postBlocks; 529 } 530 531 // so.sprk.feed.defs#feedViewPost 532 // - post (+ replies w/ reply parent author) 533 // - profile 534 // - list basic 535 // - list 536 // - profile 537 // - list basic 538 // - feedgen 539 // - profile 540 // - list basic 541 // - repost 542 // - profile 543 // - list basic 544 // - post 545 // - ... 546 async hydrateFeedItems( 547 items: FeedItem[], 548 ctx: HydrateCtx, 549 ): Promise<HydrationState> { 550 const postUris: string[] = []; 551 const replyUris: string[] = []; 552 const replyRefs: ItemRef[] = []; 553 554 for (const { post } of items) { 555 const collection = new AtUri(post.uri).collection; 556 if (collection === so.sprk.feed.post.$type) { 557 postUris.push(post.uri); 558 } else if (collection === so.sprk.feed.reply.$type) { 559 replyUris.push(post.uri); 560 replyRefs.push(post); 561 } 562 } 563 564 const [posts, replies] = await Promise.all([ 565 this.feed.getPosts(postUris, ctx.includeTakedowns), 566 this.feed.getReplies(replyUris, ctx.includeTakedowns), 567 ]); 568 569 const postAndReplyRefsMap = new Map<string, ItemRef>(); 570 items.forEach((item) => { 571 postAndReplyRefsMap.set(item.post.uri, item.post); 572 }); 573 574 replies.forEach((reply) => { 575 if (!reply?.record.reply) return; 576 const { root, parent } = reply.record.reply; 577 postAndReplyRefsMap.set(root.uri, root); 578 postAndReplyRefsMap.set(parent.uri, parent); 579 }); 580 581 const postAndReplyRefs = Array.from(postAndReplyRefsMap.values()); 582 583 const postState = await this.hydratePosts(postAndReplyRefs, ctx, { 584 posts, 585 replies, 586 }); 587 588 return mergeStates(postState, { 589 ctx, 590 }); 591 } 592 593 // so.sprk.feed.defs#threadViewReply 594 // - reply 595 // - profile 596 // - list basic 597 // - list 598 // - profile 599 // - list basic 600 // - feedgen 601 // - profile 602 // - list basic 603 hydrateThreadPosts( 604 refs: ItemRef[], 605 ctx: HydrateCtx, 606 ): Promise<HydrationState> { 607 return this.hydratePosts(refs, ctx); 608 } 609 610 // so.sprk.story.defs#storyView 611 // - story 612 // - profile 613 // - list basic 614 // - embeds (story record) 615 // - mention 616 // - profile 617 // - list basic 618 // - post 619 // - postView / blockedPost / notFoundPost 620 // - profile 621 // - list basic 622 async hydrateStories( 623 uris: string[], 624 ctx: HydrateCtx, 625 ): Promise<HydrationState> { 626 const stories = await this.story.getStories(uris, ctx.includeTakedowns); 627 628 const storyAuthorDids = uris.map(didFromUri); 629 const embedPostUris = new Set<string>(); 630 const mentionDids = new Set<string>(); 631 632 for (const story of stories.values()) { 633 if (!story) continue; 634 const record = story.record as StoryRecord; 635 for (const embed of record.embeds ?? []) { 636 if ( 637 embed && 638 typeof embed === "object" && 639 "$type" in embed && 640 (embed as { $type?: string }).$type === "so.sprk.embed.record" 641 ) { 642 const postUri = (embed as { post?: { uri?: string } }).post?.uri; 643 if (postUri) { 644 embedPostUris.add(postUri); 645 } 646 } else if ( 647 embed && 648 typeof embed === "object" && 649 "$type" in embed && 650 (embed as { $type?: string }).$type === "so.sprk.embed.mention" 651 ) { 652 const did = (embed as { did?: string }).did; 653 if (did) { 654 mentionDids.add(did); 655 } 656 } 657 } 658 } 659 660 const postUris: string[] = []; 661 for (const postUri of embedPostUris) { 662 try { 663 didFromUri(postUri); 664 postUris.push(postUri); 665 } catch { 666 continue; 667 } 668 } 669 const profileDids = Array.from( 670 new Set<string>([ 671 ...storyAuthorDids, 672 ...mentionDids, 673 ]), 674 ); 675 676 const [postState, profileState] = await Promise.all([ 677 postUris.length > 0 678 ? this.hydratePosts(postUris.map((uri) => ({ uri })), ctx) 679 : Promise.resolve<HydrationState>({}), 680 this.hydrateProfilesBasic(profileDids, ctx), 681 ]); 682 683 return mergeManyStates(profileState, postState, { stories, ctx }); 684 } 685 686 // so.sprk.feed.defs#generatorView 687 // - feedgen 688 // - profile 689 // - list basic 690 async hydrateFeedGens( 691 uris: string[], // @TODO any way to get refs here? 692 ctx: HydrateCtx, 693 ): Promise<HydrationState> { 694 const [feedgens, feedgenAggs, feedgenViewers, profileState, labels] = 695 await Promise 696 .all([ 697 this.feed.getFeedGens(uris, ctx.includeTakedowns), 698 this.feed.getFeedGenAggregates( 699 uris.map((uri) => ({ uri })), 700 ), 701 ctx.viewer 702 ? this.feed.getFeedGenViewerStates(uris, ctx.viewer) 703 : undefined, 704 this.hydrateProfiles(uris.map(didFromUri), ctx), 705 this.label.getLabelsForSubjects(uris, ctx.labelers), 706 ]); 707 return mergeStates(profileState, { 708 feedgens, 709 feedgenAggs, 710 feedgenViewers, 711 labels, 712 ctx, 713 }); 714 } 715 716 // so.sprk.feed.getLikes#like 717 // - like 718 // - profile 719 // - list basic 720 async hydrateLikes( 721 authorDid: string, 722 uris: string[], 723 ctx: HydrateCtx, 724 ): Promise<HydrationState> { 725 const [likes, profileState] = await Promise.all([ 726 this.feed.getLikes(uris, ctx.includeTakedowns), 727 this.hydrateProfilesBasic(uris.map(didFromUri), ctx), 728 ]); 729 730 const pairs: RelationshipPair[] = []; 731 for (const [uri, like] of likes) { 732 if (like) { 733 pairs.push([authorDid, didFromUri(uri)]); 734 } 735 } 736 const blocks = await this.hydrateBidirectionalBlocks( 737 pairsToMap(pairs), 738 ); 739 const likeBlocks = new HydrationMap<LikeBlock>(); 740 for (const [uri, like] of likes) { 741 if (like) { 742 likeBlocks.set(uri, isBlocked(blocks, [authorDid, didFromUri(uri)])); 743 } else { 744 likeBlocks.set(uri, null); 745 } 746 } 747 748 return mergeStates(profileState, { likes, likeBlocks, ctx }); 749 } 750 751 // so.sprk.feed.getRepostedBy#repostedBy 752 // - repost 753 // - profile 754 // - list basic 755 async hydrateReposts(uris: string[], ctx: HydrateCtx) { 756 const [reposts, profileState] = await Promise.all([ 757 this.feed.getReposts(uris, ctx.includeTakedowns), 758 this.hydrateProfilesBasic(uris.map(didFromUri), ctx), 759 ]); 760 return mergeStates(profileState, { reposts, ctx }); 761 } 762 763 // so.sprk.notification.listNotifications#notification 764 // - notification 765 // - profile 766 // - list basic` 767 async hydrateNotifications( 768 notifs: Notification[], 769 ctx: HydrateCtx, 770 ): Promise<HydrationState> { 771 const uris = notifs.map((notif) => notif.uri); 772 const collections = urisByCollection(uris); 773 const postUris = collections.get(so.sprk.feed.post.$type) ?? []; 774 const replyUris = collections.get(so.sprk.feed.reply.$type) ?? []; 775 const likeUris = collections.get(so.sprk.feed.like.$type) ?? []; 776 const repostUris = collections.get(so.sprk.feed.repost.$type) ?? []; 777 const followUris = collections.get(so.sprk.graph.follow.$type) ?? []; 778 779 // Collect subject URIs for like/repost/reply notifications to hydrate their content 780 const subjectPostUris: string[] = []; 781 const subjectReplyUris: string[] = []; 782 for (const notif of notifs) { 783 if ( 784 notif.reasonSubject && 785 (notif.reason === "like" || 786 notif.reason === "repost" || 787 notif.reason === "reply") 788 ) { 789 const subjectUri = new AtUri(notif.reasonSubject); 790 if (subjectUri.collection === so.sprk.feed.post.$type) { 791 subjectPostUris.push(notif.reasonSubject); 792 } else if (subjectUri.collection === so.sprk.feed.reply.$type) { 793 subjectReplyUris.push(notif.reasonSubject); 794 } 795 } 796 } 797 798 const [ 799 posts, 800 replies, 801 likes, 802 reposts, 803 follows, 804 labels, 805 profileState, 806 subjectPosts, 807 subjectReplies, 808 ] = await Promise.all([ 809 this.feed.getPosts(postUris), // reason: mention, quote 810 this.feed.getReplies(replyUris), // reason: reply 811 this.feed.getLikes(likeUris), // reason: like 812 this.feed.getReposts(repostUris), // reason: repost 813 this.graph.getFollows(followUris), // reason: follow 814 this.label.getLabelsForSubjects(uris, ctx.labelers), 815 this.hydrateProfilesBasic(uris.map(didFromUri), ctx), 816 this.feed.getPosts(subjectPostUris), // subjects of likes/reposts 817 this.feed.getReplies(subjectReplyUris), // subjects of likes/reposts 818 ]); 819 const viewerRootPostUris = new Set<string>(); 820 for (const notif of notifs) { 821 if (notif.reason === "reply") { 822 // Check replies map for reply notifications 823 const reply = replies.get(notif.uri); 824 if (reply) { 825 const rootUri = reply.record.reply?.root.uri; 826 if (rootUri && didFromUri(rootUri) === ctx.viewer) { 827 viewerRootPostUris.add(rootUri); 828 } 829 } 830 } 831 } 832 actionTakedownLabels(postUris, posts, labels); 833 actionTakedownLabels(replyUris, replies, labels); 834 actionTakedownLabels(subjectPostUris, subjectPosts, labels); 835 actionTakedownLabels(subjectReplyUris, subjectReplies, labels); 836 return mergeStates(profileState, { 837 posts: mergeMaps(posts, subjectPosts), 838 replies: mergeMaps(replies, subjectReplies), 839 likes, 840 reposts, 841 follows, 842 labels, 843 ctx, 844 }); 845 } 846 847 // so.sprk.sound.defs#audioView 848 // - sound 849 // - profile 850 // - list basic 851 async hydrateSounds( 852 uris: string[], 853 ctx: HydrateCtx, 854 ): Promise<HydrationState> { 855 const [sounds, soundAggs, profileState] = await Promise.all([ 856 this.feed.getSounds(uris, ctx.includeTakedowns), 857 this.feed.getSoundAggregates(uris.map((uri) => ({ uri }))), 858 this.hydrateProfilesBasic(uris.map(didFromUri), ctx), 859 ]); 860 return mergeStates(profileState, { sounds, soundAggs, ctx }); 861 } 862 863 // provides partial hydration state within getFollows / getFollowers, mainly for applying rules 864 async hydrateFollows( 865 uris: string[], 866 ctx: HydrateCtx, 867 ): Promise<HydrationState> { 868 const follows = await this.graph.getFollows(uris, ctx.includeTakedowns); 869 const pairs: RelationshipPair[] = []; 870 for (const [uri, follow] of follows) { 871 if (follow) { 872 pairs.push([didFromUri(uri), follow.record.subject]); 873 } 874 } 875 const blocks = await this.hydrateBidirectionalBlocks( 876 pairsToMap(pairs), 877 ); 878 const followBlocks = new HydrationMap<FollowBlock>(); 879 for (const [uri, follow] of follows) { 880 if (follow) { 881 followBlocks.set( 882 uri, 883 isBlocked(blocks, [didFromUri(uri), follow.record.subject]), 884 ); 885 } else { 886 followBlocks.set(uri, null); 887 } 888 } 889 return { follows, followBlocks }; 890 } 891 892 async hydrateBidirectionalBlocks( 893 didMap: Map<string, string[]>, 894 ): Promise<BidirectionalBlocks> { 895 const pairs: RelationshipPair[] = []; 896 for (const [source, targets] of didMap) { 897 for (const target of targets) { 898 pairs.push([source, target]); 899 } 900 } 901 902 const blocks = await this.graph.getBidirectionalBlocks(pairs); 903 904 const result: BidirectionalBlocks = new HydrationMap< 905 HydrationMap<boolean> 906 >(); 907 for (const [source, targets] of didMap) { 908 const didBlocks = new HydrationMap<boolean>(); 909 for (const target of targets) { 910 const block = blocks.get(source, target); 911 912 const blockEntry: BlockEntry = { 913 blockUri: block?.blockUri, 914 }; 915 916 didBlocks.set( 917 target, 918 !!blockEntry.blockUri, 919 ); 920 } 921 result.set(source, didBlocks); 922 } 923 924 return result; 925 } 926 927 // so.sprk.labeler.def#labelerViewDetailed 928 // - labeler 929 // - profile 930 // - list basic 931 async hydrateLabelers( 932 dids: string[], 933 ctx: HydrateCtx, 934 ): Promise<HydrationState> { 935 const [labelers, labelerAggs, labelerViewers, profileState] = await Promise 936 .all([ 937 this.label.getLabelers(dids, ctx.includeTakedowns), 938 this.label.getLabelerAggregates(dids, ctx.viewer), 939 ctx.viewer 940 ? this.label.getLabelerViewerStates(dids, ctx.viewer) 941 : undefined, 942 this.hydrateProfiles(dids, ctx), 943 ]); 944 actionTakedownLabels(dids, labelers, profileState.labels ?? new Labels()); 945 return mergeStates(profileState, { 946 labelers, 947 labelerAggs, 948 labelerViewers, 949 ctx, 950 }); 951 } 952 953 // ad-hoc record hydration 954 // in com.atproto.repo.getRecord 955 async getRecord(uri: string, includeTakedowns = false) { 956 const parsed = new AtUri(uri); 957 const collection = parsed.collection; 958 if (collection === so.sprk.feed.post.$type) { 959 return ( 960 (await this.feed.getPosts([uri], includeTakedowns)).get(uri) ?? 961 undefined 962 ); 963 } else if (collection === so.sprk.feed.reply.$type) { 964 return ( 965 (await this.feed.getReplies([uri], includeTakedowns)).get(uri) ?? 966 undefined 967 ); 968 } else if (collection === so.sprk.feed.repost.$type) { 969 return ( 970 (await this.feed.getReposts([uri], includeTakedowns)).get(uri) ?? 971 undefined 972 ); 973 } else if (collection === so.sprk.feed.like.$type) { 974 return ( 975 (await this.feed.getLikes([uri], includeTakedowns)).get(uri) ?? 976 undefined 977 ); 978 } else if (collection === so.sprk.sound.audio.$type) { 979 return ( 980 (await this.feed.getSounds([uri], includeTakedowns)).get(uri) ?? 981 undefined 982 ); 983 } else if (collection === so.sprk.graph.follow.$type) { 984 return ( 985 (await this.graph.getFollows([uri], includeTakedowns)).get(uri) ?? 986 undefined 987 ); 988 } else if (collection === so.sprk.graph.block.$type) { 989 return ( 990 (await this.graph.getBlocks([uri], includeTakedowns)).get(uri) ?? 991 undefined 992 ); 993 } else if (collection === so.sprk.feed.generator.$type) { 994 return ( 995 (await this.feed.getFeedGens([uri], includeTakedowns)).get(uri) ?? 996 undefined 997 ); 998 } else if (collection === so.sprk.labeler.service.$type) { 999 if (parsed.rkey !== "self") return; 1000 const did = parsed.hostname; 1001 return ( 1002 (await this.label.getLabelers([did], includeTakedowns)).get(did) ?? 1003 undefined 1004 ); 1005 } else if (collection === so.sprk.actor.profile.$type) { 1006 const did = parsed.hostname; 1007 const actor = ( 1008 await this.actor.getActors([did], { includeTakedowns }) 1009 ).get(did); 1010 if (!actor?.profile || !actor?.profileCid) return undefined; 1011 const recordInfo: RecordInfo<ProfileRecord> = { 1012 record: actor.profile, 1013 cid: actor.profileCid, 1014 sortedAt: actor.sortedAt ?? new Date(0), 1015 indexedAt: actor.indexedAt ?? new Date(0), 1016 takedownRef: actor.profileTakedownRef, 1017 }; 1018 1019 return recordInfo; 1020 } else if (collection === so.sprk.story.post.$type) { 1021 // Get story records through dataplane 1022 const res = await this.dataplane.records.getStoryRecords([uri]); 1023 const storyRecord = res.records[0]; 1024 1025 if (!storyRecord || !storyRecord.cid) return undefined; 1026 1027 return parseRecord<StoryRecord>( 1028 so.sprk.story.post.main, 1029 storyRecord, 1030 includeTakedowns, 1031 ); 1032 } 1033 } 1034 1035 async createContext(vals: HydrateCtxVals) { 1036 // ensures we're only apply labelers that exist and are not taken down 1037 const labelers = vals.labelers.dids; 1038 const nonServiceLabelers = labelers.filter( 1039 (did) => !this.serviceLabelers.has(did), 1040 ); 1041 const labelerActors = await this.actor.getActors(nonServiceLabelers, { 1042 includeTakedowns: vals.includeTakedowns, 1043 }); 1044 const availableDids = labelers.filter( 1045 (did) => this.serviceLabelers.has(did) || !!labelerActors.get(did), 1046 ); 1047 const availableLabelers = { 1048 dids: availableDids, 1049 redact: vals.labelers.redact, 1050 }; 1051 return new HydrateCtx({ 1052 labelers: availableLabelers, 1053 viewer: vals.viewer, 1054 includeTakedowns: vals.includeTakedowns, 1055 include3pBlocks: vals.include3pBlocks, 1056 }); 1057 } 1058 1059 async resolveUri(uriStr: string) { 1060 const uri = new AtUri(uriStr); 1061 const [did] = await this.actor.getDids([uri.host]); 1062 if (!did) return uriStr; 1063 uri.host = did; 1064 return uri.toString(); 1065 } 1066} 1067 1068// service refs may look like "did:plc:example#service_id". we want to extract the did part "did:plc:example". 1069const serviceRefToDid = (serviceRef: string) => { 1070 const idx = serviceRef.indexOf("#"); 1071 return idx !== -1 ? serviceRef.slice(0, idx) : serviceRef; 1072}; 1073 1074const labelSubjectsForDid = (dids: string[]) => { 1075 return [ 1076 ...dids, 1077 ...dids.map((did) => 1078 AtUri.make(did, so.sprk.actor.profile.$type, "self").toString() 1079 ), 1080 ]; 1081}; 1082 1083const rootUrisFromReplies = (replies: Replies): string[] => { 1084 const uris = new Set<string>(); 1085 for (const item of replies.values()) { 1086 const rootUri = item && rootUriFromReply(item); 1087 if (rootUri) { 1088 uris.add(rootUri); 1089 } 1090 } 1091 return Array.from(uris); 1092}; 1093 1094const rootUriFromReply = (reply: Reply): string | undefined => { 1095 return reply.record.reply?.root.uri; 1096}; 1097 1098const isBlocked = (blocks: BidirectionalBlocks, [a, b]: RelationshipPair) => { 1099 return blocks.get(a)?.get(b) ?? false; 1100}; 1101 1102const pairsToMap = (pairs: RelationshipPair[]): Map<string, string[]> => { 1103 const map = new Map<string, string[]>(); 1104 for (const [a, b] of pairs) { 1105 const list = map.get(a) ?? []; 1106 list.push(b); 1107 map.set(a, list); 1108 } 1109 return map; 1110}; 1111 1112export const mergeStates = ( 1113 stateA: HydrationState, 1114 stateB: HydrationState, 1115): HydrationState => { 1116 assert( 1117 !stateA.ctx?.viewer || 1118 !stateB.ctx?.viewer || 1119 stateA.ctx?.viewer === stateB.ctx?.viewer, 1120 "incompatible viewers", 1121 ); 1122 return { 1123 ctx: stateA.ctx ?? stateB.ctx, 1124 actors: mergeMaps(stateA.actors, stateB.actors), 1125 profileAggs: mergeMaps(stateA.profileAggs, stateB.profileAggs), 1126 profileViewers: mergeMaps(stateA.profileViewers, stateB.profileViewers), 1127 posts: mergeMaps(stateA.posts, stateB.posts), 1128 replies: mergeMaps(stateA.replies, stateB.replies), 1129 postAggs: mergeMaps(stateA.postAggs, stateB.postAggs), 1130 replyAggs: mergeMaps(stateA.replyAggs, stateB.replyAggs), 1131 postViewers: mergeMaps(stateA.postViewers, stateB.postViewers), 1132 threadContexts: mergeMaps(stateA.threadContexts, stateB.threadContexts), 1133 sounds: mergeMaps(stateA.sounds, stateB.sounds), 1134 soundAggs: mergeMaps(stateA.soundAggs, stateB.soundAggs), 1135 stories: mergeMaps(stateA.stories, stateB.stories), 1136 actorStoryRefs: mergeMaps(stateA.actorStoryRefs, stateB.actorStoryRefs), 1137 postBlocks: mergeMaps(stateA.postBlocks, stateB.postBlocks), 1138 reposts: mergeMaps(stateA.reposts, stateB.reposts), 1139 follows: mergeMaps(stateA.follows, stateB.follows), 1140 followBlocks: mergeMaps(stateA.followBlocks, stateB.followBlocks), 1141 likes: mergeMaps(stateA.likes, stateB.likes), 1142 likeBlocks: mergeMaps(stateA.likeBlocks, stateB.likeBlocks), 1143 labels: mergeMaps(stateA.labels, stateB.labels), 1144 feedgens: mergeMaps(stateA.feedgens, stateB.feedgens), 1145 feedgenAggs: mergeMaps(stateA.feedgenAggs, stateB.feedgenAggs), 1146 feedgenViewers: mergeMaps(stateA.feedgenViewers, stateB.feedgenViewers), 1147 labelers: mergeMaps(stateA.labelers, stateB.labelers), 1148 labelerViewers: mergeMaps(stateA.labelerViewers, stateB.labelerViewers), 1149 labelerAggs: mergeMaps(stateA.labelerAggs, stateB.labelerAggs), 1150 knownFollowers: mergeMaps(stateA.knownFollowers, stateB.knownFollowers), 1151 knownInteractions: mergeMaps( 1152 stateA.knownInteractions, 1153 stateB.knownInteractions, 1154 ), 1155 bidirectionalBlocks: mergeMaps( 1156 stateA.bidirectionalBlocks, 1157 stateB.bidirectionalBlocks, 1158 ), 1159 }; 1160}; 1161 1162export const mergeManyStates = (...states: HydrationState[]) => { 1163 return states.reduce(mergeStates, {} as HydrationState); 1164}; 1165 1166const actionTakedownLabels = <T>( 1167 keys: string[], 1168 hydrationMap: HydrationMap<T>, 1169 labels: Labels, 1170) => { 1171 for (const key of keys) { 1172 if (labels.get(key)?.isTakendown) { 1173 hydrationMap.set(key, null); 1174 } 1175 } 1176};