[READ ONLY MIRROR] Spark Social AppView Server github.com/sprksocial/server
atproto deno hono lexicon
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

Revert "actor and indexing"

This reverts commit 961192e077ce3c778628ced4dde5ae337e562f11.

+211 -933
-50
services/appview/src/db.ts
··· 404 404 // Ensure compound index on did + cid for blob takedowns 405 405 blobTakedownSchema.index({ did: 1, cid: 1 }, { unique: true }) 406 406 407 - export interface ActorDocument extends Document { 408 - uri: string 409 - did: string 410 - handle?: string 411 - profile?: ProfileDocument 412 - profileCid?: string 413 - profileTakedownRef?: string 414 - followersCount: number 415 - followingCount: number 416 - postsCount: number 417 - sortedAt?: Date 418 - indexedAt: string 419 - takedownRef?: string 420 - isLabeler: boolean 421 - allowIncomingChatsFrom?: string 422 - upstreamStatus?: string 423 - createdAt?: Date 424 - priorityNotifications: boolean 425 - trustedVerifier?: boolean 426 - labelsDeclaration?: Record<string, any> 427 - } 428 - 429 - export const actorSchema = new Schema<ActorDocument>({ 430 - uri: { type: String, required: true, unique: true, index: true }, 431 - did: { type: String, required: true, index: true }, 432 - handle: { type: String, required: false, index: true }, 433 - profile: { type: Schema.Types.ObjectId, ref: 'Profile', required: false }, 434 - profileCid: { type: String, required: false }, 435 - profileTakedownRef: { type: String, required: false }, 436 - followersCount: { type: Number, required: true, default: 0 }, 437 - followingCount: { type: Number, required: true, default: 0 }, 438 - postsCount: { type: Number, required: true, default: 0 }, 439 - sortedAt: { type: Date, required: false }, 440 - indexedAt: { type: String, required: true }, 441 - takedownRef: { type: String, required: false }, 442 - isLabeler: { type: Boolean, required: true, default: false }, 443 - allowIncomingChatsFrom: { type: String, required: false }, 444 - upstreamStatus: { type: String, required: false }, 445 - createdAt: { type: Date, required: false }, 446 - priorityNotifications: { type: Boolean, required: true, default: false }, 447 - trustedVerifier: { type: Boolean, required: false }, 448 - labelsDeclaration: { type: Object, required: false }, 449 - }) 450 - 451 - // Add compound indexes for Actor 452 - actorSchema.index({ handle: 'text' }) 453 - actorSchema.index({ did: 1 }, { unique: true }) 454 - 455 407 export interface DatabaseModels { 456 408 Like: Model<LikeDocument> 457 409 Post: Model<PostDocument> ··· 466 418 Takedown: Model<TakedownDocument> 467 419 RepoTakedown: Model<RepoTakedownDocument> 468 420 BlobTakedown: Model<BlobTakedownDocument> 469 - Actor: Model<ActorDocument> 470 421 } 471 422 472 423 export class Database { ··· 502 453 'BlobTakedown', 503 454 blobTakedownSchema, 504 455 ), 505 - Actor: this.connection.model<ActorDocument>('Actor', actorSchema), 506 456 } 507 457 } 508 458
+1 -16
services/appview/src/index.ts
··· 25 25 import { createGetRecordRouter } from './routes/com/atproto/repo/getRecord.js' 26 26 import wellKnownRouter from './well-known.js' 27 27 import { TakedownService } from './services/takedown.js' 28 - import { IndexingService } from './services/indexing.js' 29 - 30 - // Extend Hono's context variable map to include our services 31 - declare module 'hono' { 32 - interface ContextVariableMap { 33 - serviceDid: string 34 - didResolver: DidResolver 35 - takedownService: TakedownService 36 - indexingService: IndexingService 37 - } 38 - } 39 28 40 29 export type AppContext = { 41 30 db: Database ··· 44 33 serviceDid: string 45 34 didResolver: DidResolver 46 35 takedownService: TakedownService 47 - indexingService: IndexingService 48 36 } 49 37 50 38 export class Server { ··· 65 53 // Get service DID from environment 66 54 const serviceDid = env.SERVICE_DID 67 55 68 - // Create services 56 + // Create takedown service 69 57 const takedownService = new TakedownService(db) 70 - const indexingService = new IndexingService(db, resolver) 71 58 72 59 const ctx = { 73 60 db, ··· 76 63 serviceDid, 77 64 didResolver: baseIdResolver.did, 78 65 takedownService, 79 - indexingService, 80 66 } 81 67 82 68 const app = new Hono() ··· 87 73 c.set('serviceDid', serviceDid) 88 74 c.set('didResolver', baseIdResolver.did) 89 75 c.set('takedownService', takedownService) 90 - c.set('indexingService', indexingService) 91 76 await next() 92 77 }) 93 78
-29
services/appview/src/routes/com/atproto/admin/getAccountInfos.ts
··· 1 - import { Hono } from 'hono' 2 - import { zValidator } from '@hono/zod-validator' 3 - import { z } from 'zod' 4 - import { HTTPException } from 'hono/http-exception' 5 - import { TakedownService } from '../../../../services/takedown.js' 6 - import { authMiddleware } from '../../../../auth/middleware.js' 7 - import { AppContext } from '../../../../index.js' 8 - import type * as ComAtprotoAdminUpdateSubjectStatus from '../../../../lexicon/types/com/atproto/admin/updateSubjectStatus.js' 9 - import type * as ComAtprotoAdminDefs from '../../../../lexicon/types/com/atproto/admin/defs.js' 10 - import type * as ComAtprotoRepoStrongRef from '../../../../lexicon/types/com/atproto/repo/strongRef.js' 11 - 12 - export const createGetAccountInfosRouter = (ctx: AppContext) => { 13 - const router = new Hono() 14 - 15 - // XRPC endpoint for Ozone integration: com.atproto.admin.getAccountInfos 16 - router.get( 17 - '/xrpc/com.atproto.admin.getAccountInfos', 18 - (c, next) => authMiddleware(c, next, true), 19 - zValidator( 20 - 'json', 21 - z.object({ 22 - dids: z.array(z.string()), 23 - }), 24 - ), 25 - async (c) => { 26 - 27 - } 28 - ) 29 - }
+181 -159
services/appview/src/routes/so/sprk/actor/getProfile.ts
··· 15 15 '/xrpc/so.sprk.actor.getProfile', 16 16 optionalAuthMiddleware, 17 17 async (c) => { 18 - const actorIdentifier = c.req.query('actor') 18 + const actor = c.req.query('actor') 19 19 const viewerDid = c.get('did') as string | undefined 20 - const now = new Date().toISOString() 21 20 22 - if (!actorIdentifier) { 21 + if (!actor) { 23 22 return c.json({ error: 'Actor not provided' }, 400) 24 23 } 25 24 26 - // Resolve actor DID from handle or DID 27 - let actorDid: string 28 - try { 29 - if (isValidHandle(actorIdentifier)) { 30 - const didDoc = await ctx.resolver.resolveHandleToDidDoc(actorIdentifier) 31 - actorDid = didDoc.did 32 - } else { 33 - ensureValidDid(actorIdentifier) 34 - actorDid = actorIdentifier 25 + let actorDidDoc 26 + if (isValidHandle(actor)) { 27 + actorDidDoc = await ctx.resolver.resolveHandleToDidDoc(actor) 28 + } else { 29 + try { 30 + ensureValidDid(actor) 31 + actorDidDoc = await ctx.resolver.resolveDidToDidDoc(actor) 32 + } catch (err) { 33 + return c.json({ error: 'Invalid actor' }, 400) 35 34 } 36 - } catch (err) { 37 - return c.json({ error: 'Invalid actor' }, 400) 38 35 } 39 36 40 - // Use the indexing service - it will handle recency checks internally 41 - try { 42 - await ctx.indexingService.indexHandle(actorDid, now) 43 - } catch (error) { 44 - ctx.logger.warn({ error, did: actorDid }, 'Failed to index handle') 45 - // Continue anyway - we might still have data 46 - } 37 + const actorDid = actorDidDoc.did 47 38 48 - // Find the actor with populated profile 49 - const actor = await ctx.db.models.Actor.findOne({ did: actorDid }) 50 - .populate('profile') 51 - .lean() 39 + // Get profile data 40 + const profile = await ctx.db.models.Profile.findOne({ 41 + authorDid: actorDid, 42 + }).lean() 52 43 53 - if (!actor) { 54 - return c.json({ error: 'Actor not found' }, 404) 44 + if (!profile) { 45 + return c.json({ error: 'Profile not found' }, 404) 55 46 } 56 - 57 - return generateProfileResponse(c, ctx, actor, viewerDid, actorDid) 58 - }, 59 - ) 60 47 61 - return router 62 - } 48 + const profileHandle = await ctx.resolver.resolveDidToHandle( 49 + profile.authorDid, 50 + ) 63 51 64 - // Helper function to handle the rest of the profile rendering logic 65 - async function generateProfileResponse( 66 - c: any, 67 - ctx: AppContext, 68 - actor: any, 69 - viewerDid: string | undefined, 70 - actorDid: string 71 - ) { 72 - const profile = actor.profile as any 73 - 74 - // Build viewer state if a user is authenticated 75 - const viewer: SoSprkActorDefs.ViewerState = {} 52 + // Get follower count 53 + const followersCount = await ctx.db.models.Follow.countDocuments({ 54 + subject: actorDid, 55 + }) 76 56 77 - if (viewerDid) { 78 - // Check if viewer follows this profile 79 - const follow = await ctx.db.models.Follow.findOne({ 80 - subject: actorDid, 81 - authorDid: viewerDid, 82 - }) 83 - if (follow) { 84 - viewer.following = follow.uri 85 - } 57 + // Get follows count 58 + const followsCount = await ctx.db.models.Follow.countDocuments({ 59 + authorDid: actorDid, 60 + }) 86 61 87 - // Check if this profile follows the viewer 88 - const followedBy = await ctx.db.models.Follow.findOne({ 89 - subject: viewerDid, 90 - authorDid: actorDid, 91 - }) 92 - if (followedBy) { 93 - viewer.followedBy = followedBy.uri 94 - } 62 + // Get posts count 63 + const postsCount = await ctx.db.models.Post.countDocuments({ 64 + authorDid: actorDid, 65 + }) 95 66 96 - // Check if viewer has blocked this profile 97 - const block = await ctx.db.models.Block.findOne({ 98 - subject: actorDid, 99 - authorDid: viewerDid, 100 - }) 101 - if (block) { 102 - viewer.blocking = block.uri 103 - } 67 + // Build viewer state if a user is authenticated 68 + const viewer: SoSprkActorDefs.ViewerState = {} 104 69 105 - // Check if this profile has blocked the viewer 106 - const blockedBy = await ctx.db.models.Block.findOne({ 107 - subject: viewerDid, 108 - authorDid: actorDid, 109 - }) 110 - if (blockedBy) { 111 - viewer.blockedBy = true 112 - } 70 + if (viewerDid) { 71 + // Check if viewer follows this profile 72 + const follow = await ctx.db.models.Follow.findOne({ 73 + subject: actorDid, 74 + authorDid: viewerDid, 75 + }) 76 + if (follow) { 77 + viewer.following = follow.uri 78 + } 113 79 114 - // Get known followers only if profile exists 115 - if (profile && actor.followersCount > 0) { 116 - // Get the followers of this profile 117 - const followers = await ctx.db.models.Follow.find({ 118 - subject: actorDid, 119 - }).lean() 80 + // Check if this profile follows the viewer 81 + const followedBy = await ctx.db.models.Follow.findOne({ 82 + subject: viewerDid, 83 + authorDid: actorDid, 84 + }) 85 + if (followedBy) { 86 + viewer.followedBy = followedBy.uri 87 + } 120 88 121 - const followerDids = followers.map((f) => f.authorDid) 89 + // Check if viewer has blocked this profile 90 + const block = await ctx.db.models.Block.findOne({ 91 + subject: actorDid, 92 + authorDid: viewerDid, 93 + }) 94 + if (block) { 95 + viewer.blocking = block.uri 96 + } 122 97 123 - // Check which of these followers the viewer follows 124 - const knownFollowsQuery = await ctx.db.models.Follow.find({ 125 - subject: { $in: followerDids }, 126 - authorDid: viewerDid, 127 - }).lean() 98 + // Check if this profile has blocked the viewer 99 + const blockedBy = await ctx.db.models.Block.findOne({ 100 + subject: viewerDid, 101 + authorDid: actorDid, 102 + }) 103 + if (blockedBy) { 104 + viewer.blockedBy = true 105 + } 128 106 129 - if (knownFollowsQuery.length > 0) { 130 - const knownFollowerDids = knownFollowsQuery.map((f) => f.subject) 107 + // Get known followers (followers of the profile that the viewer also follows) 108 + if (followersCount > 0) { 109 + // Get the followers of this profile 110 + const followers = await ctx.db.models.Follow.find({ 111 + subject: actorDid, 112 + }).lean() 131 113 132 - // Get actors for known followers 133 - const knownFollowerActors = await ctx.db.models.Actor.find({ 134 - did: { $in: knownFollowerDids }, 135 - }) 136 - .populate('profile') 137 - .limit(3) 138 - .lean() 114 + const followerDids = followers.map((f) => f.authorDid) 115 + 116 + // Check which of these followers the viewer follows 117 + const knownFollowsQuery = await ctx.db.models.Follow.find({ 118 + subject: { $in: followerDids }, 119 + authorDid: viewerDid, 120 + }).lean() 139 121 140 - const knownFollowersBasic = knownFollowerActors 141 - .filter(a => a.profile) 142 - .map((a) => { 143 - const p = a.profile as any 144 - return { 145 - did: a.did, 146 - handle: a.handle || a.did, 147 - displayName: p?.displayName, 148 - avatar: p?.avatar 149 - ? `https://media.sprk.so/avatar/tiny/${a.did}/${p.avatar.ref.$link}/webp` 150 - : undefined, 151 - } as SoSprkActorDefs.ProfileViewBasic 152 - }) 122 + if (knownFollowsQuery.length > 0) { 123 + const knownFollowerDids = knownFollowsQuery.map((f) => f.subject) 124 + 125 + // Get profiles for known followers 126 + const knownFollowerProfiles = await ctx.db.models.Profile.find({ 127 + authorDid: { $in: knownFollowerDids }, 128 + }) 129 + .limit(3) 130 + .lean() 131 + 132 + const knownFollowersBasic = await Promise.all( 133 + knownFollowerProfiles.map(async (p) => { 134 + const handle = await ctx.resolver.resolveDidToHandle( 135 + p.authorDid, 136 + ) 137 + return { 138 + did: p.authorDid, 139 + handle, 140 + displayName: p.displayName, 141 + avatar: p.avatar 142 + ? `https://media.sprk.so/avatar/tiny/${p.authorDid}/${p.avatar.ref.$link}/webp` 143 + : undefined, 144 + } as SoSprkActorDefs.ProfileViewBasic 145 + }), 146 + ) 153 147 154 - viewer.knownFollowers = { 155 - count: knownFollowsQuery.length, 156 - followers: knownFollowersBasic, 148 + viewer.knownFollowers = { 149 + count: knownFollowsQuery.length, 150 + followers: knownFollowersBasic, 151 + } 152 + } 157 153 } 158 154 } 159 - } 160 - } 161 155 162 - // Build the ProfileViewDetailed response with required fields 163 - const profileView: SoSprkActorDefs.ProfileViewDetailed = { 164 - did: actorDid, 165 - handle: actor.handle || actorDid, 166 - viewer: viewerDid ? viewer : undefined, 167 - } 156 + // Check for associated services 157 + const associated: SoSprkActorDefs.ProfileAssociated = {} 168 158 169 - // Only add optional fields if profile exists 170 - if (profile) { 171 - const associated: SoSprkActorDefs.ProfileAssociated = {} 172 - 173 - // Check for feed generators 174 - let feedgensCount = 0 175 - try { 176 - if (ctx.db.models.Generator) { 177 - feedgensCount = await ctx.db.models.Generator.countDocuments({ 178 - authorDid: actorDid, 179 - }) 159 + // Check for feed generators 160 + let feedgensCount = 0 161 + try { 162 + if (ctx.db.models.Generator) { 163 + feedgensCount = await ctx.db.models.Generator.countDocuments({ 164 + authorDid: actorDid, 165 + }) 166 + } 167 + } catch (error) { 168 + // Ignore if model doesn't exist 180 169 } 181 - } catch (error) { 182 - // Ignore if model doesn't exist 183 - } 184 170 185 - if (feedgensCount > 0) { 186 - associated.feedgens = feedgensCount 187 - } 171 + if (feedgensCount > 0) { 172 + associated.feedgens = feedgensCount 173 + } 188 174 189 - Object.assign(profileView, { 190 - displayName: profile.displayName, 191 - description: profile.description, 192 - avatar: profile.avatar 175 + // Get avatar and banner URLs 176 + const avatar = profile.avatar 193 177 ? `https://media.sprk.so/avatar/tiny/${actorDid}/${profile.avatar.ref.$link}/webp` 194 - : undefined, 195 - banner: profile.banner 178 + : undefined 179 + const banner = profile.banner 196 180 ? `https://media.sprk.so/img/tiny/${actorDid}/${profile.banner.ref.$link}/webp` 197 - : undefined, 198 - followersCount: actor.followersCount, 199 - followsCount: actor.followingCount, 200 - postsCount: actor.postsCount, 201 - associated: Object.keys(associated).length > 0 ? associated : undefined, 202 - joinedViaStarterPack: profile.joinedViaStarterPack as unknown as SoSprkGraphDefs.StarterPackViewBasic, 203 - indexedAt: profile.indexedAt, 204 - createdAt: profile.createdAt, 205 - labels: Array.isArray(profile.labels) ? profile.labels as Label[] : undefined, 206 - pinnedPost: profile.pinnedPost as unknown as ComAtprotoRepoStrongRef.Main, 207 - }) 208 - } 181 + : undefined 182 + 183 + // Convert joinedViaStarterPack to the correct type if it exists 184 + let joinedViaStarterPack: 185 + | SoSprkGraphDefs.StarterPackViewBasic 186 + | undefined = undefined 187 + if (profile.joinedViaStarterPack) { 188 + // Type assertion assuming the structure fits the requirements 189 + joinedViaStarterPack = 190 + profile.joinedViaStarterPack as unknown as SoSprkGraphDefs.StarterPackViewBasic 191 + } 192 + 193 + // Convert labels to the correct type if it exists 194 + let labels: Label[] | undefined = undefined 195 + if (profile.labels) { 196 + labels = Array.isArray(profile.labels) 197 + ? (profile.labels as Label[]) 198 + : undefined 199 + } 200 + 201 + // Convert pinnedPost to the correct type if it exists 202 + let pinnedPost: ComAtprotoRepoStrongRef.Main | undefined = undefined 203 + if (profile.pinnedPost) { 204 + pinnedPost = 205 + profile.pinnedPost as unknown as ComAtprotoRepoStrongRef.Main 206 + } 209 207 210 - return c.json(profileView) 208 + // Build the ProfileViewDetailed response 209 + const profileView: SoSprkActorDefs.ProfileViewDetailed = { 210 + did: actorDid, 211 + handle: profileHandle, 212 + displayName: profile.displayName, 213 + description: profile.description, 214 + avatar, 215 + banner, 216 + followersCount, 217 + followsCount, 218 + postsCount, 219 + associated: Object.keys(associated).length > 0 ? associated : undefined, 220 + joinedViaStarterPack, 221 + indexedAt: profile.indexedAt, 222 + createdAt: profile.createdAt, 223 + viewer: Object.keys(viewer).length > 0 ? viewer : undefined, 224 + labels, 225 + pinnedPost, 226 + } 227 + 228 + return c.json(profileView) 229 + }, 230 + ) 231 + 232 + return router 211 233 }
+25 -55
services/appview/src/routes/so/sprk/actor/searchActor.ts
··· 29 29 } 30 30 } 31 31 32 - // Build the filter for actors instead of directly searching profiles 33 - const actorFilter: any = {} 32 + const filter: any = {} 34 33 const sort: any = {} 35 34 36 - // Only search for actors that already have profiles 37 - actorFilter.profile = { $exists: true, $ne: null } 38 - 39 35 if (q) { 40 36 const escaped = escapeRegExp(q) 41 37 const regex = new RegExp(escaped, 'i') 42 - 43 - // Search by handle directly on actor model 44 - actorFilter.$or = [ 45 - { handle: regex } 38 + filter.$or = [ 39 + { displayName: regex }, 40 + { description: regex }, 41 + { handle: regex }, 46 42 ] 47 - 48 - // For queries matching profile fields, we need to find actors by their profiles 49 - const profileIds = await ctx.db.models.Profile.find({ 50 - $or: [ 51 - { displayName: regex }, 52 - { description: regex } 53 - ] 54 - }) 55 - .select('_id authorDid') 56 - .lean() 57 - 58 - // Add actor DIDs from matching profiles 59 - if (profileIds.length > 0) { 60 - const profileDids = profileIds.map(p => p.authorDid) 61 - // Add to $or condition 62 - actorFilter.$or.push({ did: { $in: profileDids } }) 63 - } 64 - 65 - // Sort by recency and relevance 66 - sort.indexedAt = -1 43 + // fall back to sorting by createdAt 44 + sort.createdAt = -1 67 45 } else { 68 - // Default sort for discovery - prioritize recently indexed actors 69 - sort.indexedAt = -1 46 + sort.createdAt = -1 70 47 } 71 48 72 - // Find actors with populated profiles - no need to index them 73 - const actorsWithProfiles = await ctx.db.models.Actor.find(actorFilter) 74 - .populate('profile') 49 + const profiles = await ctx.db.models.Profile.find(filter) 75 50 .sort(sort) 76 51 .skip(skip) 77 52 .limit(limit) 78 53 .lean() 79 54 80 - // Filter out any invalid profiles and transform to profile views 81 - const actors: SoSprkActorDefs.ProfileView[] = actorsWithProfiles 82 - .filter(actor => actor.profile) 83 - .map(actor => { 84 - const profile = actor.profile as any 85 - 86 - const avatar = profile?.avatar 87 - ? `https://media.sprk.so/avatar/tiny/${actor.did}/${profile.avatar.ref.$link}/webp` 55 + const actors: SoSprkActorDefs.ProfileView[] = await Promise.all( 56 + profiles.map(async (p) => { 57 + const avatar = p.avatar 58 + ? `https://media.sprk.so/avatar/tiny/${p.authorDid}/${(p.avatar as any).ref.$link}/webp` 88 59 : undefined 89 - 90 - const labels = profile?.labels && Array.isArray(profile.labels) 91 - ? (profile.labels as Label[]) 60 + const labels = Array.isArray(p.labels) 61 + ? (p.labels as Label[]) 92 62 : undefined 93 - 63 + const handle = await ctx.resolver.resolveDidToHandle(p.authorDid) 94 64 return { 95 65 $type: 'so.sprk.actor.defs#profileView', 96 - did: actor.did, 97 - handle: actor.handle || actor.did, 98 - displayName: profile?.displayName, 99 - description: profile?.description, 66 + did: p.authorDid, 67 + handle: handle, 68 + displayName: p.displayName, 69 + description: p.description, 100 70 avatar, 101 - indexedAt: actor.indexedAt, 102 - createdAt: actor.createdAt ? new Date(actor.createdAt).toISOString() : undefined, 71 + indexedAt: p.indexedAt, 72 + createdAt: p.createdAt, 103 73 labels, 104 74 } satisfies SoSprkActorDefs.ProfileView 105 - }) 75 + }), 76 + ) 106 77 107 - // Calculate cursor for pagination 108 78 const nextCursor = 109 - actorsWithProfiles.length === limit ? String(skip + limit) : undefined 79 + profiles.length === limit ? String(skip + limit) : undefined 110 80 const result: SoSprkActorSearch.OutputSchema = { actors } 111 81 if (nextCursor) { 112 82 result.cursor = nextCursor
-209
services/appview/src/services/indexing.ts
··· 1 - import { AtUri } from '@atproto/syntax' 2 - import { CID } from 'multiformats/cid' 3 - import { Document } from 'mongoose' 4 - import { BidirectionalResolver } from '../id-resolver.js' 5 - import { Database } from '../db.js' 6 - import { pino } from 'pino' 7 - import * as Post from './plugins/post.js' 8 - 9 - const logger = pino({ name: 'indexing-service' }) 10 - 11 - // Generic type for model processors 12 - type RecordProcessor<T extends Document> = { 13 - collection: string 14 - insertRecord: ( 15 - uri: AtUri, 16 - cid: CID, 17 - record: unknown, 18 - timestamp: string, 19 - opts?: { disableNotifs?: boolean }, 20 - ) => Promise<T | null> 21 - updateRecord: ( 22 - uri: AtUri, 23 - cid: CID, 24 - record: unknown, 25 - timestamp: string, 26 - ) => Promise<T | null> 27 - deleteRecord: (uri: AtUri, cascading?: boolean) => Promise<void> 28 - } 29 - 30 - /** 31 - * Service to handle indexing of records from the Atproto network 32 - */ 33 - export class IndexingService { 34 - private records: Record<string, RecordProcessor<any>> = {} 35 - private logger = pino({ name: 'indexing-service' }) 36 - 37 - constructor( 38 - private db: Database, 39 - private resolver: BidirectionalResolver, 40 - ) { 41 - // Register record processors 42 - this.records.post = Post.makePlugin(db) 43 - 44 - // Additional plugins would be registered here 45 - // Example: 46 - // this.records.like = Like.makePlugin(db) 47 - // this.records.follow = Follow.makePlugin(db) 48 - // etc. 49 - } 50 - 51 - /** 52 - * Index a record in the database 53 - * 54 - * @param uri The URI of the record 55 - * @param cid The CID of the record 56 - * @param obj The record data 57 - * @param action The action type (create/update) 58 - * @param timestamp The timestamp of the operation 59 - * @param opts Optional parameters 60 - */ 61 - async indexRecord( 62 - uri: AtUri, 63 - cid: CID, 64 - obj: unknown, 65 - action: 'create' | 'update', 66 - timestamp: string, 67 - opts?: { disableNotifs?: boolean }, 68 - ): Promise<void> { 69 - try { 70 - const indexer = this.findIndexerForCollection(uri.collection) 71 - if (!indexer) { 72 - this.logger.debug({ collection: uri.collection }, 'No indexer found for collection') 73 - return 74 - } 75 - 76 - if (action === 'create') { 77 - await indexer.insertRecord(uri, cid, obj, timestamp, opts) 78 - } else { 79 - await indexer.updateRecord(uri, cid, obj, timestamp) 80 - } 81 - } catch (error) { 82 - this.logger.error( 83 - { error, uri: uri.toString(), cid: cid.toString(), action }, 84 - 'Error indexing record', 85 - ) 86 - } 87 - } 88 - 89 - /** 90 - * Delete a record from the database 91 - * 92 - * @param uri The URI of the record to delete 93 - * @param cascading Whether to cascade the deletion to related records 94 - */ 95 - async deleteRecord(uri: AtUri, cascading = false): Promise<void> { 96 - try { 97 - const indexer = this.findIndexerForCollection(uri.collection) 98 - if (!indexer) { 99 - this.logger.debug({ collection: uri.collection }, 'No indexer found for collection') 100 - return 101 - } 102 - 103 - await indexer.deleteRecord(uri, cascading) 104 - } catch (error) { 105 - this.logger.error( 106 - { error, uri: uri.toString() }, 107 - 'Error deleting record', 108 - ) 109 - } 110 - } 111 - 112 - /** 113 - * Index or update actor handle information 114 - * 115 - * @param did The DID of the actor 116 - * @param timestamp The timestamp of the operation 117 - * @param force Force reindexing even if recently indexed 118 - */ 119 - async indexHandle(did: string, timestamp: string, force = false): Promise<void> { 120 - try { 121 - // Find existing actor 122 - const actor = await this.db.models.Actor.findOne({ did }) 123 - 124 - // Skip if recently indexed and not forced 125 - if (!force && actor && this.isHandleRecentlyIndexed(actor, timestamp)) { 126 - return 127 - } 128 - 129 - // Resolve DID to handle 130 - const didDoc = await this.resolver.resolveDidToDidDoc(did) 131 - 132 - // Verify handle ownership 133 - let handle: string | undefined = undefined 134 - if (didDoc.handle) { 135 - const handleDidDoc = await this.resolver.resolveHandleToDidDoc(didDoc.handle) 136 - handle = did === handleDidDoc.did ? didDoc.handle.toLowerCase() : undefined 137 - } 138 - 139 - // Handle conflict resolution - if another actor has this handle 140 - if (handle) { 141 - const actorWithHandle = await this.db.models.Actor.findOne({ handle }) 142 - if (actorWithHandle && actorWithHandle.did !== did) { 143 - // Clear handle from the other actor 144 - await this.db.models.Actor.updateOne( 145 - { did: actorWithHandle.did }, 146 - { $set: { handle: null } } 147 - ) 148 - } 149 - } 150 - 151 - // Update or create actor 152 - await this.db.models.Actor.updateOne( 153 - { did }, 154 - { 155 - $set: { 156 - handle, 157 - indexedAt: timestamp 158 - }, 159 - $setOnInsert: { 160 - uri: `at://${did}/app.bsky.actor.profile`, 161 - followersCount: 0, 162 - followingCount: 0, 163 - postsCount: 0, 164 - isLabeler: false, 165 - priorityNotifications: false, 166 - } 167 - }, 168 - { upsert: true } 169 - ) 170 - } catch (error) { 171 - this.logger.error({ error, did }, 'Error indexing handle') 172 - } 173 - } 174 - 175 - /** 176 - * Find the indexer responsible for a collection 177 - * 178 - * @param collection The collection to find an indexer for 179 - * @returns The indexer or undefined if not found 180 - */ 181 - findIndexerForCollection(collection: string): RecordProcessor<any> | undefined { 182 - return Object.values(this.records).find( 183 - (indexer) => indexer.collection === collection 184 - ) 185 - } 186 - 187 - /** 188 - * Check if an actor's handle was recently indexed 189 - * 190 - * @param actor The actor document 191 - * @param timestamp Current timestamp 192 - * @returns Whether the actor was recently indexed 193 - */ 194 - private isHandleRecentlyIndexed(actor: any, timestamp: string): boolean { 195 - if (!actor.indexedAt) return false 196 - 197 - const timeDiff = new Date(timestamp).getTime() - new Date(actor.indexedAt).getTime() 198 - const ONE_DAY = 24 * 60 * 60 * 1000 199 - const ONE_HOUR = 60 * 60 * 1000 200 - 201 - // Reindex daily for all actors 202 - if (timeDiff > ONE_DAY) return false 203 - 204 - // Reindex more frequently for actors without handles 205 - if (actor.handle === null && timeDiff > ONE_HOUR) return false 206 - 207 - return true 208 - } 209 - }
-178
services/appview/src/services/plugins/post.ts
··· 1 - import { AtUri } from '@atproto/syntax' 2 - import { CID } from 'multiformats/cid' 3 - import { pino } from 'pino' 4 - import { Database, PostDocument } from '../../db.js' 5 - 6 - const logger = pino({ name: 'post-processor' }) 7 - 8 - export type PostRecord = { 9 - text: string 10 - createdAt: string 11 - facets?: Array<unknown> 12 - reply?: { 13 - root: { uri: string; cid: string } 14 - parent: { uri: string; cid: string } 15 - } 16 - embed?: unknown 17 - langs?: string[] 18 - labels?: unknown 19 - tags?: string[] 20 - } 21 - 22 - export type PostPluginType = { 23 - collection: string 24 - insertRecord: ( 25 - uri: AtUri, 26 - cid: CID, 27 - record: unknown, 28 - timestamp: string, 29 - opts?: { disableNotifs?: boolean }, 30 - ) => Promise<PostDocument | null> 31 - updateRecord: ( 32 - uri: AtUri, 33 - cid: CID, 34 - record: unknown, 35 - timestamp: string, 36 - ) => Promise<PostDocument | null> 37 - deleteRecord: (uri: AtUri, cascading?: boolean) => Promise<void> 38 - } 39 - 40 - /** 41 - * Create a post processor plugin for the indexing service 42 - */ 43 - export function makePlugin(db: Database): PostPluginType { 44 - return { 45 - collection: 'so.sprk.feed.post', 46 - 47 - async insertRecord( 48 - uri: AtUri, 49 - cid: CID, 50 - recordObj: unknown, 51 - timestamp: string, 52 - opts = {}, 53 - ): Promise<PostDocument | null> { 54 - const record = recordObj as PostRecord 55 - if (!record) return null 56 - 57 - try { 58 - // Find author information 59 - const actor = await db.models.Actor.findOne({ did: uri.hostname }) 60 - if (!actor) { 61 - logger.warn({ did: uri.hostname }, 'Actor not found when indexing post') 62 - return null 63 - } 64 - 65 - // Extract post data 66 - const postData = { 67 - uri: uri.toString(), 68 - cid: cid.toString(), 69 - text: record.text, 70 - facets: record.facets || [], 71 - reply: record.reply || null, 72 - embed: record.embed || null, 73 - langs: record.langs || [], 74 - labels: record.labels || null, 75 - tags: record.tags || [], 76 - authorDid: uri.hostname, 77 - authorHandle: actor.handle || uri.hostname, 78 - createdAt: record.createdAt, 79 - indexedAt: timestamp, 80 - } 81 - 82 - // Create the post 83 - const post = await db.models.Post.findOneAndUpdate( 84 - { uri: uri.toString() }, 85 - { $set: postData }, 86 - { upsert: true, new: true } 87 - ) 88 - 89 - // Increment post count for actor 90 - await db.models.Actor.updateOne( 91 - { did: uri.hostname }, 92 - { $inc: { postsCount: 1 } } 93 - ) 94 - 95 - return post 96 - } catch (error) { 97 - logger.error( 98 - { error, uri: uri.toString(), cid: cid.toString() }, 99 - 'Error inserting post record' 100 - ) 101 - return null 102 - } 103 - }, 104 - 105 - async updateRecord( 106 - uri: AtUri, 107 - cid: CID, 108 - recordObj: unknown, 109 - timestamp: string, 110 - ): Promise<PostDocument | null> { 111 - const record = recordObj as PostRecord 112 - if (!record) return null 113 - 114 - try { 115 - // Find author information 116 - const actor = await db.models.Actor.findOne({ did: uri.hostname }) 117 - if (!actor) { 118 - logger.warn({ did: uri.hostname }, 'Actor not found when updating post') 119 - return null 120 - } 121 - 122 - // Update post data 123 - const postData = { 124 - cid: cid.toString(), 125 - text: record.text, 126 - facets: record.facets || [], 127 - reply: record.reply || null, 128 - embed: record.embed || null, 129 - langs: record.langs || [], 130 - labels: record.labels || null, 131 - tags: record.tags || [], 132 - authorHandle: actor.handle || uri.hostname, 133 - indexedAt: timestamp, 134 - } 135 - 136 - // Update the post 137 - const post = await db.models.Post.findOneAndUpdate( 138 - { uri: uri.toString() }, 139 - { $set: postData }, 140 - { new: true } 141 - ) 142 - 143 - return post 144 - } catch (error) { 145 - logger.error( 146 - { error, uri: uri.toString(), cid: cid.toString() }, 147 - 'Error updating post record' 148 - ) 149 - return null 150 - } 151 - }, 152 - 153 - async deleteRecord(uri: AtUri): Promise<void> { 154 - try { 155 - const post = await db.models.Post.findOne({ uri: uri.toString() }) 156 - 157 - if (post) { 158 - // Delete the post 159 - await db.models.Post.deleteOne({ uri: uri.toString() }) 160 - 161 - // Decrement post count for actor 162 - await db.models.Actor.updateOne( 163 - { did: uri.hostname }, 164 - { $inc: { postsCount: -1 } } 165 - ) 166 - 167 - // Delete any associated likes, reposts, etc. as needed 168 - // This would be the cascading deletion logic 169 - } 170 - } catch (error) { 171 - logger.error( 172 - { error, uri: uri.toString() }, 173 - 'Error deleting post record' 174 - ) 175 - } 176 - }, 177 - } 178 - }
-2
services/ingester/src/db/connection.ts
··· 11 11 musicSchema, 12 12 lookSchema, 13 13 generatorSchema, 14 - actorSchema, 15 14 } from './models.js' 16 15 import { env } from '../utils/env.js' 17 16 import { pino } from 'pino' ··· 34 33 Music: this.connection.model('Music', musicSchema), 35 34 Look: this.connection.model('Look', lookSchema), 36 35 Generator: this.connection.model('Generator', generatorSchema), 37 - Actor: this.connection.model('Actor', actorSchema), 38 36 } 39 37 } 40 38
-49
services/ingester/src/db/models.ts
··· 342 342 generatorSchema.index({ authorDid: 1, createdAt: -1 }) 343 343 generatorSchema.index({ did: 1, createdAt: -1 }) 344 344 345 - export interface ActorDocument extends Document { 346 - uri: string 347 - did: string 348 - handle?: string 349 - profile?: ProfileDocument 350 - profileCid?: string 351 - profileTakedownRef?: string 352 - followersCount: number 353 - followingCount: number 354 - postsCount: number 355 - sortedAt?: Date 356 - indexedAt: string 357 - takedownRef?: string 358 - isLabeler: boolean 359 - allowIncomingChatsFrom?: string 360 - upstreamStatus?: string 361 - createdAt?: Date 362 - priorityNotifications: boolean 363 - trustedVerifier?: boolean 364 - labelsDeclaration?: Record<string, any> 365 - } 366 - 367 - export const actorSchema = new Schema<ActorDocument>({ 368 - uri: { type: String, required: true, unique: true, index: true }, 369 - did: { type: String, required: true, index: true }, 370 - handle: { type: String, required: false, index: true }, 371 - profile: { type: Schema.Types.ObjectId, ref: 'Profile', required: false }, 372 - profileCid: { type: String, required: false }, 373 - profileTakedownRef: { type: String, required: false }, 374 - followersCount: { type: Number, required: true, default: 0 }, 375 - followingCount: { type: Number, required: true, default: 0 }, 376 - postsCount: { type: Number, required: true, default: 0 }, 377 - sortedAt: { type: Date, required: false }, 378 - indexedAt: { type: String, required: true }, 379 - takedownRef: { type: String, required: false }, 380 - isLabeler: { type: Boolean, required: true, default: false }, 381 - allowIncomingChatsFrom: { type: String, required: false }, 382 - upstreamStatus: { type: String, required: false }, 383 - createdAt: { type: Date, required: false }, 384 - priorityNotifications: { type: Boolean, required: true, default: false }, 385 - trustedVerifier: { type: Boolean, required: false }, 386 - labelsDeclaration: { type: Object, required: false }, 387 - }) 388 - 389 - // Add compound indexes for Actor 390 - actorSchema.index({ handle: 'text' }) 391 - actorSchema.index({ did: 1 }, { unique: true }) 392 - 393 345 export interface DatabaseModels { 394 346 Like: Model<LikeDocument> 395 347 Post: Model<PostDocument> ··· 401 353 Music: Model<MusicDocument> 402 354 Look: Model<LookDocument> 403 355 Generator: Model<GeneratorDocument> 404 - Actor: Model<ActorDocument> 405 356 }
-76
services/ingester/src/handlers/actor-handler.ts
··· 1 - import { pino } from 'pino' 2 - import { Database } from '../db/connection.js' 3 - import type { NormalizedEvent } from '../types/events.js' 4 - import { ensureActor } from '../utils/actor-utils.js' 5 - 6 - const logger = pino({ name: 'actor-handler' }) 7 - 8 - /** 9 - * This handler is called by all other handlers to ensure that 10 - * any DID referenced in an event has a corresponding actor entry. 11 - * 12 - * @param evt The normalized event to process 13 - * @param db Database connection 14 - */ 15 - export async function handleActorReferences(evt: NormalizedEvent, db: Database): Promise<void> { 16 - try { 17 - // Always ensure the author DID has an actor 18 - if (evt.did) { 19 - await ensureActor(evt.did, evt.handle || undefined, db) 20 - } 21 - 22 - // Handle subject DIDs for follow, block, like events 23 - if (['follow', 'block', 'like'].includes(evt.event) && evt.record?.subject) { 24 - // Subject is usually a DID in format did:plc:12345 25 - const subjectDid = evt.record.subject as string 26 - if (subjectDid && subjectDid.startsWith('did:')) { 27 - await ensureActor(subjectDid, undefined, db) 28 - } 29 - } 30 - 31 - // Handle reply references for posts 32 - if (evt.collection === 'so.sprk.feed.post' && evt.record?.reply) { 33 - const reply = evt.record.reply as { root?: { uri?: string }, parent?: { uri?: string } } 34 - 35 - // Extract DIDs from reply URIs (format: at://did:plc:12345/...) 36 - if (reply.root?.uri) { 37 - const rootDid = extractDidFromUri(reply.root.uri) 38 - if (rootDid) { 39 - await ensureActor(rootDid, undefined, db) 40 - } 41 - } 42 - 43 - if (reply.parent?.uri) { 44 - const parentDid = extractDidFromUri(reply.parent.uri) 45 - if (parentDid && parentDid !== extractDidFromUri(reply.root?.uri || '')) { 46 - await ensureActor(parentDid, undefined, db) 47 - } 48 - } 49 - } 50 - 51 - // Handle repost subjects 52 - if (evt.collection === 'so.sprk.feed.repost' && evt.record?.subject?.uri) { 53 - const subjectUri = evt.record.subject.uri as string 54 - const subjectDid = extractDidFromUri(subjectUri) 55 - if (subjectDid) { 56 - await ensureActor(subjectDid, undefined, db) 57 - } 58 - } 59 - } catch (error) { 60 - logger.error({ error, uri: evt.uri }, 'Error while handling actor references') 61 - } 62 - } 63 - 64 - /** 65 - * Extracts a DID from an AT URI (at://did:plc:12345/...) 66 - * 67 - * @param uri The URI to extract the DID from 68 - * @returns The extracted DID or undefined 69 - */ 70 - function extractDidFromUri(uri: string): string | undefined { 71 - if (!uri) return undefined 72 - 73 - // Match a DID in an AT URI format 74 - const match = uri.match(/at:\/\/(did:[a-zA-Z0-9:]+)\//) 75 - return match ? match[1] : undefined 76 - }
+1 -5
services/ingester/src/handlers/index.ts
··· 11 11 import { handleMusicEvent } from './music-handler.js' 12 12 import { handleLookEvent } from './look-handler.js' 13 13 import { handleGeneratorEvent } from './generator-handler.js' 14 - import { handleActorReferences } from './actor-handler.js' 15 14 16 15 const logger = pino({ name: 'event-handler' }) 17 16 18 17 export async function handleEvent(evt: NormalizedEvent, db: Database): Promise<void> { 19 18 try { 20 - // First, ensure all actor references are handled properly 21 - await handleActorReferences(evt, db) 22 - 23 - // Then handle different events based on collection 19 + // Handle different events based on collection 24 20 if (evt.collection === 'so.sprk.feed.like') { 25 21 await handleLikeEvent(evt, db) 26 22 return
+3 -14
services/ingester/src/handlers/profile-handler.ts
··· 1 1 import { pino } from 'pino' 2 2 import { Database } from '../db/connection.js' 3 3 import type { NormalizedEvent } from '../types/events.js' 4 - import { ensureActor, linkProfileToActor } from '../utils/actor-utils.js' 5 - import type { ProfileDocument } from '../db/models.js' 6 4 7 5 const logger = pino({ name: 'profile-handler' }) 8 6 ··· 39 37 }, 'Processing profile event') 40 38 41 39 try { 42 - // First, ensure we have an actor for this DID 43 - await ensureActor(evt.did, evt.handle || undefined, db) 44 - 45 40 const profileData = { 46 41 uri: evt.uri, 47 42 displayName: record.displayName, ··· 58 53 cid: evt.commit.cid 59 54 } 60 55 61 - // Save the profile 62 - const profile = await db.models.Profile.findOneAndUpdate( 56 + await db.models.Profile.findOneAndUpdate( 63 57 { uri: evt.uri }, 64 58 profileData, 65 59 { upsert: true, new: true } 66 - ) as ProfileDocument 67 - 68 - if (profile && profile._id) { 69 - // Link the profile to the actor 70 - await linkProfileToActor(evt.did, profile._id.toString(), evt.commit.cid, db) 71 - } 60 + ) 72 61 73 62 logger.info( 74 63 { uri: evt.uri }, 75 - 'Successfully saved profile to database and linked to actor' 64 + 'Successfully saved profile to database' 76 65 ) 77 66 } catch (error) { 78 67 logger.error(
-91
services/ingester/src/utils/actor-utils.ts
··· 1 - import { pino } from 'pino' 2 - import { Database } from '../db/connection.js' 3 - 4 - const logger = pino({ name: 'actor-utils' }) 5 - 6 - /** 7 - * Ensures that an actor exists for the given DID. 8 - * If the actor doesn't exist, it creates a new one. 9 - * 10 - * @param did The DID to ensure has an actor 11 - * @param handle Optional handle associated with the DID 12 - * @param db Database connection 13 - * @returns The actor document, either existing or newly created 14 - */ 15 - export async function ensureActor( 16 - did: string, 17 - handle?: string, 18 - db?: Database 19 - ): Promise<any> { 20 - if (!db) { 21 - logger.warn({ did }, 'No database connection provided to ensureActor') 22 - return null 23 - } 24 - 25 - try { 26 - // Try to find existing actor 27 - const existingActor = await db.models.Actor.findOne({ did }) 28 - 29 - if (existingActor) { 30 - // If handle is provided and different from existing, update it 31 - if (handle && existingActor.handle !== handle) { 32 - existingActor.handle = handle 33 - await existingActor.save() 34 - logger.info({ did, handle }, 'Updated actor handle') 35 - } 36 - return existingActor 37 - } 38 - 39 - // Create new actor if none exists 40 - const now = new Date() 41 - const uri = `at://${did}/app.bsky.actor.profile` 42 - 43 - const newActor = await db.models.Actor.create({ 44 - uri, 45 - did, 46 - handle: handle || undefined, 47 - followersCount: 0, 48 - followingCount: 0, 49 - postsCount: 0, 50 - indexedAt: now.toISOString(), 51 - isLabeler: false, 52 - priorityNotifications: false 53 - }) 54 - 55 - logger.info({ did, handle }, 'Created new actor') 56 - return newActor 57 - } catch (error) { 58 - logger.error({ error, did, handle }, 'Failed to ensure actor exists') 59 - return null 60 - } 61 - } 62 - 63 - /** 64 - * Links a profile to an actor 65 - * 66 - * @param did The DID of the actor 67 - * @param profileId The MongoDB ID of the profile 68 - * @param profileCid The CID of the profile 69 - * @param db Database connection 70 - */ 71 - export async function linkProfileToActor( 72 - did: string, 73 - profileId: string, 74 - profileCid: string, 75 - db: Database 76 - ): Promise<void> { 77 - try { 78 - await db.models.Actor.findOneAndUpdate( 79 - { did }, 80 - { 81 - profile: profileId, 82 - profileCid 83 - }, 84 - { new: true } 85 - ) 86 - 87 - logger.info({ did, profileId }, 'Linked profile to actor') 88 - } catch (error) { 89 - logger.error({ error, did, profileId }, 'Failed to link profile to actor') 90 - } 91 - }