[READ ONLY MIRROR] Spark Social AppView Server github.com/sprksocial/server
atproto deno hono lexicon
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

stories pipeline (#49)

* stories pipeline

* fmt and fix

* Update index.ts

* pluh

authored by

Roscoe Rubin-Rottenberg and committed by
GitHub
4655a892 1ef6a969

+707 -511
+151 -221
api/so/sprk/story/getStories.ts
··· 1 + import { dedupeStrs, mapDefined } from "@atp/common"; 2 + import { AppContext } from "../../../../context.ts"; 3 + import { 4 + HydrateCtx, 5 + HydrationState, 6 + Hydrator, 7 + } from "../../../../hydration/index.ts"; 1 8 import { Server } from "../../../../lex/index.ts"; 2 - import { AppContext } from "../../../../context.ts"; 3 - import { OutputSchema } from "../../../../lex/types/so/sprk/story/getStories.ts"; 4 - import { transformStoriesToStoryViews } from "../../../../utils/story-transformer.ts"; 5 - import { StoryDocument } from "../../../../data-plane/db/models.ts"; 9 + import { 10 + OutputSchema, 11 + QueryParams, 12 + } from "../../../../lex/types/so/sprk/story/getStories.ts"; 13 + import { 14 + createPipeline, 15 + HydrationFnInput, 16 + PresentationFnInput, 17 + RulesFnInput, 18 + SkeletonFnInput, 19 + } from "../../../../pipeline.ts"; 20 + import { uriToDid } from "../../../../utils/uris.ts"; 21 + import { Views } from "../../../../views/index.ts"; 22 + import { resHeaders } from "../../../util.ts"; 6 23 7 24 // Constants 8 25 const MAX_STORIES_LIMIT = 25; ··· 36 53 return { valid, invalid }; 37 54 } 38 55 39 - // Helper function to deduplicate URIs while preserving order 40 - function deduplicateUris(uris: string[]): string[] { 41 - const seen = new Set<string>(); 42 - return uris.filter((uri) => { 43 - if (seen.has(uri)) { 44 - return false; 45 - } 46 - seen.add(uri); 47 - return true; 48 - }); 49 - } 50 - 51 - // Helper function to check for blocked relationships 52 - async function checkBlockedStories( 53 - ctx: AppContext, 54 - stories: StoryDocument[], 55 - userDid?: string, 56 - ): Promise<Set<string>> { 57 - if (!userDid || stories.length === 0) { 58 - return new Set(); 59 - } 60 - 61 - const authorDids = [...new Set(stories.map((s) => s.authorDid))]; 62 - 63 - // Check if user is blocking any of the authors or is blocked by them 64 - const [userBlocking, userBlocked] = await Promise.all([ 65 - ctx.db.models.Block.find({ 66 - authorDid: userDid, 67 - subject: { $in: authorDids }, 68 - }).lean(), 69 - ctx.db.models.Block.find({ 70 - authorDid: { $in: authorDids }, 71 - subject: userDid, 72 - }).lean(), 73 - ]); 74 - 75 - const blockedAuthorDids = new Set([ 76 - ...userBlocking.map((b) => b.subject), 77 - ...userBlocked.map((b) => b.authorDid), 78 - ]); 79 - 80 - // Return URIs of stories from blocked authors 81 - return new Set( 82 - stories 83 - .filter((s) => blockedAuthorDids.has(s.authorDid)) 84 - .map((s) => s.uri), 85 - ); 86 - } 87 - 88 - // Helper function to sort stories by original URI order 89 - function sortStoriesByUriOrder( 90 - stories: StoryDocument[], 91 - originalUris: string[], 92 - ): StoryDocument[] { 93 - const storyMap = new Map(stories.map((story) => [story.uri, story])); 94 - const sortedStories: StoryDocument[] = []; 95 - 96 - for (const uri of originalUris) { 97 - const story = storyMap.get(uri); 98 - if (story) { 99 - sortedStories.push(story); 100 - } 101 - } 102 - 103 - return sortedStories; 104 - } 105 - 106 - function filterExpiredStories( 107 - stories: StoryDocument[], 108 - ownerDid?: string, 109 - ): StoryDocument[] { 110 - const twentyFourHoursAgo = new Date(); 111 - twentyFourHoursAgo.setHours(twentyFourHoursAgo.getHours() - 24); 112 - 113 - return stories.filter((story) => { 114 - // If the authenticated user is the author, not apply the 24h expiration filter 115 - if (ownerDid && story.authorDid === ownerDid) return true; 116 - const storyDate = new Date(story.indexedAt); 117 - return storyDate >= twentyFourHoursAgo; 118 - }); 119 - } 120 - 121 56 export default function (server: Server, ctx: AppContext) { 57 + const getStories = createPipeline(skeleton, hydration, rules, presentation); 122 58 server.so.sprk.story.getStories({ 123 59 auth: ctx.authVerifier.standardOptional, 124 60 handler: async ({ params, auth }) => { 125 - try { 126 - const { uris } = params; 127 - const userDid = auth.credentials.type === "standard" 128 - ? auth.credentials.iss 129 - : undefined; 61 + const viewer = auth.credentials.type === "standard" 62 + ? auth.credentials.iss 63 + : null; 64 + const hydrateCtx = ctx.hydrator.createContext({ viewer }); 65 + 66 + // Validate input 67 + if (!params.uris) { 68 + return { 69 + status: 400, 70 + message: "URIs parameter is required", 71 + }; 72 + } 130 73 131 - // Validate input 132 - if (!uris) { 133 - return { 134 - status: 400, 135 - message: "URIs parameter is required", 136 - }; 137 - } 74 + // Ensure uris is an array 75 + const uriArray = Array.isArray(params.uris) ? params.uris : [params.uris]; 138 76 139 - // Ensure uris is an array 140 - const uriArray = Array.isArray(uris) ? uris : [uris]; 77 + // Check if empty array 78 + if (uriArray.length === 0) { 79 + return { 80 + encoding: "application/json", 81 + body: { stories: [] } as OutputSchema, 82 + }; 83 + } 141 84 142 - // Check if empty array 143 - if (uriArray.length === 0) { 144 - return { 145 - encoding: "application/json", 146 - body: { stories: [] } as OutputSchema, 147 - }; 148 - } 85 + // Enforce maximum limit 86 + if (uriArray.length > MAX_STORIES_LIMIT) { 87 + return { 88 + status: 400, 89 + message: `Too many URIs requested. Maximum is ${MAX_STORIES_LIMIT}`, 90 + }; 91 + } 149 92 150 - // Enforce maximum limit 151 - if (uriArray.length > MAX_STORIES_LIMIT) { 152 - return { 153 - status: 400, 154 - message: `Too many URIs requested. Maximum is ${MAX_STORIES_LIMIT}`, 155 - }; 156 - } 93 + // Validate URIs 94 + const { valid: validUris, invalid: invalidUris } = validateUris(uriArray); 157 95 158 - // Validate URIs 159 - const { valid: validUris, invalid: invalidUris } = validateUris( 160 - uriArray, 96 + if (invalidUris.length > 0) { 97 + console.warn( 98 + `Invalid story URIs provided: ${invalidUris.slice(0, 5).join(", ")}${ 99 + invalidUris.length > 5 ? "..." : "" 100 + }`, 161 101 ); 102 + } 162 103 163 - if (invalidUris.length > 0) { 164 - console.warn( 165 - `Invalid story URIs provided: ${ 166 - invalidUris.slice(0, 5).join(", ") 167 - }${invalidUris.length > 5 ? "..." : ""}`, 168 - ); 169 - } 104 + if (validUris.length === 0) { 105 + return { 106 + encoding: "application/json", 107 + body: { stories: [] } as OutputSchema, 108 + }; 109 + } 170 110 171 - if (validUris.length === 0) { 172 - return { 173 - encoding: "application/json", 174 - body: { stories: [] } as OutputSchema, 175 - }; 176 - } 111 + const result = await getStories( 112 + { ...params, uris: validUris, hydrateCtx, viewer: viewer || null }, 113 + ctx, 114 + ); 177 115 178 - // Deduplicate URIs while preserving order 179 - const uniqueUris = deduplicateUris(validUris); 116 + return { 117 + encoding: "application/json", 118 + body: result, 119 + headers: resHeaders({}), 120 + }; 121 + }, 122 + }); 123 + } 180 124 181 - // Fetch stories from database with optimized query 182 - const dbStories = await ctx.db.models.Story.find({ 183 - uri: { $in: uniqueUris }, 184 - }) 185 - .exec(); 125 + const skeleton = (inputs: SkeletonFnInput<Context, Params>): Skeleton => { 126 + const { params } = inputs; 127 + // Deduplicate URIs while preserving order 128 + const uniqueUris = dedupeStrs(params.uris); 129 + return { stories: uniqueUris }; 130 + }; 186 131 187 - if (dbStories.length === 0) { 188 - return { 189 - encoding: "application/json", 190 - body: { stories: [] } as OutputSchema, 191 - }; 192 - } 132 + const hydration = async ( 133 + inputs: HydrationFnInput<Context, Params, Skeleton>, 134 + ): Promise<HydrationState> => { 135 + const { ctx, params, skeleton } = inputs; 136 + // Hydrate stories 137 + const stories = await ctx.hydrator.story.getStories( 138 + skeleton.stories, 139 + params.hydrateCtx.includeTakedowns || false, 140 + ); 193 141 194 - // Filter out expired stories (older than 24 hours) 195 - const activeStories = filterExpiredStories(dbStories, userDid); 142 + // Get author DIDs for actor hydration 143 + const authorDids = [ 144 + ...new Set( 145 + skeleton.stories.map((uri) => uriToDid(uri)), 146 + ), 147 + ]; 196 148 197 - if (activeStories.length === 0) { 198 - return { 199 - encoding: "application/json", 200 - body: { stories: [] } as OutputSchema, 201 - }; 202 - } 149 + // Hydrate actors (profiles) 150 + const actors = await ctx.hydrator.actor.getActors( 151 + authorDids, 152 + params.hydrateCtx, 153 + ); 203 154 204 - // Check for blocked relationships 205 - const blockedStoryUris = await checkBlockedStories( 206 - ctx, 207 - activeStories, 208 - userDid, 209 - ); 155 + return { 156 + stories, 157 + actors, 158 + }; 159 + }; 210 160 211 - // Filter out blocked stories 212 - const accessibleStories = activeStories.filter((story) => 213 - !blockedStoryUris.has(story.uri) 214 - ); 161 + const rules = (inputs: RulesFnInput<Context, Params, Skeleton>): Skeleton => { 162 + const { ctx, params, skeleton, hydration } = inputs; 163 + const viewer = params.viewer; 215 164 216 - if (accessibleStories.length === 0) { 217 - return { 218 - encoding: "application/json", 219 - body: { stories: [] } as OutputSchema, 220 - }; 221 - } 165 + // Filter out expired stories (24 hours, except for owner's stories) 166 + const activeStories = skeleton.stories.filter((uri) => { 167 + const storyInfo = hydration.stories?.get(uri); 168 + if (!storyInfo) return false; 222 169 223 - // Sort stories to match the original URI order 224 - const sortedStories = sortStoriesByUriOrder( 225 - accessibleStories, 226 - uniqueUris, 227 - ); 170 + // If the authenticated user is the author, don't apply the 24h expiration filter 171 + const authorDid = uriToDid(uri); 172 + if (viewer && authorDid === viewer) return true; 228 173 229 - // Transform stories to StoryView format using batch transformer 230 - const storyViews = await transformStoriesToStoryViews( 231 - sortedStories, 232 - ctx, 233 - ); 174 + // Check if story is expired (older than 24 hours) 175 + const twentyFourHoursAgo = new Date(); 176 + twentyFourHoursAgo.setHours(twentyFourHoursAgo.getHours() - 24); 177 + const storyDate = storyInfo.indexedAt; 178 + return storyDate >= twentyFourHoursAgo; 179 + }); 234 180 235 - const response: OutputSchema = { 236 - stories: storyViews, 237 - }; 181 + // Filter out blocked stories 182 + const accessibleStories = activeStories.filter((uri) => { 183 + const authorDid = uriToDid(uri); 184 + return !ctx.views.viewerBlockExists(authorDid, hydration); 185 + }); 238 186 239 - return { 240 - encoding: "application/json", 241 - body: response, 242 - }; 243 - } catch (error) { 244 - // Log error for debugging 245 - console.error("Error in getStories:", error); 187 + return { stories: accessibleStories }; 188 + }; 246 189 247 - // Handle specific error cases 248 - if (error instanceof Error) { 249 - const message = error.message; 190 + const presentation = ( 191 + inputs: PresentationFnInput<Context, Params, Skeleton>, 192 + ): OutputSchema => { 193 + const { ctx, skeleton, hydration } = inputs; 194 + const storyViews = mapDefined( 195 + skeleton.stories, 196 + (uri) => ctx.views.story(uri, hydration), 197 + ); 250 198 251 - // MongoDB connection errors 252 - if (message.includes("connection") || message.includes("timeout")) { 253 - return { 254 - status: 503, 255 - message: "Database temporarily unavailable", 256 - }; 257 - } 199 + return { stories: storyViews }; 200 + }; 258 201 259 - // Validation errors 260 - if (message.includes("validation") || message.includes("invalid")) { 261 - return { 262 - status: 400, 263 - message: "Invalid request parameters", 264 - }; 265 - } 202 + type Context = { 203 + hydrator: Hydrator; 204 + views: Views; 205 + }; 266 206 267 - // Rate limiting or resource errors 268 - if (message.includes("limit") || message.includes("quota")) { 269 - return { 270 - status: 429, 271 - message: "Rate limit exceeded", 272 - }; 273 - } 274 - } 207 + type Params = QueryParams & { 208 + hydrateCtx: HydrateCtx; 209 + viewer: string | null; 210 + }; 275 211 276 - // Generic server error for unexpected cases 277 - return { 278 - status: 500, 279 - message: "Internal server error", 280 - }; 281 - } 282 - }, 283 - }); 284 - } 212 + type Skeleton = { 213 + stories: string[]; 214 + };
+142 -243
api/so/sprk/story/getTimeline.ts
··· 1 1 import { InvalidRequestError } from "@atp/xrpc-server"; 2 + import { AppContext } from "../../../../context.ts"; 3 + import { HydrateCtx, HydrationState } from "../../../../hydration/index.ts"; 4 + import { parseString } from "../../../../hydration/util.ts"; 2 5 import { Server } from "../../../../lex/index.ts"; 3 - import { AppContext } from "../../../../context.ts"; 4 - import { transformStoriesToStoryViews } from "../../../../utils/story-transformer.ts"; 5 - import { decodeBase64, encodeBase64 } from "@std/encoding"; 6 - import type { ProfileViewBasic } from "../../../../lex/types/so/sprk/actor/defs.ts"; 7 - import type * as SoSprkStoryDefs from "../../../../lex/types/so/sprk/story/defs.ts"; 6 + import { 7 + OutputSchema, 8 + QueryParams, 9 + } from "../../../../lex/types/so/sprk/story/getTimeline.ts"; 10 + import { 11 + createPipeline, 12 + HydrationFnInput, 13 + PresentationFnInput, 14 + RulesFnInput, 15 + SkeletonFnInput, 16 + } from "../../../../pipeline.ts"; 17 + import { uriToDid } from "../../../../utils/uris.ts"; 18 + import { resHeaders } from "../../../util.ts"; 8 19 9 20 // Constants 10 21 const MAX_LIMIT = 100; 11 22 const DEFAULT_LIMIT = 50; 12 - const STORIES_EXPIRY_HOURS = 24; 13 23 14 - interface CursorData { 15 - indexedAt: string; 16 - id: string; 17 - } 24 + export default function (server: Server, ctx: AppContext) { 25 + const getTimeline = createPipeline( 26 + skeleton, 27 + hydration, 28 + rules, 29 + presentation, 30 + ); 31 + server.so.sprk.story.getTimeline({ 32 + auth: ctx.authVerifier.standard, 33 + handler: async ({ params, auth }) => { 34 + const viewer = auth.credentials.iss; 35 + const hydrateCtx = ctx.hydrator.createContext({ viewer }); 18 36 19 - interface AuthorStoryGroup { 20 - author: ProfileViewBasic; 21 - stories: SoSprkStoryDefs.StoryView[]; 22 - } 37 + const { limit: limitParam = DEFAULT_LIMIT, cursor } = params; 23 38 24 - // Helper function to parse cursor 25 - function parseCursor(cursor: string): CursorData { 26 - try { 27 - const decodedCursor = new TextDecoder().decode(decodeBase64(cursor)); 28 - const [timestamp, id] = decodedCursor.split("::"); 39 + // Validate and sanitize limit 40 + const limit = typeof limitParam === "string" 41 + ? parseInt(limitParam, 10) 42 + : limitParam; 29 43 30 - if (!timestamp || !id) { 31 - throw new Error("Invalid cursor format"); 32 - } 33 - 34 - return { indexedAt: timestamp, id }; 35 - } catch { 36 - throw new InvalidRequestError("Invalid cursor format"); 37 - } 38 - } 44 + if (isNaN(limit) || limit < 1 || limit > MAX_LIMIT) { 45 + throw new InvalidRequestError( 46 + `Invalid limit: must be between 1 and ${MAX_LIMIT}`, 47 + ); 48 + } 39 49 40 - // Helper function to generate cursor 41 - function generateCursor(indexedAt: string, id: string): string { 42 - return encodeBase64( 43 - new TextEncoder().encode(`${indexedAt}::${id}`), 44 - ); 45 - } 50 + const result = await getTimeline( 51 + { ...params, limit, cursor, hydrateCtx: hydrateCtx.copy({ viewer }) }, 52 + ctx, 53 + ); 46 54 47 - // Helper function to get follows with caching optimization 48 - async function getUserFollows( 49 - ctx: AppContext, 50 - userDid: string, 51 - ): Promise<string[]> { 52 - const follows = await ctx.db.models.Follow.find({ 53 - authorDid: userDid, 54 - }) 55 - .select("subject") 56 - .lean() 57 - .exec(); 55 + const repoRev = await ctx.hydrator.actor.getRepoRevSafe(viewer); 58 56 59 - return follows.map((follow) => follow.subject); 57 + return { 58 + encoding: "application/json", 59 + body: result, 60 + headers: resHeaders({ repoRev }), 61 + }; 62 + }, 63 + }); 60 64 } 61 65 62 - // Batch check blocked relationships for all authors 63 - async function batchCheckBlockedAuthors( 64 - ctx: AppContext, 65 - authorDids: string[], 66 - userDid: string, 67 - ): Promise<Set<string>> { 68 - if (authorDids.length === 0) { 69 - return new Set(); 70 - } 66 + const skeleton = async ( 67 + inputs: SkeletonFnInput<Context, Params>, 68 + ): Promise<Skeleton> => { 69 + const { ctx, params } = inputs; 70 + const viewer = params.hydrateCtx.viewer!; 71 71 72 - // Single query to get all block relationships 73 - const [userBlocking, userBlocked] = await Promise.all([ 74 - ctx.db.models.Block.find({ 75 - authorDid: userDid, 76 - subject: { $in: authorDids }, 77 - }).select("subject").lean(), 78 - ctx.db.models.Block.find({ 79 - authorDid: { $in: authorDids }, 80 - subject: userDid, 81 - }).select("authorDid").lean(), 82 - ]); 72 + // Get accounts that the viewer follows 73 + const followsRes = await ctx.dataplane.follows.getFollows(viewer); 74 + const followedDids = followsRes.follows.map((f) => f.subjectDid); 83 75 84 - const blockedAuthorDids = new Set([ 85 - ...userBlocking.map((b) => b.subject), 86 - ...userBlocked.map((b) => b.authorDid), 87 - ]); 76 + // Include the user's own stories in the timeline 77 + const timelineDids = [...followedDids, viewer]; 88 78 89 - return blockedAuthorDids; 90 - } 79 + if (timelineDids.length === 0) { 80 + return { stories: [], cursor: undefined }; 81 + } 91 82 92 - // Build optimized query for stories 93 - function buildStoriesQuery( 94 - followedDids: string[], 95 - cursor?: CursorData, 96 - ): Record<string, unknown> { 97 - const twentyFourHoursAgo = new Date(); 98 - twentyFourHoursAgo.setHours( 99 - twentyFourHoursAgo.getHours() - STORIES_EXPIRY_HOURS, 83 + // Get timeline stories from dataplane 84 + const res = await ctx.dataplane.stories.getTimeline( 85 + viewer, 86 + followedDids, 87 + params.limit, 88 + params.cursor, 100 89 ); 101 90 102 - const query: Record<string, unknown> = { 103 - authorDid: { $in: followedDids }, 104 - indexedAt: { $gte: twentyFourHoursAgo.toISOString() }, 91 + return { 92 + stories: res.stories.map((story: { uri: string }) => story.uri), 93 + cursor: parseString(res.cursor), 105 94 }; 95 + }; 106 96 107 - // Add cursor-based pagination 108 - if (cursor) { 109 - query.$or = [ 110 - { indexedAt: { $lt: cursor.indexedAt } }, 111 - { indexedAt: cursor.indexedAt, _id: { $lt: cursor.id } }, 112 - ]; 113 - } 97 + const hydration = async ( 98 + inputs: HydrationFnInput<Context, Params, Skeleton>, 99 + ): Promise<HydrationState> => { 100 + const { ctx, params, skeleton } = inputs; 114 101 115 - return query; 116 - } 102 + // Hydrate stories 103 + const stories = await ctx.hydrator.story.getStories( 104 + skeleton.stories, 105 + params.hydrateCtx.includeTakedowns || false, 106 + ); 117 107 118 - // Efficiently group stories by author with proper sorting 119 - function groupStoriesByAuthor( 120 - storyViews: SoSprkStoryDefs.StoryView[], 121 - ): SoSprkStoryDefs.StoriesByAuthor[] { 122 - if (storyViews.length === 0) { 123 - return []; 124 - } 108 + // Get author DIDs for actor hydration 109 + const authorDids = [ 110 + ...new Set( 111 + skeleton.stories.map((uri) => uriToDid(uri)), 112 + ), 113 + ]; 125 114 126 - // Use Map for efficient grouping 127 - const storiesGroupedByAuthor = new Map<string, AuthorStoryGroup>(); 115 + // Hydrate actors (profiles) 116 + const actors = await ctx.hydrator.actor.getActors( 117 + authorDids, 118 + params.hydrateCtx, 119 + ); 128 120 129 - for (const storyView of storyViews) { 130 - const authorDid = storyView.author.did; 121 + return { 122 + stories, 123 + actors, 124 + }; 125 + }; 131 126 132 - if (!storiesGroupedByAuthor.has(authorDid)) { 133 - storiesGroupedByAuthor.set(authorDid, { 134 - author: storyView.author, 135 - stories: [], 136 - }); 137 - } 127 + const rules = (inputs: RulesFnInput<Context, Params, Skeleton>): Skeleton => { 128 + const { ctx, params, skeleton, hydration } = inputs; 129 + const viewer = params.hydrateCtx.viewer!; 138 130 139 - storiesGroupedByAuthor.get(authorDid)!.stories.push(storyView); 140 - } 131 + // Filter out expired stories (24 hours, except for owner's stories) 132 + // Note: The dataplane already filters expired stories, but we do an additional 133 + // check here for stories from the viewer (which shouldn't be filtered) 134 + const activeStories = skeleton.stories.filter((uri) => { 135 + const storyInfo = hydration.stories?.get(uri); 136 + if (!storyInfo) return false; 141 137 142 - // Convert to array and sort stories within each group 143 - const storiesByAuthor = Array.from(storiesGroupedByAuthor.values()).map( 144 - (group) => ({ 145 - author: group.author, 146 - stories: group.stories.sort( 147 - (a, b) => 148 - new Date(a.indexedAt).getTime() - new Date(b.indexedAt).getTime(), 149 - ), 150 - }), 151 - ); 138 + // If the authenticated user is the author, don't apply the 24h expiration filter 139 + const authorDid = uriToDid(uri); 140 + if (authorDid === viewer) return true; 152 141 153 - // Sort author groups by the latest story from each author (newest first) 154 - storiesByAuthor.sort((a, b) => { 155 - const latestA = Math.max( 156 - ...a.stories.map((s) => new Date(s.indexedAt).getTime()), 157 - ); 158 - const latestB = Math.max( 159 - ...b.stories.map((s) => new Date(s.indexedAt).getTime()), 160 - ); 161 - return latestB - latestA; 142 + // The dataplane already filtered expired stories, so we just check if it exists 143 + return true; 162 144 }); 163 145 164 - return storiesByAuthor; 165 - } 166 - 167 - export default function (server: Server, ctx: AppContext) { 168 - server.so.sprk.story.getTimeline({ 169 - auth: ctx.authVerifier.standard, 170 - handler: async ({ params, auth }) => { 171 - const { limit: limitParam = DEFAULT_LIMIT, cursor } = params; 172 - const userDid = auth.credentials.iss; 173 - 174 - // Validate and sanitize limit 175 - const limit = typeof limitParam === "string" 176 - ? parseInt(limitParam, 10) 177 - : limitParam; 178 - 179 - if (isNaN(limit) || limit < 1 || limit > MAX_LIMIT) { 180 - throw new InvalidRequestError( 181 - `Invalid limit: must be between 1 and ${MAX_LIMIT}`, 182 - ); 183 - } 184 - 185 - // Parse cursor if provided 186 - let cursorData: CursorData | undefined; 187 - if (cursor) { 188 - cursorData = parseCursor(cursor); 189 - } 190 - 191 - // Get accounts that the viewer follows (with optimization) 192 - const followedDids = await getUserFollows(ctx, userDid); 193 - 194 - if (followedDids.length === 0) { 195 - return { 196 - encoding: "application/json", 197 - body: { 198 - storiesByAuthor: [], 199 - }, 200 - }; 201 - } 202 - 203 - // Build optimized query 204 - const query = buildStoriesQuery(followedDids, cursorData); 205 - 206 - // Get stories from database with optimized query 207 - const stories = await ctx.db.models.Story.find(query) 208 - .sort({ indexedAt: -1, _id: -1 }) 209 - .limit(limit + 1) // Get one extra for hasMore check 210 - .exec(); 211 - 212 - if (stories.length === 0) { 213 - return { 214 - encoding: "application/json", 215 - body: { 216 - storiesByAuthor: [], 217 - }, 218 - }; 219 - } 220 - 221 - // Check if we have more results (for cursor) 222 - const hasMore = stories.length > limit; 223 - if (hasMore) { 224 - stories.pop(); // Remove the extra item 225 - } 226 - 227 - // Get all unique author DIDs for batch block checking 228 - const authorDids = [ 229 - ...new Set(stories.map((story) => story.authorDid)), 230 - ]; 231 - 232 - // Batch check all block relationships 233 - const blockedAuthorDids = await batchCheckBlockedAuthors( 234 - ctx, 235 - authorDids, 236 - userDid, 237 - ); 146 + // Filter out blocked stories 147 + const accessibleStories = activeStories.filter((uri) => { 148 + const authorDid = uriToDid(uri); 149 + return !ctx.views.viewerBlockExists(authorDid, hydration); 150 + }); 238 151 239 - // Filter out stories from blocked authors 240 - const accessibleStories = stories.filter( 241 - (story) => !blockedAuthorDids.has(story.authorDid), 242 - ); 152 + return { stories: accessibleStories, cursor: skeleton.cursor }; 153 + }; 243 154 244 - if (accessibleStories.length === 0) { 245 - return { 246 - encoding: "application/json", 247 - body: { 248 - storiesByAuthor: [], 249 - }, 250 - }; 251 - } 155 + const presentation = ( 156 + inputs: PresentationFnInput<Context, Params, Skeleton>, 157 + ): OutputSchema => { 158 + const { ctx, skeleton, hydration } = inputs; 159 + const storyViews = skeleton.stories 160 + .map((uri) => ctx.views.story(uri, hydration)) 161 + .filter((view): view is NonNullable<typeof view> => view !== undefined); 252 162 253 - // Transform stories to story views using batch transformer 254 - const storyViews = await transformStoriesToStoryViews( 255 - accessibleStories, 256 - ctx, 257 - ); 163 + // Group stories by author 164 + const storiesByAuthor = ctx.views.storiesByAuthor(storyViews); 258 165 259 - // Group stories by author efficiently 260 - const storiesByAuthor = groupStoriesByAuthor(storyViews); 166 + return { 167 + storiesByAuthor, 168 + ...(skeleton.cursor && { cursor: skeleton.cursor }), 169 + }; 170 + }; 261 171 262 - // Generate next cursor if there are more results 263 - let nextCursor: string | undefined; 264 - if (hasMore && accessibleStories.length > 0) { 265 - const lastStory = accessibleStories[accessibleStories.length - 1]; 266 - nextCursor = generateCursor( 267 - lastStory.indexedAt, 268 - String(lastStory._id), 269 - ); 270 - } 172 + type Context = AppContext; 271 173 272 - const response = { 273 - storiesByAuthor, 274 - ...(nextCursor && { cursor: nextCursor }), 275 - }; 174 + type Params = QueryParams & { 175 + hydrateCtx: HydrateCtx & { viewer: string }; 176 + limit: number; 177 + }; 276 178 277 - return { 278 - encoding: "application/json", 279 - body: response, 280 - }; 281 - }, 282 - }); 283 - } 179 + type Skeleton = { 180 + stories: string[]; 181 + cursor?: string; 182 + };
+3
data-plane/index.ts
··· 12 12 import { Relationships } from "./routes/relationships.ts"; 13 13 import { Interactions } from "./routes/interactions.ts"; 14 14 import { Reposts } from "./routes/reposts.ts"; 15 + import { Stories } from "./routes/stories.ts"; 15 16 import { Sync } from "./routes/sync.ts"; 16 17 import { Threads } from "./routes/threads.ts"; 17 18 ··· 39 40 public relationships: Relationships; 40 41 public interactions: Interactions; 41 42 public reposts: Reposts; 43 + public stories: Stories; 42 44 public sync: Sync; 43 45 public threads: Threads; 44 46 ··· 62 64 this.relationships = new Relationships(db); 63 65 this.interactions = new Interactions(db); 64 66 this.reposts = new Reposts(db); 67 + this.stories = new Stories(db); 65 68 this.sync = new Sync(db); 66 69 this.threads = new Threads(db); 67 70 }
+28 -5
data-plane/indexing/processor.ts
··· 103 103 ) { 104 104 this.assertValidRecord(obj); 105 105 106 + // Extract createdAt from the record object if available 107 + const recordObj = obj as Record<string, unknown>; 108 + const createdAt = typeof recordObj.createdAt === "string" 109 + ? recordObj.createdAt 110 + : timestamp; 111 + 106 112 // Insert or update record 107 113 await this.db.models.Record.findOneAndUpdate( 108 114 { uri: uri.toString() }, ··· 113 119 collectionName: uri.collection, 114 120 rkey: uri.rkey, 115 121 json: stringifyLex(obj), 122 + createdAt, 116 123 indexedAt: timestamp, 117 124 }, 118 125 { upsert: true, new: true }, ··· 161 168 ) { 162 169 this.assertValidRecord(obj); 163 170 171 + // Extract createdAt from the record object if available 172 + const recordObj = obj as Record<string, unknown>; 173 + const createdAt = typeof recordObj.createdAt === "string" 174 + ? recordObj.createdAt 175 + : undefined; 176 + 164 177 // Update record 178 + const updateData: { 179 + cid: string; 180 + json: string; 181 + indexedAt: string; 182 + createdAt?: string; 183 + } = { 184 + cid: cid.toString(), 185 + json: stringifyLex(obj), 186 + indexedAt: timestamp, 187 + }; 188 + if (createdAt) { 189 + updateData.createdAt = createdAt; 190 + } 191 + 165 192 await this.db.models.Record.findOneAndUpdate( 166 193 { uri: uri.toString() }, 167 - { 168 - cid: cid.toString(), 169 - json: stringifyLex(obj), 170 - indexedAt: timestamp, 171 - }, 194 + updateData, 172 195 { new: true }, 173 196 ); 174 197
+5
data-plane/routes/records.ts
··· 161 161 const result = await getRecords(this.db, uris); 162 162 return result; 163 163 } 164 + 165 + async getStoryRecords(uris: string[]) { 166 + const result = await getRecords(this.db, uris, ids.SoSprkStoryPost); 167 + return result; 168 + } 164 169 }
+165
data-plane/routes/stories.ts
··· 1 + import { Database } from "../db/index.ts"; 2 + import { TimeCidKeyset } from "../db/pagination.ts"; 3 + import { compositeTime } from "./records.ts"; 4 + 5 + const STORIES_EXPIRY_HOURS = 24; 6 + 7 + export interface StoryItem { 8 + uri: string; 9 + cid: string; 10 + authorDid: string; 11 + createdAt: string; 12 + indexedAt: string; 13 + sortAt: string; 14 + } 15 + 16 + export class Stories { 17 + private db: Database; 18 + private timeCidKeyset: TimeCidKeyset; 19 + 20 + constructor(db: Database) { 21 + this.db = db; 22 + this.timeCidKeyset = new TimeCidKeyset(); 23 + } 24 + 25 + /** 26 + * Get stories by URIs 27 + */ 28 + async getStories(uris: string[]): Promise<StoryItem[]> { 29 + if (!uris.length) return []; 30 + 31 + const stories = await this.db.models.Story.find({ 32 + uri: { $in: uris }, 33 + }).lean(); 34 + 35 + return stories.map((story) => ({ 36 + uri: story.uri, 37 + cid: story.cid, 38 + authorDid: story.authorDid, 39 + createdAt: story.createdAt, 40 + indexedAt: story.indexedAt, 41 + sortAt: compositeTime(story.createdAt, story.indexedAt) || 42 + story.createdAt, 43 + })); 44 + } 45 + 46 + /** 47 + * Get timeline stories from followed users (including the viewer's own stories) 48 + */ 49 + async getTimeline( 50 + actorDid: string, 51 + followedDids: string[], 52 + limit = 50, 53 + cursor?: string, 54 + ): Promise<{ stories: StoryItem[]; cursor?: string }> { 55 + const timelineDids = [...followedDids, actorDid]; 56 + 57 + if (timelineDids.length === 0) { 58 + return { stories: [] }; 59 + } 60 + 61 + // Calculate 24-hour expiry threshold 62 + const twentyFourHoursAgo = new Date(); 63 + twentyFourHoursAgo.setHours( 64 + twentyFourHoursAgo.getHours() - STORIES_EXPIRY_HOURS, 65 + ); 66 + const minDate = twentyFourHoursAgo.toISOString(); 67 + 68 + // Build query with expiry filter 69 + const storiesQuery = this.db.models.Story.find({ 70 + authorDid: { $in: timelineDids }, 71 + indexedAt: { $gte: minDate }, 72 + }); 73 + 74 + // Apply pagination 75 + const paginatedQuery = this.timeCidKeyset.paginate(storiesQuery, { 76 + limit: limit + 1, // Get one extra for cursor check 77 + cursor, 78 + direction: "desc", 79 + }); 80 + 81 + const stories = await paginatedQuery.exec(); 82 + 83 + // Check if we have more results 84 + const hasMore = stories.length > limit; 85 + const resultStories = hasMore ? stories.slice(0, limit) : stories; 86 + 87 + // Transform stories 88 + const transformedStories: StoryItem[] = resultStories.map((story) => ({ 89 + uri: story.uri, 90 + cid: story.cid, 91 + authorDid: story.authorDid, 92 + createdAt: story.createdAt, 93 + indexedAt: story.indexedAt, 94 + sortAt: compositeTime(story.createdAt, story.indexedAt) || 95 + story.createdAt, 96 + })); 97 + 98 + // Generate cursor from last item if we have more results 99 + let nextCursor: string | undefined; 100 + if (hasMore && transformedStories.length > 0) { 101 + const lastStory = transformedStories[transformedStories.length - 1]; 102 + nextCursor = this.timeCidKeyset.pack({ 103 + primary: lastStory.sortAt, 104 + secondary: lastStory.cid, 105 + }); 106 + } 107 + 108 + return { 109 + stories: transformedStories, 110 + cursor: nextCursor, 111 + }; 112 + } 113 + 114 + /** 115 + * Filter out expired stories (older than 24 hours) 116 + * Stories from the owner are not filtered 117 + */ 118 + filterExpiredStories( 119 + stories: StoryItem[], 120 + ownerDid?: string, 121 + ): StoryItem[] { 122 + const twentyFourHoursAgo = new Date(); 123 + twentyFourHoursAgo.setHours( 124 + twentyFourHoursAgo.getHours() - STORIES_EXPIRY_HOURS, 125 + ); 126 + 127 + return stories.filter((story) => { 128 + // If the authenticated user is the author, don't apply the 24h expiration filter 129 + if (ownerDid && story.authorDid === ownerDid) return true; 130 + const storyDate = new Date(story.indexedAt); 131 + return storyDate >= twentyFourHoursAgo; 132 + }); 133 + } 134 + 135 + /** 136 + * Get blocked author DIDs for a viewer 137 + */ 138 + async getBlockedAuthors( 139 + viewerDid: string, 140 + authorDids: string[], 141 + ): Promise<Set<string>> { 142 + if (authorDids.length === 0) { 143 + return new Set(); 144 + } 145 + 146 + // Single query to get all block relationships 147 + const [viewerBlocking, viewerBlocked] = await Promise.all([ 148 + this.db.models.Block.find({ 149 + authorDid: viewerDid, 150 + subject: { $in: authorDids }, 151 + }).select("subject").lean(), 152 + this.db.models.Block.find({ 153 + authorDid: { $in: authorDids }, 154 + subject: viewerDid, 155 + }).select("authorDid").lean(), 156 + ]); 157 + 158 + const blockedAuthorDids = new Set([ 159 + ...viewerBlocking.map((b) => b.subject), 160 + ...viewerBlocked.map((b) => b.authorDid), 161 + ]); 162 + 163 + return blockedAuthorDids; 164 + } 165 + }
+41 -40
deno.lock
··· 10 10 "jsr:@atp/lexicon@~0.1.0-alpha.1": "0.1.0-alpha.2", 11 11 "jsr:@atp/lexicon@~0.1.0-alpha.2": "0.1.0-alpha.2", 12 12 "jsr:@atp/repo@~0.1.0-alpha.2": "0.1.0-alpha.2", 13 - "jsr:@atp/sync@~0.1.0-alpha.3": "0.1.0-alpha.3", 14 - "jsr:@atp/syntax@~0.1.0-alpha.1": "0.1.0-alpha.1", 13 + "jsr:@atp/sync@~0.1.0-alpha.3": "0.1.0-alpha.4", 14 + "jsr:@atp/syntax@~0.1.0-alpha.1": "0.1.0-alpha.2", 15 15 "jsr:@atp/xrpc-server@~0.1.0-alpha.2": "0.1.0-alpha.2", 16 16 "jsr:@atp/xrpc@~0.1.0-alpha.2": "0.1.0-alpha.2", 17 - "jsr:@hono/hono@^4.9.8": "4.9.9", 18 - "jsr:@hono/hono@^4.9.9": "4.9.9", 19 - "jsr:@logtape/file@^1.2.0-dev.344+834f24a9": "1.2.0-dev.344+834f24a9", 20 - "jsr:@logtape/logtape@^1.2.0-dev.344+834f24a9": "1.2.0-dev.344+834f24a9", 21 - "jsr:@logtape/pretty@^1.2.0-dev.344+834f24a9": "1.2.0-dev.344+834f24a9", 17 + "jsr:@hono/hono@^4.9.8": "4.10.4", 18 + "jsr:@hono/hono@^4.9.9": "4.10.4", 19 + "jsr:@logtape/file@^1.2.0-dev.344+834f24a9": "1.2.0-dev.367+d4fd9984", 20 + "jsr:@logtape/logtape@^1.2.0-dev.344+834f24a9": "1.2.0-dev.367+d4fd9984", 21 + "jsr:@logtape/logtape@^1.2.0-dev.367+d4fd9984": "1.2.0-dev.367+d4fd9984", 22 + "jsr:@logtape/pretty@^1.2.0-dev.344+834f24a9": "1.2.0-dev.367+d4fd9984", 22 23 "jsr:@noble/curves@^2.0.1": "2.0.1", 23 24 "jsr:@noble/hashes@2": "2.0.1", 24 25 "jsr:@noble/hashes@^2.0.1": "2.0.1", 25 - "jsr:@std/assert@^1.0.14": "1.0.14", 26 + "jsr:@std/assert@^1.0.14": "1.0.15", 26 27 "jsr:@std/bytes@^1.0.5": "1.0.6", 27 28 "jsr:@std/cbor@~0.1.8": "0.1.8", 28 29 "jsr:@std/crypto@^1.0.5": "1.0.5", 29 30 "jsr:@std/encoding@^1.0.10": "1.0.10", 30 31 "jsr:@std/fs@^1.0.19": "1.0.19", 31 - "jsr:@std/internal@^1.0.10": "1.0.10", 32 - "jsr:@std/streams@^1.0.9": "1.0.12", 33 - "jsr:@zod/zod@^4.1.11": "4.1.11", 32 + "jsr:@std/internal@^1.0.12": "1.0.12", 33 + "jsr:@std/streams@^1.0.9": "1.0.13", 34 + "jsr:@zod/zod@^4.1.11": "4.1.12", 34 35 "npm:@atproto/api@~0.16.11": "0.16.11", 35 36 "npm:@atproto/identity@~0.4.9": "0.4.9", 36 37 "npm:@atproto/repo@~0.8.10": "0.8.10", ··· 45 46 "npm:multiformats@^13.4.1": "13.4.1", 46 47 "npm:p-queue@^8.1.1": "8.1.1", 47 48 "npm:rate-limiter-flexible@^2.4.2": "2.4.2", 48 - "npm:zod@^4.1.11": "4.1.11" 49 + "npm:zod@^4.1.11": "4.1.12" 49 50 }, 50 51 "jsr": { 51 52 "@atp/bytes@0.1.0-alpha.1": { ··· 59 60 "dependencies": [ 60 61 "jsr:@atp/bytes", 61 62 "jsr:@logtape/file", 62 - "jsr:@logtape/logtape", 63 + "jsr:@logtape/logtape@^1.2.0-dev.344+834f24a9", 63 64 "jsr:@std/cbor", 64 65 "jsr:@std/crypto", 65 66 "jsr:@std/encoding", ··· 106 107 "npm:zod" 107 108 ] 108 109 }, 109 - "@atp/sync@0.1.0-alpha.3": { 110 - "integrity": "c5d8dbed0d7e2a15013428e5eeb16c2624017756b82eb2c809a9e59623334e13", 110 + "@atp/sync@0.1.0-alpha.4": { 111 + "integrity": "9b6aa6ccc9447270843272e40bfcb26520eddaf37f98202bcbab6c0bee0a602b", 111 112 "dependencies": [ 112 113 "jsr:@atp/common@~0.1.0-alpha.4", 113 114 "jsr:@atp/identity", ··· 119 120 "npm:p-queue" 120 121 ] 121 122 }, 122 - "@atp/syntax@0.1.0-alpha.1": { 123 - "integrity": "9e2055cace77cf63a8c52a4a94c39492215e7135101db7bc2289ebad9bec1991" 123 + "@atp/syntax@0.1.0-alpha.2": { 124 + "integrity": "f7ab598b6b3c3b01dc446077b4c57acc1f1cb8a45f91bd3eb394997408a712a2" 124 125 }, 125 126 "@atp/xrpc@0.1.0-alpha.2": { 126 127 "integrity": "53a548b554430671eeef683ce48830599c12e48bb2f73ef9fa49d1cfe3aba1fb", ··· 146 147 "@hono/hono@4.9.8": { 147 148 "integrity": "908150f13e90181a051a3af3bf15203aff00190682afedfd38824d0cb9299a95" 148 149 }, 149 - "@hono/hono@4.9.9": { 150 - "integrity": "1d716e97b71e91b852c70beb85c9d3b236393282c59d5e268b07cfd224a77318" 150 + "@hono/hono@4.10.4": { 151 + "integrity": "e54d00c4cf994e7ae297d7321793cf940656b9c5e934564c03ffc15499041b9e" 151 152 }, 152 - "@logtape/file@1.2.0-dev.344+834f24a9": { 153 - "integrity": "4d674c368f8130dc1403c5c93a316726a65d6b17e36b094780f1b2ed301f5e1b", 153 + "@logtape/file@1.2.0-dev.367+d4fd9984": { 154 + "integrity": "c00ef17a3ba5d7f949aee3c56a80700a70d89ce571736db4af3c04448bd9d62f", 154 155 "dependencies": [ 155 - "jsr:@logtape/logtape" 156 + "jsr:@logtape/logtape@^1.2.0-dev.367+d4fd9984" 156 157 ] 157 158 }, 158 - "@logtape/logtape@1.2.0-dev.344+834f24a9": { 159 - "integrity": "204222be0f94cd1b64a500e2dcdea22a1618a086fc531b054351e3cfa079c435" 159 + "@logtape/logtape@1.2.0-dev.367+d4fd9984": { 160 + "integrity": "56a8ee17f80759773769a701e4b110341fdca8e6f3da7e733dbc47cc1d340c62" 160 161 }, 161 - "@logtape/pretty@1.2.0-dev.344+834f24a9": { 162 - "integrity": "19aa16ee6409d7112b7cbcaadf10f0e379df03583426eda7efe2740e30a1c065", 162 + "@logtape/pretty@1.2.0-dev.367+d4fd9984": { 163 + "integrity": "f1b14a1f9ca190f87067fbf82d34717d2fdcad22bdd0ce733ef74ed5de16a157", 163 164 "dependencies": [ 164 - "jsr:@logtape/logtape", 165 + "jsr:@logtape/logtape@^1.2.0-dev.367+d4fd9984", 165 166 "npm:@types/node" 166 167 ] 167 168 }, ··· 174 175 "@noble/hashes@2.0.1": { 175 176 "integrity": "e0e908292a0bf91099cf8ba0720a1647cef82ab38b588815b5e9535b4ff4d7bb" 176 177 }, 177 - "@std/assert@1.0.14": { 178 - "integrity": "68d0d4a43b365abc927f45a9b85c639ea18a9fab96ad92281e493e4ed84abaa4", 178 + "@std/assert@1.0.15": { 179 + "integrity": "d64018e951dbdfab9777335ecdb000c0b4e3df036984083be219ce5941e4703b", 179 180 "dependencies": [ 180 181 "jsr:@std/internal" 181 182 ] ··· 199 200 "@std/fs@1.0.19": { 200 201 "integrity": "051968c2b1eae4d2ea9f79a08a3845740ef6af10356aff43d3e2ef11ed09fb06" 201 202 }, 202 - "@std/internal@1.0.10": { 203 - "integrity": "e3be62ce42cab0e177c27698e5d9800122f67b766a0bea6ca4867886cbde8cf7" 203 + "@std/internal@1.0.12": { 204 + "integrity": "972a634fd5bc34b242024402972cd5143eac68d8dffaca5eaa4dba30ce17b027" 204 205 }, 205 - "@std/streams@1.0.12": { 206 - "integrity": "ae925fa1dc459b1abf5cbaa28cc5c7b0485853af3b2a384b0dc22d86e59dfbf4" 206 + "@std/streams@1.0.13": { 207 + "integrity": "772d208cd0d3e5dac7c1d9e6cdb25842846d136eea4a41a62e44ed4ab0c8dd9e" 207 208 }, 208 - "@zod/zod@4.1.11": { 209 - "integrity": "0d48947455491addca672d8ef766d86bc7bc3add07e78d049b8ffd643bb33a7a" 209 + "@zod/zod@4.1.12": { 210 + "integrity": "5876ed4c6d44673faf5120f0a461a2ada2eb6c735329d3ebaf5ba1fc08387695" 210 211 } 211 212 }, 212 213 "npm": { ··· 366 367 "@ipld/dag-cbor@9.2.5": { 367 368 "integrity": "sha512-84wSr4jv30biui7endhobYhXBQzQE4c/wdoWlFrKcfiwH+ofaPg8fwsM8okX9cOzkkrsAsNdDyH3ou+kiLquwQ==", 368 369 "dependencies": [ 369 - "cborg@4.2.15", 370 + "cborg@4.2.18", 370 371 "multiformats@13.4.1" 371 372 ] 372 373 }, ··· 495 496 "integrity": "sha512-b3tFPA9pUr2zCUiCfRd2+wok2/LBSNUMKOuRRok+WlvvAgEt/PlbgPTsZUcwCOs53IJvLgTp0eotwtosE6njug==", 496 497 "bin": true 497 498 }, 498 - "cborg@4.2.15": { 499 - "integrity": "sha512-T+YVPemWyXcBVQdp0k61lQp2hJniRNmul0lAwTj2DTS/6dI4eCq/MRMucGqqvFqMBfmnD8tJ9aFtPu5dEGAbgw==", 499 + "cborg@4.2.18": { 500 + "integrity": "sha512-uzhkd5HOaLccokqeZa5B0Qz7/aa9C12pmUq5yU3vcy6I6OhTKdPHSzOuBPZfcoQHdcx8Emz/dWZbPNNfF/puvg==", 500 501 "bin": true 501 502 }, 502 503 "content-disposition@0.5.4": { ··· 1094 1095 "zod@3.25.76": { 1095 1096 "integrity": "sha512-gzUt/qt81nXsFGKIFcC3YnfEAx5NkunCfnDlvuBSSFS02bcXu4Lmea0AFIUwbLWxWPx3d9p8S5QoaujKcNQxcQ==" 1096 1097 }, 1097 - "zod@4.1.11": { 1098 - "integrity": "sha512-WPsqwxITS2tzx1bzhIKsEs19ABD5vmCVa4xBo2tq/SrV4RNZtfws1EnCWQXM6yh8bD08a1idvkB5MZSBiZsjwg==" 1098 + "zod@4.1.12": { 1099 + "integrity": "sha512-JInaHOamG8pt5+Ey8kGmdcAcg3OL9reK8ltczgHTAwNhMys/6ThXHityHxVV2p3fkw/c+MAvBHFVYHFZDmjMCQ==" 1099 1100 } 1100 1101 }, 1101 1102 "workspace": {
+28
hydration/index.ts
··· 34 34 ThreadRef, 35 35 VideoMappings, 36 36 } from "./feed.ts"; 37 + import { Stories, StoryHydrator } from "./story.ts"; 37 38 38 39 import { 39 40 BlockEntry, ··· 88 89 threadContexts?: ThreadContexts; 89 90 sounds?: Sounds; 90 91 soundAggs?: SoundAggs; 92 + stories?: Stories; 91 93 92 94 postBlocks?: PostBlocks; 93 95 reposts?: Reposts; ··· 126 128 actor: ActorHydrator; 127 129 feed: FeedHydrator; 128 130 graph: GraphHydrator; 131 + story: StoryHydrator; 129 132 130 133 constructor( 131 134 public dataplane: DataPlane, ··· 133 136 this.actor = new ActorHydrator(dataplane); 134 137 this.feed = new FeedHydrator(dataplane); 135 138 this.graph = new GraphHydrator(dataplane); 139 + this.story = new StoryHydrator(dataplane); 136 140 } 137 141 138 142 // so.sprk.actor.defs#profileView ··· 727 731 sortedAt: actor.sortedAt ?? new Date(0), 728 732 indexedAt: actor.indexedAt ?? new Date(0), 729 733 takedownRef: actor.profileTakedownRef, 734 + }; 735 + 736 + return recordInfo; 737 + } else if (collection === ids.SoSprkStoryPost) { 738 + // Get story records through dataplane 739 + const res = await this.dataplane.records.getStoryRecords([uri]); 740 + const storyRecord = res.records[0]; 741 + 742 + if (!storyRecord || !storyRecord.cid) return undefined; 743 + 744 + // Parse the record JSON 745 + const record = JSON.parse(storyRecord.record); 746 + if (!record) return undefined; 747 + 748 + const recordInfo: RecordInfo<typeof record> = { 749 + record, 750 + cid: storyRecord.cid, 751 + sortedAt: storyRecord.sortedAt 752 + ? new Date(storyRecord.sortedAt) 753 + : new Date(storyRecord.createdAt || storyRecord.indexedAt || 0), 754 + indexedAt: storyRecord.indexedAt 755 + ? new Date(storyRecord.indexedAt) 756 + : new Date(0), 757 + takedownRef: storyRecord.takedownRef, 730 758 }; 731 759 732 760 return recordInfo;
+32
hydration/story.ts
··· 1 + import { Record as StoryRecord } from "../lex/types/so/sprk/story/post.ts"; 2 + import { HydrationMap, parseRecord, RecordInfo, split } from "./util.ts"; 3 + import { DataPlane } from "../data-plane/index.ts"; 4 + 5 + export type Story = RecordInfo<StoryRecord>; 6 + export type Stories = HydrationMap<Story>; 7 + 8 + export class StoryHydrator { 9 + constructor(public dataplane: DataPlane) {} 10 + 11 + async getStories( 12 + uris: string[], 13 + includeTakedowns = false, 14 + given = new HydrationMap<Story>(), 15 + ): Promise<Stories> { 16 + const [have, need] = split(uris, (uri) => given.has(uri)); 17 + const base = have.reduce( 18 + (acc, uri) => acc.set(uri, given.get(uri) ?? null), 19 + new HydrationMap<Story>(), 20 + ); 21 + if (!need.length) return base; 22 + const res = await this.dataplane.records.getStoryRecords(need); 23 + 24 + return need.reduce((acc, uri, i) => { 25 + const record = parseRecord<StoryRecord>(res.records[i], includeTakedowns); 26 + return acc.set( 27 + uri, 28 + record ?? null, 29 + ); 30 + }, base); 31 + } 32 + }
+1 -1
hydration/util.ts
··· 65 65 if (!includeTakedowns && entry.takenDown) { 66 66 return undefined; 67 67 } 68 - const record = jsonStringToLex(entry.record); 68 + const record = JSON.parse(entry.record); 69 69 const cid = entry.cid; 70 70 const sortedAt = new Date(entry.sortedAt ?? 0); 71 71 const indexedAt = new Date(entry.indexedAt ?? 0);
+111 -1
views/index.ts
··· 12 12 ThreadContext, 13 13 ThreadViewPost, 14 14 } from "../lex/types/so/sprk/feed/defs.ts"; 15 + import { StoriesByAuthor, StoryView } from "../lex/types/so/sprk/story/defs.ts"; 15 16 import { 16 17 isRecord as isReplyRecord, 17 18 Record as ReplyRecord, ··· 38 39 VideoMediaView, 39 40 } from "./types.ts"; 40 41 import { 42 + isMain as isImageMedia, 41 43 Main as ImageMedia, 42 44 View as ImageView, 43 45 } from "../lex/types/so/sprk/media/image.ts"; 46 + import { 47 + isMain as isVideoMediaMain, 48 + } from "../lex/types/so/sprk/media/video.ts"; 49 + import type { Main as VideoMediaMainType } from "../lex/types/so/sprk/media/video.ts"; 44 50 import { AudioView } from "../lex/types/so/sprk/sound/defs.ts"; 45 51 import { INVALID_HANDLE } from "@atp/syntax"; 46 52 import { cidFromBlobJson } from "./util.ts"; ··· 269 275 }; 270 276 } 271 277 278 + story( 279 + uri: string, 280 + state: HydrationState, 281 + ): Un$Typed<StoryView> | undefined { 282 + const storyInfo = state.stories?.get(uri); 283 + if (!storyInfo) return; 284 + 285 + const parsedUri = new AtUri(uri); 286 + const authorDid = parsedUri.hostname; 287 + const author = this.profileBasic(authorDid, state); 288 + if (!author) return; 289 + 290 + const mediaRecord = storyInfo.record.media; 291 + 292 + return { 293 + uri, 294 + cid: storyInfo.cid, 295 + author, 296 + record: storyInfo.record, 297 + media: mediaRecord ? this.storyMedia(uri, mediaRecord, state) : undefined, 298 + indexedAt: this.indexedAt(storyInfo)?.toISOString() ?? 299 + new Date().toISOString(), 300 + }; 301 + } 302 + 303 + storiesByAuthor( 304 + stories: StoryView[], 305 + ): StoriesByAuthor[] { 306 + if (stories.length === 0) { 307 + return []; 308 + } 309 + 310 + // Group stories by author 311 + const storiesGroupedByAuthor = new Map<string, { 312 + author: ProfileViewBasic; 313 + stories: StoryView[]; 314 + }>(); 315 + 316 + for (const story of stories) { 317 + const authorDid = story.author.did; 318 + 319 + if (!storiesGroupedByAuthor.has(authorDid)) { 320 + storiesGroupedByAuthor.set(authorDid, { 321 + author: story.author, 322 + stories: [], 323 + }); 324 + } 325 + 326 + storiesGroupedByAuthor.get(authorDid)!.stories.push(story); 327 + } 328 + 329 + // Convert to array and sort stories within each group 330 + const storiesByAuthor = Array.from(storiesGroupedByAuthor.values()).map( 331 + (group) => ({ 332 + author: group.author, 333 + stories: group.stories.sort( 334 + (a, b) => 335 + new Date(a.indexedAt).getTime() - new Date(b.indexedAt).getTime(), 336 + ), 337 + }), 338 + ); 339 + 340 + // Sort author groups by the latest story from each author (newest first) 341 + storiesByAuthor.sort((a, b) => { 342 + const latestA = Math.max( 343 + ...a.stories.map((s) => new Date(s.indexedAt).getTime()), 344 + ); 345 + const latestB = Math.max( 346 + ...b.stories.map((s) => new Date(s.indexedAt).getTime()), 347 + ); 348 + return latestB - latestA; 349 + }); 350 + 351 + return storiesByAuthor; 352 + } 353 + 354 + storyMedia( 355 + storyUri: string, 356 + media: $Typed<ImageMedia> | $Typed<VideoMedia> | { $type: string }, 357 + state?: HydrationState, 358 + ): (ImageView | VideoMediaView) & { $type: string } | undefined { 359 + const authorDid = uriToDid(storyUri); 360 + 361 + // Check if it's an image media 362 + if (isImageMedia(media)) { 363 + return this.imageMedia(authorDid, media); 364 + } 365 + 366 + // Check if it's a video media 367 + if (isVideoMediaMain(media)) { 368 + const videoMedia = media as VideoMediaMainType; 369 + const videoCid = videoMedia.video 370 + ? cidFromBlobJson(videoMedia.video) 371 + : ""; 372 + const videoMappingKey = `${authorDid}-${videoCid}`; 373 + const videoMapping = state?.videoMappings?.get(videoMappingKey) || null; 374 + return this.videoMedia(authorDid, videoMedia, videoMapping); 375 + } 376 + 377 + return undefined; 378 + } 379 + 272 380 feedViewPost( 273 381 item: FeedItem, 274 382 state: HydrationState, ··· 491 599 handle: actor.handle ?? INVALID_HANDLE, 492 600 displayName: actor.profile?.displayName, 493 601 avatar: actor.profile?.avatar 494 - ? `${this.mediaCdn}/avatar/medium/${did}/${actor.profile.avatar.ref}/webp` 602 + ? `${this.mediaCdn}/avatar/medium/${did}/${ 603 + cidFromBlobJson(actor.profile.avatar) 604 + }/webp` 495 605 : undefined, 496 606 viewer: this.profileViewer(did, state), 497 607 createdAt: actor.createdAt?.toISOString(),