[READ ONLY MIRROR] Spark Social AppView Server
github.com/sprksocial/server
atproto
deno
hono
lexicon
1import { Cid } from "@atp/lex";
2import { AtUri } from "@atp/syntax";
3import * as so from "../../../lex/so.ts";
4import { BackgroundQueue } from "../../background.ts";
5import { Database } from "../../db/index.ts";
6import { PostDocument } from "../../db/models.ts";
7import { getDescendents } from "../../util.ts";
8import { RecordProcessor } from "../processor.ts";
9
10type PostAncestor = {
11 uri: string;
12 height: number;
13};
14type PostDescendent = {
15 uri: string;
16 depth: number;
17 cid: string;
18 creator: string;
19 sortAt: string;
20};
21type MediaImages = so.sprk.media.images.Main;
22type MediaImage = so.sprk.media.image.Main;
23type MediaVideo = so.sprk.media.video.Main;
24type PostRecord = so.sprk.feed.post.Main;
25type GateRecord = so.sprk.feed.threadgate.Main;
26type IndexedPost = {
27 post: PostDocument;
28 facets?: { type: "mention" | "link"; value: string }[];
29 medias?: Array<{
30 position?: number;
31 imageCid?: string;
32 alt?: string | null;
33 thumbCid?: string | null;
34 videoCid?: string;
35 }>;
36 ancestors?: PostAncestor[];
37 descendents?: PostDescendent[];
38 threadgate?: GateRecord;
39};
40
41const schema = so.sprk.feed.post.main;
42const isMediaImages = so.sprk.media.images.$matches;
43const isMediaVideo = so.sprk.media.video.$matches;
44const isMention = so.sprk.richtext.facet.mention.$matches;
45const isLink = so.sprk.richtext.facet.link.$matches;
46
47const REPLY_NOTIF_DEPTH = 5;
48
49const insertFn = async (
50 db: Database,
51 uri: AtUri,
52 cid: Cid,
53 obj: PostRecord,
54 timestamp: string,
55): Promise<IndexedPost | null> => {
56 const post = {
57 uri: uri.toString(),
58 cid: cid.toString(),
59 authorDid: uri.host,
60 caption: obj.caption
61 ? {
62 text: obj.caption.text || "",
63 facets: obj.caption.facets || [],
64 }
65 : undefined,
66 media: obj.media || null,
67 sound: obj.sound || null,
68 langs: obj.langs || [],
69 labels: obj.labels || null,
70 tags: obj.tags || [],
71 crossposts: obj.crossposts || [],
72 createdAt: obj.createdAt,
73 indexedAt: timestamp,
74 };
75
76 const insertedPost = await db.models.Post.findOneAndUpdate(
77 { uri: post.uri },
78 { $set: post },
79 { upsert: true, returnDocument: "after" },
80 );
81
82 const facets = (obj.caption?.facets || [])
83 .flatMap((facet) => facet.features)
84 .flatMap((feature) => {
85 if (isMention(feature)) {
86 return {
87 type: "mention" as const,
88 value: feature.did,
89 };
90 }
91 if (isLink(feature)) {
92 return {
93 type: "link" as const,
94 value: feature.uri,
95 };
96 }
97 return [];
98 });
99
100 // Media processing - medias are stored inline in the Post model
101 const medias: Array<{
102 position?: number;
103 imageCid?: string;
104 alt?: string | null;
105 thumbCid?: string | null;
106 videoCid?: string;
107 }> = [];
108 const postMedias = separateMedia(obj.media);
109 for (const postMedia of postMedias) {
110 if (isMediaImages(postMedia)) {
111 const { images } = postMedia as MediaImages;
112 const imagesMedia = images.map((
113 img: MediaImage,
114 i: number,
115 ) => ({
116 position: i,
117 imageCid: img.image.ref.toString(),
118 alt: img.alt,
119 }));
120 medias.push(...imagesMedia);
121 } else if (isMediaVideo(postMedia)) {
122 const media = postMedia as MediaVideo;
123 const videoMedia = {
124 postUri: uri.toString(),
125 videoCid: media.video.ref.toString(),
126 alt: media.alt ?? null,
127 };
128 medias.push(videoMedia);
129 }
130 }
131
132 const descendents = await getDescendents(db, {
133 uri: post.uri,
134 depth: REPLY_NOTIF_DEPTH,
135 });
136
137 return {
138 post: insertedPost,
139 facets,
140 medias,
141 descendents,
142 };
143};
144
145const findDuplicate = (): AtUri | null => {
146 return null;
147};
148
149const notifsForInsert = (obj: IndexedPost) => {
150 const notifs: Array<{
151 did: string;
152 reason: string;
153 author: string;
154 recordUri: string;
155 recordCid: string;
156 sortAt: string;
157 reasonSubject?: string;
158 }> = [];
159 const notified = new Set([obj.post.authorDid]);
160 const maybeNotify = (notif: {
161 did: string;
162 reason: string;
163 author: string;
164 recordUri: string;
165 recordCid: string;
166 sortAt: string;
167 reasonSubject?: string;
168 }) => {
169 if (!notified.has(notif.did)) {
170 notified.add(notif.did);
171 notifs.push(notif);
172 }
173 };
174 for (const facet of obj.facets ?? []) {
175 if (facet.type === "mention") {
176 maybeNotify({
177 did: facet.value,
178 reason: "mention",
179 author: obj.post.authorDid,
180 recordUri: obj.post.uri,
181 recordCid: obj.post.cid,
182 sortAt: obj.post.createdAt,
183 });
184 }
185 }
186
187 const threadgateHiddenReplies = (obj.threadgate?.hiddenReplies || [])
188 .map(String);
189
190 // reply notifications
191 for (const ancestor of obj.ancestors ?? []) {
192 if (ancestor.uri === obj.post.uri) continue; // no need to notify for own post
193 if (ancestor.height < REPLY_NOTIF_DEPTH) {
194 const ancestorUri = new AtUri(ancestor.uri);
195 maybeNotify({
196 did: ancestorUri.host,
197 reason: "reply",
198 reasonSubject: ancestorUri.toString(),
199 author: obj.post.authorDid,
200 recordUri: obj.post.uri,
201 recordCid: obj.post.cid,
202 sortAt: obj.post.createdAt,
203 });
204 // found hidden reply, don't notify any higher ancestors
205 if (threadgateHiddenReplies.includes(ancestorUri.toString())) break;
206 }
207 }
208
209 // descendents indicate out-of-order indexing: need to notify
210 // the current post and upwards.
211 for (const descendent of obj.descendents ?? []) {
212 for (const ancestor of obj.ancestors ?? []) {
213 const totalHeight = descendent.depth + ancestor.height;
214 if (totalHeight < REPLY_NOTIF_DEPTH) {
215 const ancestorUri = new AtUri(ancestor.uri);
216 maybeNotify({
217 did: ancestorUri.host,
218 reason: "reply",
219 reasonSubject: ancestorUri.toString(),
220 author: descendent.creator,
221 recordUri: descendent.uri,
222 recordCid: descendent.cid,
223 sortAt: descendent.sortAt,
224 });
225 }
226 }
227 }
228
229 return notifs;
230};
231
232const deleteFn = async (
233 db: Database,
234 uri: AtUri,
235): Promise<IndexedPost | null> => {
236 const uriStr = uri.toString();
237 const deleted = await db.models.Post.findOneAndDelete({ uri: uriStr });
238
239 if (!deleted) {
240 return null;
241 }
242
243 return {
244 post: deleted,
245 facets: [],
246 };
247};
248
249const notifsForDelete = (
250 deleted: IndexedPost,
251 replacedBy: IndexedPost | null,
252) => {
253 const notifs = replacedBy ? notifsForInsert(replacedBy) : [];
254 return {
255 notifs,
256 toDelete: [deleted.post.uri],
257 };
258};
259
260const updateAggregates = async (db: Database, postIdx: IndexedPost) => {
261 // Update posts count for author
262 const postsCount = await db.models.Post.countDocuments({
263 authorDid: postIdx.post.authorDid,
264 });
265
266 // First check if profile exists to avoid creating one with null URI
267 const existingProfile = await db.models.Profile.findOne({
268 authorDid: postIdx.post.authorDid,
269 });
270
271 if (existingProfile) {
272 // Only update existing profiles
273 await db.models.Profile.findOneAndUpdate(
274 { authorDid: postIdx.post.authorDid },
275 { $set: { postsCount } },
276 { returnDocument: "after" },
277 );
278 }
279};
280
281export type PluginType = RecordProcessor<typeof schema, IndexedPost>;
282
283export const makePlugin = (
284 db: Database,
285 background: BackgroundQueue,
286): PluginType => {
287 return new RecordProcessor(db, background, {
288 schema,
289 insertFn,
290 findDuplicate,
291 deleteFn,
292 notifsForInsert,
293 notifsForDelete,
294 updateAggregates,
295 });
296};
297
298export default makePlugin;
299
300function separateMedia(
301 media: PostRecord["media"],
302): Array<NonNullable<PostRecord["media"]>> {
303 if (!media) {
304 return [];
305 }
306 return [media];
307}