[READ ONLY MIRROR] Spark Social AppView Server github.com/sprksocial/server
atproto deno hono lexicon
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 364 lines 9.3 kB view raw
1import { Cid, lexParse } from "@atp/lex"; 2import { AtUri } from "@atp/syntax"; 3import * as so from "../../../lex/so.ts"; 4import { BackgroundQueue } from "../../background.ts"; 5import { Database } from "../../db/index.ts"; 6import { ReplyDocument } from "../../db/models.ts"; 7import { 8 getAncestorsAndSelf, 9 getDescendents, 10 invalidReplyRoot as checkInvalidReplyRoot, 11} from "../../util.ts"; 12import { RecordProcessor } from "../processor.ts"; 13 14type Ancestor = { 15 uri: string; 16 height: number; 17}; 18type Descendent = { 19 uri: string; 20 depth: number; 21 cid: string; 22 creator: string; 23 sortAt: string; 24}; 25type ReplyRecord = so.sprk.feed.reply.Main; 26type ReplyRef = so.sprk.feed.reply.ReplyRef; 27type GateRecord = so.sprk.feed.threadgate.Main; 28type IndexedReply = { 29 reply: ReplyDocument; 30 facets?: { type: "mention" | "link"; value: string }[]; 31 media?: { 32 cid?: string; 33 alt?: string | null; 34 }; 35 ancestors?: Ancestor[]; 36 descendents?: Descendent[]; 37 threadgate?: GateRecord; 38}; 39 40const schema = so.sprk.feed.reply.main; 41const isMediaImage = so.sprk.media.image.$matches; 42const isMention = so.sprk.richtext.facet.mention.$matches; 43const isLink = so.sprk.richtext.facet.link.$matches; 44 45const REPLY_NOTIF_DEPTH = 5; 46 47const insertFn = async ( 48 db: Database, 49 uri: AtUri, 50 cid: Cid, 51 obj: ReplyRecord, 52 timestamp: string, 53): Promise<IndexedReply | null> => { 54 const reply = { 55 uri: uri.toString(), 56 cid: cid.toString(), 57 authorDid: uri.host, 58 text: obj.text || "", 59 facets: obj.facets || [], 60 reply: obj.reply 61 ? { 62 root: { 63 uri: obj.reply.root.uri, 64 cid: obj.reply.root.cid, 65 }, 66 parent: { 67 uri: obj.reply.parent.uri, 68 cid: obj.reply.parent.cid, 69 }, 70 } 71 : null, 72 media: obj.media, 73 langs: obj.langs || [], 74 labels: obj.labels || null, 75 tags: (obj as { tags?: string[] }).tags || [], 76 createdAt: obj.createdAt, 77 indexedAt: timestamp, 78 }; 79 80 // Use findOneAndUpdate with upsert to handle potential duplicate key errors 81 const insertedReply = await db.models.Reply.findOneAndUpdate( 82 { uri: reply.uri }, 83 { $set: reply }, 84 { upsert: true, returnDocument: "after" }, 85 ); 86 87 if (obj.reply) { 88 const { invalidReplyRoot } = await validateReply( 89 db, 90 obj.reply, 91 ); 92 if (invalidReplyRoot) { 93 Object.assign(insertedReply, { invalidReplyRoot }); 94 await db.models.Reply.updateOne( 95 { uri: reply.uri }, 96 { $set: { invalidReplyRoot } }, 97 ); 98 } 99 } 100 101 const facets = (obj.facets || []) 102 .flatMap((facet) => facet.features) 103 .flatMap((feature) => { 104 if (isMention(feature)) { 105 return { 106 type: "mention" as const, 107 value: feature.did, 108 }; 109 } 110 if (isLink(feature)) { 111 return { 112 type: "link" as const, 113 value: feature.uri, 114 }; 115 } 116 return []; 117 }); 118 119 // Embed processing - embeds are stored inline in the Post model 120 let media: { 121 postUri?: string; 122 cid?: string; 123 alt?: string; 124 } = {}; 125 if (isMediaImage(obj.media)) { 126 const imageMedia = { 127 postUri: uri.toString(), 128 cid: obj.media.image.ref.toString(), 129 alt: obj.media.alt as string, 130 }; 131 media = imageMedia; 132 } 133 134 const ancestors = await getAncestorsAndSelf(db, { 135 uri: reply.uri, 136 parentHeight: REPLY_NOTIF_DEPTH, 137 }); 138 const descendents = await getDescendents(db, { 139 uri: reply.uri, 140 depth: REPLY_NOTIF_DEPTH, 141 }); 142 143 return { 144 reply: insertedReply, 145 facets, 146 media, 147 ancestors, 148 descendents, 149 }; 150}; 151 152const findDuplicate = (): AtUri | null => { 153 return null; 154}; 155 156const notifsForInsert = (obj: IndexedReply) => { 157 const notifs: Array<{ 158 did: string; 159 reason: string; 160 author: string; 161 recordUri: string; 162 recordCid: string; 163 sortAt: string; 164 reasonSubject?: string; 165 }> = []; 166 const notified = new Set([obj.reply.authorDid]); 167 const maybeNotify = (notif: { 168 did: string; 169 reason: string; 170 author: string; 171 recordUri: string; 172 recordCid: string; 173 sortAt: string; 174 reasonSubject?: string; 175 }) => { 176 if (!notified.has(notif.did)) { 177 notified.add(notif.did); 178 notifs.push(notif); 179 } 180 }; 181 for (const facet of obj.facets ?? []) { 182 if (facet.type === "mention") { 183 maybeNotify({ 184 did: facet.value, 185 reason: "mention", 186 author: obj.reply.authorDid, 187 recordUri: obj.reply.uri, 188 recordCid: obj.reply.cid, 189 sortAt: obj.reply.createdAt, 190 }); 191 } 192 } 193 194 const threadgateHiddenReplies = (obj.threadgate?.hiddenReplies || []) 195 .map(String); 196 197 // reply notifications 198 for (const ancestor of obj.ancestors ?? []) { 199 if (ancestor.uri === obj.reply.uri) continue; 200 if (ancestor.height < REPLY_NOTIF_DEPTH) { 201 const ancestorUri = new AtUri(ancestor.uri); 202 maybeNotify({ 203 did: ancestorUri.host, 204 reason: "reply", 205 reasonSubject: ancestorUri.toString(), 206 author: obj.reply.authorDid, 207 recordUri: obj.reply.uri, 208 recordCid: obj.reply.cid, 209 sortAt: obj.reply.createdAt, 210 }); 211 // found hidden reply, don't notify any higher ancestors 212 if (threadgateHiddenReplies.includes(ancestorUri.toString())) break; 213 } 214 } 215 216 // descendents indicate out-of-order indexing: need to notify 217 // everything upwards of the current reply 218 for (const descendent of obj.descendents ?? []) { 219 for (const ancestor of obj.ancestors ?? []) { 220 const totalHeight = descendent.depth + ancestor.height; 221 if (totalHeight < REPLY_NOTIF_DEPTH) { 222 const ancestorUri = new AtUri(ancestor.uri); 223 maybeNotify({ 224 did: ancestorUri.host, 225 reason: "reply", 226 reasonSubject: ancestorUri.toString(), 227 author: descendent.creator, 228 recordUri: descendent.uri, 229 recordCid: descendent.cid, 230 sortAt: descendent.sortAt, 231 }); 232 } 233 } 234 } 235 236 return notifs; 237}; 238 239const deleteFn = async ( 240 db: Database, 241 uri: AtUri, 242): Promise<IndexedReply | null> => { 243 const uriStr = uri.toString(); 244 const deleted = await db.models.Reply.findOneAndDelete({ uri: uriStr }); 245 246 if (!deleted) { 247 return null; 248 } 249 250 return { 251 reply: deleted, 252 facets: [], // Not used 253 }; 254}; 255 256const notifsForDelete = ( 257 deleted: IndexedReply, 258 replacedBy: IndexedReply | null, 259) => { 260 const notifs = replacedBy ? notifsForInsert(replacedBy) : []; 261 return { 262 notifs, 263 toDelete: [deleted.reply.uri], 264 }; 265}; 266 267const updateAggregates = async (db: Database, replyIdx: IndexedReply) => { 268 if (replyIdx.reply.reply?.parent?.uri) { 269 const parentPost = await db.models.Post.findOne({ 270 uri: replyIdx.reply.reply?.parent.uri, 271 }); 272 const [parentReply, parentCrosspostReply, nativeReplyCount] = await Promise 273 .all([ 274 db.models.Reply.findOne({ 275 uri: replyIdx.reply.reply?.parent.uri, 276 }), 277 db.models.CrosspostReply.findOne({ 278 uri: replyIdx.reply.reply?.parent.uri, 279 }), 280 db.models.Reply.countDocuments({ 281 "reply.parent.uri": replyIdx.reply.reply.parent.uri, 282 }), 283 ]); 284 const replyCount = nativeReplyCount; 285 286 if (parentPost) { 287 await db.models.Post.findOneAndUpdate( 288 { uri: replyIdx.reply.reply?.parent.uri }, 289 { $set: { replyCount } }, 290 { returnDocument: "after" }, 291 ); 292 } else if (parentReply) { 293 await db.models.Reply.findOneAndUpdate( 294 { uri: replyIdx.reply.reply?.parent.uri }, 295 { $set: { replyCount } }, 296 { returnDocument: "after" }, 297 ); 298 } else if (parentCrosspostReply) { 299 await db.models.CrosspostReply.findOneAndUpdate( 300 { uri: replyIdx.reply.reply?.parent.uri }, 301 { $set: { replyCount } }, 302 { returnDocument: "after" }, 303 ); 304 } 305 } 306}; 307 308export type PluginType = RecordProcessor<typeof schema, IndexedReply>; 309 310export const makePlugin = ( 311 db: Database, 312 background: BackgroundQueue, 313): PluginType => { 314 return new RecordProcessor(db, background, { 315 schema, 316 insertFn, 317 findDuplicate, 318 deleteFn, 319 notifsForInsert, 320 notifsForDelete, 321 updateAggregates, 322 }); 323}; 324 325export default makePlugin; 326 327async function validateReply( 328 db: Database, 329 reply: ReplyRef, 330) { 331 const replyRefs = await getReplyRefs(db, reply); 332 const invalidReplyRoot = !replyRefs.parent || 333 checkInvalidReplyRoot(reply, replyRefs.parent); 334 return { 335 invalidReplyRoot, 336 }; 337} 338 339async function getReplyRefs(db: Database, reply: ReplyRef) { 340 const replyRoot = reply.root.uri; 341 const replyParent = reply.parent.uri; 342 343 const [root, parent] = await Promise.all([ 344 db.models.Record.findOne({ uri: replyRoot }).lean(), 345 db.models.Record.findOne({ uri: replyParent }).lean(), 346 ]); 347 348 return { 349 root: root && root.json 350 ? { 351 uri: root.uri, 352 invalidReplyRoot: root.invalidReplyRoot ?? null, 353 record: lexParse(root.json, { strict: false }) as ReplyRecord, 354 } 355 : null, 356 parent: parent && parent.json 357 ? { 358 uri: parent.uri, 359 invalidReplyRoot: parent.invalidReplyRoot ?? null, 360 record: lexParse(parent.json, { strict: false }) as ReplyRecord, 361 } 362 : null, 363 }; 364}