[READ ONLY MIRROR] Spark Social AppView Server github.com/sprksocial/server
atproto deno hono lexicon
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix threads and reply aggs (#46)

* fix threads and reply aggs

* quotes???

* rm feed gens

authored by

Roscoe Rubin-Rottenberg and committed by
GitHub
0636329f 7b79eda0

+257 -84
+28
AGENTS.md
··· 1 + # Agent Guidelines for Spark AppView 2 + 3 + ## Commands 4 + 5 + - **Format**: `deno fmt` 6 + - **Lint**: `deno lint` 7 + - **Test all**: `deno test -P` 8 + - **Test single**: `deno test -P tests/main_test.ts` 9 + - **Dev**: `deno task dev` (requires MongoDB) 10 + - **Codegen**: `deno task codegen` (generate types from lexicons) 11 + 12 + ## Code Style 13 + 14 + - **Runtime**: Deno with TypeScript, imports use JSR/npm prefixes 15 + - **Imports**: Use absolute imports from root (e.g., `../../../lex/index.ts`), 16 + group by external/internal 17 + - **Types**: Explicit interface definitions, use TypeScript interfaces over 18 + types. Avoid using `any` type. 19 + - **Naming**: camelCase for variables/functions, PascalCase for 20 + types/interfaces, UPPER_CASE for constants. Always use double quotes. 21 + - **Error handling**: Use InvalidRequestError from `@atp/xrpc-server`, log 22 + errors before throwing 23 + - **Patterns**: Pipeline pattern (skeleton → hydration → presentation) for 24 + endpoints in `api/`, plugin architecture for indexing in 25 + `data-plane/indexing/` 26 + - **Database**: Mongoose models with explicit schemas, use findOneAndUpdate with 27 + upsert for idempotency. Only interact with database directly in the 28 + `data-plane/` directory, otherwise use the `DataPlane` API.
+129 -18
README.md
··· 1 1 # Spark AppView 2 2 3 - This AppView provides a view of AT Protocol that encompasses all Spark lexicon 4 - and aims to interop with Bluesky lexicon. 3 + An AT Protocol AppView implementation that provides a comprehensive view of the 4 + Spark lexicon. 5 + 6 + ## Features 7 + 8 + - **Real-time sync**: Subscribes to AT Protocol relay for live data ingestion 9 + - **Rich API**: XRPC endpoints for feeds, profiles, audio, stories, and social 10 + graph 11 + - **MongoDB storage**: Efficient document-based storage with Mongoose ODM 12 + - **Pipeline architecture**: Clean separation between skeleton, hydration, and 13 + presentation layers 14 + 15 + ## Quick Start 16 + 17 + ### Docker Compose (Recommended) 5 18 6 - ## Development 19 + ```bash 20 + deno task docker-dev 21 + ``` 7 22 8 - To run with Docker Compose (includes database and appview): 9 - `deno task docker-dev`. This will start both the database and appview services 10 - in Docker containers. 23 + This starts MongoDB and the AppView in Docker containers with hot reloading at 24 + `http://localhost:4000`. 11 25 12 - For development without Docker, set up the .env file by following the 13 - instructions down below, then start the development server: `deno task dev` 26 + ### Local Development 14 27 15 - Both methods will start the server in development mode with hot reloading 16 - enabled available at `http://localhost:4000`. 28 + 1. **Prerequisites**: Deno 2.x, MongoDB 8.x 17 29 18 - ## Environment Variables 30 + 2. **Environment setup**: Create `.env` file (see Configuration below) 19 31 20 - .env setup: 32 + 3. **Start services**: 21 33 34 + ```bash 35 + deno task dev 22 36 ``` 37 + 38 + This runs three parallel services: 39 + 40 + - MongoDB (`dev:db`) 41 + - API server (`dev:api`) on port 4000 42 + - Ingester (`dev:ingest`) for real-time sync 43 + 44 + ## Architecture 45 + 46 + ### Key Directories 47 + 48 + - `api/` - XRPC endpoint handlers using pipeline pattern 49 + - `data-plane/` - Database layer, indexing plugins, and subscription logic 50 + - `hydration/` - Data enrichment layer (actors, feeds, graphs) 51 + - `views/` - Presentation layer transforming hydrated data to API responses 52 + - `lexicons/` - AT Protocol lexicon definitions (JSON) 53 + - `lex/` - Generated TypeScript types from lexicons 54 + - `utils/` - Shared utilities (transformers, logger, retry logic) 55 + 56 + ### Data Flow 57 + 58 + ``` 59 + AT Protocol Relay → Ingester → MongoDB ← Data Plane ← Pipeline ← API Endpoints 60 + (Firehose) (ingest.ts) (Raw queries) (4 stages) (XRPC) 61 + ``` 62 + 63 + ### Request Pipeline (api/) 64 + 65 + Every API endpoint follows a 4-stage pipeline pattern: 66 + 67 + ``` 68 + Client Request 69 + 70 + 71 + ┌─────────────────────────────────────────────────────────────────┐ 72 + │ 1. SKELETON │ 73 + │ • Query parameters → minimal data identifiers (URIs, DIDs) │ 74 + │ • Fast database queries for structure only │ 75 + │ • Returns: { postUris: [...], authorDids: [...] } │ 76 + └────────────────────────────────┬────────────────────────────────┘ 77 + 78 + ┌─────────────────────────────────────────────────────────────────┐ 79 + │ 2. HYDRATION (hydration/) │ 80 + │ • Skeleton → Data Plane → rich data from MongoDB │ 81 + │ • Batch fetches: actors, posts, likes, blocks, etc. │ 82 + │ • Returns: HydrationState with all related records │ 83 + └────────────────────────────────┬────────────────────────────────┘ 84 + 85 + ┌─────────────────────────────────────────────────────────────────┐ 86 + │ 3. RULES │ 87 + │ • Apply business logic (filtering, sorting, permissions) │ 88 + │ • Modify skeleton based on hydrated data │ 89 + │ • Returns: Modified skeleton │ 90 + └────────────────────────────────┬────────────────────────────────┘ 91 + 92 + ┌─────────────────────────────────────────────────────────────────┐ 93 + │ 4. PRESENTATION (views/) │ 94 + │ • Skeleton + Hydration → API response format │ 95 + │ • Transform internal models to lexicon types │ 96 + │ • Apply CDN URLs, format dates, handle takedowns │ 97 + │ • Returns: JSON response matching lexicon schema │ 98 + └────────────────────────────────┬────────────────────────────────┘ 99 + 100 + Client Response 101 + ``` 102 + 103 + ### Layer Responsibilities 104 + 105 + **Data Plane** (`data-plane/`) 106 + 107 + - Direct MongoDB access through Mongoose models 108 + - Route handlers: `actors`, `feeds`, `follows`, `likes`, `blocks`, etc. 109 + - No business logic, pure data operations 110 + - Used only by Hydrator 111 + 112 + **Hydrator** (`hydration/`) 113 + 114 + - Orchestrates Data Plane queries 115 + - Batches requests for efficiency 116 + - Maintains viewer context (permissions, blocks) 117 + - Returns `HydrationState` with all data needed for presentation 118 + 119 + **Views** (`views/`) 120 + 121 + - Pure transformation functions 122 + - No database access 123 + - Applies CDN URLs, formats responses 124 + - Enforces lexicon schemas 125 + 126 + ## Configuration 127 + 128 + Create a `.env` file: 129 + 130 + ```bash 23 131 # Database 24 132 SPRK_DB_URI=mongodb://mongo:mongo@localhost:27017 133 + SPRK_DB_NAME=dev 25 134 26 - # Server 27 135 NODE_ENV=development 28 136 SPRK_PORT=4000 29 - SPRK_PUBLIC_URL=http://localhost:3000 30 - SPRK_SERVER_DID=did:web:localhost 137 + SPRK_PUBLIC_URL=https://example.com 138 + SPRK_SERVER_DID=did:web:example.com 31 139 32 - # Keys, generate these with openssl ecparam --name secp256k1 --genkey --noout --outform DER | tail --bytes=+8 | head --bytes=32 | xxd --plain --cols 32 33 - # On Mac: openssl ecparam -name secp256k1 -genkey -noout -outform DER | tail --bytes=+8 | head --bytes=32 | xxd --plain --cols 32 34 - SPRK_PRIVATE_KEY=keyhex 140 + # openssl ecparam -name secp256k1 -genkey -noout -outform DER | tail -c +8 | head -c 32 | xxd -p -c 32 141 + SPRK_PRIVATE_KEY=your_private_key_hex 35 142 SPRK_ADMIN_PASSWORDS=password1,password2 143 + SPRK_MOD_SERVICE_DID=did:web:mod.bsky.app 144 + 145 + SPRK_VERSION=0.1.0 146 + SPRK_INDEXED_AT_EPOCH=2025-01-01T00:00:00Z 36 147 ```
+12 -6
data-plane/indexing/plugins/like.ts
··· 139 139 140 140 const updateAggregates = async (db: Database, like: IndexedLike) => { 141 141 try { 142 - // Update like count for the subject 143 142 const likeCount = await db.models.Like.countDocuments({ 144 143 subject: like.subject, 145 144 }); 146 145 147 146 const subjectUri = new AtUri(like.subject); 148 147 149 - // Check if this is a feed generator 150 148 if (subjectUri.collection === "so.sprk.feed.generator") { 151 149 const existingGenerator = await db.models.Generator.findOne({ 152 150 uri: like.subject, ··· 160 158 ); 161 159 } 162 160 } else { 163 - // Handle posts and other content types 164 161 const existingPost = await db.models.Post.findOne({ 165 162 uri: like.subject, 166 163 }); 167 164 168 165 if (existingPost) { 169 - // Only update existing posts 170 166 await db.models.Post.findOneAndUpdate( 171 167 { uri: like.subject }, 172 168 { $set: { likeCount } }, 173 169 { new: true }, 174 170 ); 175 171 } 176 - // We don't create a post if it doesn't exist, as we might lack required fields 172 + 173 + const existingReply = await db.models.Reply.findOne({ 174 + uri: like.subject, 175 + }); 176 + 177 + if (existingReply) { 178 + await db.models.Reply.findOneAndUpdate( 179 + { uri: like.subject }, 180 + { $set: { likeCount } }, 181 + { new: true }, 182 + ); 183 + } 177 184 } 178 185 } catch (error) { 179 186 console.error("Error updating like aggregates:", error); 180 - // Don't throw - allow processing to continue even if aggregates update fails 181 187 } 182 188 }; 183 189
+4 -5
data-plane/indexing/plugins/reply.ts
··· 288 288 }; 289 289 290 290 const updateAggregates = async (db: Database, replyIdx: IndexedReply) => { 291 - // Update reply count for parent post 292 291 if (replyIdx.reply.reply?.parent?.uri) { 293 292 const parentPost = await db.models.Post.findOne({ 294 293 uri: replyIdx.reply.reply?.parent.uri, ··· 304 303 if (parentPost) { 305 304 await db.models.Post.findOneAndUpdate( 306 305 { uri: replyIdx.reply.reply?.parent.uri }, 307 - { replyCount }, 308 - { upsert: true, new: true }, 306 + { $set: { replyCount } }, 307 + { new: true }, 309 308 ); 310 309 } else if (parentReply) { 311 310 await db.models.Reply.findOneAndUpdate( 312 311 { uri: replyIdx.reply.reply?.parent.uri }, 313 - { replyCount }, 314 - { upsert: true, new: true }, 312 + { $set: { replyCount } }, 313 + { new: true }, 315 314 ); 316 315 } 317 316 }
+2 -11
data-plane/indexing/plugins/repost.ts
··· 167 167 168 168 const updateAggregates = async (db: Database, repost: IndexedRepost) => { 169 169 try { 170 - // Update repost count for the subject 171 170 const repostCount = await db.models.Repost.countDocuments({ 172 171 "subject.uri": repost.subject.uri, 173 172 }); 174 173 175 - // First check if post exists to avoid creating one with missing fields 176 174 const existingPost = await db.models.Post.findOne({ 177 175 uri: repost.subject.uri, 178 176 }); 179 177 180 178 if (existingPost) { 181 - // Only update existing posts 182 179 await db.models.Post.findOneAndUpdate( 183 180 { uri: repost.subject.uri }, 184 - { repostCount }, 181 + { $set: { repostCount } }, 185 182 { new: true }, 186 183 ); 187 184 } 188 - // We don't create a post if it doesn't exist, as we might lack required fields 189 185 190 - // Update repost count for the author (optional enhancement) 191 186 const authorRepostCount = await db.models.Repost.countDocuments({ 192 187 authorDid: repost.authorDid, 193 188 }); 194 189 195 - // First check if profile exists to avoid creating one with null URI 196 190 const existingProfile = await db.models.Profile.findOne({ 197 191 authorDid: repost.authorDid, 198 192 }); 199 193 200 194 if (existingProfile) { 201 - // Only update existing profiles to avoid creating profiles with null URI 202 195 await db.models.Profile.findOneAndUpdate( 203 196 { authorDid: repost.authorDid }, 204 - { repostCount: authorRepostCount }, 197 + { $set: { repostCount: authorRepostCount } }, 205 198 { new: true }, 206 199 ); 207 200 } 208 - // We don't create a profile if it doesn't exist, as we lack required URI field 209 201 } catch (error) { 210 202 console.error("Error updating repost aggregates:", error); 211 - // Don't throw - allow processing to continue even if aggregates update fails 212 203 } 213 204 }; 214 205
+23 -36
data-plane/routes/interactions.ts
··· 19 19 return { likes: [], replies: [], reposts: [], quotes: [] }; 20 20 } 21 21 22 - // Get interaction counts for posts 23 - const [likes, reposts] = await Promise.all([ 24 - // Count likes for each URI 25 - this.db.models.Like.aggregate([ 26 - { $match: { "subject.uri": { $in: uris } } }, 27 - { $group: { _id: "$subject.uri", count: { $sum: 1 } } }, 28 - ]), 29 - // Count reposts for each URI 30 - this.db.models.Repost.aggregate([ 31 - { $match: { "subject.uri": { $in: uris } } }, 32 - { $group: { _id: "$subject.uri", count: { $sum: 1 } } }, 33 - ]), 22 + // Get pre-computed counts from Post and Reply documents 23 + const [posts, replies] = await Promise.all([ 24 + this.db.models.Post.find( 25 + { uri: { $in: uris } }, 26 + { uri: 1, likeCount: 1, replyCount: 1, repostCount: 1 }, 27 + ), 28 + this.db.models.Reply.find( 29 + { uri: { $in: uris } }, 30 + { uri: 1, likeCount: 1, replyCount: 1, repostCount: 1 }, 31 + ), 34 32 ]); 35 33 36 - // Count replies by finding posts that have a reply.parent.uri matching our URIs 37 - const replies = await this.db.models.Post.aggregate([ 38 - { $match: { "reply.parent.uri": { $in: uris } } }, 39 - { $group: { _id: "$reply.parent.uri", count: { $sum: 1 } } }, 40 - ]); 34 + // Create lookup maps from pre-computed counts 35 + const likesMap = new Map<string, number>(); 36 + const repliesMap = new Map<string, number>(); 37 + const repostsMap = new Map<string, number>(); 41 38 42 - // Count quotes by finding posts that have an embed.record.uri matching our URIs 43 - const quotes = await this.db.models.Post.aggregate([ 44 - { $match: { "embed.record.uri": { $in: uris } } }, 45 - { $group: { _id: "$embed.record.uri", count: { $sum: 1 } } }, 46 - ]); 39 + for (const post of posts) { 40 + likesMap.set(post.uri, post.likeCount ?? 0); 41 + repliesMap.set(post.uri, post.replyCount ?? 0); 42 + repostsMap.set(post.uri, post.repostCount ?? 0); 43 + } 47 44 48 - // Create lookup maps 49 - const likesMap = new Map( 50 - likes.map((item: AggregationResult) => [item._id, item.count]), 51 - ); 52 - const repliesMap = new Map( 53 - replies.map((item: AggregationResult) => [item._id, item.count]), 54 - ); 55 - const repostsMap = new Map( 56 - reposts.map((item: AggregationResult) => [item._id, item.count]), 57 - ); 58 - const quotesMap = new Map( 59 - quotes.map((item: AggregationResult) => [item._id, item.count]), 60 - ); 45 + for (const reply of replies) { 46 + likesMap.set(reply.uri, reply.likeCount ?? 0); 47 + repliesMap.set(reply.uri, reply.replyCount ?? 0); 48 + } 61 49 62 50 return { 63 51 likes: uris.map((uri) => likesMap.get(uri) ?? 0), 64 52 replies: uris.map((uri) => repliesMap.get(uri) ?? 0), 65 53 reposts: uris.map((uri) => repostsMap.get(uri) ?? 0), 66 - quotes: uris.map((uri) => quotesMap.get(uri) ?? 0), 67 54 }; 68 55 } 69 56
+59 -7
data-plane/routes/threads.ts
··· 68 68 validateThreadParams(above, below); 69 69 70 70 try { 71 - // Verify the original post exists and is a root post (posts can't have ancestors) 71 + // Check if it's a post or reply 72 72 const originalPost = await this.db.models.Post.findOne({ uri: postUri }); 73 73 74 - if (!originalPost) { 74 + if (originalPost) { 75 + // Posts are always root - they don't have ancestors by design 76 + // So we only get descendants (replies) 77 + const descendants = await getDescendants(this.db, postUri, below); 78 + 79 + // The thread is just the root post + all its descendant replies 80 + const uris = [ 81 + postUri, // The original post (always root) 82 + ...descendants, 83 + ]; 84 + 85 + // Remove duplicates while preserving order 86 + const uniqueUris = Array.from(new Set(uris)); 87 + 88 + return { 89 + uris: uniqueUris, 90 + meta: { 91 + ancestorCount: 0, // Posts never have ancestors 92 + descendantCount: descendants.length, 93 + totalCount: uniqueUris.length, 94 + }, 95 + }; 96 + } 97 + 98 + // Check if it's a reply 99 + const originalReply = await this.db.models.Reply.findOne({ 100 + uri: postUri, 101 + }); 102 + 103 + if (!originalReply) { 75 104 throw new DataPlaneError(Code.NotFound); 76 105 } 77 106 78 - // Posts are always root - they don't have ancestors by design 79 - // So we only get descendants (replies) 107 + // Get ancestors (walking up the reply chain) 108 + const ancestors: string[] = []; 109 + let currentUri = postUri; 110 + const visited = new Set<string>([currentUri]); 111 + 112 + for (let i = 0; i < above; i++) { 113 + const current = await this.db.models.Reply.findOne({ uri: currentUri }); 114 + 115 + if (!current?.reply?.parent?.uri) { 116 + break; 117 + } 118 + 119 + const parentUri = current.reply.parent.uri; 120 + 121 + if (visited.has(parentUri)) { 122 + break; 123 + } 124 + 125 + visited.add(parentUri); 126 + ancestors.unshift(parentUri); // Add to beginning to maintain order 127 + currentUri = parentUri; 128 + } 129 + 130 + // Get descendants (replies to this reply) 80 131 const descendants = await getDescendants(this.db, postUri, below); 81 132 82 - // The thread is just the root post + all its descendant replies 133 + // Build the full thread: ancestors + anchor + descendants 83 134 const uris = [ 84 - postUri, // The original post (always root) 135 + ...ancestors, 136 + postUri, // The anchor reply 85 137 ...descendants, 86 138 ]; 87 139 ··· 91 143 return { 92 144 uris: uniqueUris, 93 145 meta: { 94 - ancestorCount: 0, // Posts never have ancestors 146 + ancestorCount: ancestors.length, 95 147 descendantCount: descendants.length, 96 148 totalCount: uniqueUris.length, 97 149 },
-1
hydration/feed.ts
··· 21 21 export type Posts = HydrationMap<Post>; 22 22 export type Reply = RecordInfo<ReplyRecord>; 23 23 export type Replies = HydrationMap<Reply>; 24 - 25 24 export type Sound = RecordInfo<AudioRecord>; 26 25 export type Sounds = HydrationMap<Sound>; 27 26