[READ ONLY MIRROR] Spark Social AppView Server github.com/sprksocial/server
atproto deno hono lexicon
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: crosspost replies

+792 -40
+1 -1
compose.dev.yaml
··· 70 70 build: 71 71 context: . 72 72 dockerfile: Dockerfile 73 - command: ["deno", "run", "-A", "--watch", "ingest.ts"] 73 + command: ["deno", "run", "-A", "--watch", "ingest/index.ts"] 74 74 environment: 75 75 NODE_ENV: development 76 76 HOST: 0.0.0.0
+13 -6
data-plane/db/index.ts
··· 71 71 "Reply", 72 72 models.replySchema, 73 73 ), 74 + CrosspostReply: this.connection.model<models.CrosspostReplyDocument>( 75 + "CrosspostReply", 76 + models.crosspostReplySchema, 77 + ), 74 78 Story: this.connection.model<models.StoryDocument>( 75 79 "Story", 76 80 models.storySchema, ··· 203 207 return getResultFromDoc(doc); 204 208 } 205 209 206 - async getCursorState(): Promise<number | null> { 210 + async getCursorState(identifier = "last_processed_cursor"): Promise<number | null> { 207 211 try { 208 212 const cursorState = await this.models.CursorState.findOne({ 209 - identifier: "last_processed_cursor", 213 + identifier, 210 214 }); 211 215 return cursorState?.cursorValue || null; 212 216 } catch (error) { 213 - console.error("Failed to get cursor state", { error }); 217 + console.error("Failed to get cursor state", { error, identifier }); 214 218 return null; 215 219 } 216 220 } 217 221 218 - async saveCursorState(cursorPosition: number): Promise<void> { 222 + async saveCursorState( 223 + cursorPosition: number, 224 + identifier = "last_processed_cursor", 225 + ): Promise<void> { 219 226 try { 220 227 await this.models.CursorState.findOneAndUpdate( 221 - { identifier: "last_processed_cursor" }, 228 + { identifier }, 222 229 { 223 230 cursorValue: cursorPosition, 224 231 updatedAt: new Date(), ··· 228 235 } catch (error) { 229 236 console.error( 230 237 "Failed to save cursor state", 231 - { error, cursorPosition }, 238 + { error, cursorPosition, identifier }, 232 239 ); 233 240 } 234 241 }
+50
data-plane/db/models.ts
··· 282 282 langs?: string[]; 283 283 labels?: Label[]; 284 284 tags?: string[]; 285 + crossposts?: RecordRef[]; 285 286 likeCount: number; 286 287 replyCount: number; 287 288 repostCount: number; ··· 306 307 langs: { type: [String], required: false, default: [] }, 307 308 labels: { type: [Object], required: false, default: [] }, 308 309 tags: { type: [String], required: false, default: [] }, 310 + crossposts: { type: [Object], required: false, default: [] }, 309 311 likeCount: { type: Number, required: true, default: 0 }, 310 312 replyCount: { type: Number, required: true, default: 0 }, 311 313 repostCount: { type: Number, required: true, default: 0 }, ··· 325 327 media?: ImageMedia | { images?: ImageMedia[]; [key: string]: unknown }; 326 328 langs?: string[]; 327 329 labels?: Label[]; 330 + invalidReplyRoot?: boolean; 328 331 likeCount: number; 329 332 replyCount: number; 330 333 } ··· 348 351 media: { type: Object, required: false }, 349 352 langs: { type: [String], required: false, default: [] }, 350 353 labels: { type: [Object], required: false, default: [] }, 354 + invalidReplyRoot: { type: Boolean, required: false }, 355 + likeCount: { type: Number, required: true, default: 0 }, 356 + replyCount: { type: Number, required: true, default: 0 }, 357 + }) 358 + .index({ reply: 1, createdAt: -1 }) 359 + .index({ "reply.parent.uri": 1, authorDid: 1 }) 360 + .index({ "reply.root.uri": 1, createdAt: -1 }); 361 + 362 + // crosspost replies 363 + 364 + export interface CrosspostReplyDocument extends AuthoredDocument { 365 + text?: string; 366 + facets?: Facet[]; 367 + reply?: { 368 + root: RecordRef; 369 + parent: RecordRef; 370 + }; 371 + langs?: string[]; 372 + labels?: Label[]; 373 + tags?: string[]; 374 + invalidReplyRoot?: boolean; 375 + likeCount: number; 376 + replyCount: number; 377 + } 378 + export const crosspostReplySchema = new Schema<CrosspostReplyDocument>({ 379 + ...authoredSchema, 380 + text: { type: String, required: false }, 381 + facets: { type: [Object], required: false, default: [] }, 382 + reply: { 383 + type: { 384 + root: { 385 + uri: { type: String, required: true }, 386 + cid: { type: String, required: true }, 387 + }, 388 + parent: { 389 + uri: { type: String, required: true }, 390 + cid: { type: String, required: true }, 391 + }, 392 + }, 393 + required: false, 394 + }, 395 + langs: { type: [String], required: false, default: [] }, 396 + labels: { type: [Object], required: false, default: [] }, 397 + tags: { type: [String], required: false, default: [] }, 398 + invalidReplyRoot: { type: Boolean, required: false }, 351 399 likeCount: { type: Number, required: true, default: 0 }, 352 400 replyCount: { type: Number, required: true, default: 0 }, 353 401 }) ··· 655 703 likeSchema, 656 704 postSchema, 657 705 replySchema, 706 + crosspostReplySchema, 658 707 repostSchema, 659 708 followSchema, 660 709 blockSchema, ··· 670 719 Like: Model<LikeDocument>; 671 720 Post: Model<PostDocument>; 672 721 Reply: Model<ReplyDocument>; 722 + CrosspostReply: Model<CrosspostReplyDocument>; 673 723 Story: Model<StoryDocument>; 674 724 Follow: Model<FollowDocument>; 675 725 Block: Model<BlockDocument>;
+1 -1
data-plane/index.ts
··· 22 22 import { Labels } from "./routes/labels.ts"; 23 23 import { PushTokens } from "./routes/push-tokens.ts"; 24 24 25 - export { RepoSubscription } from "./subscription.ts"; 25 + export { RepoSubscription } from "./subscription/index.ts"; 26 26 27 27 export type ServerContext = { 28 28 db: Database;
+4
data-plane/indexing/index.ts
··· 26 26 import * as Story from "./plugins/story.ts"; 27 27 import * as Audio from "./plugins/audio.ts"; 28 28 import * as Labeler from "./plugins/labeler.ts"; 29 + import * as CrosspostReply from "./plugins/crosspost/reply.ts"; 29 30 import { RecordProcessor } from "./processor.ts"; 30 31 import { ServerConfig } from "../../config.ts"; 31 32 import { PushService } from "../../utils/push.ts"; ··· 43 44 story: Story.PluginType; 44 45 audio: Audio.PluginType; 45 46 labeler: Labeler.PluginType; 47 + crosspostReply: CrosspostReply.PluginType; 46 48 }; 47 49 private pushService?: PushService; 48 50 ··· 66 68 story: Story.makePlugin(this.db, this.background), 67 69 audio: Audio.makePlugin(this.db, this.background), 68 70 labeler: Labeler.makePlugin(this.db, this.background), 71 + crosspostReply: CrosspostReply.makePlugin(this.db, this.background), 69 72 }; 70 73 71 74 // Set push service on all processors ··· 321 324 await this.db.models.Post.deleteMany({ authorDid: did }); 322 325 await this.db.models.Reply.deleteMany({ authorDid: did }); 323 326 await this.db.models.Labeler.deleteMany({ authorDid: did }); 327 + await this.db.models.CrosspostReply.deleteMany({ authorDid: did }); 324 328 } 325 329 } 326 330
+371
data-plane/indexing/plugins/crosspost/reply.ts
··· 1 + import { CID } from "multiformats/cid"; 2 + import { AtUri } from "@atp/syntax"; 3 + import * as lex from "../../../../lex/lexicons.ts"; 4 + import { Record as BskyPostRecord } from "../../../../lex/types/app/bsky/feed/post.ts"; 5 + import { 6 + isLink, 7 + isMention, 8 + } from "../../../../lex/types/app/bsky/richtext/facet.ts"; 9 + import { BackgroundQueue } from "../../../background.ts"; 10 + import { Database } from "../../../db/index.ts"; 11 + import { CrosspostReplyDocument } from "../../../db/models.ts"; 12 + import { getAncestorsAndSelf, getDescendents } from "../../../util.ts"; 13 + import { RecordProcessor } from "../../processor.ts"; 14 + 15 + type Ancestor = { 16 + uri: string; 17 + height: number; 18 + }; 19 + type Descendent = { 20 + uri: string; 21 + depth: number; 22 + cid: string; 23 + creator: string; 24 + sortAt: string; 25 + }; 26 + type IndexedReply = { 27 + reply: CrosspostReplyDocument; 28 + facets?: { type: "mention" | "link"; value: string }[]; 29 + media?: { 30 + cid?: string; 31 + alt?: string | null; 32 + }; 33 + ancestors?: Ancestor[]; 34 + descendents?: Descendent[]; 35 + }; 36 + 37 + const lexId = lex.ids.AppBskyFeedPost; 38 + 39 + const REPLY_NOTIF_DEPTH = 5; 40 + 41 + const insertFn = async ( 42 + db: Database, 43 + uri: AtUri, 44 + cid: CID, 45 + obj: BskyPostRecord, 46 + timestamp: string, 47 + ): Promise<IndexedReply | null> => { 48 + if (!obj.reply) { 49 + return null; 50 + } 51 + 52 + const sparkPost = await db.models.Post.findOne({ 53 + "crossposts.uri": obj.reply.root.uri, 54 + }); 55 + if (!sparkPost) { 56 + return null; 57 + } 58 + 59 + const mappedRoot = { 60 + uri: sparkPost.uri, 61 + cid: sparkPost.cid, 62 + }; 63 + 64 + let mappedParent = { 65 + uri: obj.reply.parent.uri, 66 + cid: obj.reply.parent.cid, 67 + }; 68 + 69 + if ( 70 + sparkPost.crossposts?.some((crosspost) => 71 + crosspost.uri === obj.reply?.parent.uri 72 + ) 73 + ) { 74 + mappedParent = { 75 + uri: sparkPost.uri, 76 + cid: sparkPost.cid, 77 + }; 78 + } else { 79 + const [parentReply, parentCrosspostReply] = await Promise.all([ 80 + db.models.Reply.findOne({ 81 + uri: obj.reply.parent.uri, 82 + }), 83 + db.models.CrosspostReply.findOne({ 84 + uri: obj.reply.parent.uri, 85 + }), 86 + ]); 87 + const parent = parentReply || parentCrosspostReply; 88 + if (parent) { 89 + mappedParent = { 90 + uri: parent.uri, 91 + cid: parent.cid, 92 + }; 93 + } 94 + } 95 + 96 + const reply = { 97 + uri: uri.toString(), 98 + cid: cid.toString(), 99 + authorDid: uri.host, 100 + text: obj.text || "", 101 + facets: obj.facets || [], 102 + reply: { 103 + root: mappedRoot, 104 + parent: mappedParent, 105 + }, 106 + langs: obj.langs || [], 107 + labels: obj.labels || null, 108 + tags: obj.tags || [], 109 + createdAt: obj.createdAt, 110 + indexedAt: timestamp, 111 + }; 112 + 113 + const insertedReply = await db.models.CrosspostReply.findOneAndUpdate( 114 + { uri: reply.uri }, 115 + { $set: reply }, 116 + { upsert: true, new: true }, 117 + ); 118 + 119 + const { invalidReplyRoot } = await validateCrosspostReply(db, insertedReply); 120 + if (invalidReplyRoot) { 121 + Object.assign(insertedReply, { invalidReplyRoot }); 122 + await db.models.CrosspostReply.updateOne( 123 + { uri: reply.uri }, 124 + { $set: { invalidReplyRoot } }, 125 + ); 126 + } 127 + 128 + const facets = (obj.facets || []) 129 + .flatMap((facet) => facet.features) 130 + .flatMap((feature) => { 131 + if (isMention(feature)) { 132 + return { 133 + type: "mention" as const, 134 + value: feature.did, 135 + }; 136 + } 137 + if (isLink(feature)) { 138 + return { 139 + type: "link" as const, 140 + value: feature.uri, 141 + }; 142 + } 143 + return []; 144 + }); 145 + 146 + const ancestors = await getAncestorsAndSelf(db, { 147 + uri: reply.uri, 148 + parentHeight: REPLY_NOTIF_DEPTH, 149 + }); 150 + const descendents = await getDescendents(db, { 151 + uri: reply.uri, 152 + depth: REPLY_NOTIF_DEPTH, 153 + }); 154 + 155 + return { 156 + reply: insertedReply, 157 + facets, 158 + media: {}, 159 + ancestors, 160 + descendents, 161 + }; 162 + }; 163 + 164 + const findDuplicate = (): AtUri | null => { 165 + return null; 166 + }; 167 + 168 + const notifsForInsert = (obj: IndexedReply) => { 169 + const notifs: Array<{ 170 + did: string; 171 + reason: string; 172 + author: string; 173 + recordUri: string; 174 + recordCid: string; 175 + sortAt: string; 176 + reasonSubject?: string; 177 + }> = []; 178 + const notified = new Set([obj.reply.authorDid]); 179 + const maybeNotify = (notif: { 180 + did: string; 181 + reason: string; 182 + author: string; 183 + recordUri: string; 184 + recordCid: string; 185 + sortAt: string; 186 + reasonSubject?: string; 187 + }) => { 188 + if (!notified.has(notif.did)) { 189 + notified.add(notif.did); 190 + notifs.push(notif); 191 + } 192 + }; 193 + for (const facet of obj.facets ?? []) { 194 + if (facet.type === "mention") { 195 + maybeNotify({ 196 + did: facet.value, 197 + reason: "mention", 198 + author: obj.reply.authorDid, 199 + recordUri: obj.reply.uri, 200 + recordCid: obj.reply.cid, 201 + sortAt: obj.reply.createdAt, 202 + }); 203 + } 204 + } 205 + 206 + for (const ancestor of obj.ancestors ?? []) { 207 + if (ancestor.uri === obj.reply.uri) continue; 208 + if (ancestor.height < REPLY_NOTIF_DEPTH) { 209 + const ancestorUri = new AtUri(ancestor.uri); 210 + maybeNotify({ 211 + did: ancestorUri.host, 212 + reason: "reply", 213 + reasonSubject: ancestorUri.toString(), 214 + author: obj.reply.authorDid, 215 + recordUri: obj.reply.uri, 216 + recordCid: obj.reply.cid, 217 + sortAt: obj.reply.createdAt, 218 + }); 219 + } 220 + } 221 + 222 + for (const descendent of obj.descendents ?? []) { 223 + for (const ancestor of obj.ancestors ?? []) { 224 + const totalHeight = descendent.depth + ancestor.height; 225 + if (totalHeight < REPLY_NOTIF_DEPTH) { 226 + const ancestorUri = new AtUri(ancestor.uri); 227 + maybeNotify({ 228 + did: ancestorUri.host, 229 + reason: "reply", 230 + reasonSubject: ancestorUri.toString(), 231 + author: descendent.creator, 232 + recordUri: descendent.uri, 233 + recordCid: descendent.cid, 234 + sortAt: descendent.sortAt, 235 + }); 236 + } 237 + } 238 + } 239 + 240 + return notifs; 241 + }; 242 + 243 + const deleteFn = async ( 244 + db: Database, 245 + uri: AtUri, 246 + ): Promise<IndexedReply | null> => { 247 + const uriStr = uri.toString(); 248 + const deleted = await db.models.CrosspostReply.findOneAndDelete({ 249 + uri: uriStr, 250 + }); 251 + 252 + if (!deleted) { 253 + return null; 254 + } 255 + 256 + return { 257 + reply: deleted, 258 + facets: [], 259 + }; 260 + }; 261 + 262 + const notifsForDelete = ( 263 + deleted: IndexedReply, 264 + replacedBy: IndexedReply | null, 265 + ) => { 266 + const notifs = replacedBy ? notifsForInsert(replacedBy) : []; 267 + return { 268 + notifs, 269 + toDelete: [deleted.reply.uri], 270 + }; 271 + }; 272 + 273 + const updateAggregates = async (db: Database, replyIdx: IndexedReply) => { 274 + if (replyIdx.reply.reply?.parent?.uri) { 275 + const parentPost = await db.models.Post.findOne({ 276 + uri: replyIdx.reply.reply?.parent.uri, 277 + }); 278 + const [ 279 + parentReply, 280 + parentCrosspostReply, 281 + nativeReplyCount, 282 + crosspostReplyCount, 283 + ] = await Promise.all([ 284 + db.models.Reply.findOne({ 285 + uri: replyIdx.reply.reply?.parent.uri, 286 + }), 287 + db.models.CrosspostReply.findOne({ 288 + uri: replyIdx.reply.reply?.parent.uri, 289 + }), 290 + db.models.Reply.countDocuments({ 291 + "reply.parent.uri": replyIdx.reply.reply.parent.uri, 292 + }), 293 + db.models.CrosspostReply.countDocuments({ 294 + "reply.parent.uri": replyIdx.reply.reply.parent.uri, 295 + }), 296 + ]); 297 + const replyCount = nativeReplyCount + crosspostReplyCount; 298 + 299 + if (parentPost) { 300 + await db.models.Post.findOneAndUpdate( 301 + { uri: replyIdx.reply.reply?.parent.uri }, 302 + { $set: { replyCount } }, 303 + { new: true }, 304 + ); 305 + } else if (parentReply) { 306 + await db.models.Reply.findOneAndUpdate( 307 + { uri: replyIdx.reply.reply?.parent.uri }, 308 + { $set: { replyCount } }, 309 + { new: true }, 310 + ); 311 + } else if (parentCrosspostReply) { 312 + await db.models.CrosspostReply.findOneAndUpdate( 313 + { uri: replyIdx.reply.reply?.parent.uri }, 314 + { $set: { replyCount } }, 315 + { new: true }, 316 + ); 317 + } 318 + } 319 + }; 320 + 321 + export type PluginType = RecordProcessor<BskyPostRecord, IndexedReply>; 322 + 323 + export const makePlugin = ( 324 + db: Database, 325 + background: BackgroundQueue, 326 + ): PluginType => { 327 + return new RecordProcessor(db, background, { 328 + lexId, 329 + insertFn, 330 + findDuplicate, 331 + deleteFn, 332 + notifsForInsert, 333 + notifsForDelete, 334 + updateAggregates, 335 + }); 336 + }; 337 + 338 + export default makePlugin; 339 + 340 + async function validateCrosspostReply( 341 + db: Database, 342 + reply: CrosspostReplyDocument, 343 + ) { 344 + const parentUri = reply.reply?.parent?.uri; 345 + const rootUri = reply.reply?.root?.uri; 346 + if (!parentUri || !rootUri) { 347 + return { invalidReplyRoot: true }; 348 + } 349 + 350 + const [parentPost, parentReply, parentCrosspostReply] = await Promise.all([ 351 + db.models.Post.findOne({ uri: parentUri }).lean(), 352 + db.models.Reply.findOne({ uri: parentUri }).lean(), 353 + db.models.CrosspostReply.findOne({ uri: parentUri }).lean(), 354 + ]); 355 + const parent = parentReply || parentCrosspostReply; 356 + 357 + if (!parentPost && !parent) { 358 + return { invalidReplyRoot: true }; 359 + } 360 + 361 + if (parentPost) { 362 + return { 363 + invalidReplyRoot: parentUri !== rootUri, 364 + }; 365 + } 366 + 367 + return { 368 + invalidReplyRoot: !!parent?.invalidReplyRoot || 369 + parent?.reply?.root?.uri !== rootUri, 370 + }; 371 + }
+12
data-plane/indexing/plugins/like.ts
··· 167 167 { new: true }, 168 168 ); 169 169 } 170 + 171 + const existingCrosspostReply = await db.models.CrosspostReply.findOne({ 172 + uri: like.subject, 173 + }); 174 + 175 + if (existingCrosspostReply) { 176 + await db.models.CrosspostReply.findOneAndUpdate( 177 + { uri: like.subject }, 178 + { $set: { likeCount } }, 179 + { new: true }, 180 + ); 181 + } 170 182 } 171 183 }; 172 184
+1
data-plane/indexing/plugins/post.ts
··· 74 74 langs: obj.langs || [], 75 75 labels: obj.labels || null, 76 76 tags: obj.tags || [], 77 + crossposts: obj.crossposts || [], 77 78 createdAt: obj.createdAt, 78 79 indexedAt: timestamp, 79 80 };
+21 -7
data-plane/indexing/plugins/reply.ts
··· 273 273 const parentPost = await db.models.Post.findOne({ 274 274 uri: replyIdx.reply.reply?.parent.uri, 275 275 }); 276 - const parentReply = await db.models.Reply.findOne({ 277 - uri: replyIdx.reply.reply?.parent.uri, 278 - }); 279 - 280 - const replyCount = await db.models.Reply.countDocuments({ 281 - "reply.parent.uri": replyIdx.reply.reply.parent.uri, 282 - }); 276 + const [parentReply, parentCrosspostReply, nativeReplyCount, crosspostReplyCount] = await Promise.all([ 277 + db.models.Reply.findOne({ 278 + uri: replyIdx.reply.reply?.parent.uri, 279 + }), 280 + db.models.CrosspostReply.findOne({ 281 + uri: replyIdx.reply.reply?.parent.uri, 282 + }), 283 + db.models.Reply.countDocuments({ 284 + "reply.parent.uri": replyIdx.reply.reply.parent.uri, 285 + }), 286 + db.models.CrosspostReply.countDocuments({ 287 + "reply.parent.uri": replyIdx.reply.reply.parent.uri, 288 + }), 289 + ]); 290 + const replyCount = nativeReplyCount + crosspostReplyCount; 283 291 284 292 if (parentPost) { 285 293 await db.models.Post.findOneAndUpdate( ··· 289 297 ); 290 298 } else if (parentReply) { 291 299 await db.models.Reply.findOneAndUpdate( 300 + { uri: replyIdx.reply.reply?.parent.uri }, 301 + { $set: { replyCount } }, 302 + { new: true }, 303 + ); 304 + } else if (parentCrosspostReply) { 305 + await db.models.CrosspostReply.findOneAndUpdate( 292 306 { uri: replyIdx.reply.reply?.parent.uri }, 293 307 { $set: { replyCount } }, 294 308 { new: true },
+34 -2
data-plane/routes/interactions.ts
··· 29 29 } 30 30 31 31 // Get pre-computed counts from Post, Reply, and Generator documents 32 - const [posts, replies, generators] = await Promise.all([ 32 + const [posts, replies, crosspostReplies, generators] = await Promise.all([ 33 33 this.db.models.Post.find( 34 34 { uri: { $in: uris } }, 35 35 { uri: 1, likeCount: 1, replyCount: 1, repostCount: 1 }, 36 36 ), 37 37 this.db.models.Reply.find( 38 + { uri: { $in: uris } }, 39 + { uri: 1, likeCount: 1, replyCount: 1 }, 40 + ), 41 + this.db.models.CrosspostReply.find( 38 42 { uri: { $in: uris } }, 39 43 { uri: 1, likeCount: 1, replyCount: 1 }, 40 44 ), ··· 56 60 } 57 61 58 62 for (const reply of replies) { 63 + likesMap.set(reply.uri, reply.likeCount ?? 0); 64 + repliesMap.set(reply.uri, reply.replyCount ?? 0); 65 + } 66 + 67 + for (const reply of crosspostReplies) { 59 68 likesMap.set(reply.uri, reply.likeCount ?? 0); 60 69 repliesMap.set(reply.uri, reply.replyCount ?? 0); 61 70 } ··· 172 181 173 182 // Query likes, reposts, and replies by followed users on the subject URIs 174 183 // All queries are batched and parallelized for optimal performance 175 - const [likes, reposts, replies] = await Promise.all([ 184 + const [likes, reposts, replies, crosspostReplies] = await Promise.all([ 176 185 this.db.models.Like.find({ 177 186 subject: { $in: subjectUris }, 178 187 authorDid: { $in: followedDids }, ··· 194 203 .select("uri cid reply.parent.uri authorDid indexedAt text") 195 204 .sort({ indexedAt: -1 }) 196 205 .lean(), 206 + this.db.models.CrosspostReply.find({ 207 + "reply.parent.uri": { $in: subjectUris }, 208 + authorDid: { $in: followedDids }, 209 + }) 210 + .select("uri cid reply.parent.uri authorDid indexedAt text") 211 + .sort({ indexedAt: -1 }) 212 + .lean(), 197 213 ]); 198 214 199 215 // Build result map keyed by subject URI - pre-initialize for all subject URIs ··· 233 249 234 250 // Add replies 235 251 for (const reply of replies) { 252 + const parentUri = reply.reply?.parent?.uri; 253 + if (!parentUri) continue; 254 + const interactions = results.get(parentUri); 255 + if (interactions) { 256 + interactions.push({ 257 + type: "reply", 258 + uri: reply.uri, 259 + cid: reply.cid, 260 + authorDid: reply.authorDid, 261 + indexedAt: String(reply.indexedAt), 262 + text: reply.text, 263 + }); 264 + } 265 + } 266 + 267 + for (const reply of crosspostReplies) { 236 268 const parentUri = reply.reply?.parent?.uri; 237 269 if (!parentUri) continue; 238 270 const interactions = results.get(parentUri);
+6 -6
data-plane/subscription.ts data-plane/subscription/index.ts
··· 1 1 import { IdResolver } from "@atp/identity"; 2 2 import { WriteOpAction } from "@atp/repo"; 3 3 import { Event as FirehoseEvent, Firehose, MemoryRunner } from "@atp/sync"; 4 - import { BackgroundQueue } from "./background.ts"; 5 - import { Database } from "./db/index.ts"; 6 - import { IndexingService } from "./indexing/index.ts"; 7 - import { ServerConfig } from "../config.ts"; 8 - import { PushService } from "../utils/push.ts"; 9 - import { PushTokens } from "./routes/push-tokens.ts"; 4 + import { BackgroundQueue } from "../background.ts"; 5 + import { Database } from "../db/index.ts"; 6 + import { IndexingService } from "../indexing/index.ts"; 7 + import { ServerConfig } from "../../config.ts"; 8 + import { PushService } from "../../utils/push.ts"; 9 + import { PushTokens } from "../routes/push-tokens.ts"; 10 10 11 11 export class RepoSubscription { 12 12 firehose: Firehose;
+191
data-plane/subscription/crosspost.ts
··· 1 + import { IdResolver } from "@atp/identity"; 2 + import { WriteOpAction } from "@atp/repo"; 3 + import { Event as FirehoseEvent, Firehose, MemoryRunner } from "@atp/sync"; 4 + import { BackgroundQueue } from "../background.ts"; 5 + import { Database } from "../db/index.ts"; 6 + import { IndexingService } from "../indexing/index.ts"; 7 + import { ServerConfig } from "../../config.ts"; 8 + import { PushService } from "../../utils/push.ts"; 9 + import { PushTokens } from "../routes/push-tokens.ts"; 10 + 11 + const CURSOR_STATE_IDENTIFIER = "crosspost_comments_cursor"; 12 + 13 + export class CrosspostRepoSubscription { 14 + firehose: Firehose; 15 + runner: MemoryRunner; 16 + background: BackgroundQueue; 17 + indexingSvc: IndexingService; 18 + pushService: PushService; 19 + private firehoseRunning = false; 20 + 21 + constructor( 22 + public opts: { 23 + db: Database; 24 + idResolver: IdResolver; 25 + startCursor?: number; 26 + cfg: ServerConfig; 27 + }, 28 + ) { 29 + const { db, idResolver, startCursor, cfg } = opts; 30 + this.background = new BackgroundQueue(db); 31 + 32 + const pushTokens = new PushTokens(db); 33 + this.pushService = new PushService(pushTokens, db, { 34 + enabled: cfg.pushEnabled, 35 + fcmServiceAccount: cfg.fcmServiceAccount, 36 + }); 37 + 38 + this.indexingSvc = new IndexingService( 39 + db, 40 + cfg, 41 + idResolver, 42 + this.background, 43 + this.pushService, 44 + ); 45 + 46 + const { runner, firehose } = createCrosspostFirehose({ 47 + idResolver, 48 + service: cfg.relayUrl, 49 + indexingSvc: this.indexingSvc, 50 + db, 51 + startCursor, 52 + }); 53 + this.runner = runner; 54 + this.firehose = firehose; 55 + } 56 + 57 + start() { 58 + console.info("Starting crosspost firehose subscription"); 59 + this.firehoseRunning = true; 60 + this.firehose.start(); 61 + } 62 + 63 + async restart() { 64 + await this.destroy(); 65 + 66 + const savedCursor = await this.opts.db.getCursorState( 67 + CURSOR_STATE_IDENTIFIER, 68 + ); 69 + const startCursor = savedCursor !== null ? savedCursor : undefined; 70 + 71 + const { runner, firehose } = createCrosspostFirehose({ 72 + idResolver: this.opts.idResolver, 73 + service: this.opts.cfg.relayUrl, 74 + indexingSvc: this.indexingSvc, 75 + db: this.opts.db, 76 + startCursor, 77 + }); 78 + this.runner = runner; 79 + this.firehose = firehose; 80 + this.start(); 81 + } 82 + 83 + async processAll() { 84 + await this.runner.processAll(); 85 + await this.background.processAll(); 86 + } 87 + 88 + async destroy() { 89 + try { 90 + if (this.firehoseRunning) { 91 + await this.firehose.destroy(); 92 + this.firehoseRunning = false; 93 + } 94 + console.info("Processing remaining runner tasks..."); 95 + if (this.opts.cfg.debugMode) { 96 + const timeoutMs = 10000; 97 + let destroyTimeoutId: number | undefined; 98 + try { 99 + const timeoutPromise = new Promise<never>((_, reject) => { 100 + destroyTimeoutId = setTimeout( 101 + () => reject(new Error(`Timeout after ${timeoutMs}ms`)), 102 + timeoutMs, 103 + ); 104 + }); 105 + await Promise.race([this.runner.destroy(), timeoutPromise]); 106 + } catch (e) { 107 + console.warn("Runner destroy timed out; continuing shutdown", { 108 + e, 109 + }); 110 + } finally { 111 + if (destroyTimeoutId !== undefined) { 112 + clearTimeout(destroyTimeoutId); 113 + destroyTimeoutId = undefined; 114 + } 115 + } 116 + 117 + let bgTimeoutId: number | undefined; 118 + try { 119 + const timeoutPromise = new Promise<never>((_, reject) => { 120 + bgTimeoutId = setTimeout( 121 + () => reject(new Error(`Timeout after ${timeoutMs}ms`)), 122 + timeoutMs, 123 + ); 124 + }); 125 + await Promise.race([this.background.processAll(), timeoutPromise]); 126 + } catch (e) { 127 + console.warn("Runner destroy timed out; continuing shutdown", { 128 + e, 129 + }); 130 + } finally { 131 + if (bgTimeoutId !== undefined) { 132 + clearTimeout(bgTimeoutId); 133 + bgTimeoutId = undefined; 134 + } 135 + } 136 + } else { 137 + await this.runner.processAll(); 138 + await this.background.processAll(); 139 + } 140 + } catch (error) { 141 + console.error("Error during subscription destroy", { error }); 142 + throw error; 143 + } 144 + } 145 + } 146 + 147 + function createCrosspostFirehose(opts: { 148 + idResolver: IdResolver; 149 + service?: string; 150 + indexingSvc: IndexingService; 151 + db: Database; 152 + startCursor?: number; 153 + }): { firehose: Firehose; runner: MemoryRunner } { 154 + const { idResolver, service, indexingSvc, db, startCursor } = opts; 155 + 156 + const runner = new MemoryRunner({ 157 + startCursor, 158 + setCursorInterval: 30000, 159 + setCursor: async (cursor: number) => { 160 + await db.saveCursorState(cursor, CURSOR_STATE_IDENTIFIER); 161 + console.info("Crosspost cursor saved to database", { cursor }); 162 + }, 163 + }); 164 + 165 + const firehose = new Firehose({ 166 + idResolver, 167 + runner, 168 + service, 169 + onError: (err: Error) => 170 + console.error("error in crosspost subscription", { err }), 171 + excludeAccount: true, 172 + excludeIdentity: true, 173 + excludeSync: true, 174 + handleEvent: async (evt: FirehoseEvent) => { 175 + if (evt.event === "create" || evt.event === "update") { 176 + await indexingSvc.indexRecord( 177 + evt.uri, 178 + evt.cid, 179 + evt.record, 180 + evt.event === "create" ? WriteOpAction.Create : WriteOpAction.Update, 181 + evt.time, 182 + ); 183 + } else if (evt.event === "delete") { 184 + await indexingSvc.deleteRecord(evt.uri); 185 + } 186 + }, 187 + filterCollections: ["app.bsky.feed.post"], 188 + }); 189 + 190 + return { firehose, runner }; 191 + }
+36 -12
data-plane/util.ts
··· 23 23 }> = []; 24 24 25 25 // Get direct replies (depth 1) 26 - const directReplies = await db.models.Reply.find({ 27 - "reply.parent.uri": uri, 28 - }).lean(); 26 + const [directReplies, directCrosspostReplies] = await Promise.all([ 27 + db.models.Reply.find({ 28 + "reply.parent.uri": uri, 29 + }).lean(), 30 + db.models.CrosspostReply.find({ 31 + "reply.parent.uri": uri, 32 + }).lean(), 33 + ]); 34 + const directChildren = [...directReplies, ...directCrosspostReplies]; 29 35 30 - for (const reply of directReplies) { 36 + for (const reply of directChildren) { 31 37 descendents.push({ 32 38 uri: reply.uri, 33 39 depth: 1, ··· 39 45 40 46 // Get nested replies (depth > 1) 41 47 if (depth > 1) { 42 - const processedUris = new Set(directReplies.map((r) => r.uri)); 43 - const toProcess = [...directReplies.map((r) => ({ uri: r.uri, depth: 1 }))]; 48 + const processedUris = new Set(directChildren.map((r) => r.uri)); 49 + const toProcess = [...directChildren.map((r) => ({ uri: r.uri, depth: 1 }))]; 44 50 45 51 while (toProcess.length > 0) { 46 52 const current = toProcess.shift()!; 47 53 if (current.depth >= depth) continue; 48 54 49 - const nestedReplies = await db.models.Reply.find({ 50 - "reply.parent.uri": current.uri, 51 - }).lean(); 55 + const [nestedReplies, nestedCrosspostReplies] = await Promise.all([ 56 + db.models.Reply.find({ 57 + "reply.parent.uri": current.uri, 58 + }).lean(), 59 + db.models.CrosspostReply.find({ 60 + "reply.parent.uri": current.uri, 61 + }).lean(), 62 + ]); 63 + const nestedChildren = [...nestedReplies, ...nestedCrosspostReplies]; 52 64 53 - for (const reply of nestedReplies) { 65 + for (const reply of nestedChildren) { 54 66 if (processedUris.has(reply.uri)) continue; 55 67 processedUris.add(reply.uri); 56 68 ··· 84 96 }> = []; 85 97 86 98 // Start with the current post 87 - const currentPost = await db.models.Reply.findOne({ uri }).lean(); 99 + const [currentReply, currentCrosspostReply] = await Promise.all([ 100 + db.models.Reply.findOne({ uri }).lean(), 101 + db.models.CrosspostReply.findOne({ uri }).lean(), 102 + ]); 103 + const currentPost = currentReply || currentCrosspostReply; 88 104 if (!currentPost) return ancestors; 89 105 90 106 ancestors.push({ ··· 98 114 99 115 while (currentUri && height <= parentHeight) { 100 116 // Check if parent is a Post (root) or Reply 101 - const [parentPost, parentReply] = await Promise.all([ 117 + const [parentPost, parentReply, parentCrosspostReply] = await Promise.all([ 102 118 db.models.Post.findOne({ uri: currentUri }).lean(), 103 119 db.models.Reply.findOne({ uri: currentUri }).lean(), 120 + db.models.CrosspostReply.findOne({ uri: currentUri }).lean(), 104 121 ]); 105 122 106 123 if (parentPost) { ··· 117 134 height, 118 135 }); 119 136 currentUri = parentReply.reply?.parent?.uri; 137 + height++; 138 + } else if (parentCrosspostReply) { 139 + ancestors.push({ 140 + uri: parentCrosspostReply.uri, 141 + height, 142 + }); 143 + currentUri = parentCrosspostReply.reply?.parent?.uri; 120 144 height++; 121 145 } else { 122 146 // Parent not found - stop traversing
+3 -2
deno.json
··· 2 2 "tasks": { 3 3 "dev:db": "mongod --dbpath ./devdb", 4 4 "dev:api": "deno run -A --watch main.ts", 5 - "dev:ingest": "deno run -A --watch ingest.ts", 5 + "dev:ingest": "deno run -A --watch ingest/index.ts", 6 + "dev:ingest:crosspost": "deno run -A --watch ingest/crosspost.ts", 6 7 "dev": { 7 8 "dependencies": ["dev:db", "dev:api", "dev:ingest"] 8 9 }, 9 10 "codegen": "deno run -A jsr:@atp/lex-gen@^0.1.0-alpha.2 server -o ./lex -i ./lexicons", 10 - "start": "deno run -A --env-file main.ts", 11 + "start": "deno run -A main.ts", 11 12 "docker-dev": "docker compose -f compose.dev.yaml up --build --watch" 12 13 }, 13 14 "imports": {
+3 -3
ingest.ts ingest/index.ts
··· 1 - import { RepoSubscription } from "./data-plane/subscription.ts"; 1 + import { RepoSubscription } from "../data-plane/subscription/index.ts"; 2 2 import { IdResolver } from "@atp/identity"; 3 - import { ServerConfig } from "./config.ts"; 4 - import { Database } from "./data-plane/db/index.ts"; 3 + import { ServerConfig } from "../config.ts"; 4 + import { Database } from "../data-plane/db/index.ts"; 5 5 6 6 const cfg = ServerConfig.readEnv(); 7 7
+25
ingest/crosspost.ts
··· 1 + import { CrosspostRepoSubscription } from "../data-plane/subscription/crosspost.ts"; 2 + import { IdResolver } from "@atp/identity"; 3 + import { ServerConfig } from "../config.ts"; 4 + import { Database } from "../data-plane/db/index.ts"; 5 + 6 + const cfg = ServerConfig.readEnv(); 7 + 8 + const idResolver = new IdResolver({ plcUrl: cfg.plcUrl }); 9 + const db = new Database(cfg); 10 + db.connect(); 11 + 12 + const savedCursor = cfg.debugMode 13 + ? null 14 + : await db.getCursorState("crosspost_comments_cursor"); 15 + const startCursor = savedCursor !== null ? savedCursor : undefined; 16 + 17 + const sub = new CrosspostRepoSubscription({ 18 + cfg, 19 + db, 20 + idResolver, 21 + startCursor, 22 + }); 23 + 24 + sub.start(); 25 + console.info("Crosspost subscription started");
+9
lex/lexicons.ts
··· 19182 19182 "maxGraphemes": 64, 19183 19183 }, 19184 19184 }, 19185 + "crossposts": { 19186 + "type": "array", 19187 + "description": 19188 + "Records created for external services for this post", 19189 + "items": { 19190 + "type": "ref", 19191 + "ref": "lex:com.atproto.repo.strongRef", 19192 + }, 19193 + }, 19185 19194 "createdAt": { 19186 19195 "type": "string", 19187 19196 "format": "datetime",
+2
lex/types/so/sprk/feed/post.ts
··· 24 24 labels?: $Typed<ComAtprotoLabelDefs.SelfLabels> | { $type: string }; 25 25 /** Additional hashtags, in addition to any included in post text and facets. */ 26 26 tags?: (string)[]; 27 + /** Records created for external services for this post */ 28 + crossposts?: (ComAtprotoRepoStrongRef.Main)[]; 27 29 /** Client-declared timestamp when this post was originally created. */ 28 30 createdAt: string; 29 31 [k: string]: unknown;
+5
lexicons/so/sprk/feed/post.json
··· 36 36 "maxLength": 8, 37 37 "items": { "type": "string", "maxLength": 640, "maxGraphemes": 64 } 38 38 }, 39 + "crossposts": { 40 + "type": "array", 41 + "description": "Records created for external services for this post", 42 + "items": { "type": "ref", "ref": "com.atproto.repo.strongRef" } 43 + }, 39 44 "createdAt": { 40 45 "type": "string", 41 46 "format": "datetime",
+4
tests/util.ts
··· 147 147 Like: connection.model<models.LikeDocument>("Like", models.likeSchema), 148 148 Post: connection.model<models.PostDocument>("Post", models.postSchema), 149 149 Reply: connection.model<models.ReplyDocument>("Reply", models.replySchema), 150 + CrosspostReply: connection.model<models.CrosspostReplyDocument>( 151 + "CrosspostReply", 152 + models.crosspostReplySchema, 153 + ), 150 154 Story: connection.model<models.StoryDocument>("Story", models.storySchema), 151 155 Follow: connection.model<models.FollowDocument>( 152 156 "Follow",