[READ ONLY MIRROR] Spark Social AppView Server github.com/sprksocial/server
atproto deno hono lexicon
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 307 lines 7.7 kB view raw
1import { Cid } from "@atp/lex"; 2import { AtUri } from "@atp/syntax"; 3import * as so from "../../../lex/so.ts"; 4import { BackgroundQueue } from "../../background.ts"; 5import { Database } from "../../db/index.ts"; 6import { PostDocument } from "../../db/models.ts"; 7import { getDescendents } from "../../util.ts"; 8import { RecordProcessor } from "../processor.ts"; 9 10type PostAncestor = { 11 uri: string; 12 height: number; 13}; 14type PostDescendent = { 15 uri: string; 16 depth: number; 17 cid: string; 18 creator: string; 19 sortAt: string; 20}; 21type MediaImages = so.sprk.media.images.Main; 22type MediaImage = so.sprk.media.image.Main; 23type MediaVideo = so.sprk.media.video.Main; 24type PostRecord = so.sprk.feed.post.Main; 25type GateRecord = so.sprk.feed.threadgate.Main; 26type IndexedPost = { 27 post: PostDocument; 28 facets?: { type: "mention" | "link"; value: string }[]; 29 medias?: Array<{ 30 position?: number; 31 imageCid?: string; 32 alt?: string | null; 33 thumbCid?: string | null; 34 videoCid?: string; 35 }>; 36 ancestors?: PostAncestor[]; 37 descendents?: PostDescendent[]; 38 threadgate?: GateRecord; 39}; 40 41const schema = so.sprk.feed.post.main; 42const isMediaImages = so.sprk.media.images.$matches; 43const isMediaVideo = so.sprk.media.video.$matches; 44const isMention = so.sprk.richtext.facet.mention.$matches; 45const isLink = so.sprk.richtext.facet.link.$matches; 46 47const REPLY_NOTIF_DEPTH = 5; 48 49const insertFn = async ( 50 db: Database, 51 uri: AtUri, 52 cid: Cid, 53 obj: PostRecord, 54 timestamp: string, 55): Promise<IndexedPost | null> => { 56 const post = { 57 uri: uri.toString(), 58 cid: cid.toString(), 59 authorDid: uri.host, 60 caption: obj.caption 61 ? { 62 text: obj.caption.text || "", 63 facets: obj.caption.facets || [], 64 } 65 : undefined, 66 media: obj.media || null, 67 sound: obj.sound || null, 68 langs: obj.langs || [], 69 labels: obj.labels || null, 70 tags: obj.tags || [], 71 crossposts: obj.crossposts || [], 72 createdAt: obj.createdAt, 73 indexedAt: timestamp, 74 }; 75 76 const insertedPost = await db.models.Post.findOneAndUpdate( 77 { uri: post.uri }, 78 { $set: post }, 79 { upsert: true, returnDocument: "after" }, 80 ); 81 82 const facets = (obj.caption?.facets || []) 83 .flatMap((facet) => facet.features) 84 .flatMap((feature) => { 85 if (isMention(feature)) { 86 return { 87 type: "mention" as const, 88 value: feature.did, 89 }; 90 } 91 if (isLink(feature)) { 92 return { 93 type: "link" as const, 94 value: feature.uri, 95 }; 96 } 97 return []; 98 }); 99 100 // Media processing - medias are stored inline in the Post model 101 const medias: Array<{ 102 position?: number; 103 imageCid?: string; 104 alt?: string | null; 105 thumbCid?: string | null; 106 videoCid?: string; 107 }> = []; 108 const postMedias = separateMedia(obj.media); 109 for (const postMedia of postMedias) { 110 if (isMediaImages(postMedia)) { 111 const { images } = postMedia as MediaImages; 112 const imagesMedia = images.map(( 113 img: MediaImage, 114 i: number, 115 ) => ({ 116 position: i, 117 imageCid: img.image.ref.toString(), 118 alt: img.alt, 119 })); 120 medias.push(...imagesMedia); 121 } else if (isMediaVideo(postMedia)) { 122 const media = postMedia as MediaVideo; 123 const videoMedia = { 124 postUri: uri.toString(), 125 videoCid: media.video.ref.toString(), 126 alt: media.alt ?? null, 127 }; 128 medias.push(videoMedia); 129 } 130 } 131 132 const descendents = await getDescendents(db, { 133 uri: post.uri, 134 depth: REPLY_NOTIF_DEPTH, 135 }); 136 137 return { 138 post: insertedPost, 139 facets, 140 medias, 141 descendents, 142 }; 143}; 144 145const findDuplicate = (): AtUri | null => { 146 return null; 147}; 148 149const notifsForInsert = (obj: IndexedPost) => { 150 const notifs: Array<{ 151 did: string; 152 reason: string; 153 author: string; 154 recordUri: string; 155 recordCid: string; 156 sortAt: string; 157 reasonSubject?: string; 158 }> = []; 159 const notified = new Set([obj.post.authorDid]); 160 const maybeNotify = (notif: { 161 did: string; 162 reason: string; 163 author: string; 164 recordUri: string; 165 recordCid: string; 166 sortAt: string; 167 reasonSubject?: string; 168 }) => { 169 if (!notified.has(notif.did)) { 170 notified.add(notif.did); 171 notifs.push(notif); 172 } 173 }; 174 for (const facet of obj.facets ?? []) { 175 if (facet.type === "mention") { 176 maybeNotify({ 177 did: facet.value, 178 reason: "mention", 179 author: obj.post.authorDid, 180 recordUri: obj.post.uri, 181 recordCid: obj.post.cid, 182 sortAt: obj.post.createdAt, 183 }); 184 } 185 } 186 187 const threadgateHiddenReplies = (obj.threadgate?.hiddenReplies || []) 188 .map(String); 189 190 // reply notifications 191 for (const ancestor of obj.ancestors ?? []) { 192 if (ancestor.uri === obj.post.uri) continue; // no need to notify for own post 193 if (ancestor.height < REPLY_NOTIF_DEPTH) { 194 const ancestorUri = new AtUri(ancestor.uri); 195 maybeNotify({ 196 did: ancestorUri.host, 197 reason: "reply", 198 reasonSubject: ancestorUri.toString(), 199 author: obj.post.authorDid, 200 recordUri: obj.post.uri, 201 recordCid: obj.post.cid, 202 sortAt: obj.post.createdAt, 203 }); 204 // found hidden reply, don't notify any higher ancestors 205 if (threadgateHiddenReplies.includes(ancestorUri.toString())) break; 206 } 207 } 208 209 // descendents indicate out-of-order indexing: need to notify 210 // the current post and upwards. 211 for (const descendent of obj.descendents ?? []) { 212 for (const ancestor of obj.ancestors ?? []) { 213 const totalHeight = descendent.depth + ancestor.height; 214 if (totalHeight < REPLY_NOTIF_DEPTH) { 215 const ancestorUri = new AtUri(ancestor.uri); 216 maybeNotify({ 217 did: ancestorUri.host, 218 reason: "reply", 219 reasonSubject: ancestorUri.toString(), 220 author: descendent.creator, 221 recordUri: descendent.uri, 222 recordCid: descendent.cid, 223 sortAt: descendent.sortAt, 224 }); 225 } 226 } 227 } 228 229 return notifs; 230}; 231 232const deleteFn = async ( 233 db: Database, 234 uri: AtUri, 235): Promise<IndexedPost | null> => { 236 const uriStr = uri.toString(); 237 const deleted = await db.models.Post.findOneAndDelete({ uri: uriStr }); 238 239 if (!deleted) { 240 return null; 241 } 242 243 return { 244 post: deleted, 245 facets: [], 246 }; 247}; 248 249const notifsForDelete = ( 250 deleted: IndexedPost, 251 replacedBy: IndexedPost | null, 252) => { 253 const notifs = replacedBy ? notifsForInsert(replacedBy) : []; 254 return { 255 notifs, 256 toDelete: [deleted.post.uri], 257 }; 258}; 259 260const updateAggregates = async (db: Database, postIdx: IndexedPost) => { 261 // Update posts count for author 262 const postsCount = await db.models.Post.countDocuments({ 263 authorDid: postIdx.post.authorDid, 264 }); 265 266 // First check if profile exists to avoid creating one with null URI 267 const existingProfile = await db.models.Profile.findOne({ 268 authorDid: postIdx.post.authorDid, 269 }); 270 271 if (existingProfile) { 272 // Only update existing profiles 273 await db.models.Profile.findOneAndUpdate( 274 { authorDid: postIdx.post.authorDid }, 275 { $set: { postsCount } }, 276 { returnDocument: "after" }, 277 ); 278 } 279}; 280 281export type PluginType = RecordProcessor<typeof schema, IndexedPost>; 282 283export const makePlugin = ( 284 db: Database, 285 background: BackgroundQueue, 286): PluginType => { 287 return new RecordProcessor(db, background, { 288 schema, 289 insertFn, 290 findDuplicate, 291 deleteFn, 292 notifsForInsert, 293 notifsForDelete, 294 updateAggregates, 295 }); 296}; 297 298export default makePlugin; 299 300function separateMedia( 301 media: PostRecord["media"], 302): Array<NonNullable<PostRecord["media"]>> { 303 if (!media) { 304 return []; 305 } 306 return [media]; 307}