[READ ONLY MIRROR] Spark Social AppView Server github.com/sprksocial/server
atproto deno hono lexicon
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

perf(pipeline): batch queries and remove unneccesary lookups and ops

+148 -137
+5 -3
api/so/sprk/actor/getProfile.ts
··· 22 22 includeTakedowns, 23 23 }); 24 24 25 - const result = await getProfile({ ...params, hydrateCtx }, ctx); 26 - 27 - const repoRev = await ctx.hydrator.actor.getRepoRevSafe(viewer); 25 + // Parallelize pipeline execution with repoRev fetch 26 + const [result, repoRev] = await Promise.all([ 27 + getProfile({ ...params, hydrateCtx }, ctx), 28 + ctx.hydrator.actor.getRepoRevSafe(viewer), 29 + ]); 28 30 29 31 return { 30 32 encoding: "application/json",
+5 -3
api/so/sprk/actor/getProfiles.ts
··· 22 22 includeTakedowns, 23 23 }); 24 24 25 - const result = await getProfile({ ...params, hydrateCtx }, ctx); 26 - 27 - const repoRev = await ctx.hydrator.actor.getRepoRevSafe(viewer); 25 + // Parallelize pipeline execution with repoRev fetch 26 + const [result, repoRev] = await Promise.all([ 27 + getProfile({ ...params, hydrateCtx }, ctx), 28 + ctx.hydrator.actor.getRepoRevSafe(viewer), 29 + ]); 28 30 29 31 return { 30 32 encoding: "application/json",
+11 -4
api/so/sprk/feed/getAuthorFeed.ts
··· 34 34 includeTakedowns, 35 35 }); 36 36 37 - const result = await getAuthorFeed({ ...params, hydrateCtx }, ctx); 38 - 39 - const repoRev = await ctx.hydrator.actor.getRepoRevSafe(viewer); 37 + // Parallelize pipeline execution with repoRev fetch 38 + const [result, repoRev] = await Promise.all([ 39 + getAuthorFeed({ ...params, hydrateCtx }, ctx), 40 + ctx.hydrator.actor.getRepoRevSafe(viewer), 41 + ]); 40 42 41 43 return { 42 44 encoding: "application/json", ··· 54 56 params: Params; 55 57 }): Promise<Skeleton> => { 56 58 const { ctx, params } = inputs; 57 - const [did] = await ctx.hydrator.actor.getDids([params.actor]); 59 + 60 + // Skip DID lookup if params.actor is already a DID 61 + const did = params.actor.startsWith("did:") 62 + ? params.actor 63 + : (await ctx.hydrator.actor.getDids([params.actor]))[0]; 64 + 58 65 if (!did) { 59 66 throw new InvalidRequestError("Profile not found"); 60 67 }
+5 -7
api/so/sprk/feed/getFeedGenerator.ts
··· 12 12 includeTakedowns, 13 13 }); 14 14 15 - // Hydrate feed generator 16 - const hydrationState = await ctx.hydrator.hydrateFeedGens( 17 - [params.feed], 18 - hydrateCtx, 19 - ); 15 + // Parallelize hydration with repoRev fetch 16 + const [hydrationState, repoRev] = await Promise.all([ 17 + ctx.hydrator.hydrateFeedGens([params.feed], hydrateCtx), 18 + ctx.hydrator.actor.getRepoRevSafe(viewer), 19 + ]); 20 20 21 21 // Create generator view 22 22 const view = ctx.views.generator(params.feed, hydrationState); ··· 29 29 // In a real implementation, you might check service health 30 30 const isOnline = true; 31 31 const isValid = true; 32 - 33 - const repoRev = await ctx.hydrator.actor.getRepoRevSafe(viewer); 34 32 35 33 return { 36 34 encoding: "application/json",
+5 -7
api/so/sprk/feed/getFeedGenerators.ts
··· 12 12 includeTakedowns, 13 13 }); 14 14 15 - // Hydrate feed generators 16 - const hydrationState = await ctx.hydrator.hydrateFeedGens( 17 - params.feeds, 18 - hydrateCtx, 19 - ); 15 + // Parallelize hydration with repoRev fetch 16 + const [hydrationState, repoRev] = await Promise.all([ 17 + ctx.hydrator.hydrateFeedGens(params.feeds, hydrateCtx), 18 + ctx.hydrator.actor.getRepoRevSafe(viewer), 19 + ]); 20 20 21 21 // Create generator views 22 22 const feeds = params.feeds 23 23 .map((uri) => ctx.views.generator(uri, hydrationState)) 24 24 .filter((view): view is NonNullable<typeof view> => view !== undefined); 25 - 26 - const repoRev = await ctx.hydrator.actor.getRepoRevSafe(viewer); 27 25 28 26 return { 29 27 encoding: "application/json",
+5 -2
api/so/sprk/feed/getPostThread.ts
··· 38 38 include3pBlocks, 39 39 }); 40 40 41 + // Start repoRev fetch early so it runs in parallel with the pipeline 42 + const repoRevPromise = ctx.hydrator.actor.getRepoRevSafe(viewer); 43 + 41 44 let result: OutputSchema; 42 45 try { 43 46 result = await getPostThread({ ...params, hydrateCtx }, ctx); 44 47 } catch (err) { 45 - const repoRev = await ctx.hydrator.actor.getRepoRevSafe(viewer); 48 + const repoRev = await repoRevPromise; 46 49 if (repoRev) { 47 50 res.headers.set(ATPROTO_REPO_REV, repoRev); 48 51 } 49 52 throw err; 50 53 } 51 54 52 - const repoRev = await ctx.hydrator.actor.getRepoRevSafe(viewer); 55 + const repoRev = await repoRevPromise; 53 56 54 57 return { 55 58 encoding: "application/json",
+8 -6
api/so/sprk/feed/getTimeline.ts
··· 27 27 const viewer = auth.credentials.iss; 28 28 const hydrateCtx = ctx.hydrator.createContext({ viewer }); 29 29 30 - const result = await getTimeline( 31 - { ...params, hydrateCtx: hydrateCtx.copy({ viewer }) }, 32 - ctx, 33 - ); 34 - 35 - const repoRev = await ctx.hydrator.actor.getRepoRevSafe(viewer); 30 + // Parallelize pipeline execution with repoRev fetch 31 + const [result, repoRev] = await Promise.all([ 32 + getTimeline( 33 + { ...params, hydrateCtx: hydrateCtx.copy({ viewer }) }, 34 + ctx, 35 + ), 36 + ctx.hydrator.actor.getRepoRevSafe(viewer), 37 + ]); 36 38 37 39 return { 38 40 encoding: "application/json",
+9 -16
api/so/sprk/graph/getFollowers.ts
··· 72 72 ) => { 73 73 const { ctx, params, skeleton } = input; 74 74 const { followUris, subjectDid } = skeleton; 75 - const followState = await ctx.hydrator.hydrateFollows( 76 - followUris, 77 - params.hydrateCtx, 78 - ); 79 - const dids = [subjectDid]; 80 - if (followState.follows) { 81 - for (const [uri, follow] of followState.follows) { 82 - if (follow) { 83 - dids.push(didFromUri(uri)); 84 - } 85 - } 86 - } 87 - const profileState = await ctx.hydrator.hydrateProfiles( 88 - dids, 89 - params.hydrateCtx, 90 - ); 75 + 76 + // DIDs can be derived from URIs directly, enabling parallel fetches 77 + const dids = [subjectDid, ...followUris.map(didFromUri)]; 78 + 79 + const [followState, profileState] = await Promise.all([ 80 + ctx.hydrator.hydrateFollows(followUris, params.hydrateCtx), 81 + ctx.hydrator.hydrateProfiles(dids, params.hydrateCtx), 82 + ]); 83 + 91 84 return mergeStates(followState, profileState); 92 85 }; 93 86
+17 -18
api/so/sprk/story/getTimeline.ts
··· 47 47 ); 48 48 } 49 49 50 - const result = await getTimeline( 51 - { ...params, limit, cursor, hydrateCtx: hydrateCtx.copy({ viewer }) }, 52 - ctx, 53 - ); 54 - 55 - const repoRev = await ctx.hydrator.actor.getRepoRevSafe(viewer); 50 + // Parallelize pipeline execution with repoRev fetch 51 + const [result, repoRev] = await Promise.all([ 52 + getTimeline( 53 + { ...params, limit, cursor, hydrateCtx: hydrateCtx.copy({ viewer }) }, 54 + ctx, 55 + ), 56 + ctx.hydrator.actor.getRepoRevSafe(viewer), 57 + ]); 56 58 57 59 return { 58 60 encoding: "application/json", ··· 99 101 ): Promise<HydrationState> => { 100 102 const { ctx, params, skeleton } = inputs; 101 103 102 - // Hydrate stories 103 - const stories = await ctx.hydrator.story.getStories( 104 - skeleton.stories, 105 - params.hydrateCtx.includeTakedowns || false, 106 - ); 107 - 108 - // Get author DIDs for actor hydration 104 + // Get author DIDs for actor hydration (can be computed before fetching) 109 105 const authorDids = [ 110 106 ...new Set( 111 107 skeleton.stories.map((uri) => uriToDid(uri)), 112 108 ), 113 109 ]; 114 110 115 - // Hydrate actors (profiles) 116 - const actors = await ctx.hydrator.actor.getActors( 117 - authorDids, 118 - params.hydrateCtx, 119 - ); 111 + // Parallelize stories and actors hydration 112 + const [stories, actors] = await Promise.all([ 113 + ctx.hydrator.story.getStories( 114 + skeleton.stories, 115 + params.hydrateCtx.includeTakedowns || false, 116 + ), 117 + ctx.hydrator.actor.getActors(authorDids, params.hydrateCtx), 118 + ]); 120 119 121 120 return { 122 121 stories,
+78 -68
data-plane/routes/relationships.ts
··· 12 12 return { relationships: [] }; 13 13 } 14 14 15 - const relationships = await Promise.all( 16 - targetDids.map(async (targetDid) => { 17 - const [ 18 - blocking, 19 - blockedBy, 20 - following, 21 - followedBy, 22 - ] = await Promise.all([ 23 - // Check if actor blocks target 24 - this.db.models.Block.findOne({ 25 - authorDid: actorDid, 26 - subjectDid: targetDid, 27 - }), 28 - // Check if target blocks actor 29 - this.db.models.Block.findOne({ 30 - authorDid: targetDid, 31 - subjectDid: actorDid, 32 - }), 33 - // Check if actor follows target 34 - this.db.models.Follow.findOne({ 35 - authorDid: actorDid, 36 - subject: targetDid, 37 - }), 38 - // Check if target follows actor 39 - this.db.models.Follow.findOne({ 40 - authorDid: targetDid, 41 - subject: actorDid, 42 - }), 43 - ]); 15 + // Batch queries using $in to reduce N+1 to 4 total queries 16 + const [ 17 + blockingResults, 18 + blockedByResults, 19 + followingResults, 20 + followedByResults, 21 + ] = await Promise.all([ 22 + // All blocks where actor blocks any target 23 + this.db.models.Block.find({ 24 + authorDid: actorDid, 25 + subject: { $in: targetDids }, 26 + }), 27 + // All blocks where any target blocks actor 28 + this.db.models.Block.find({ 29 + authorDid: { $in: targetDids }, 30 + subject: actorDid, 31 + }), 32 + // All follows where actor follows any target 33 + this.db.models.Follow.find({ 34 + authorDid: actorDid, 35 + subject: { $in: targetDids }, 36 + }), 37 + // All follows where any target follows actor 38 + this.db.models.Follow.find({ 39 + authorDid: { $in: targetDids }, 40 + subject: actorDid, 41 + }), 42 + ]); 44 43 45 - return { 46 - muted: false, 47 - blockedBy: blockedBy?.uri || "", 48 - blocking: blocking?.uri || "", 49 - following: following?.uri || "", 50 - followedBy: followedBy?.uri || "", 51 - }; 52 - }), 44 + // Build lookup maps for O(1) access 45 + const blockingMap = new Map( 46 + blockingResults.map((b) => [b.subject, b.uri]), 47 + ); 48 + const blockedByMap = new Map( 49 + blockedByResults.map((b) => [b.authorDid, b.uri]), 53 50 ); 51 + const followingMap = new Map( 52 + followingResults.map((f) => [f.subject, f.uri]), 53 + ); 54 + const followedByMap = new Map( 55 + followedByResults.map((f) => [f.authorDid, f.uri]), 56 + ); 57 + 58 + // Build relationships from maps 59 + const relationships = targetDids.map((targetDid) => ({ 60 + muted: false, 61 + blockedBy: blockedByMap.get(targetDid) || "", 62 + blocking: blockingMap.get(targetDid) || "", 63 + following: followingMap.get(targetDid) || "", 64 + followedBy: followedByMap.get(targetDid) || "", 65 + })); 54 66 55 67 return { relationships }; 56 68 } ··· 60 72 return { exists: [], blocks: [] }; 61 73 } 62 74 63 - const results = await Promise.all( 64 - pairs.map(async (pair) => { 65 - const [ 66 - blocking, 67 - blockedBy, 68 - ] = await Promise.all([ 69 - // Check if A blocks B 70 - this.db.models.Block.findOne({ 71 - authorDid: pair.a, 72 - subjectDid: pair.b, 73 - }), 74 - // Check if B blocks A 75 - this.db.models.Block.findOne({ 76 - authorDid: pair.b, 77 - subjectDid: pair.a, 78 - }), 79 - ]); 75 + // Build all block query pairs for batch lookup 76 + const blockQueries = pairs.flatMap((pair) => [ 77 + { authorDid: pair.a, subject: pair.b }, 78 + { authorDid: pair.b, subject: pair.a }, 79 + ]); 80 80 81 - const hasBlocks = !!( 82 - blocking || 83 - blockedBy 84 - ); 81 + // Single batch query using $or 82 + const allBlocks = await this.db.models.Block.find({ 83 + $or: blockQueries, 84 + }); 85 85 86 - return { 87 - exists: hasBlocks, 88 - blocks: { 89 - blockedBy: blockedBy?.uri || undefined, 90 - blocking: blocking?.uri || undefined, 91 - blockedByList: undefined, 92 - blockingByList: undefined, 93 - }, 94 - }; 95 - }), 86 + // Build lookup map: "authorDid:subject" -> uri 87 + const blockMap = new Map( 88 + allBlocks.map((b) => [`${b.authorDid}:${b.subject}`, b.uri]), 96 89 ); 90 + 91 + // Build results from map 92 + const results = pairs.map((pair) => { 93 + const blockingUri = blockMap.get(`${pair.a}:${pair.b}`); 94 + const blockedByUri = blockMap.get(`${pair.b}:${pair.a}`); 95 + const hasBlocks = !!(blockingUri || blockedByUri); 96 + 97 + return { 98 + exists: hasBlocks, 99 + blocks: { 100 + blockedBy: blockedByUri || undefined, 101 + blocking: blockingUri || undefined, 102 + blockedByList: undefined, 103 + blockingByList: undefined, 104 + }, 105 + }; 106 + }); 97 107 98 108 return { 99 109 exists: results.map((r) => r.exists),
-3
hydration/index.ts
··· 341 341 postViewers, 342 342 postBlocks, 343 343 profileState, 344 - feedGenState, 345 344 threadContexts, 346 345 soundState, 347 346 ] = await Promise.all([ ··· 352 351 : Promise.resolve<PostViewerStates | undefined>(undefined), 353 352 this.hydratePostBlocks(state.posts!, state.replies!), 354 353 this.hydrateProfiles(authorDids, ctx), 355 - this.hydrateFeedGens([], ctx), 356 354 this.feed.getThreadContexts(threadRefs), 357 355 this.hydrateSounds(Array.from(soundUris), ctx), 358 356 ]); 359 357 360 358 return mergeManyStates( 361 359 profileState, 362 - feedGenState, 363 360 soundState, 364 361 { 365 362 posts: state.posts,