[READ ONLY MIRROR] Spark Social AppView Server
github.com/sprksocial/server
atproto
deno
hono
lexicon
1import { Cid, lexParse } from "@atp/lex";
2import { AtUri } from "@atp/syntax";
3import * as so from "../../../lex/so.ts";
4import { BackgroundQueue } from "../../background.ts";
5import { Database } from "../../db/index.ts";
6import { ReplyDocument } from "../../db/models.ts";
7import {
8 getAncestorsAndSelf,
9 getDescendents,
10 invalidReplyRoot as checkInvalidReplyRoot,
11} from "../../util.ts";
12import { RecordProcessor } from "../processor.ts";
13
14type Ancestor = {
15 uri: string;
16 height: number;
17};
18type Descendent = {
19 uri: string;
20 depth: number;
21 cid: string;
22 creator: string;
23 sortAt: string;
24};
25type ReplyRecord = so.sprk.feed.reply.Main;
26type ReplyRef = so.sprk.feed.reply.ReplyRef;
27type GateRecord = so.sprk.feed.threadgate.Main;
28type IndexedReply = {
29 reply: ReplyDocument;
30 facets?: { type: "mention" | "link"; value: string }[];
31 media?: {
32 cid?: string;
33 alt?: string | null;
34 };
35 ancestors?: Ancestor[];
36 descendents?: Descendent[];
37 threadgate?: GateRecord;
38};
39
40const schema = so.sprk.feed.reply.main;
41const isMediaImage = so.sprk.media.image.$matches;
42const isMention = so.sprk.richtext.facet.mention.$matches;
43const isLink = so.sprk.richtext.facet.link.$matches;
44
45const REPLY_NOTIF_DEPTH = 5;
46
47const insertFn = async (
48 db: Database,
49 uri: AtUri,
50 cid: Cid,
51 obj: ReplyRecord,
52 timestamp: string,
53): Promise<IndexedReply | null> => {
54 const reply = {
55 uri: uri.toString(),
56 cid: cid.toString(),
57 authorDid: uri.host,
58 text: obj.text || "",
59 facets: obj.facets || [],
60 reply: obj.reply
61 ? {
62 root: {
63 uri: obj.reply.root.uri,
64 cid: obj.reply.root.cid,
65 },
66 parent: {
67 uri: obj.reply.parent.uri,
68 cid: obj.reply.parent.cid,
69 },
70 }
71 : null,
72 media: obj.media,
73 langs: obj.langs || [],
74 labels: obj.labels || null,
75 tags: (obj as { tags?: string[] }).tags || [],
76 createdAt: obj.createdAt,
77 indexedAt: timestamp,
78 };
79
80 // Use findOneAndUpdate with upsert to handle potential duplicate key errors
81 const insertedReply = await db.models.Reply.findOneAndUpdate(
82 { uri: reply.uri },
83 { $set: reply },
84 { upsert: true, returnDocument: "after" },
85 );
86
87 if (obj.reply) {
88 const { invalidReplyRoot } = await validateReply(
89 db,
90 obj.reply,
91 );
92 if (invalidReplyRoot) {
93 Object.assign(insertedReply, { invalidReplyRoot });
94 await db.models.Reply.updateOne(
95 { uri: reply.uri },
96 { $set: { invalidReplyRoot } },
97 );
98 }
99 }
100
101 const facets = (obj.facets || [])
102 .flatMap((facet) => facet.features)
103 .flatMap((feature) => {
104 if (isMention(feature)) {
105 return {
106 type: "mention" as const,
107 value: feature.did,
108 };
109 }
110 if (isLink(feature)) {
111 return {
112 type: "link" as const,
113 value: feature.uri,
114 };
115 }
116 return [];
117 });
118
119 // Embed processing - embeds are stored inline in the Post model
120 let media: {
121 postUri?: string;
122 cid?: string;
123 alt?: string;
124 } = {};
125 if (isMediaImage(obj.media)) {
126 const imageMedia = {
127 postUri: uri.toString(),
128 cid: obj.media.image.ref.toString(),
129 alt: obj.media.alt as string,
130 };
131 media = imageMedia;
132 }
133
134 const ancestors = await getAncestorsAndSelf(db, {
135 uri: reply.uri,
136 parentHeight: REPLY_NOTIF_DEPTH,
137 });
138 const descendents = await getDescendents(db, {
139 uri: reply.uri,
140 depth: REPLY_NOTIF_DEPTH,
141 });
142
143 return {
144 reply: insertedReply,
145 facets,
146 media,
147 ancestors,
148 descendents,
149 };
150};
151
152const findDuplicate = (): AtUri | null => {
153 return null;
154};
155
156const notifsForInsert = (obj: IndexedReply) => {
157 const notifs: Array<{
158 did: string;
159 reason: string;
160 author: string;
161 recordUri: string;
162 recordCid: string;
163 sortAt: string;
164 reasonSubject?: string;
165 }> = [];
166 const notified = new Set([obj.reply.authorDid]);
167 const maybeNotify = (notif: {
168 did: string;
169 reason: string;
170 author: string;
171 recordUri: string;
172 recordCid: string;
173 sortAt: string;
174 reasonSubject?: string;
175 }) => {
176 if (!notified.has(notif.did)) {
177 notified.add(notif.did);
178 notifs.push(notif);
179 }
180 };
181 for (const facet of obj.facets ?? []) {
182 if (facet.type === "mention") {
183 maybeNotify({
184 did: facet.value,
185 reason: "mention",
186 author: obj.reply.authorDid,
187 recordUri: obj.reply.uri,
188 recordCid: obj.reply.cid,
189 sortAt: obj.reply.createdAt,
190 });
191 }
192 }
193
194 const threadgateHiddenReplies = (obj.threadgate?.hiddenReplies || [])
195 .map(String);
196
197 // reply notifications
198 for (const ancestor of obj.ancestors ?? []) {
199 if (ancestor.uri === obj.reply.uri) continue;
200 if (ancestor.height < REPLY_NOTIF_DEPTH) {
201 const ancestorUri = new AtUri(ancestor.uri);
202 maybeNotify({
203 did: ancestorUri.host,
204 reason: "reply",
205 reasonSubject: ancestorUri.toString(),
206 author: obj.reply.authorDid,
207 recordUri: obj.reply.uri,
208 recordCid: obj.reply.cid,
209 sortAt: obj.reply.createdAt,
210 });
211 // found hidden reply, don't notify any higher ancestors
212 if (threadgateHiddenReplies.includes(ancestorUri.toString())) break;
213 }
214 }
215
216 // descendents indicate out-of-order indexing: need to notify
217 // everything upwards of the current reply
218 for (const descendent of obj.descendents ?? []) {
219 for (const ancestor of obj.ancestors ?? []) {
220 const totalHeight = descendent.depth + ancestor.height;
221 if (totalHeight < REPLY_NOTIF_DEPTH) {
222 const ancestorUri = new AtUri(ancestor.uri);
223 maybeNotify({
224 did: ancestorUri.host,
225 reason: "reply",
226 reasonSubject: ancestorUri.toString(),
227 author: descendent.creator,
228 recordUri: descendent.uri,
229 recordCid: descendent.cid,
230 sortAt: descendent.sortAt,
231 });
232 }
233 }
234 }
235
236 return notifs;
237};
238
239const deleteFn = async (
240 db: Database,
241 uri: AtUri,
242): Promise<IndexedReply | null> => {
243 const uriStr = uri.toString();
244 const deleted = await db.models.Reply.findOneAndDelete({ uri: uriStr });
245
246 if (!deleted) {
247 return null;
248 }
249
250 return {
251 reply: deleted,
252 facets: [], // Not used
253 };
254};
255
256const notifsForDelete = (
257 deleted: IndexedReply,
258 replacedBy: IndexedReply | null,
259) => {
260 const notifs = replacedBy ? notifsForInsert(replacedBy) : [];
261 return {
262 notifs,
263 toDelete: [deleted.reply.uri],
264 };
265};
266
267const updateAggregates = async (db: Database, replyIdx: IndexedReply) => {
268 if (replyIdx.reply.reply?.parent?.uri) {
269 const parentPost = await db.models.Post.findOne({
270 uri: replyIdx.reply.reply?.parent.uri,
271 });
272 const [parentReply, parentCrosspostReply, nativeReplyCount] = await Promise
273 .all([
274 db.models.Reply.findOne({
275 uri: replyIdx.reply.reply?.parent.uri,
276 }),
277 db.models.CrosspostReply.findOne({
278 uri: replyIdx.reply.reply?.parent.uri,
279 }),
280 db.models.Reply.countDocuments({
281 "reply.parent.uri": replyIdx.reply.reply.parent.uri,
282 }),
283 ]);
284 const replyCount = nativeReplyCount;
285
286 if (parentPost) {
287 await db.models.Post.findOneAndUpdate(
288 { uri: replyIdx.reply.reply?.parent.uri },
289 { $set: { replyCount } },
290 { returnDocument: "after" },
291 );
292 } else if (parentReply) {
293 await db.models.Reply.findOneAndUpdate(
294 { uri: replyIdx.reply.reply?.parent.uri },
295 { $set: { replyCount } },
296 { returnDocument: "after" },
297 );
298 } else if (parentCrosspostReply) {
299 await db.models.CrosspostReply.findOneAndUpdate(
300 { uri: replyIdx.reply.reply?.parent.uri },
301 { $set: { replyCount } },
302 { returnDocument: "after" },
303 );
304 }
305 }
306};
307
308export type PluginType = RecordProcessor<typeof schema, IndexedReply>;
309
310export const makePlugin = (
311 db: Database,
312 background: BackgroundQueue,
313): PluginType => {
314 return new RecordProcessor(db, background, {
315 schema,
316 insertFn,
317 findDuplicate,
318 deleteFn,
319 notifsForInsert,
320 notifsForDelete,
321 updateAggregates,
322 });
323};
324
325export default makePlugin;
326
327async function validateReply(
328 db: Database,
329 reply: ReplyRef,
330) {
331 const replyRefs = await getReplyRefs(db, reply);
332 const invalidReplyRoot = !replyRefs.parent ||
333 checkInvalidReplyRoot(reply, replyRefs.parent);
334 return {
335 invalidReplyRoot,
336 };
337}
338
339async function getReplyRefs(db: Database, reply: ReplyRef) {
340 const replyRoot = reply.root.uri;
341 const replyParent = reply.parent.uri;
342
343 const [root, parent] = await Promise.all([
344 db.models.Record.findOne({ uri: replyRoot }).lean(),
345 db.models.Record.findOne({ uri: replyParent }).lean(),
346 ]);
347
348 return {
349 root: root && root.json
350 ? {
351 uri: root.uri,
352 invalidReplyRoot: root.invalidReplyRoot ?? null,
353 record: lexParse(root.json, { strict: false }) as ReplyRecord,
354 }
355 : null,
356 parent: parent && parent.json
357 ? {
358 uri: parent.uri,
359 invalidReplyRoot: parent.invalidReplyRoot ?? null,
360 record: lexParse(parent.json, { strict: false }) as ReplyRecord,
361 }
362 : null,
363 };
364}