[READ ONLY MIRROR] Spark Social AppView Server github.com/sprksocial/server
atproto deno hono lexicon
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

actor and indexing

+933 -211
+50
services/appview/src/db.ts
··· 404 404 // Ensure compound index on did + cid for blob takedowns 405 405 blobTakedownSchema.index({ did: 1, cid: 1 }, { unique: true }) 406 406 407 + export interface ActorDocument extends Document { 408 + uri: string 409 + did: string 410 + handle?: string 411 + profile?: ProfileDocument 412 + profileCid?: string 413 + profileTakedownRef?: string 414 + followersCount: number 415 + followingCount: number 416 + postsCount: number 417 + sortedAt?: Date 418 + indexedAt: string 419 + takedownRef?: string 420 + isLabeler: boolean 421 + allowIncomingChatsFrom?: string 422 + upstreamStatus?: string 423 + createdAt?: Date 424 + priorityNotifications: boolean 425 + trustedVerifier?: boolean 426 + labelsDeclaration?: Record<string, any> 427 + } 428 + 429 + export const actorSchema = new Schema<ActorDocument>({ 430 + uri: { type: String, required: true, unique: true, index: true }, 431 + did: { type: String, required: true, index: true }, 432 + handle: { type: String, required: false, index: true }, 433 + profile: { type: Schema.Types.ObjectId, ref: 'Profile', required: false }, 434 + profileCid: { type: String, required: false }, 435 + profileTakedownRef: { type: String, required: false }, 436 + followersCount: { type: Number, required: true, default: 0 }, 437 + followingCount: { type: Number, required: true, default: 0 }, 438 + postsCount: { type: Number, required: true, default: 0 }, 439 + sortedAt: { type: Date, required: false }, 440 + indexedAt: { type: String, required: true }, 441 + takedownRef: { type: String, required: false }, 442 + isLabeler: { type: Boolean, required: true, default: false }, 443 + allowIncomingChatsFrom: { type: String, required: false }, 444 + upstreamStatus: { type: String, required: false }, 445 + createdAt: { type: Date, required: false }, 446 + priorityNotifications: { type: Boolean, required: true, default: false }, 447 + trustedVerifier: { type: Boolean, required: false }, 448 + labelsDeclaration: { type: Object, required: false }, 449 + }) 450 + 451 + // Add compound indexes for Actor 452 + actorSchema.index({ handle: 'text' }) 453 + actorSchema.index({ did: 1 }, { unique: true }) 454 + 407 455 export interface DatabaseModels { 408 456 Like: Model<LikeDocument> 409 457 Post: Model<PostDocument> ··· 418 466 Takedown: Model<TakedownDocument> 419 467 RepoTakedown: Model<RepoTakedownDocument> 420 468 BlobTakedown: Model<BlobTakedownDocument> 469 + Actor: Model<ActorDocument> 421 470 } 422 471 423 472 export class Database { ··· 453 502 'BlobTakedown', 454 503 blobTakedownSchema, 455 504 ), 505 + Actor: this.connection.model<ActorDocument>('Actor', actorSchema), 456 506 } 457 507 } 458 508
+16 -1
services/appview/src/index.ts
··· 25 25 import { createGetRecordRouter } from './routes/com/atproto/repo/getRecord.js' 26 26 import wellKnownRouter from './well-known.js' 27 27 import { TakedownService } from './services/takedown.js' 28 + import { IndexingService } from './services/indexing.js' 29 + 30 + // Extend Hono's context variable map to include our services 31 + declare module 'hono' { 32 + interface ContextVariableMap { 33 + serviceDid: string 34 + didResolver: DidResolver 35 + takedownService: TakedownService 36 + indexingService: IndexingService 37 + } 38 + } 28 39 29 40 export type AppContext = { 30 41 db: Database ··· 33 44 serviceDid: string 34 45 didResolver: DidResolver 35 46 takedownService: TakedownService 47 + indexingService: IndexingService 36 48 } 37 49 38 50 export class Server { ··· 53 65 // Get service DID from environment 54 66 const serviceDid = env.SERVICE_DID 55 67 56 - // Create takedown service 68 + // Create services 57 69 const takedownService = new TakedownService(db) 70 + const indexingService = new IndexingService(db, resolver) 58 71 59 72 const ctx = { 60 73 db, ··· 63 76 serviceDid, 64 77 didResolver: baseIdResolver.did, 65 78 takedownService, 79 + indexingService, 66 80 } 67 81 68 82 const app = new Hono() ··· 73 87 c.set('serviceDid', serviceDid) 74 88 c.set('didResolver', baseIdResolver.did) 75 89 c.set('takedownService', takedownService) 90 + c.set('indexingService', indexingService) 76 91 await next() 77 92 }) 78 93
+29
services/appview/src/routes/com/atproto/admin/getAccountInfos.ts
··· 1 + import { Hono } from 'hono' 2 + import { zValidator } from '@hono/zod-validator' 3 + import { z } from 'zod' 4 + import { HTTPException } from 'hono/http-exception' 5 + import { TakedownService } from '../../../../services/takedown.js' 6 + import { authMiddleware } from '../../../../auth/middleware.js' 7 + import { AppContext } from '../../../../index.js' 8 + import type * as ComAtprotoAdminUpdateSubjectStatus from '../../../../lexicon/types/com/atproto/admin/updateSubjectStatus.js' 9 + import type * as ComAtprotoAdminDefs from '../../../../lexicon/types/com/atproto/admin/defs.js' 10 + import type * as ComAtprotoRepoStrongRef from '../../../../lexicon/types/com/atproto/repo/strongRef.js' 11 + 12 + export const createGetAccountInfosRouter = (ctx: AppContext) => { 13 + const router = new Hono() 14 + 15 + // XRPC endpoint for Ozone integration: com.atproto.admin.getAccountInfos 16 + router.get( 17 + '/xrpc/com.atproto.admin.getAccountInfos', 18 + (c, next) => authMiddleware(c, next, true), 19 + zValidator( 20 + 'json', 21 + z.object({ 22 + dids: z.array(z.string()), 23 + }), 24 + ), 25 + async (c) => { 26 + 27 + } 28 + ) 29 + }
+159 -181
services/appview/src/routes/so/sprk/actor/getProfile.ts
··· 15 15 '/xrpc/so.sprk.actor.getProfile', 16 16 optionalAuthMiddleware, 17 17 async (c) => { 18 - const actor = c.req.query('actor') 18 + const actorIdentifier = c.req.query('actor') 19 19 const viewerDid = c.get('did') as string | undefined 20 + const now = new Date().toISOString() 20 21 21 - if (!actor) { 22 + if (!actorIdentifier) { 22 23 return c.json({ error: 'Actor not provided' }, 400) 23 24 } 24 25 25 - let actorDidDoc 26 - if (isValidHandle(actor)) { 27 - actorDidDoc = await ctx.resolver.resolveHandleToDidDoc(actor) 28 - } else { 29 - try { 30 - ensureValidDid(actor) 31 - actorDidDoc = await ctx.resolver.resolveDidToDidDoc(actor) 32 - } catch (err) { 33 - return c.json({ error: 'Invalid actor' }, 400) 26 + // Resolve actor DID from handle or DID 27 + let actorDid: string 28 + try { 29 + if (isValidHandle(actorIdentifier)) { 30 + const didDoc = await ctx.resolver.resolveHandleToDidDoc(actorIdentifier) 31 + actorDid = didDoc.did 32 + } else { 33 + ensureValidDid(actorIdentifier) 34 + actorDid = actorIdentifier 34 35 } 36 + } catch (err) { 37 + return c.json({ error: 'Invalid actor' }, 400) 35 38 } 36 39 37 - const actorDid = actorDidDoc.did 40 + // Use the indexing service - it will handle recency checks internally 41 + try { 42 + await ctx.indexingService.indexHandle(actorDid, now) 43 + } catch (error) { 44 + ctx.logger.warn({ error, did: actorDid }, 'Failed to index handle') 45 + // Continue anyway - we might still have data 46 + } 38 47 39 - // Get profile data 40 - const profile = await ctx.db.models.Profile.findOne({ 41 - authorDid: actorDid, 42 - }).lean() 48 + // Find the actor with populated profile 49 + const actor = await ctx.db.models.Actor.findOne({ did: actorDid }) 50 + .populate('profile') 51 + .lean() 43 52 44 - if (!profile) { 45 - return c.json({ error: 'Profile not found' }, 404) 53 + if (!actor) { 54 + return c.json({ error: 'Actor not found' }, 404) 46 55 } 56 + 57 + return generateProfileResponse(c, ctx, actor, viewerDid, actorDid) 58 + }, 59 + ) 47 60 48 - const profileHandle = await ctx.resolver.resolveDidToHandle( 49 - profile.authorDid, 50 - ) 51 - 52 - // Get follower count 53 - const followersCount = await ctx.db.models.Follow.countDocuments({ 54 - subject: actorDid, 55 - }) 56 - 57 - // Get follows count 58 - const followsCount = await ctx.db.models.Follow.countDocuments({ 59 - authorDid: actorDid, 60 - }) 61 - 62 - // Get posts count 63 - const postsCount = await ctx.db.models.Post.countDocuments({ 64 - authorDid: actorDid, 65 - }) 61 + return router 62 + } 66 63 67 - // Build viewer state if a user is authenticated 68 - const viewer: SoSprkActorDefs.ViewerState = {} 64 + // Helper function to handle the rest of the profile rendering logic 65 + async function generateProfileResponse( 66 + c: any, 67 + ctx: AppContext, 68 + actor: any, 69 + viewerDid: string | undefined, 70 + actorDid: string 71 + ) { 72 + const profile = actor.profile as any 73 + 74 + // Build viewer state if a user is authenticated 75 + const viewer: SoSprkActorDefs.ViewerState = {} 69 76 70 - if (viewerDid) { 71 - // Check if viewer follows this profile 72 - const follow = await ctx.db.models.Follow.findOne({ 73 - subject: actorDid, 74 - authorDid: viewerDid, 75 - }) 76 - if (follow) { 77 - viewer.following = follow.uri 78 - } 77 + if (viewerDid) { 78 + // Check if viewer follows this profile 79 + const follow = await ctx.db.models.Follow.findOne({ 80 + subject: actorDid, 81 + authorDid: viewerDid, 82 + }) 83 + if (follow) { 84 + viewer.following = follow.uri 85 + } 79 86 80 - // Check if this profile follows the viewer 81 - const followedBy = await ctx.db.models.Follow.findOne({ 82 - subject: viewerDid, 83 - authorDid: actorDid, 84 - }) 85 - if (followedBy) { 86 - viewer.followedBy = followedBy.uri 87 - } 87 + // Check if this profile follows the viewer 88 + const followedBy = await ctx.db.models.Follow.findOne({ 89 + subject: viewerDid, 90 + authorDid: actorDid, 91 + }) 92 + if (followedBy) { 93 + viewer.followedBy = followedBy.uri 94 + } 88 95 89 - // Check if viewer has blocked this profile 90 - const block = await ctx.db.models.Block.findOne({ 91 - subject: actorDid, 92 - authorDid: viewerDid, 93 - }) 94 - if (block) { 95 - viewer.blocking = block.uri 96 - } 96 + // Check if viewer has blocked this profile 97 + const block = await ctx.db.models.Block.findOne({ 98 + subject: actorDid, 99 + authorDid: viewerDid, 100 + }) 101 + if (block) { 102 + viewer.blocking = block.uri 103 + } 97 104 98 - // Check if this profile has blocked the viewer 99 - const blockedBy = await ctx.db.models.Block.findOne({ 100 - subject: viewerDid, 101 - authorDid: actorDid, 102 - }) 103 - if (blockedBy) { 104 - viewer.blockedBy = true 105 - } 105 + // Check if this profile has blocked the viewer 106 + const blockedBy = await ctx.db.models.Block.findOne({ 107 + subject: viewerDid, 108 + authorDid: actorDid, 109 + }) 110 + if (blockedBy) { 111 + viewer.blockedBy = true 112 + } 106 113 107 - // Get known followers (followers of the profile that the viewer also follows) 108 - if (followersCount > 0) { 109 - // Get the followers of this profile 110 - const followers = await ctx.db.models.Follow.find({ 111 - subject: actorDid, 112 - }).lean() 114 + // Get known followers only if profile exists 115 + if (profile && actor.followersCount > 0) { 116 + // Get the followers of this profile 117 + const followers = await ctx.db.models.Follow.find({ 118 + subject: actorDid, 119 + }).lean() 113 120 114 - const followerDids = followers.map((f) => f.authorDid) 121 + const followerDids = followers.map((f) => f.authorDid) 115 122 116 - // Check which of these followers the viewer follows 117 - const knownFollowsQuery = await ctx.db.models.Follow.find({ 118 - subject: { $in: followerDids }, 119 - authorDid: viewerDid, 120 - }).lean() 123 + // Check which of these followers the viewer follows 124 + const knownFollowsQuery = await ctx.db.models.Follow.find({ 125 + subject: { $in: followerDids }, 126 + authorDid: viewerDid, 127 + }).lean() 121 128 122 - if (knownFollowsQuery.length > 0) { 123 - const knownFollowerDids = knownFollowsQuery.map((f) => f.subject) 129 + if (knownFollowsQuery.length > 0) { 130 + const knownFollowerDids = knownFollowsQuery.map((f) => f.subject) 124 131 125 - // Get profiles for known followers 126 - const knownFollowerProfiles = await ctx.db.models.Profile.find({ 127 - authorDid: { $in: knownFollowerDids }, 128 - }) 129 - .limit(3) 130 - .lean() 132 + // Get actors for known followers 133 + const knownFollowerActors = await ctx.db.models.Actor.find({ 134 + did: { $in: knownFollowerDids }, 135 + }) 136 + .populate('profile') 137 + .limit(3) 138 + .lean() 131 139 132 - const knownFollowersBasic = await Promise.all( 133 - knownFollowerProfiles.map(async (p) => { 134 - const handle = await ctx.resolver.resolveDidToHandle( 135 - p.authorDid, 136 - ) 137 - return { 138 - did: p.authorDid, 139 - handle, 140 - displayName: p.displayName, 141 - avatar: p.avatar 142 - ? `https://media.sprk.so/avatar/tiny/${p.authorDid}/${p.avatar.ref.$link}/webp` 143 - : undefined, 144 - } as SoSprkActorDefs.ProfileViewBasic 145 - }), 146 - ) 140 + const knownFollowersBasic = knownFollowerActors 141 + .filter(a => a.profile) 142 + .map((a) => { 143 + const p = a.profile as any 144 + return { 145 + did: a.did, 146 + handle: a.handle || a.did, 147 + displayName: p?.displayName, 148 + avatar: p?.avatar 149 + ? `https://media.sprk.so/avatar/tiny/${a.did}/${p.avatar.ref.$link}/webp` 150 + : undefined, 151 + } as SoSprkActorDefs.ProfileViewBasic 152 + }) 147 153 148 - viewer.knownFollowers = { 149 - count: knownFollowsQuery.length, 150 - followers: knownFollowersBasic, 151 - } 152 - } 154 + viewer.knownFollowers = { 155 + count: knownFollowsQuery.length, 156 + followers: knownFollowersBasic, 153 157 } 154 158 } 159 + } 160 + } 155 161 156 - // Check for associated services 157 - const associated: SoSprkActorDefs.ProfileAssociated = {} 162 + // Build the ProfileViewDetailed response with required fields 163 + const profileView: SoSprkActorDefs.ProfileViewDetailed = { 164 + did: actorDid, 165 + handle: actor.handle || actorDid, 166 + viewer: viewerDid ? viewer : undefined, 167 + } 158 168 159 - // Check for feed generators 160 - let feedgensCount = 0 161 - try { 162 - if (ctx.db.models.Generator) { 163 - feedgensCount = await ctx.db.models.Generator.countDocuments({ 164 - authorDid: actorDid, 165 - }) 166 - } 167 - } catch (error) { 168 - // Ignore if model doesn't exist 169 - } 169 + // Only add optional fields if profile exists 170 + if (profile) { 171 + const associated: SoSprkActorDefs.ProfileAssociated = {} 170 172 171 - if (feedgensCount > 0) { 172 - associated.feedgens = feedgensCount 173 + // Check for feed generators 174 + let feedgensCount = 0 175 + try { 176 + if (ctx.db.models.Generator) { 177 + feedgensCount = await ctx.db.models.Generator.countDocuments({ 178 + authorDid: actorDid, 179 + }) 173 180 } 181 + } catch (error) { 182 + // Ignore if model doesn't exist 183 + } 174 184 175 - // Get avatar and banner URLs 176 - const avatar = profile.avatar 185 + if (feedgensCount > 0) { 186 + associated.feedgens = feedgensCount 187 + } 188 + 189 + Object.assign(profileView, { 190 + displayName: profile.displayName, 191 + description: profile.description, 192 + avatar: profile.avatar 177 193 ? `https://media.sprk.so/avatar/tiny/${actorDid}/${profile.avatar.ref.$link}/webp` 178 - : undefined 179 - const banner = profile.banner 194 + : undefined, 195 + banner: profile.banner 180 196 ? `https://media.sprk.so/img/tiny/${actorDid}/${profile.banner.ref.$link}/webp` 181 - : undefined 197 + : undefined, 198 + followersCount: actor.followersCount, 199 + followsCount: actor.followingCount, 200 + postsCount: actor.postsCount, 201 + associated: Object.keys(associated).length > 0 ? associated : undefined, 202 + joinedViaStarterPack: profile.joinedViaStarterPack as unknown as SoSprkGraphDefs.StarterPackViewBasic, 203 + indexedAt: profile.indexedAt, 204 + createdAt: profile.createdAt, 205 + labels: Array.isArray(profile.labels) ? profile.labels as Label[] : undefined, 206 + pinnedPost: profile.pinnedPost as unknown as ComAtprotoRepoStrongRef.Main, 207 + }) 208 + } 182 209 183 - // Convert joinedViaStarterPack to the correct type if it exists 184 - let joinedViaStarterPack: 185 - | SoSprkGraphDefs.StarterPackViewBasic 186 - | undefined = undefined 187 - if (profile.joinedViaStarterPack) { 188 - // Type assertion assuming the structure fits the requirements 189 - joinedViaStarterPack = 190 - profile.joinedViaStarterPack as unknown as SoSprkGraphDefs.StarterPackViewBasic 191 - } 192 - 193 - // Convert labels to the correct type if it exists 194 - let labels: Label[] | undefined = undefined 195 - if (profile.labels) { 196 - labels = Array.isArray(profile.labels) 197 - ? (profile.labels as Label[]) 198 - : undefined 199 - } 200 - 201 - // Convert pinnedPost to the correct type if it exists 202 - let pinnedPost: ComAtprotoRepoStrongRef.Main | undefined = undefined 203 - if (profile.pinnedPost) { 204 - pinnedPost = 205 - profile.pinnedPost as unknown as ComAtprotoRepoStrongRef.Main 206 - } 207 - 208 - // Build the ProfileViewDetailed response 209 - const profileView: SoSprkActorDefs.ProfileViewDetailed = { 210 - did: actorDid, 211 - handle: profileHandle, 212 - displayName: profile.displayName, 213 - description: profile.description, 214 - avatar, 215 - banner, 216 - followersCount, 217 - followsCount, 218 - postsCount, 219 - associated: Object.keys(associated).length > 0 ? associated : undefined, 220 - joinedViaStarterPack, 221 - indexedAt: profile.indexedAt, 222 - createdAt: profile.createdAt, 223 - viewer: Object.keys(viewer).length > 0 ? viewer : undefined, 224 - labels, 225 - pinnedPost, 226 - } 227 - 228 - return c.json(profileView) 229 - }, 230 - ) 231 - 232 - return router 210 + return c.json(profileView) 233 211 }
+55 -25
services/appview/src/routes/so/sprk/actor/searchActor.ts
··· 29 29 } 30 30 } 31 31 32 - const filter: any = {} 32 + // Build the filter for actors instead of directly searching profiles 33 + const actorFilter: any = {} 33 34 const sort: any = {} 34 35 36 + // Only search for actors that already have profiles 37 + actorFilter.profile = { $exists: true, $ne: null } 38 + 35 39 if (q) { 36 40 const escaped = escapeRegExp(q) 37 41 const regex = new RegExp(escaped, 'i') 38 - filter.$or = [ 39 - { displayName: regex }, 40 - { description: regex }, 41 - { handle: regex }, 42 + 43 + // Search by handle directly on actor model 44 + actorFilter.$or = [ 45 + { handle: regex } 42 46 ] 43 - // fall back to sorting by createdAt 44 - sort.createdAt = -1 47 + 48 + // For queries matching profile fields, we need to find actors by their profiles 49 + const profileIds = await ctx.db.models.Profile.find({ 50 + $or: [ 51 + { displayName: regex }, 52 + { description: regex } 53 + ] 54 + }) 55 + .select('_id authorDid') 56 + .lean() 57 + 58 + // Add actor DIDs from matching profiles 59 + if (profileIds.length > 0) { 60 + const profileDids = profileIds.map(p => p.authorDid) 61 + // Add to $or condition 62 + actorFilter.$or.push({ did: { $in: profileDids } }) 63 + } 64 + 65 + // Sort by recency and relevance 66 + sort.indexedAt = -1 45 67 } else { 46 - sort.createdAt = -1 68 + // Default sort for discovery - prioritize recently indexed actors 69 + sort.indexedAt = -1 47 70 } 48 71 49 - const profiles = await ctx.db.models.Profile.find(filter) 72 + // Find actors with populated profiles - no need to index them 73 + const actorsWithProfiles = await ctx.db.models.Actor.find(actorFilter) 74 + .populate('profile') 50 75 .sort(sort) 51 76 .skip(skip) 52 77 .limit(limit) 53 78 .lean() 54 79 55 - const actors: SoSprkActorDefs.ProfileView[] = await Promise.all( 56 - profiles.map(async (p) => { 57 - const avatar = p.avatar 58 - ? `https://media.sprk.so/avatar/tiny/${p.authorDid}/${(p.avatar as any).ref.$link}/webp` 80 + // Filter out any invalid profiles and transform to profile views 81 + const actors: SoSprkActorDefs.ProfileView[] = actorsWithProfiles 82 + .filter(actor => actor.profile) 83 + .map(actor => { 84 + const profile = actor.profile as any 85 + 86 + const avatar = profile?.avatar 87 + ? `https://media.sprk.so/avatar/tiny/${actor.did}/${profile.avatar.ref.$link}/webp` 59 88 : undefined 60 - const labels = Array.isArray(p.labels) 61 - ? (p.labels as Label[]) 89 + 90 + const labels = profile?.labels && Array.isArray(profile.labels) 91 + ? (profile.labels as Label[]) 62 92 : undefined 63 - const handle = await ctx.resolver.resolveDidToHandle(p.authorDid) 93 + 64 94 return { 65 95 $type: 'so.sprk.actor.defs#profileView', 66 - did: p.authorDid, 67 - handle: handle, 68 - displayName: p.displayName, 69 - description: p.description, 96 + did: actor.did, 97 + handle: actor.handle || actor.did, 98 + displayName: profile?.displayName, 99 + description: profile?.description, 70 100 avatar, 71 - indexedAt: p.indexedAt, 72 - createdAt: p.createdAt, 101 + indexedAt: actor.indexedAt, 102 + createdAt: actor.createdAt ? new Date(actor.createdAt).toISOString() : undefined, 73 103 labels, 74 104 } satisfies SoSprkActorDefs.ProfileView 75 - }), 76 - ) 105 + }) 77 106 107 + // Calculate cursor for pagination 78 108 const nextCursor = 79 - profiles.length === limit ? String(skip + limit) : undefined 109 + actorsWithProfiles.length === limit ? String(skip + limit) : undefined 80 110 const result: SoSprkActorSearch.OutputSchema = { actors } 81 111 if (nextCursor) { 82 112 result.cursor = nextCursor
+209
services/appview/src/services/indexing.ts
··· 1 + import { AtUri } from '@atproto/syntax' 2 + import { CID } from 'multiformats/cid' 3 + import { Document } from 'mongoose' 4 + import { BidirectionalResolver } from '../id-resolver.js' 5 + import { Database } from '../db.js' 6 + import { pino } from 'pino' 7 + import * as Post from './plugins/post.js' 8 + 9 + const logger = pino({ name: 'indexing-service' }) 10 + 11 + // Generic type for model processors 12 + type RecordProcessor<T extends Document> = { 13 + collection: string 14 + insertRecord: ( 15 + uri: AtUri, 16 + cid: CID, 17 + record: unknown, 18 + timestamp: string, 19 + opts?: { disableNotifs?: boolean }, 20 + ) => Promise<T | null> 21 + updateRecord: ( 22 + uri: AtUri, 23 + cid: CID, 24 + record: unknown, 25 + timestamp: string, 26 + ) => Promise<T | null> 27 + deleteRecord: (uri: AtUri, cascading?: boolean) => Promise<void> 28 + } 29 + 30 + /** 31 + * Service to handle indexing of records from the Atproto network 32 + */ 33 + export class IndexingService { 34 + private records: Record<string, RecordProcessor<any>> = {} 35 + private logger = pino({ name: 'indexing-service' }) 36 + 37 + constructor( 38 + private db: Database, 39 + private resolver: BidirectionalResolver, 40 + ) { 41 + // Register record processors 42 + this.records.post = Post.makePlugin(db) 43 + 44 + // Additional plugins would be registered here 45 + // Example: 46 + // this.records.like = Like.makePlugin(db) 47 + // this.records.follow = Follow.makePlugin(db) 48 + // etc. 49 + } 50 + 51 + /** 52 + * Index a record in the database 53 + * 54 + * @param uri The URI of the record 55 + * @param cid The CID of the record 56 + * @param obj The record data 57 + * @param action The action type (create/update) 58 + * @param timestamp The timestamp of the operation 59 + * @param opts Optional parameters 60 + */ 61 + async indexRecord( 62 + uri: AtUri, 63 + cid: CID, 64 + obj: unknown, 65 + action: 'create' | 'update', 66 + timestamp: string, 67 + opts?: { disableNotifs?: boolean }, 68 + ): Promise<void> { 69 + try { 70 + const indexer = this.findIndexerForCollection(uri.collection) 71 + if (!indexer) { 72 + this.logger.debug({ collection: uri.collection }, 'No indexer found for collection') 73 + return 74 + } 75 + 76 + if (action === 'create') { 77 + await indexer.insertRecord(uri, cid, obj, timestamp, opts) 78 + } else { 79 + await indexer.updateRecord(uri, cid, obj, timestamp) 80 + } 81 + } catch (error) { 82 + this.logger.error( 83 + { error, uri: uri.toString(), cid: cid.toString(), action }, 84 + 'Error indexing record', 85 + ) 86 + } 87 + } 88 + 89 + /** 90 + * Delete a record from the database 91 + * 92 + * @param uri The URI of the record to delete 93 + * @param cascading Whether to cascade the deletion to related records 94 + */ 95 + async deleteRecord(uri: AtUri, cascading = false): Promise<void> { 96 + try { 97 + const indexer = this.findIndexerForCollection(uri.collection) 98 + if (!indexer) { 99 + this.logger.debug({ collection: uri.collection }, 'No indexer found for collection') 100 + return 101 + } 102 + 103 + await indexer.deleteRecord(uri, cascading) 104 + } catch (error) { 105 + this.logger.error( 106 + { error, uri: uri.toString() }, 107 + 'Error deleting record', 108 + ) 109 + } 110 + } 111 + 112 + /** 113 + * Index or update actor handle information 114 + * 115 + * @param did The DID of the actor 116 + * @param timestamp The timestamp of the operation 117 + * @param force Force reindexing even if recently indexed 118 + */ 119 + async indexHandle(did: string, timestamp: string, force = false): Promise<void> { 120 + try { 121 + // Find existing actor 122 + const actor = await this.db.models.Actor.findOne({ did }) 123 + 124 + // Skip if recently indexed and not forced 125 + if (!force && actor && this.isHandleRecentlyIndexed(actor, timestamp)) { 126 + return 127 + } 128 + 129 + // Resolve DID to handle 130 + const didDoc = await this.resolver.resolveDidToDidDoc(did) 131 + 132 + // Verify handle ownership 133 + let handle: string | undefined = undefined 134 + if (didDoc.handle) { 135 + const handleDidDoc = await this.resolver.resolveHandleToDidDoc(didDoc.handle) 136 + handle = did === handleDidDoc.did ? didDoc.handle.toLowerCase() : undefined 137 + } 138 + 139 + // Handle conflict resolution - if another actor has this handle 140 + if (handle) { 141 + const actorWithHandle = await this.db.models.Actor.findOne({ handle }) 142 + if (actorWithHandle && actorWithHandle.did !== did) { 143 + // Clear handle from the other actor 144 + await this.db.models.Actor.updateOne( 145 + { did: actorWithHandle.did }, 146 + { $set: { handle: null } } 147 + ) 148 + } 149 + } 150 + 151 + // Update or create actor 152 + await this.db.models.Actor.updateOne( 153 + { did }, 154 + { 155 + $set: { 156 + handle, 157 + indexedAt: timestamp 158 + }, 159 + $setOnInsert: { 160 + uri: `at://${did}/app.bsky.actor.profile`, 161 + followersCount: 0, 162 + followingCount: 0, 163 + postsCount: 0, 164 + isLabeler: false, 165 + priorityNotifications: false, 166 + } 167 + }, 168 + { upsert: true } 169 + ) 170 + } catch (error) { 171 + this.logger.error({ error, did }, 'Error indexing handle') 172 + } 173 + } 174 + 175 + /** 176 + * Find the indexer responsible for a collection 177 + * 178 + * @param collection The collection to find an indexer for 179 + * @returns The indexer or undefined if not found 180 + */ 181 + findIndexerForCollection(collection: string): RecordProcessor<any> | undefined { 182 + return Object.values(this.records).find( 183 + (indexer) => indexer.collection === collection 184 + ) 185 + } 186 + 187 + /** 188 + * Check if an actor's handle was recently indexed 189 + * 190 + * @param actor The actor document 191 + * @param timestamp Current timestamp 192 + * @returns Whether the actor was recently indexed 193 + */ 194 + private isHandleRecentlyIndexed(actor: any, timestamp: string): boolean { 195 + if (!actor.indexedAt) return false 196 + 197 + const timeDiff = new Date(timestamp).getTime() - new Date(actor.indexedAt).getTime() 198 + const ONE_DAY = 24 * 60 * 60 * 1000 199 + const ONE_HOUR = 60 * 60 * 1000 200 + 201 + // Reindex daily for all actors 202 + if (timeDiff > ONE_DAY) return false 203 + 204 + // Reindex more frequently for actors without handles 205 + if (actor.handle === null && timeDiff > ONE_HOUR) return false 206 + 207 + return true 208 + } 209 + }
+178
services/appview/src/services/plugins/post.ts
··· 1 + import { AtUri } from '@atproto/syntax' 2 + import { CID } from 'multiformats/cid' 3 + import { pino } from 'pino' 4 + import { Database, PostDocument } from '../../db.js' 5 + 6 + const logger = pino({ name: 'post-processor' }) 7 + 8 + export type PostRecord = { 9 + text: string 10 + createdAt: string 11 + facets?: Array<unknown> 12 + reply?: { 13 + root: { uri: string; cid: string } 14 + parent: { uri: string; cid: string } 15 + } 16 + embed?: unknown 17 + langs?: string[] 18 + labels?: unknown 19 + tags?: string[] 20 + } 21 + 22 + export type PostPluginType = { 23 + collection: string 24 + insertRecord: ( 25 + uri: AtUri, 26 + cid: CID, 27 + record: unknown, 28 + timestamp: string, 29 + opts?: { disableNotifs?: boolean }, 30 + ) => Promise<PostDocument | null> 31 + updateRecord: ( 32 + uri: AtUri, 33 + cid: CID, 34 + record: unknown, 35 + timestamp: string, 36 + ) => Promise<PostDocument | null> 37 + deleteRecord: (uri: AtUri, cascading?: boolean) => Promise<void> 38 + } 39 + 40 + /** 41 + * Create a post processor plugin for the indexing service 42 + */ 43 + export function makePlugin(db: Database): PostPluginType { 44 + return { 45 + collection: 'so.sprk.feed.post', 46 + 47 + async insertRecord( 48 + uri: AtUri, 49 + cid: CID, 50 + recordObj: unknown, 51 + timestamp: string, 52 + opts = {}, 53 + ): Promise<PostDocument | null> { 54 + const record = recordObj as PostRecord 55 + if (!record) return null 56 + 57 + try { 58 + // Find author information 59 + const actor = await db.models.Actor.findOne({ did: uri.hostname }) 60 + if (!actor) { 61 + logger.warn({ did: uri.hostname }, 'Actor not found when indexing post') 62 + return null 63 + } 64 + 65 + // Extract post data 66 + const postData = { 67 + uri: uri.toString(), 68 + cid: cid.toString(), 69 + text: record.text, 70 + facets: record.facets || [], 71 + reply: record.reply || null, 72 + embed: record.embed || null, 73 + langs: record.langs || [], 74 + labels: record.labels || null, 75 + tags: record.tags || [], 76 + authorDid: uri.hostname, 77 + authorHandle: actor.handle || uri.hostname, 78 + createdAt: record.createdAt, 79 + indexedAt: timestamp, 80 + } 81 + 82 + // Create the post 83 + const post = await db.models.Post.findOneAndUpdate( 84 + { uri: uri.toString() }, 85 + { $set: postData }, 86 + { upsert: true, new: true } 87 + ) 88 + 89 + // Increment post count for actor 90 + await db.models.Actor.updateOne( 91 + { did: uri.hostname }, 92 + { $inc: { postsCount: 1 } } 93 + ) 94 + 95 + return post 96 + } catch (error) { 97 + logger.error( 98 + { error, uri: uri.toString(), cid: cid.toString() }, 99 + 'Error inserting post record' 100 + ) 101 + return null 102 + } 103 + }, 104 + 105 + async updateRecord( 106 + uri: AtUri, 107 + cid: CID, 108 + recordObj: unknown, 109 + timestamp: string, 110 + ): Promise<PostDocument | null> { 111 + const record = recordObj as PostRecord 112 + if (!record) return null 113 + 114 + try { 115 + // Find author information 116 + const actor = await db.models.Actor.findOne({ did: uri.hostname }) 117 + if (!actor) { 118 + logger.warn({ did: uri.hostname }, 'Actor not found when updating post') 119 + return null 120 + } 121 + 122 + // Update post data 123 + const postData = { 124 + cid: cid.toString(), 125 + text: record.text, 126 + facets: record.facets || [], 127 + reply: record.reply || null, 128 + embed: record.embed || null, 129 + langs: record.langs || [], 130 + labels: record.labels || null, 131 + tags: record.tags || [], 132 + authorHandle: actor.handle || uri.hostname, 133 + indexedAt: timestamp, 134 + } 135 + 136 + // Update the post 137 + const post = await db.models.Post.findOneAndUpdate( 138 + { uri: uri.toString() }, 139 + { $set: postData }, 140 + { new: true } 141 + ) 142 + 143 + return post 144 + } catch (error) { 145 + logger.error( 146 + { error, uri: uri.toString(), cid: cid.toString() }, 147 + 'Error updating post record' 148 + ) 149 + return null 150 + } 151 + }, 152 + 153 + async deleteRecord(uri: AtUri): Promise<void> { 154 + try { 155 + const post = await db.models.Post.findOne({ uri: uri.toString() }) 156 + 157 + if (post) { 158 + // Delete the post 159 + await db.models.Post.deleteOne({ uri: uri.toString() }) 160 + 161 + // Decrement post count for actor 162 + await db.models.Actor.updateOne( 163 + { did: uri.hostname }, 164 + { $inc: { postsCount: -1 } } 165 + ) 166 + 167 + // Delete any associated likes, reposts, etc. as needed 168 + // This would be the cascading deletion logic 169 + } 170 + } catch (error) { 171 + logger.error( 172 + { error, uri: uri.toString() }, 173 + 'Error deleting post record' 174 + ) 175 + } 176 + }, 177 + } 178 + }
+2
services/ingester/src/db/connection.ts
··· 11 11 musicSchema, 12 12 lookSchema, 13 13 generatorSchema, 14 + actorSchema, 14 15 } from './models.js' 15 16 import { env } from '../utils/env.js' 16 17 import { pino } from 'pino' ··· 33 34 Music: this.connection.model('Music', musicSchema), 34 35 Look: this.connection.model('Look', lookSchema), 35 36 Generator: this.connection.model('Generator', generatorSchema), 37 + Actor: this.connection.model('Actor', actorSchema), 36 38 } 37 39 } 38 40
+49
services/ingester/src/db/models.ts
··· 342 342 generatorSchema.index({ authorDid: 1, createdAt: -1 }) 343 343 generatorSchema.index({ did: 1, createdAt: -1 }) 344 344 345 + export interface ActorDocument extends Document { 346 + uri: string 347 + did: string 348 + handle?: string 349 + profile?: ProfileDocument 350 + profileCid?: string 351 + profileTakedownRef?: string 352 + followersCount: number 353 + followingCount: number 354 + postsCount: number 355 + sortedAt?: Date 356 + indexedAt: string 357 + takedownRef?: string 358 + isLabeler: boolean 359 + allowIncomingChatsFrom?: string 360 + upstreamStatus?: string 361 + createdAt?: Date 362 + priorityNotifications: boolean 363 + trustedVerifier?: boolean 364 + labelsDeclaration?: Record<string, any> 365 + } 366 + 367 + export const actorSchema = new Schema<ActorDocument>({ 368 + uri: { type: String, required: true, unique: true, index: true }, 369 + did: { type: String, required: true, index: true }, 370 + handle: { type: String, required: false, index: true }, 371 + profile: { type: Schema.Types.ObjectId, ref: 'Profile', required: false }, 372 + profileCid: { type: String, required: false }, 373 + profileTakedownRef: { type: String, required: false }, 374 + followersCount: { type: Number, required: true, default: 0 }, 375 + followingCount: { type: Number, required: true, default: 0 }, 376 + postsCount: { type: Number, required: true, default: 0 }, 377 + sortedAt: { type: Date, required: false }, 378 + indexedAt: { type: String, required: true }, 379 + takedownRef: { type: String, required: false }, 380 + isLabeler: { type: Boolean, required: true, default: false }, 381 + allowIncomingChatsFrom: { type: String, required: false }, 382 + upstreamStatus: { type: String, required: false }, 383 + createdAt: { type: Date, required: false }, 384 + priorityNotifications: { type: Boolean, required: true, default: false }, 385 + trustedVerifier: { type: Boolean, required: false }, 386 + labelsDeclaration: { type: Object, required: false }, 387 + }) 388 + 389 + // Add compound indexes for Actor 390 + actorSchema.index({ handle: 'text' }) 391 + actorSchema.index({ did: 1 }, { unique: true }) 392 + 345 393 export interface DatabaseModels { 346 394 Like: Model<LikeDocument> 347 395 Post: Model<PostDocument> ··· 353 401 Music: Model<MusicDocument> 354 402 Look: Model<LookDocument> 355 403 Generator: Model<GeneratorDocument> 404 + Actor: Model<ActorDocument> 356 405 }
+76
services/ingester/src/handlers/actor-handler.ts
··· 1 + import { pino } from 'pino' 2 + import { Database } from '../db/connection.js' 3 + import type { NormalizedEvent } from '../types/events.js' 4 + import { ensureActor } from '../utils/actor-utils.js' 5 + 6 + const logger = pino({ name: 'actor-handler' }) 7 + 8 + /** 9 + * This handler is called by all other handlers to ensure that 10 + * any DID referenced in an event has a corresponding actor entry. 11 + * 12 + * @param evt The normalized event to process 13 + * @param db Database connection 14 + */ 15 + export async function handleActorReferences(evt: NormalizedEvent, db: Database): Promise<void> { 16 + try { 17 + // Always ensure the author DID has an actor 18 + if (evt.did) { 19 + await ensureActor(evt.did, evt.handle || undefined, db) 20 + } 21 + 22 + // Handle subject DIDs for follow, block, like events 23 + if (['follow', 'block', 'like'].includes(evt.event) && evt.record?.subject) { 24 + // Subject is usually a DID in format did:plc:12345 25 + const subjectDid = evt.record.subject as string 26 + if (subjectDid && subjectDid.startsWith('did:')) { 27 + await ensureActor(subjectDid, undefined, db) 28 + } 29 + } 30 + 31 + // Handle reply references for posts 32 + if (evt.collection === 'so.sprk.feed.post' && evt.record?.reply) { 33 + const reply = evt.record.reply as { root?: { uri?: string }, parent?: { uri?: string } } 34 + 35 + // Extract DIDs from reply URIs (format: at://did:plc:12345/...) 36 + if (reply.root?.uri) { 37 + const rootDid = extractDidFromUri(reply.root.uri) 38 + if (rootDid) { 39 + await ensureActor(rootDid, undefined, db) 40 + } 41 + } 42 + 43 + if (reply.parent?.uri) { 44 + const parentDid = extractDidFromUri(reply.parent.uri) 45 + if (parentDid && parentDid !== extractDidFromUri(reply.root?.uri || '')) { 46 + await ensureActor(parentDid, undefined, db) 47 + } 48 + } 49 + } 50 + 51 + // Handle repost subjects 52 + if (evt.collection === 'so.sprk.feed.repost' && evt.record?.subject?.uri) { 53 + const subjectUri = evt.record.subject.uri as string 54 + const subjectDid = extractDidFromUri(subjectUri) 55 + if (subjectDid) { 56 + await ensureActor(subjectDid, undefined, db) 57 + } 58 + } 59 + } catch (error) { 60 + logger.error({ error, uri: evt.uri }, 'Error while handling actor references') 61 + } 62 + } 63 + 64 + /** 65 + * Extracts a DID from an AT URI (at://did:plc:12345/...) 66 + * 67 + * @param uri The URI to extract the DID from 68 + * @returns The extracted DID or undefined 69 + */ 70 + function extractDidFromUri(uri: string): string | undefined { 71 + if (!uri) return undefined 72 + 73 + // Match a DID in an AT URI format 74 + const match = uri.match(/at:\/\/(did:[a-zA-Z0-9:]+)\//) 75 + return match ? match[1] : undefined 76 + }
+5 -1
services/ingester/src/handlers/index.ts
··· 11 11 import { handleMusicEvent } from './music-handler.js' 12 12 import { handleLookEvent } from './look-handler.js' 13 13 import { handleGeneratorEvent } from './generator-handler.js' 14 + import { handleActorReferences } from './actor-handler.js' 14 15 15 16 const logger = pino({ name: 'event-handler' }) 16 17 17 18 export async function handleEvent(evt: NormalizedEvent, db: Database): Promise<void> { 18 19 try { 19 - // Handle different events based on collection 20 + // First, ensure all actor references are handled properly 21 + await handleActorReferences(evt, db) 22 + 23 + // Then handle different events based on collection 20 24 if (evt.collection === 'so.sprk.feed.like') { 21 25 await handleLikeEvent(evt, db) 22 26 return
+14 -3
services/ingester/src/handlers/profile-handler.ts
··· 1 1 import { pino } from 'pino' 2 2 import { Database } from '../db/connection.js' 3 3 import type { NormalizedEvent } from '../types/events.js' 4 + import { ensureActor, linkProfileToActor } from '../utils/actor-utils.js' 5 + import type { ProfileDocument } from '../db/models.js' 4 6 5 7 const logger = pino({ name: 'profile-handler' }) 6 8 ··· 37 39 }, 'Processing profile event') 38 40 39 41 try { 42 + // First, ensure we have an actor for this DID 43 + await ensureActor(evt.did, evt.handle || undefined, db) 44 + 40 45 const profileData = { 41 46 uri: evt.uri, 42 47 displayName: record.displayName, ··· 53 58 cid: evt.commit.cid 54 59 } 55 60 56 - await db.models.Profile.findOneAndUpdate( 61 + // Save the profile 62 + const profile = await db.models.Profile.findOneAndUpdate( 57 63 { uri: evt.uri }, 58 64 profileData, 59 65 { upsert: true, new: true } 60 - ) 66 + ) as ProfileDocument 67 + 68 + if (profile && profile._id) { 69 + // Link the profile to the actor 70 + await linkProfileToActor(evt.did, profile._id.toString(), evt.commit.cid, db) 71 + } 61 72 62 73 logger.info( 63 74 { uri: evt.uri }, 64 - 'Successfully saved profile to database' 75 + 'Successfully saved profile to database and linked to actor' 65 76 ) 66 77 } catch (error) { 67 78 logger.error(
+91
services/ingester/src/utils/actor-utils.ts
··· 1 + import { pino } from 'pino' 2 + import { Database } from '../db/connection.js' 3 + 4 + const logger = pino({ name: 'actor-utils' }) 5 + 6 + /** 7 + * Ensures that an actor exists for the given DID. 8 + * If the actor doesn't exist, it creates a new one. 9 + * 10 + * @param did The DID to ensure has an actor 11 + * @param handle Optional handle associated with the DID 12 + * @param db Database connection 13 + * @returns The actor document, either existing or newly created 14 + */ 15 + export async function ensureActor( 16 + did: string, 17 + handle?: string, 18 + db?: Database 19 + ): Promise<any> { 20 + if (!db) { 21 + logger.warn({ did }, 'No database connection provided to ensureActor') 22 + return null 23 + } 24 + 25 + try { 26 + // Try to find existing actor 27 + const existingActor = await db.models.Actor.findOne({ did }) 28 + 29 + if (existingActor) { 30 + // If handle is provided and different from existing, update it 31 + if (handle && existingActor.handle !== handle) { 32 + existingActor.handle = handle 33 + await existingActor.save() 34 + logger.info({ did, handle }, 'Updated actor handle') 35 + } 36 + return existingActor 37 + } 38 + 39 + // Create new actor if none exists 40 + const now = new Date() 41 + const uri = `at://${did}/app.bsky.actor.profile` 42 + 43 + const newActor = await db.models.Actor.create({ 44 + uri, 45 + did, 46 + handle: handle || undefined, 47 + followersCount: 0, 48 + followingCount: 0, 49 + postsCount: 0, 50 + indexedAt: now.toISOString(), 51 + isLabeler: false, 52 + priorityNotifications: false 53 + }) 54 + 55 + logger.info({ did, handle }, 'Created new actor') 56 + return newActor 57 + } catch (error) { 58 + logger.error({ error, did, handle }, 'Failed to ensure actor exists') 59 + return null 60 + } 61 + } 62 + 63 + /** 64 + * Links a profile to an actor 65 + * 66 + * @param did The DID of the actor 67 + * @param profileId The MongoDB ID of the profile 68 + * @param profileCid The CID of the profile 69 + * @param db Database connection 70 + */ 71 + export async function linkProfileToActor( 72 + did: string, 73 + profileId: string, 74 + profileCid: string, 75 + db: Database 76 + ): Promise<void> { 77 + try { 78 + await db.models.Actor.findOneAndUpdate( 79 + { did }, 80 + { 81 + profile: profileId, 82 + profileCid 83 + }, 84 + { new: true } 85 + ) 86 + 87 + logger.info({ did, profileId }, 'Linked profile to actor') 88 + } catch (error) { 89 + logger.error({ error, did, profileId }, 'Failed to link profile to actor') 90 + } 91 + }