[READ ONLY MIRROR] Spark Social AppView Server github.com/sprksocial/server
atproto deno hono lexicon
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eb947da8a3a8a7d485ff132b3ff0db4a8baaac19 311 lines 7.8 kB view raw
1import { CID } from "multiformats/cid"; 2import { AtUri } from "@atp/syntax"; 3import * as lex from "../../../lex/lexicons.ts"; 4import { 5 isMain as isMediaImages, 6 Main as MediaImages, 7} from "../../../lex/types/so/sprk/media/images.ts"; 8import { Main as MediaImage } from "../../../lex/types/so/sprk/media/image.ts"; 9import { 10 isMain as isMediaVideo, 11 Main as MediaVideo, 12} from "../../../lex/types/so/sprk/media/video.ts"; 13import { Record as PostRecord } from "../../../lex/types/so/sprk/feed/post.ts"; 14import { Record as GateRecord } from "../../../lex/types/so/sprk/feed/threadgate.ts"; 15import { 16 isLink, 17 isMention, 18} from "../../../lex/types/so/sprk/richtext/facet.ts"; 19import { BackgroundQueue } from "../../background.ts"; 20import { Database } from "../../db/index.ts"; 21import { PostDocument } from "../../db/models.ts"; 22import { getDescendents } from "../../util.ts"; 23import { RecordProcessor } from "../processor.ts"; 24 25type PostAncestor = { 26 uri: string; 27 height: number; 28}; 29type PostDescendent = { 30 uri: string; 31 depth: number; 32 cid: string; 33 creator: string; 34 sortAt: string; 35}; 36type IndexedPost = { 37 post: PostDocument; 38 facets?: { type: "mention" | "link"; value: string }[]; 39 medias?: Array<{ 40 position?: number; 41 imageCid?: string; 42 alt?: string | null; 43 thumbCid?: string | null; 44 videoCid?: string; 45 }>; 46 ancestors?: PostAncestor[]; 47 descendents?: PostDescendent[]; 48 threadgate?: GateRecord; 49}; 50 51const lexId = lex.ids.SoSprkFeedPost; 52 53const REPLY_NOTIF_DEPTH = 5; 54 55const insertFn = async ( 56 db: Database, 57 uri: AtUri, 58 cid: CID, 59 obj: PostRecord, 60 timestamp: string, 61): Promise<IndexedPost | null> => { 62 const post = { 63 uri: uri.toString(), 64 cid: cid.toString(), 65 authorDid: uri.host, 66 caption: obj.caption 67 ? { 68 text: obj.caption.text || "", 69 facets: obj.caption.facets || [], 70 } 71 : undefined, 72 media: obj.media || null, 73 sound: obj.sound || null, 74 langs: obj.langs || [], 75 labels: obj.labels || null, 76 tags: obj.tags || [], 77 createdAt: obj.createdAt, 78 indexedAt: timestamp, 79 }; 80 81 const insertedPost = await db.models.Post.findOneAndUpdate( 82 { uri: post.uri }, 83 { $set: post }, 84 { upsert: true, new: true }, 85 ); 86 87 const facets = (obj.caption?.facets || []) 88 .flatMap((facet) => facet.features) 89 .flatMap((feature) => { 90 if (isMention(feature)) { 91 return { 92 type: "mention" as const, 93 value: feature.did, 94 }; 95 } 96 if (isLink(feature)) { 97 return { 98 type: "link" as const, 99 value: feature.uri, 100 }; 101 } 102 return []; 103 }); 104 105 // Media processing - medias are stored inline in the Post model 106 const medias: Array<{ 107 position?: number; 108 imageCid?: string; 109 alt?: string | null; 110 thumbCid?: string | null; 111 videoCid?: string; 112 }> = []; 113 const postMedias = separateMedia(obj.media); 114 for (const postMedia of postMedias) { 115 if (isMediaImages(postMedia)) { 116 const { images } = postMedia as MediaImages; 117 const imagesMedia = images.map(( 118 img: MediaImage, 119 i: number, 120 ) => ({ 121 position: i, 122 imageCid: img.image.ref.toString(), 123 alt: img.alt, 124 })); 125 medias.push(...imagesMedia); 126 } else if (isMediaVideo(postMedia)) { 127 const media = postMedia as MediaVideo; 128 const videoMedia = { 129 postUri: uri.toString(), 130 videoCid: media.video.ref.toString(), 131 alt: media.alt ?? null, 132 }; 133 medias.push(videoMedia); 134 } 135 } 136 137 const descendents = await getDescendents(db, { 138 uri: post.uri, 139 depth: REPLY_NOTIF_DEPTH, 140 }); 141 142 return { 143 post: insertedPost, 144 facets, 145 medias, 146 descendents, 147 }; 148}; 149 150const findDuplicate = (): AtUri | null => { 151 return null; 152}; 153 154const notifsForInsert = (obj: IndexedPost) => { 155 const notifs: Array<{ 156 did: string; 157 reason: string; 158 author: string; 159 recordUri: string; 160 recordCid: string; 161 sortAt: string; 162 reasonSubject?: string; 163 }> = []; 164 const notified = new Set([obj.post.authorDid]); 165 const maybeNotify = (notif: { 166 did: string; 167 reason: string; 168 author: string; 169 recordUri: string; 170 recordCid: string; 171 sortAt: string; 172 reasonSubject?: string; 173 }) => { 174 if (!notified.has(notif.did)) { 175 notified.add(notif.did); 176 notifs.push(notif); 177 } 178 }; 179 for (const facet of obj.facets ?? []) { 180 if (facet.type === "mention") { 181 maybeNotify({ 182 did: facet.value, 183 reason: "mention", 184 author: obj.post.authorDid, 185 recordUri: obj.post.uri, 186 recordCid: obj.post.cid, 187 sortAt: obj.post.createdAt, 188 }); 189 } 190 } 191 192 const threadgateHiddenReplies = obj.threadgate?.hiddenReplies || []; 193 194 // reply notifications 195 for (const ancestor of obj.ancestors ?? []) { 196 if (ancestor.uri === obj.post.uri) continue; // no need to notify for own post 197 if (ancestor.height < REPLY_NOTIF_DEPTH) { 198 const ancestorUri = new AtUri(ancestor.uri); 199 maybeNotify({ 200 did: ancestorUri.host, 201 reason: "reply", 202 reasonSubject: ancestorUri.toString(), 203 author: obj.post.authorDid, 204 recordUri: obj.post.uri, 205 recordCid: obj.post.cid, 206 sortAt: obj.post.createdAt, 207 }); 208 // found hidden reply, don't notify any higher ancestors 209 if (threadgateHiddenReplies.includes(ancestorUri.toString())) break; 210 } 211 } 212 213 // descendents indicate out-of-order indexing: need to notify 214 // the current post and upwards. 215 for (const descendent of obj.descendents ?? []) { 216 for (const ancestor of obj.ancestors ?? []) { 217 const totalHeight = descendent.depth + ancestor.height; 218 if (totalHeight < REPLY_NOTIF_DEPTH) { 219 const ancestorUri = new AtUri(ancestor.uri); 220 maybeNotify({ 221 did: ancestorUri.host, 222 reason: "reply", 223 reasonSubject: ancestorUri.toString(), 224 author: descendent.creator, 225 recordUri: descendent.uri, 226 recordCid: descendent.cid, 227 sortAt: descendent.sortAt, 228 }); 229 } 230 } 231 } 232 233 return notifs; 234}; 235 236const deleteFn = async ( 237 db: Database, 238 uri: AtUri, 239): Promise<IndexedPost | null> => { 240 const uriStr = uri.toString(); 241 const deleted = await db.models.Post.findOneAndDelete({ uri: uriStr }); 242 243 if (!deleted) { 244 return null; 245 } 246 247 return { 248 post: deleted, 249 facets: [], 250 }; 251}; 252 253const notifsForDelete = ( 254 deleted: IndexedPost, 255 replacedBy: IndexedPost | null, 256) => { 257 const notifs = replacedBy ? notifsForInsert(replacedBy) : []; 258 return { 259 notifs, 260 toDelete: [deleted.post.uri], 261 }; 262}; 263 264const updateAggregates = async (db: Database, postIdx: IndexedPost) => { 265 // Update posts count for author 266 const postsCount = await db.models.Post.countDocuments({ 267 authorDid: postIdx.post.authorDid, 268 }); 269 270 // First check if profile exists to avoid creating one with null URI 271 const existingProfile = await db.models.Profile.findOne({ 272 authorDid: postIdx.post.authorDid, 273 }); 274 275 if (existingProfile) { 276 // Only update existing profiles 277 await db.models.Profile.findOneAndUpdate( 278 { authorDid: postIdx.post.authorDid }, 279 { $set: { postsCount } }, 280 { new: true }, 281 ); 282 } 283}; 284 285export type PluginType = RecordProcessor<PostRecord, IndexedPost>; 286 287export const makePlugin = ( 288 db: Database, 289 background: BackgroundQueue, 290): PluginType => { 291 return new RecordProcessor(db, background, { 292 lexId, 293 insertFn, 294 findDuplicate, 295 deleteFn, 296 notifsForInsert, 297 notifsForDelete, 298 updateAggregates, 299 }); 300}; 301 302export default makePlugin; 303 304function separateMedia( 305 media: PostRecord["media"], 306): Array<NonNullable<PostRecord["media"]>> { 307 if (!media) { 308 return []; 309 } 310 return [media]; 311}