[READ ONLY MIRROR] Spark Social AppView Server
github.com/sprksocial/server
atproto
deno
hono
lexicon
1import { assert } from "@std/assert";
2import { AtUri } from "@atp/syntax";
3import { DataPlane } from "../data-plane/index.ts";
4import * as so from "../lex/so.ts";
5import { uriToDid as didFromUri } from "../utils/uris.ts";
6import {
7 ActivitySubscriptionStates,
8 ActorHydrator,
9 Actors,
10 KnownFollowersStates,
11 ProfileAggs,
12 ProfileRecord,
13 ProfileViewerStates,
14} from "./actor.ts";
15import {
16 FeedGenAggs,
17 FeedGens,
18 FeedGenViewerStates,
19 FeedHydrator,
20 FeedItem,
21 KnownInteractionsStates,
22 Likes,
23 Post,
24 PostAggs,
25 Posts,
26 PostViewerStates,
27 Replies,
28 Reply,
29 ReplyAggs,
30 Reposts,
31 SoundAggs,
32 Sounds,
33 ThreadContexts,
34 ThreadRef,
35} from "./feed.ts";
36import { Stories, StoryHydrator, StoryRecord } from "./story.ts";
37
38import {
39 BlockEntry,
40 Follows,
41 GraphHydrator,
42 RelationshipPair,
43} from "./graph.ts";
44import {
45 HydrationMap,
46 ItemRef,
47 mergeMaps,
48 parseRecord,
49 RecordInfo,
50 urisByCollection,
51} from "./util.ts";
52
53import {
54 LabelerAggs,
55 Labelers,
56 LabelerViewerStates,
57 LabelHydrator,
58 Labels,
59} from "./label.ts";
60import { ParsedLabelers } from "../util.ts";
61import { Notification } from "../data-plane/routes/notifs.ts";
62
63export class HydrateCtx {
64 labelers: ParsedLabelers;
65 viewer: string | null;
66 includeTakedowns?: boolean;
67 includeActorTakedowns?: boolean;
68 include3pBlocks?: boolean;
69
70 constructor(private vals: HydrateCtxVals) {
71 this.labelers = this.vals.labelers;
72 this.viewer = this.vals.viewer !== null
73 ? serviceRefToDid(this.vals.viewer)
74 : null;
75 this.includeTakedowns = this.vals.includeTakedowns;
76 this.includeActorTakedowns = this.vals.includeActorTakedowns;
77 this.include3pBlocks = this.vals.include3pBlocks;
78 }
79 // Convenience with use with dataplane.getActors cache control
80 get skipCacheForViewer() {
81 if (!this.viewer) return undefined;
82 return [this.viewer];
83 }
84 copy<V extends Partial<HydrateCtxVals>>(vals?: V): HydrateCtx & V {
85 return new HydrateCtx({ ...this.vals, ...vals }) as HydrateCtx & V;
86 }
87}
88
89export type HydrateCtxVals = {
90 labelers: ParsedLabelers;
91 viewer: string | null;
92 includeTakedowns?: boolean;
93 includeActorTakedowns?: boolean;
94 include3pBlocks?: boolean;
95};
96
97export type HydrationState = {
98 ctx?: HydrateCtx;
99 actors?: Actors;
100 profileViewers?: ProfileViewerStates;
101 profileAggs?: ProfileAggs;
102 posts?: Posts;
103 replies?: Replies;
104 postAggs?: PostAggs;
105 replyAggs?: ReplyAggs;
106 postViewers?: PostViewerStates;
107 threadContexts?: ThreadContexts;
108 sounds?: Sounds;
109 soundAggs?: SoundAggs;
110 stories?: Stories;
111 actorStoryRefs?: ActorStoryRefs;
112
113 postBlocks?: PostBlocks;
114 reposts?: Reposts;
115 follows?: Follows;
116 followBlocks?: FollowBlocks;
117 likes?: Likes;
118 likeBlocks?: LikeBlocks;
119 labels?: Labels;
120 feedgens?: FeedGens;
121 feedgenViewers?: FeedGenViewerStates;
122 feedgenAggs?: FeedGenAggs;
123 labelers?: Labelers;
124 labelerViewers?: LabelerViewerStates;
125 labelerAggs?: LabelerAggs;
126 knownFollowers?: KnownFollowersStates;
127 knownInteractions?: KnownInteractionsStates;
128 activitySubscriptions?: ActivitySubscriptionStates;
129 bidirectionalBlocks?: BidirectionalBlocks;
130};
131
132export type PostBlock = { embed: boolean; parent: boolean; root: boolean };
133export type PostBlocks = HydrationMap<PostBlock>;
134type PostBlockPairs = {
135 embed?: RelationshipPair;
136 parent?: RelationshipPair;
137 root?: RelationshipPair;
138};
139
140export type LikeBlock = boolean;
141export type LikeBlocks = HydrationMap<LikeBlock>;
142
143export type FollowBlock = boolean;
144export type FollowBlocks = HydrationMap<FollowBlock>;
145
146export type BidirectionalBlocks = HydrationMap<HydrationMap<boolean>>;
147export type ActorStoryRefs = HydrationMap<ItemRef[]>;
148
149export class Hydrator {
150 actor: ActorHydrator;
151 feed: FeedHydrator;
152 graph: GraphHydrator;
153 story: StoryHydrator;
154 label: LabelHydrator;
155 serviceLabelers: Set<string>;
156
157 constructor(
158 public dataplane: DataPlane,
159 serviceLabelers: string[],
160 ) {
161 this.actor = new ActorHydrator(dataplane);
162 this.feed = new FeedHydrator(dataplane);
163 this.graph = new GraphHydrator(dataplane);
164 this.story = new StoryHydrator(dataplane);
165 this.label = new LabelHydrator(dataplane);
166 this.serviceLabelers = new Set(serviceLabelers);
167 }
168
169 // so.sprk.actor.defs#profileView
170 // - profile viewer
171 // Note: builds on the naive profile viewer hydrator and removes references to lists that have been deleted
172 async hydrateProfileViewers(
173 dids: string[],
174 ctx: HydrateCtx,
175 ): Promise<HydrationState> {
176 const viewer = ctx.viewer;
177 if (!viewer) return {};
178 const profileViewers = await this.actor.getProfileViewerStatesNaive(
179 dids,
180 viewer,
181 );
182
183 return {
184 profileViewers,
185 ctx,
186 };
187 }
188
189 // so.sprk.actor.defs#profileView
190 // - profile
191 // - list basic
192 async hydrateProfiles(
193 dids: string[],
194 ctx: HydrateCtx,
195 opts: {
196 includeStories?: boolean;
197 } = {},
198 ): Promise<HydrationState> {
199 const includeStories = opts.includeStories ?? true;
200 const includeTakedowns = ctx.includeTakedowns || ctx.includeActorTakedowns;
201 const [actors, labels, profileViewersState, actorStories] = await Promise
202 .all([
203 this.actor.getActors(dids, {
204 includeTakedowns,
205 }),
206 this.label.getLabelsForSubjects(
207 labelSubjectsForDid(dids),
208 ctx.labelers,
209 ),
210 this.hydrateProfileViewers(dids, ctx),
211 includeStories
212 ? this.story.getActorStories(dids)
213 : Promise.resolve(new HydrationMap<ItemRef[]>()),
214 ]);
215 let actorStoryRefs: ActorStoryRefs | undefined;
216 let storyState: HydrationState = {};
217 if (includeStories) {
218 actorStoryRefs = actorStories;
219 const storyUris = new Set<string>();
220 for (const [_did, stories] of actorStories) {
221 if (!stories) continue;
222 for (const story of stories) {
223 storyUris.add(story.uri);
224 }
225 }
226 if (storyUris.size > 0) {
227 storyState = await this.hydrateStories(Array.from(storyUris), ctx);
228 }
229 }
230 if (!includeTakedowns) {
231 actionTakedownLabels(dids, actors, labels);
232 }
233 return mergeManyStates(profileViewersState ?? {}, storyState, {
234 actors,
235 labels,
236 actorStoryRefs,
237 ctx,
238 });
239 }
240
241 // so.sprk.actor.defs#profileViewBasic
242 // - profile basic
243 // - profile
244 // - list basic
245 hydrateProfilesBasic(
246 dids: string[],
247 ctx: HydrateCtx,
248 ): Promise<HydrationState> {
249 return this.hydrateProfiles(dids, ctx, { includeStories: false });
250 }
251
252 // so.sprk.actor.defs#profileViewDetailed
253 // - profile detailed
254 // - profile
255 // - list basic
256 // - starterpack
257 // - profile
258 // - list basic
259 // - labels
260 async hydrateProfilesDetailed(
261 dids: string[],
262 ctx: HydrateCtx,
263 ): Promise<HydrationState> {
264 let knownFollowers: KnownFollowersStates = new HydrationMap();
265 try {
266 knownFollowers = await this.actor.getKnownFollowers(dids, ctx.viewer);
267 } catch (err) {
268 console.error(
269 "Failed to get known followers for profiles",
270 { err },
271 );
272 }
273
274 const subjectsToKnownFollowersMap = Array.from(
275 knownFollowers.keys(),
276 ).reduce((acc, did) => {
277 const known = knownFollowers.get(did);
278 if (known) {
279 acc.set(did, known.followers);
280 }
281 return acc;
282 }, new Map<string, string[]>());
283 const allKnownFollowerDids = Array.from(knownFollowers.values())
284 .filter(Boolean)
285 .flatMap((f) => f!.followers);
286 const [state, knownFollowerState, profileAggs, bidirectionalBlocks] =
287 await Promise.all([
288 this.hydrateProfiles(dids, ctx),
289 allKnownFollowerDids.length > 0
290 ? this.hydrateProfilesBasic(allKnownFollowerDids, ctx)
291 : Promise.resolve<HydrationState>({}),
292 this.actor.getProfileAggregates(dids),
293 this.hydrateBidirectionalBlocks(subjectsToKnownFollowersMap),
294 ]);
295 return mergeManyStates(state, knownFollowerState, {
296 profileAggs,
297 knownFollowers,
298 ctx,
299 bidirectionalBlocks,
300 });
301 }
302
303 // so.sprk.feed.defs#postView
304 // - post
305 // - profile
306 // - list basic
307 // - list
308 // - profile
309 // - list basic
310 // - feedgen
311 // - profile
312 // - list basic
313 // - mod service
314 // - profile
315 // - list basic
316 async hydratePosts(
317 refs: ItemRef[],
318 ctx: HydrateCtx,
319 state: HydrationState = {},
320 ): Promise<HydrationState> {
321 const postRefs = refs.filter((ref) =>
322 new AtUri(ref.uri).collection === so.sprk.feed.post.$type
323 );
324 const replyRefs = refs.filter((ref) =>
325 new AtUri(ref.uri).collection === so.sprk.feed.reply.$type
326 );
327
328 const allUris = refs.map((ref) => ref.uri);
329
330 state.posts ??= new HydrationMap<Post>();
331 state.replies ??= new HydrationMap<Reply>();
332
333 const [postsLayer0, repliesLayer0] = await Promise.all([
334 this.feed.getPosts(
335 postRefs.map((ref) => ref.uri),
336 ctx.includeTakedowns,
337 state.posts,
338 ),
339 this.feed.getReplies(
340 replyRefs.map((ref) => ref.uri),
341 ctx.includeTakedowns,
342 state.replies,
343 ),
344 ]);
345
346 postsLayer0.forEach((post, uri) => {
347 state.posts!.set(uri, post);
348 });
349 repliesLayer0.forEach((reply, uri) => {
350 state.replies!.set(uri, reply);
351 });
352
353 const additionalRootUris = rootUrisFromReplies(repliesLayer0).filter(
354 (uri) => !state.posts!.has(uri),
355 );
356
357 const postsLayer1 = await this.feed.getPosts(
358 additionalRootUris,
359 ctx.includeTakedowns,
360 state.posts,
361 );
362 postsLayer1.forEach((post, uri) => {
363 state.posts!.set(uri, post);
364 });
365
366 const threadRefs: ThreadRef[] = [];
367 for (const ref of refs) {
368 const collection = new AtUri(ref.uri).collection;
369 if (collection === so.sprk.feed.post.$type) {
370 const post = state.posts!.get(ref.uri);
371 if (!post) continue;
372 threadRefs.push({
373 uri: ref.uri,
374 cid: post.cid,
375 threadRoot: ref.uri,
376 });
377 } else if (collection === so.sprk.feed.reply.$type) {
378 const reply = state.replies!.get(ref.uri);
379 if (!reply) continue;
380 const rootUri = reply.record.reply?.root.uri ?? ref.uri;
381 threadRefs.push({
382 uri: ref.uri,
383 cid: reply.cid,
384 threadRoot: rootUri,
385 });
386 }
387 }
388
389 const authorUris = Array.from(
390 new Set<string>([
391 ...state.posts!.keys(),
392 ...state.replies!.keys(),
393 ]),
394 );
395 const authorDids = authorUris.map(didFromUri);
396
397 const soundUris = new Set<string>();
398 for (const post of state.posts!.values()) {
399 if (post && post.record.sound) {
400 soundUris.add(post.record.sound.uri);
401 }
402 }
403
404 // Fetch known interactions first so we can batch all profile hydration
405 const knownInteractions = await this.feed.getKnownInteractions(
406 refs,
407 ctx.viewer,
408 );
409
410 // Gather DIDs from known interactions for profile hydration
411 const knownInteractionDids = new Set<string>();
412 for (const interactions of knownInteractions.values()) {
413 if (interactions) {
414 for (const interaction of interactions) {
415 knownInteractionDids.add(interaction.by);
416 }
417 }
418 }
419
420 // Combine all DIDs for a single batched profile hydration call
421 const allProfileDids = Array.from(
422 new Set([...authorDids, ...knownInteractionDids]),
423 );
424
425 // Build map for bidirectional block checking between post authors and interactors
426 const subjectsToInteractorsMap = new Map<string, string[]>();
427 for (const [uri, interactions] of knownInteractions) {
428 if (interactions && interactions.length > 0) {
429 subjectsToInteractorsMap.set(
430 didFromUri(uri),
431 interactions.map((i) => i.by),
432 );
433 }
434 }
435
436 const [
437 postAggs,
438 replyAggs,
439 postViewers,
440 labels,
441 postBlocks,
442 profileState,
443 threadContexts,
444 soundState,
445 interactionBlocks,
446 ] = await Promise.all([
447 this.feed.getPostAggregates(postRefs),
448 this.feed.getReplyAggregates(replyRefs),
449 ctx.viewer
450 ? this.feed.getPostViewerStates(threadRefs, ctx.viewer)
451 : Promise.resolve<PostViewerStates | undefined>(undefined),
452 this.label.getLabelsForSubjects(allUris, ctx.labelers),
453 this.hydratePostBlocks(state.posts!, state.replies!),
454 this.hydrateProfilesBasic(allProfileDids, ctx),
455 this.feed.getThreadContexts(threadRefs),
456 this.hydrateSounds(Array.from(soundUris), ctx),
457 this.hydrateBidirectionalBlocks(subjectsToInteractorsMap),
458 ]);
459
460 return mergeManyStates(
461 profileState,
462 soundState,
463 {
464 posts: state.posts,
465 replies: state.replies,
466 postAggs,
467 replyAggs,
468 postViewers,
469 postBlocks,
470 labels,
471 threadContexts,
472 knownInteractions,
473 ctx,
474 bidirectionalBlocks: interactionBlocks,
475 },
476 );
477 }
478
479 private async hydratePostBlocks(
480 posts: Posts,
481 replies: Replies,
482 ): Promise<PostBlocks> {
483 const postBlocks = new HydrationMap<PostBlock>();
484 const postBlocksPairs = new Map<string, PostBlockPairs>();
485 const relationships: RelationshipPair[] = [];
486
487 for (const [uri, item] of posts) {
488 if (!item) continue;
489 postBlocksPairs.set(uri, {});
490 }
491
492 for (const [uri, item] of replies) {
493 if (!item) continue;
494 const reply = item.record;
495 const creator = didFromUri(uri);
496 const pairs = postBlocksPairs.get(uri) ?? {};
497 postBlocksPairs.set(uri, pairs);
498
499 const parentUri = reply.reply?.parent.uri;
500 const parentDid = parentUri && didFromUri(parentUri);
501 if (parentDid && parentDid !== creator) {
502 const pair: RelationshipPair = [creator, parentDid];
503 relationships.push(pair);
504 pairs.parent = pair;
505 }
506
507 const rootUri = reply.reply?.root.uri;
508 const rootDid = rootUri && didFromUri(rootUri);
509 if (rootDid && rootDid !== creator) {
510 const pair: RelationshipPair = [creator, rootDid];
511 relationships.push(pair);
512 pairs.root = pair;
513 }
514 }
515
516 const blocks = await this.hydrateBidirectionalBlocks(
517 pairsToMap(relationships),
518 );
519
520 for (const [uri, { embed, parent, root }] of postBlocksPairs) {
521 postBlocks.set(uri, {
522 embed: !!embed && !!isBlocked(blocks, embed),
523 parent: !!parent && !!isBlocked(blocks, parent),
524 root: !!root && !!isBlocked(blocks, root),
525 });
526 }
527
528 return postBlocks;
529 }
530
531 // so.sprk.feed.defs#feedViewPost
532 // - post (+ replies w/ reply parent author)
533 // - profile
534 // - list basic
535 // - list
536 // - profile
537 // - list basic
538 // - feedgen
539 // - profile
540 // - list basic
541 // - repost
542 // - profile
543 // - list basic
544 // - post
545 // - ...
546 async hydrateFeedItems(
547 items: FeedItem[],
548 ctx: HydrateCtx,
549 ): Promise<HydrationState> {
550 const postUris: string[] = [];
551 const replyUris: string[] = [];
552 const replyRefs: ItemRef[] = [];
553
554 for (const { post } of items) {
555 const collection = new AtUri(post.uri).collection;
556 if (collection === so.sprk.feed.post.$type) {
557 postUris.push(post.uri);
558 } else if (collection === so.sprk.feed.reply.$type) {
559 replyUris.push(post.uri);
560 replyRefs.push(post);
561 }
562 }
563
564 const [posts, replies] = await Promise.all([
565 this.feed.getPosts(postUris, ctx.includeTakedowns),
566 this.feed.getReplies(replyUris, ctx.includeTakedowns),
567 ]);
568
569 const postAndReplyRefsMap = new Map<string, ItemRef>();
570 items.forEach((item) => {
571 postAndReplyRefsMap.set(item.post.uri, item.post);
572 });
573
574 replies.forEach((reply) => {
575 if (!reply?.record.reply) return;
576 const { root, parent } = reply.record.reply;
577 postAndReplyRefsMap.set(root.uri, root);
578 postAndReplyRefsMap.set(parent.uri, parent);
579 });
580
581 const postAndReplyRefs = Array.from(postAndReplyRefsMap.values());
582
583 const postState = await this.hydratePosts(postAndReplyRefs, ctx, {
584 posts,
585 replies,
586 });
587
588 return mergeStates(postState, {
589 ctx,
590 });
591 }
592
593 // so.sprk.feed.defs#threadViewReply
594 // - reply
595 // - profile
596 // - list basic
597 // - list
598 // - profile
599 // - list basic
600 // - feedgen
601 // - profile
602 // - list basic
603 hydrateThreadPosts(
604 refs: ItemRef[],
605 ctx: HydrateCtx,
606 ): Promise<HydrationState> {
607 return this.hydratePosts(refs, ctx);
608 }
609
610 // so.sprk.story.defs#storyView
611 // - story
612 // - profile
613 // - list basic
614 // - embeds (story record)
615 // - mention
616 // - profile
617 // - list basic
618 // - post
619 // - postView / blockedPost / notFoundPost
620 // - profile
621 // - list basic
622 async hydrateStories(
623 uris: string[],
624 ctx: HydrateCtx,
625 ): Promise<HydrationState> {
626 const stories = await this.story.getStories(uris, ctx.includeTakedowns);
627
628 const storyAuthorDids = uris.map(didFromUri);
629 const embedPostUris = new Set<string>();
630 const mentionDids = new Set<string>();
631
632 for (const story of stories.values()) {
633 if (!story) continue;
634 const record = story.record as StoryRecord;
635 for (const embed of record.embeds ?? []) {
636 if (
637 embed &&
638 typeof embed === "object" &&
639 "$type" in embed &&
640 (embed as { $type?: string }).$type === "so.sprk.embed.record"
641 ) {
642 const postUri = (embed as { post?: { uri?: string } }).post?.uri;
643 if (postUri) {
644 embedPostUris.add(postUri);
645 }
646 } else if (
647 embed &&
648 typeof embed === "object" &&
649 "$type" in embed &&
650 (embed as { $type?: string }).$type === "so.sprk.embed.mention"
651 ) {
652 const did = (embed as { did?: string }).did;
653 if (did) {
654 mentionDids.add(did);
655 }
656 }
657 }
658 }
659
660 const postUris: string[] = [];
661 for (const postUri of embedPostUris) {
662 try {
663 didFromUri(postUri);
664 postUris.push(postUri);
665 } catch {
666 continue;
667 }
668 }
669 const profileDids = Array.from(
670 new Set<string>([
671 ...storyAuthorDids,
672 ...mentionDids,
673 ]),
674 );
675
676 const [postState, profileState] = await Promise.all([
677 postUris.length > 0
678 ? this.hydratePosts(postUris.map((uri) => ({ uri })), ctx)
679 : Promise.resolve<HydrationState>({}),
680 this.hydrateProfilesBasic(profileDids, ctx),
681 ]);
682
683 return mergeManyStates(profileState, postState, { stories, ctx });
684 }
685
686 // so.sprk.feed.defs#generatorView
687 // - feedgen
688 // - profile
689 // - list basic
690 async hydrateFeedGens(
691 uris: string[], // @TODO any way to get refs here?
692 ctx: HydrateCtx,
693 ): Promise<HydrationState> {
694 const [feedgens, feedgenAggs, feedgenViewers, profileState, labels] =
695 await Promise
696 .all([
697 this.feed.getFeedGens(uris, ctx.includeTakedowns),
698 this.feed.getFeedGenAggregates(
699 uris.map((uri) => ({ uri })),
700 ),
701 ctx.viewer
702 ? this.feed.getFeedGenViewerStates(uris, ctx.viewer)
703 : undefined,
704 this.hydrateProfiles(uris.map(didFromUri), ctx),
705 this.label.getLabelsForSubjects(uris, ctx.labelers),
706 ]);
707 return mergeStates(profileState, {
708 feedgens,
709 feedgenAggs,
710 feedgenViewers,
711 labels,
712 ctx,
713 });
714 }
715
716 // so.sprk.feed.getLikes#like
717 // - like
718 // - profile
719 // - list basic
720 async hydrateLikes(
721 authorDid: string,
722 uris: string[],
723 ctx: HydrateCtx,
724 ): Promise<HydrationState> {
725 const [likes, profileState] = await Promise.all([
726 this.feed.getLikes(uris, ctx.includeTakedowns),
727 this.hydrateProfilesBasic(uris.map(didFromUri), ctx),
728 ]);
729
730 const pairs: RelationshipPair[] = [];
731 for (const [uri, like] of likes) {
732 if (like) {
733 pairs.push([authorDid, didFromUri(uri)]);
734 }
735 }
736 const blocks = await this.hydrateBidirectionalBlocks(
737 pairsToMap(pairs),
738 );
739 const likeBlocks = new HydrationMap<LikeBlock>();
740 for (const [uri, like] of likes) {
741 if (like) {
742 likeBlocks.set(uri, isBlocked(blocks, [authorDid, didFromUri(uri)]));
743 } else {
744 likeBlocks.set(uri, null);
745 }
746 }
747
748 return mergeStates(profileState, { likes, likeBlocks, ctx });
749 }
750
751 // so.sprk.feed.getRepostedBy#repostedBy
752 // - repost
753 // - profile
754 // - list basic
755 async hydrateReposts(uris: string[], ctx: HydrateCtx) {
756 const [reposts, profileState] = await Promise.all([
757 this.feed.getReposts(uris, ctx.includeTakedowns),
758 this.hydrateProfilesBasic(uris.map(didFromUri), ctx),
759 ]);
760 return mergeStates(profileState, { reposts, ctx });
761 }
762
763 // so.sprk.notification.listNotifications#notification
764 // - notification
765 // - profile
766 // - list basic`
767 async hydrateNotifications(
768 notifs: Notification[],
769 ctx: HydrateCtx,
770 ): Promise<HydrationState> {
771 const uris = notifs.map((notif) => notif.uri);
772 const collections = urisByCollection(uris);
773 const postUris = collections.get(so.sprk.feed.post.$type) ?? [];
774 const replyUris = collections.get(so.sprk.feed.reply.$type) ?? [];
775 const likeUris = collections.get(so.sprk.feed.like.$type) ?? [];
776 const repostUris = collections.get(so.sprk.feed.repost.$type) ?? [];
777 const followUris = collections.get(so.sprk.graph.follow.$type) ?? [];
778
779 // Collect subject URIs for like/repost/reply notifications to hydrate their content
780 const subjectPostUris: string[] = [];
781 const subjectReplyUris: string[] = [];
782 for (const notif of notifs) {
783 if (
784 notif.reasonSubject &&
785 (notif.reason === "like" ||
786 notif.reason === "repost" ||
787 notif.reason === "reply")
788 ) {
789 const subjectUri = new AtUri(notif.reasonSubject);
790 if (subjectUri.collection === so.sprk.feed.post.$type) {
791 subjectPostUris.push(notif.reasonSubject);
792 } else if (subjectUri.collection === so.sprk.feed.reply.$type) {
793 subjectReplyUris.push(notif.reasonSubject);
794 }
795 }
796 }
797
798 const [
799 posts,
800 replies,
801 likes,
802 reposts,
803 follows,
804 labels,
805 profileState,
806 subjectPosts,
807 subjectReplies,
808 ] = await Promise.all([
809 this.feed.getPosts(postUris), // reason: mention, quote
810 this.feed.getReplies(replyUris), // reason: reply
811 this.feed.getLikes(likeUris), // reason: like
812 this.feed.getReposts(repostUris), // reason: repost
813 this.graph.getFollows(followUris), // reason: follow
814 this.label.getLabelsForSubjects(uris, ctx.labelers),
815 this.hydrateProfilesBasic(uris.map(didFromUri), ctx),
816 this.feed.getPosts(subjectPostUris), // subjects of likes/reposts
817 this.feed.getReplies(subjectReplyUris), // subjects of likes/reposts
818 ]);
819 const viewerRootPostUris = new Set<string>();
820 for (const notif of notifs) {
821 if (notif.reason === "reply") {
822 // Check replies map for reply notifications
823 const reply = replies.get(notif.uri);
824 if (reply) {
825 const rootUri = reply.record.reply?.root.uri;
826 if (rootUri && didFromUri(rootUri) === ctx.viewer) {
827 viewerRootPostUris.add(rootUri);
828 }
829 }
830 }
831 }
832 actionTakedownLabels(postUris, posts, labels);
833 actionTakedownLabels(replyUris, replies, labels);
834 actionTakedownLabels(subjectPostUris, subjectPosts, labels);
835 actionTakedownLabels(subjectReplyUris, subjectReplies, labels);
836 return mergeStates(profileState, {
837 posts: mergeMaps(posts, subjectPosts),
838 replies: mergeMaps(replies, subjectReplies),
839 likes,
840 reposts,
841 follows,
842 labels,
843 ctx,
844 });
845 }
846
847 // so.sprk.sound.defs#audioView
848 // - sound
849 // - profile
850 // - list basic
851 async hydrateSounds(
852 uris: string[],
853 ctx: HydrateCtx,
854 ): Promise<HydrationState> {
855 const [sounds, soundAggs, profileState] = await Promise.all([
856 this.feed.getSounds(uris, ctx.includeTakedowns),
857 this.feed.getSoundAggregates(uris.map((uri) => ({ uri }))),
858 this.hydrateProfilesBasic(uris.map(didFromUri), ctx),
859 ]);
860 return mergeStates(profileState, { sounds, soundAggs, ctx });
861 }
862
863 // provides partial hydration state within getFollows / getFollowers, mainly for applying rules
864 async hydrateFollows(
865 uris: string[],
866 ctx: HydrateCtx,
867 ): Promise<HydrationState> {
868 const follows = await this.graph.getFollows(uris, ctx.includeTakedowns);
869 const pairs: RelationshipPair[] = [];
870 for (const [uri, follow] of follows) {
871 if (follow) {
872 pairs.push([didFromUri(uri), follow.record.subject]);
873 }
874 }
875 const blocks = await this.hydrateBidirectionalBlocks(
876 pairsToMap(pairs),
877 );
878 const followBlocks = new HydrationMap<FollowBlock>();
879 for (const [uri, follow] of follows) {
880 if (follow) {
881 followBlocks.set(
882 uri,
883 isBlocked(blocks, [didFromUri(uri), follow.record.subject]),
884 );
885 } else {
886 followBlocks.set(uri, null);
887 }
888 }
889 return { follows, followBlocks };
890 }
891
892 async hydrateBidirectionalBlocks(
893 didMap: Map<string, string[]>,
894 ): Promise<BidirectionalBlocks> {
895 const pairs: RelationshipPair[] = [];
896 for (const [source, targets] of didMap) {
897 for (const target of targets) {
898 pairs.push([source, target]);
899 }
900 }
901
902 const blocks = await this.graph.getBidirectionalBlocks(pairs);
903
904 const result: BidirectionalBlocks = new HydrationMap<
905 HydrationMap<boolean>
906 >();
907 for (const [source, targets] of didMap) {
908 const didBlocks = new HydrationMap<boolean>();
909 for (const target of targets) {
910 const block = blocks.get(source, target);
911
912 const blockEntry: BlockEntry = {
913 blockUri: block?.blockUri,
914 };
915
916 didBlocks.set(
917 target,
918 !!blockEntry.blockUri,
919 );
920 }
921 result.set(source, didBlocks);
922 }
923
924 return result;
925 }
926
927 // so.sprk.labeler.def#labelerViewDetailed
928 // - labeler
929 // - profile
930 // - list basic
931 async hydrateLabelers(
932 dids: string[],
933 ctx: HydrateCtx,
934 ): Promise<HydrationState> {
935 const [labelers, labelerAggs, labelerViewers, profileState] = await Promise
936 .all([
937 this.label.getLabelers(dids, ctx.includeTakedowns),
938 this.label.getLabelerAggregates(dids, ctx.viewer),
939 ctx.viewer
940 ? this.label.getLabelerViewerStates(dids, ctx.viewer)
941 : undefined,
942 this.hydrateProfiles(dids, ctx),
943 ]);
944 actionTakedownLabels(dids, labelers, profileState.labels ?? new Labels());
945 return mergeStates(profileState, {
946 labelers,
947 labelerAggs,
948 labelerViewers,
949 ctx,
950 });
951 }
952
953 // ad-hoc record hydration
954 // in com.atproto.repo.getRecord
955 async getRecord(uri: string, includeTakedowns = false) {
956 const parsed = new AtUri(uri);
957 const collection = parsed.collection;
958 if (collection === so.sprk.feed.post.$type) {
959 return (
960 (await this.feed.getPosts([uri], includeTakedowns)).get(uri) ??
961 undefined
962 );
963 } else if (collection === so.sprk.feed.reply.$type) {
964 return (
965 (await this.feed.getReplies([uri], includeTakedowns)).get(uri) ??
966 undefined
967 );
968 } else if (collection === so.sprk.feed.repost.$type) {
969 return (
970 (await this.feed.getReposts([uri], includeTakedowns)).get(uri) ??
971 undefined
972 );
973 } else if (collection === so.sprk.feed.like.$type) {
974 return (
975 (await this.feed.getLikes([uri], includeTakedowns)).get(uri) ??
976 undefined
977 );
978 } else if (collection === so.sprk.sound.audio.$type) {
979 return (
980 (await this.feed.getSounds([uri], includeTakedowns)).get(uri) ??
981 undefined
982 );
983 } else if (collection === so.sprk.graph.follow.$type) {
984 return (
985 (await this.graph.getFollows([uri], includeTakedowns)).get(uri) ??
986 undefined
987 );
988 } else if (collection === so.sprk.graph.block.$type) {
989 return (
990 (await this.graph.getBlocks([uri], includeTakedowns)).get(uri) ??
991 undefined
992 );
993 } else if (collection === so.sprk.feed.generator.$type) {
994 return (
995 (await this.feed.getFeedGens([uri], includeTakedowns)).get(uri) ??
996 undefined
997 );
998 } else if (collection === so.sprk.labeler.service.$type) {
999 if (parsed.rkey !== "self") return;
1000 const did = parsed.hostname;
1001 return (
1002 (await this.label.getLabelers([did], includeTakedowns)).get(did) ??
1003 undefined
1004 );
1005 } else if (collection === so.sprk.actor.profile.$type) {
1006 const did = parsed.hostname;
1007 const actor = (
1008 await this.actor.getActors([did], { includeTakedowns })
1009 ).get(did);
1010 if (!actor?.profile || !actor?.profileCid) return undefined;
1011 const recordInfo: RecordInfo<ProfileRecord> = {
1012 record: actor.profile,
1013 cid: actor.profileCid,
1014 sortedAt: actor.sortedAt ?? new Date(0),
1015 indexedAt: actor.indexedAt ?? new Date(0),
1016 takedownRef: actor.profileTakedownRef,
1017 };
1018
1019 return recordInfo;
1020 } else if (collection === so.sprk.story.post.$type) {
1021 // Get story records through dataplane
1022 const res = await this.dataplane.records.getStoryRecords([uri]);
1023 const storyRecord = res.records[0];
1024
1025 if (!storyRecord || !storyRecord.cid) return undefined;
1026
1027 return parseRecord<StoryRecord>(
1028 so.sprk.story.post.main,
1029 storyRecord,
1030 includeTakedowns,
1031 );
1032 }
1033 }
1034
1035 async createContext(vals: HydrateCtxVals) {
1036 // ensures we're only apply labelers that exist and are not taken down
1037 const labelers = vals.labelers.dids;
1038 const nonServiceLabelers = labelers.filter(
1039 (did) => !this.serviceLabelers.has(did),
1040 );
1041 const labelerActors = await this.actor.getActors(nonServiceLabelers, {
1042 includeTakedowns: vals.includeTakedowns,
1043 });
1044 const availableDids = labelers.filter(
1045 (did) => this.serviceLabelers.has(did) || !!labelerActors.get(did),
1046 );
1047 const availableLabelers = {
1048 dids: availableDids,
1049 redact: vals.labelers.redact,
1050 };
1051 return new HydrateCtx({
1052 labelers: availableLabelers,
1053 viewer: vals.viewer,
1054 includeTakedowns: vals.includeTakedowns,
1055 include3pBlocks: vals.include3pBlocks,
1056 });
1057 }
1058
1059 async resolveUri(uriStr: string) {
1060 const uri = new AtUri(uriStr);
1061 const [did] = await this.actor.getDids([uri.host]);
1062 if (!did) return uriStr;
1063 uri.host = did;
1064 return uri.toString();
1065 }
1066}
1067
1068// service refs may look like "did:plc:example#service_id". we want to extract the did part "did:plc:example".
1069const serviceRefToDid = (serviceRef: string) => {
1070 const idx = serviceRef.indexOf("#");
1071 return idx !== -1 ? serviceRef.slice(0, idx) : serviceRef;
1072};
1073
1074const labelSubjectsForDid = (dids: string[]) => {
1075 return [
1076 ...dids,
1077 ...dids.map((did) =>
1078 AtUri.make(did, so.sprk.actor.profile.$type, "self").toString()
1079 ),
1080 ];
1081};
1082
1083const rootUrisFromReplies = (replies: Replies): string[] => {
1084 const uris = new Set<string>();
1085 for (const item of replies.values()) {
1086 const rootUri = item && rootUriFromReply(item);
1087 if (rootUri) {
1088 uris.add(rootUri);
1089 }
1090 }
1091 return Array.from(uris);
1092};
1093
1094const rootUriFromReply = (reply: Reply): string | undefined => {
1095 return reply.record.reply?.root.uri;
1096};
1097
1098const isBlocked = (blocks: BidirectionalBlocks, [a, b]: RelationshipPair) => {
1099 return blocks.get(a)?.get(b) ?? false;
1100};
1101
1102const pairsToMap = (pairs: RelationshipPair[]): Map<string, string[]> => {
1103 const map = new Map<string, string[]>();
1104 for (const [a, b] of pairs) {
1105 const list = map.get(a) ?? [];
1106 list.push(b);
1107 map.set(a, list);
1108 }
1109 return map;
1110};
1111
1112export const mergeStates = (
1113 stateA: HydrationState,
1114 stateB: HydrationState,
1115): HydrationState => {
1116 assert(
1117 !stateA.ctx?.viewer ||
1118 !stateB.ctx?.viewer ||
1119 stateA.ctx?.viewer === stateB.ctx?.viewer,
1120 "incompatible viewers",
1121 );
1122 return {
1123 ctx: stateA.ctx ?? stateB.ctx,
1124 actors: mergeMaps(stateA.actors, stateB.actors),
1125 profileAggs: mergeMaps(stateA.profileAggs, stateB.profileAggs),
1126 profileViewers: mergeMaps(stateA.profileViewers, stateB.profileViewers),
1127 posts: mergeMaps(stateA.posts, stateB.posts),
1128 replies: mergeMaps(stateA.replies, stateB.replies),
1129 postAggs: mergeMaps(stateA.postAggs, stateB.postAggs),
1130 replyAggs: mergeMaps(stateA.replyAggs, stateB.replyAggs),
1131 postViewers: mergeMaps(stateA.postViewers, stateB.postViewers),
1132 threadContexts: mergeMaps(stateA.threadContexts, stateB.threadContexts),
1133 sounds: mergeMaps(stateA.sounds, stateB.sounds),
1134 soundAggs: mergeMaps(stateA.soundAggs, stateB.soundAggs),
1135 stories: mergeMaps(stateA.stories, stateB.stories),
1136 actorStoryRefs: mergeMaps(stateA.actorStoryRefs, stateB.actorStoryRefs),
1137 postBlocks: mergeMaps(stateA.postBlocks, stateB.postBlocks),
1138 reposts: mergeMaps(stateA.reposts, stateB.reposts),
1139 follows: mergeMaps(stateA.follows, stateB.follows),
1140 followBlocks: mergeMaps(stateA.followBlocks, stateB.followBlocks),
1141 likes: mergeMaps(stateA.likes, stateB.likes),
1142 likeBlocks: mergeMaps(stateA.likeBlocks, stateB.likeBlocks),
1143 labels: mergeMaps(stateA.labels, stateB.labels),
1144 feedgens: mergeMaps(stateA.feedgens, stateB.feedgens),
1145 feedgenAggs: mergeMaps(stateA.feedgenAggs, stateB.feedgenAggs),
1146 feedgenViewers: mergeMaps(stateA.feedgenViewers, stateB.feedgenViewers),
1147 labelers: mergeMaps(stateA.labelers, stateB.labelers),
1148 labelerViewers: mergeMaps(stateA.labelerViewers, stateB.labelerViewers),
1149 labelerAggs: mergeMaps(stateA.labelerAggs, stateB.labelerAggs),
1150 knownFollowers: mergeMaps(stateA.knownFollowers, stateB.knownFollowers),
1151 knownInteractions: mergeMaps(
1152 stateA.knownInteractions,
1153 stateB.knownInteractions,
1154 ),
1155 bidirectionalBlocks: mergeMaps(
1156 stateA.bidirectionalBlocks,
1157 stateB.bidirectionalBlocks,
1158 ),
1159 };
1160};
1161
1162export const mergeManyStates = (...states: HydrationState[]) => {
1163 return states.reduce(mergeStates, {} as HydrationState);
1164};
1165
1166const actionTakedownLabels = <T>(
1167 keys: string[],
1168 hydrationMap: HydrationMap<T>,
1169 labels: Labels,
1170) => {
1171 for (const key of keys) {
1172 if (labels.get(key)?.isTakendown) {
1173 hydrationMap.set(key, null);
1174 }
1175 }
1176};