[READ ONLY MIRROR] Spark Social AppView Server github.com/sprksocial/server
atproto deno hono lexicon
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

Bluesky follow things (#12)

authored by

Davi Rodrigues and committed by
GitHub
dc602c5b 61008ff1

+859 -243
+3 -4
lexicons/so/sprk/actor/getPreferences.json
··· 13 13 "encoding": "application/json", 14 14 "schema": { 15 15 "type": "object", 16 - "required": ["preferences"], 17 16 "properties": { 18 - "preferences": { 19 - "type": "ref", 20 - "ref": "so.sprk.actor.defs#preferences" 17 + "followMode": { 18 + "type": "string", 19 + "knownValues": ["bsky", "sprk"] 21 20 } 22 21 } 23 22 }
+3 -4
lexicons/so/sprk/actor/putPreferences.json
··· 9 9 "encoding": "application/json", 10 10 "schema": { 11 11 "type": "object", 12 - "required": ["preferences"], 13 12 "properties": { 14 - "preferences": { 15 - "type": "ref", 16 - "ref": "so.sprk.actor.defs#preferences" 13 + "followMode": { 14 + "type": "string", 15 + "knownValues": ["bsky", "sprk"] 17 16 } 18 17 } 19 18 }
+31
services/appview/src/api/so/sprk/actor/getPreferences.ts
··· 1 + import { Hono } from 'hono' 2 + import { AppContext } from '../../../../index.js' 3 + import { authMiddleware } from '../../../../auth/middleware.js' 4 + 5 + export const createGetPreferencesRouter = (ctx: AppContext) => { 6 + const router = new Hono() 7 + 8 + router.get( 9 + '/xrpc/so.sprk.actor.getPreferences', 10 + authMiddleware, 11 + async (c) => { 12 + const userDid = c.get('did') as string 13 + 14 + try { 15 + const userPref = await ctx.db.models.UserPreference.findOne({ userDid }) 16 + 17 + return c.json( 18 + { 19 + followMode: userPref?.followMode || 'sprk', 20 + }, 21 + 200, 22 + ) 23 + } catch (error) { 24 + ctx.logger.error({ error, userDid }, 'Failed to get preferences') 25 + return c.json({ error: 'Failed to get preferences' }, 500) 26 + } 27 + }, 28 + ) 29 + 30 + return router 31 + }
+39 -25
services/appview/src/api/so/sprk/actor/getProfile.ts
··· 6 6 import type { Label } from '../../../../lexicon/types/com/atproto/label/defs.js' 7 7 import type * as ComAtprotoRepoStrongRef from '../../../../lexicon/types/com/atproto/repo/strongRef.js' 8 8 import type * as SoSprkActorDefs from '../../../../lexicon/types/so/sprk/actor/defs.js' 9 - import type * as SoSprkGraphDefs from '../../../../lexicon/types/so/sprk/graph/defs.js' 10 9 11 10 export const createGetProfileRouter = (ctx: AppContext) => { 12 11 const router = new Hono() ··· 39 38 const now = new Date().toISOString() 40 39 41 40 await ctx.indexingService.indexHandle(actorDid, now) 42 - 41 + 43 42 // First check if actor exists and has profile 44 43 let actorDoc = await ctx.db.models.Actor.findOne({ 45 44 did: actorDid, ··· 51 50 52 51 if (!actorDoc) { 53 52 try { 54 - ctx.logger.info({ did: actorDid }, 'No profile found, attempting to index') 53 + ctx.logger.info( 54 + { did: actorDid }, 55 + 'No profile found, attempting to index', 56 + ) 55 57 await ctx.indexingService.indexHandle(actorDid, now, true) 56 - 58 + 57 59 // Refetch after indexing 58 60 actorDoc = await ctx.db.models.Actor.findOne({ 59 61 did: actorDid, ··· 72 74 } 73 75 74 76 // Use actor's handle if available, otherwise resolve from DID 75 - const handle = actorDoc.handle || await ctx.resolver.resolveDidToHandle(actorDid) 77 + const handle = 78 + actorDoc.handle || (await ctx.resolver.resolveDidToHandle(actorDid)) 79 + 80 + // Get actor's preference for follow mode (used for both viewer state and counting) 81 + const actorPref = await ctx.db.models.UserPreference.findOne({ 82 + userDid: actorDid, 83 + }) 84 + const actorFollowMode = actorPref?.followMode || 'sprk' 76 85 77 86 // Build viewer state if a user is authenticated 78 87 const viewer: SoSprkActorDefs.ViewerState = {} 79 88 80 89 if (viewerDid) { 81 - // Check if viewer follows this profile 90 + // Determine follow mode from viewer's preference for checking if viewer follows profile 91 + const viewerPref = await ctx.db.models.UserPreference.findOne({ 92 + userDid: viewerDid, 93 + }) 94 + const viewerFollowMode = viewerPref?.followMode || 'sprk' 95 + 96 + // Check if viewer follows this profile (use viewer's follow mode) 82 97 const follow = await ctx.db.models.Follow.findOne({ 83 98 subject: actorDid, 84 99 authorDid: viewerDid, 100 + type: viewerFollowMode, 85 101 }) 86 - if (follow) { 87 - viewer.following = follow.uri 88 - } 102 + if (follow) viewer.following = follow.uri 89 103 90 - // Check if this profile follows the viewer 104 + // Check if this profile follows the viewer (use profile owner's follow mode) 91 105 const followedBy = await ctx.db.models.Follow.findOne({ 92 106 subject: viewerDid, 93 107 authorDid: actorDid, 108 + type: actorFollowMode, 94 109 }) 95 - if (followedBy) { 96 - viewer.followedBy = followedBy.uri 97 - } 110 + if (followedBy) viewer.followedBy = followedBy.uri 98 111 99 - // Check if viewer has blocked this profile 112 + // Check block relationships 100 113 const block = await ctx.db.models.Block.findOne({ 101 114 subject: actorDid, 102 115 authorDid: viewerDid, 103 116 }) 104 - if (block) { 105 - viewer.blocking = block.uri 106 - } 117 + if (block) viewer.blocking = block.uri 107 118 108 - // Check if this profile has blocked the viewer 109 119 const blockedBy = await ctx.db.models.Block.findOne({ 110 120 subject: viewerDid, 111 121 authorDid: actorDid, 112 122 }) 113 - if (blockedBy) { 114 - viewer.blockedBy = true 115 - } 123 + if (blockedBy) viewer.blockedBy = true 116 124 } 117 125 118 126 // Check for associated services ··· 157 165 profile.pinnedPost as unknown as ComAtprotoRepoStrongRef.Main 158 166 } 159 167 160 - const followersCount = await ctx.db.models.Follow.countDocuments({ 161 - subject: actorDid, 162 - }) 168 + // Count unique followers across both Sprk and Bsky follow types 169 + const followersCount = await ctx.db.models.Follow.aggregate([ 170 + { $match: { subject: actorDid } }, 171 + { $group: { _id: '$authorDid' } }, 172 + { $count: 'total' }, 173 + ]).then((result) => result[0]?.total || 0) 174 + 175 + // Count follows based on actor's follow mode preference 163 176 const followsCount = await ctx.db.models.Follow.countDocuments({ 164 177 authorDid: actorDid, 178 + type: actorFollowMode, 165 179 }) 166 180 const postsCount = await ctx.db.models.Post.countDocuments({ 167 181 authorDid: actorDid, ··· 191 205 ) 192 206 193 207 return router 194 - } 208 + }
+62
services/appview/src/api/so/sprk/actor/putPreferences.ts
··· 1 + import { Hono } from 'hono' 2 + import { AppContext } from '../../../../index.js' 3 + import { authMiddleware } from '../../../../auth/middleware.js' 4 + import * as SoSprkActorPutPreferences from '../../../../lexicon/types/so/sprk/actor/putPreferences.js' 5 + 6 + export const createPutPreferencesRouter = (ctx: AppContext) => { 7 + const router = new Hono() 8 + 9 + router.post( 10 + '/xrpc/so.sprk.actor.putPreferences', 11 + authMiddleware, 12 + async (c) => { 13 + const userDid = c.get('did') as string 14 + const body = await c.req.json() as SoSprkActorPutPreferences.InputSchema 15 + 16 + if (body.followMode && !['bsky', 'sprk'].includes(body.followMode)) { 17 + return c.json( 18 + { error: 'Invalid followMode parameter. Must be "bsky" or "sprk"' }, 19 + 400, 20 + ) 21 + } 22 + 23 + try { 24 + const now = new Date().toISOString() 25 + let userPref = await ctx.db.models.UserPreference.findOne({ userDid }) 26 + const oldMode = userPref?.followMode 27 + 28 + if (!userPref) { 29 + userPref = await ctx.db.models.UserPreference.create({ 30 + userDid, 31 + createdAt: now, 32 + updatedAt: now, 33 + followMode: body.followMode || 'sprk', // Default if not provided 34 + }) 35 + } else { 36 + if (body.followMode) { 37 + userPref.followMode = body.followMode 38 + } 39 + userPref.updatedAt = now 40 + await userPref.save() 41 + } 42 + 43 + // Queue indexing of Bsky follows if switched to bsky mode 44 + const indexingService = c.get('indexingService') 45 + if (body.followMode === 'bsky' && oldMode !== 'bsky') { 46 + indexingService.indexBSkyFollows(userDid).catch(error => 47 + ctx.logger.error({ error, userDid }, 'Failed to index bsky follows'), 48 + ) 49 + } 50 + 51 + // Respond with all current preferences, including the updated followMode 52 + return c.json({ followMode: userPref.followMode }, 200) 53 + 54 + } catch (error) { 55 + ctx.logger.error({ error, userDid }, 'Failed to put preferences') 56 + return c.json({ error: 'Failed to put preferences' }, 500) 57 + } 58 + }, 59 + ) 60 + 61 + return router 62 + }
+50 -12
services/appview/src/api/so/sprk/graph/getFollows.ts
··· 11 11 const actor = c.req.query('actor') 12 12 const limit = parseInt(c.req.query('limit') ?? '50') 13 13 const cursor = c.req.query('cursor') 14 + const viewerDid = c.get('did') as string | undefined 14 15 15 16 if (!actor) { 16 17 return c.json({ error: 'Actor is required' }, 400) ··· 21 22 return c.json({ error: 'Limit must be between 1 and 100' }, 400) 22 23 } 23 24 24 - // Build query 25 - const query: any = { authorDid: actor } 26 - if (cursor) { 27 - query._id = { $gt: cursor } 25 + let follows = [] 26 + 27 + // If user is authenticated, respect their follow preferences 28 + if (viewerDid) { 29 + const viewerPref = await ctx.db.models.UserPreference.findOne({ 30 + userDid: viewerDid 31 + }) 32 + const followType = viewerPref?.followMode || 'sprk' 33 + 34 + // Build query with the user's preferred follow type 35 + const query: any = { 36 + authorDid: actor, 37 + type: followType 38 + } 39 + 40 + if (cursor) { 41 + query._id = { $gt: cursor } 42 + } 43 + 44 + follows = await ctx.db.models.Follow.find(query) 45 + .sort({ _id: 1 }) 46 + .limit(limit) 47 + .lean() 48 + } else { 49 + // For unauthenticated users, get all follow types without duplicates 50 + // We use aggregation to get distinct follows by subject 51 + const pipelineStages: any[] = [ 52 + { $match: { authorDid: actor } } 53 + ] 54 + 55 + if (cursor) { 56 + pipelineStages.push({ $match: { _id: { $gt: cursor } } }) 57 + } 58 + 59 + // Group by subject to avoid duplicates 60 + pipelineStages.push( 61 + { $sort: { _id: 1 } }, 62 + { $group: { 63 + _id: '$subject', 64 + doc: { $first: '$$ROOT' } 65 + }}, 66 + { $replaceRoot: { newRoot: '$doc' } }, 67 + { $sort: { _id: 1 } }, 68 + { $limit: limit } 69 + ) 70 + 71 + follows = await ctx.db.models.Follow.aggregate(pipelineStages) 28 72 } 29 73 30 - // Get follows with pagination 31 - const follows = await ctx.db.models.Follow.find(query) 32 - .sort({ _id: 1 }) 33 - .limit(limit) 34 - .lean() 74 + // Get next cursor 75 + const nextCursor = follows.length === limit ? follows[follows.length - 1]._id : undefined 35 76 36 77 // Get profile views for each follow 37 78 const profileViews = await Promise.all( ··· 64 105 return basicProfileView 65 106 }) 66 107 ) 67 - 68 - // Get next cursor 69 - const nextCursor = follows.length === limit ? follows[follows.length - 1]._id : undefined 70 108 71 109 // Get subject profile if it exists 72 110 const subjectProfile = await ctx.db.models.Profile.findOne({ authorDid: actor })
+30 -8
services/appview/src/data-plane/server/index.ts
··· 59 59 createdAt: string 60 60 indexedAt: string 61 61 cid: string 62 + type: 'sprk' | 'bsky' 62 63 } 63 64 64 65 export const followSchema = new Schema<FollowDocument>({ ··· 69 70 createdAt: { type: String, required: true }, 70 71 indexedAt: { type: String, required: true }, 71 72 cid: { type: String, required: true }, 73 + type: { type: String, required: true, enum: ['sprk', 'bsky'], index: true, default: 'sprk' }, 72 74 }) 73 75 74 76 export interface BlockDocument extends Document { ··· 296 298 cid: { type: String, required: true }, 297 299 }) 298 300 299 - // Add compound indexes for more efficient queries 301 + // Compound indexes for more efficient queries 300 302 postSchema.index({ authorDid: 1, createdAt: -1 }) 301 303 postSchema.index({ tags: 1, createdAt: -1 }) 302 304 303 - // Add compound indexes for new schemas 304 - followSchema.index({ authorDid: 1, subject: 1 }, { unique: true }) 305 + followSchema.index({ authorDid: 1, subject: 1, type: 1 }, { unique: true }) 305 306 followSchema.index({ subject: 1, createdAt: -1 }) 307 + followSchema.index({ type: 1, createdAt: -1 }) 306 308 307 309 blockSchema.index({ authorDid: 1, subject: 1 }, { unique: true }) 308 310 blockSchema.index({ subject: 1, createdAt: -1 }) ··· 424 426 } 425 427 426 428 export const actorSchema = new Schema<ActorDocument>({ 427 - did: { type: String, required: true, index: true }, 429 + did: { type: String, required: true }, 428 430 handle: { type: String, required: false, index: true }, 429 431 indexedAt: { type: String, required: true }, 430 432 takedownRef: { type: String, required: false }, ··· 435 437 actorSchema.index({ handle: 'text' }) 436 438 actorSchema.index({ did: 1 }, { unique: true }) 437 439 440 + export interface UserPreferenceDocument extends Document { 441 + userDid: string 442 + followMode: string 443 + createdAt: string 444 + updatedAt: string 445 + } 446 + 447 + export const userPreferenceSchema = new Schema<UserPreferenceDocument>({ 448 + userDid: { type: String, required: true, unique: true, index: true }, 449 + followMode: { type: String, required: true, enum: ['bsky', 'sprk'], default: 'sprk' }, 450 + createdAt: { type: String, required: true }, 451 + updatedAt: { type: String, required: true }, 452 + }) 453 + 438 454 export interface DatabaseModels { 439 455 Like: Model<LikeDocument> 440 456 Post: Model<PostDocument> ··· 450 466 RepoTakedown: Model<RepoTakedownDocument> 451 467 BlobTakedown: Model<BlobTakedownDocument> 452 468 Actor: Model<ActorDocument> 469 + UserPreference: Model<UserPreferenceDocument> 453 470 } 454 471 455 472 export class Database implements DataPlaneClient { ··· 465 482 } 466 483 467 484 async connect(): Promise<void> { 468 - const { DB_USER, DB_PASSWORD, DB_HOST, DB_PORT, DB_NAME } = env 469 - const uri = `mongodb://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:${DB_PORT}/?appName=appview` 485 + const { DB_URI, DB_USER, DB_PASSWORD, DB_HOST, DB_PORT, DB_NAME } = env 486 + 487 + const uri = DB_URI || `mongodb://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:${DB_PORT}/?appName=appview` 488 + 470 489 this.logger.info( 471 - `Connecting to MongoDB at ${DB_HOST}:${DB_PORT}/?appName=appview`, 490 + DB_URI 491 + ? `Connecting to MongoDB using provided URI` 492 + : `Connecting to MongoDB at ${DB_HOST}:${DB_PORT}/?appName=appview`, 472 493 ) 473 494 474 495 try { 475 - this.connection = await mongoose.createConnection(uri, { 496 + this.connection = mongoose.createConnection(uri, { 476 497 autoIndex: true, 477 498 autoCreate: true, 478 499 dbName: DB_NAME, ··· 494 515 RepoTakedown: this.connection.model<RepoTakedownDocument>('RepoTakedown', repoTakedownSchema), 495 516 BlobTakedown: this.connection.model<BlobTakedownDocument>('BlobTakedown', blobTakedownSchema), 496 517 Actor: this.connection.model<ActorDocument>('Actor', actorSchema), 518 + UserPreference: this.connection.model<UserPreferenceDocument>('UserPreference', userPreferenceSchema), 497 519 } 498 520 499 521 this.logger.info('Connected to MongoDB')
+1
services/appview/src/env.ts
··· 13 13 MOD_SERVICE_DID: envStr('MOD_SERVICE_DID') ?? 'did:web:localhost', 14 14 ADMIN_PASSWORD: envStr('ADMIN_PASSWORD') ?? 'admin-token', 15 15 16 + DB_URI: envStr('DB_URI'), 16 17 DB_NAME: envStr('DB_NAME') ?? 'dev', 17 18 DB_HOST: envStr('DB_HOST') ?? 'localhost', 18 19 DB_PORT: envInt('DB_PORT') ?? 27017,
+6 -2
services/appview/src/index.ts
··· 30 30 import { TakedownService } from './services/takedown.js' 31 31 import { IndexingService } from './services/indexing.js' 32 32 import { expressToHono } from './utils/express-adapter.js' 33 + import { createPutPreferencesRouter } from './api/so/sprk/actor/putPreferences.js' 34 + import { createGetPreferencesRouter } from './api/so/sprk/actor/getPreferences.js' 33 35 34 36 // Extend Hono's context variable map to include our services 35 37 declare module 'hono' { ··· 120 122 const takedownRouter = createTakedownRouter(ctx) 121 123 const getRecordRouter = createGetRecordRouter(ctx) 122 124 const resolveHandleRouter = createResolveHandleRouter(ctx) 125 + const putPreferencesRouter = createPutPreferencesRouter(ctx) 126 + const getPreferencesRouter = createGetPreferencesRouter(ctx) 123 127 124 128 app.route('/', getPostsRouter) 125 129 app.route('/', getPostThreadRouter) ··· 131 135 app.route('/', takedownRouter) 132 136 app.route('/', getRecordRouter) 133 137 app.route('/', resolveHandleRouter) 138 + app.route('/', putPreferencesRouter) 139 + app.route('/', getPreferencesRouter) 134 140 app.route('/', wellKnownRouter()) 135 141 136 142 // Root route ··· 200 206 } 201 207 202 208 run() 203 - 204 -
+8 -9
services/appview/src/lexicon/lexicons.ts
··· 4758 4758 encoding: 'application/json', 4759 4759 schema: { 4760 4760 type: 'object', 4761 - required: ['preferences'], 4762 4761 properties: { 4763 - preferences: { 4764 - type: 'ref', 4765 - ref: 'lex:so.sprk.actor.defs#preferences', 4762 + followMode: { 4763 + type: 'string', 4764 + knownValues: ['bsky', 'sprk'], 4766 4765 }, 4767 4766 }, 4768 4767 }, ··· 4958 4957 encoding: 'application/json', 4959 4958 schema: { 4960 4959 type: 'object', 4961 - required: ['preferences'], 4962 4960 properties: { 4963 - preferences: { 4964 - type: 'ref', 4965 - ref: 'lex:so.sprk.actor.defs#preferences', 4961 + followMode: { 4962 + type: 'string', 4963 + knownValues: ['bsky', 'sprk'], 4966 4964 }, 4967 4965 }, 4968 4966 }, ··· 6886 6884 defs: { 6887 6885 main: { 6888 6886 type: 'record', 6889 - description: "Record declaring a 'like' of a piece of subject content.", 6887 + description: 6888 + "Record declaring a 'like' of a piece of subject content. Duplicate likes from the same author to the same subject will be ignored by the AppView.", 6890 6889 key: 'tid', 6891 6890 record: { 6892 6891 type: 'object',
+1 -2
services/appview/src/lexicon/types/so/sprk/actor/getPreferences.ts
··· 7 7 import { validate as _validate } from '../../../../lexicons' 8 8 import { $Typed, is$typed as _is$typed, OmitKey } from '../../../../util' 9 9 import { HandlerAuth, HandlerPipeThrough } from '@atproto/xrpc-server' 10 - import type * as SoSprkActorDefs from './defs.js' 11 10 12 11 const is$typed = _is$typed, 13 12 validate = _validate ··· 18 17 export type InputSchema = undefined 19 18 20 19 export interface OutputSchema { 21 - preferences: SoSprkActorDefs.Preferences 20 + followMode?: 'bsky' | 'sprk' | (string & {}) 22 21 } 23 22 24 23 export type HandlerInput = undefined
+1 -2
services/appview/src/lexicon/types/so/sprk/actor/putPreferences.ts
··· 7 7 import { validate as _validate } from '../../../../lexicons' 8 8 import { $Typed, is$typed as _is$typed, OmitKey } from '../../../../util' 9 9 import { HandlerAuth, HandlerPipeThrough } from '@atproto/xrpc-server' 10 - import type * as SoSprkActorDefs from './defs.js' 11 10 12 11 const is$typed = _is$typed, 13 12 validate = _validate ··· 16 15 export interface QueryParams {} 17 16 18 17 export interface InputSchema { 19 - preferences: SoSprkActorDefs.Preferences 18 + followMode?: 'bsky' | 'sprk' | (string & {}) 20 19 } 21 20 22 21 export interface HandlerInput {
+121 -35
services/appview/src/services/indexing.ts
··· 5 5 import { Database } from '../data-plane/server/index.js' 6 6 import { pino } from 'pino' 7 7 import * as Post from './plugins/post.js' 8 - 9 - const logger = pino({ name: 'indexing-service' }) 8 + import * as BskyFollow from './plugins/bskyFollow.js' 9 + import { Agent } from '@atproto/api' 10 10 11 11 // Generic type for model processors 12 12 type RecordProcessor<T extends Document> = { ··· 32 32 */ 33 33 export class IndexingService { 34 34 private records: Record<string, RecordProcessor<any>> = {} 35 - private logger = pino({ name: 'indexing-service' }) 35 + private logger = pino({ 36 + name: 'indexing-service', 37 + level: process.env.NODE_ENV === 'development' ? 'debug' : 'info', 38 + }) 36 39 37 40 constructor( 38 41 private db: Database, ··· 40 43 ) { 41 44 // Register record processors 42 45 this.records.post = Post.makePlugin(db) 43 - 46 + this.records.bskyFollow = BskyFollow.makePlugin(db) 47 + 44 48 // Additional plugins would be registered here 45 49 // Example: 46 50 // this.records.like = Like.makePlugin(db) ··· 50 54 51 55 /** 52 56 * Index a record in the database 53 - * 57 + * 54 58 * @param uri The URI of the record 55 59 * @param cid The CID of the record 56 60 * @param obj The record data ··· 69 73 try { 70 74 const indexer = this.findIndexerForCollection(uri.collection) 71 75 if (!indexer) { 72 - this.logger.debug({ collection: uri.collection }, 'No indexer found for collection') 76 + this.logger.debug( 77 + { collection: uri.collection }, 78 + 'No indexer found for collection', 79 + ) 73 80 return 74 81 } 75 82 ··· 88 95 89 96 /** 90 97 * Delete a record from the database 91 - * 98 + * 92 99 * @param uri The URI of the record to delete 93 100 * @param cascading Whether to cascade the deletion to related records 94 101 */ ··· 96 103 try { 97 104 const indexer = this.findIndexerForCollection(uri.collection) 98 105 if (!indexer) { 99 - this.logger.debug({ collection: uri.collection }, 'No indexer found for collection') 106 + this.logger.debug( 107 + { collection: uri.collection }, 108 + 'No indexer found for collection', 109 + ) 100 110 return 101 111 } 102 112 103 113 await indexer.deleteRecord(uri, cascading) 104 114 } catch (error) { 105 - this.logger.error( 106 - { error, uri: uri.toString() }, 107 - 'Error deleting record', 108 - ) 115 + this.logger.error({ error, uri: uri.toString() }, 'Error deleting record') 109 116 } 110 117 } 111 118 112 119 /** 113 120 * Index or update actor handle information 114 - * 121 + * 115 122 * @param did The DID of the actor 116 123 * @param timestamp The timestamp of the operation 117 124 * @param force Force reindexing even if recently indexed 118 125 */ 119 - async indexHandle(did: string, timestamp: string, force = false): Promise<void> { 126 + async indexHandle( 127 + did: string, 128 + timestamp: string, 129 + force = false, 130 + ): Promise<void> { 120 131 try { 121 132 // Find existing actor 122 133 const actor = await this.db.models.Actor.findOne({ did }) 123 - 134 + 124 135 // Skip if recently indexed and not forced 125 - if (!force && actor && this.isHandleRecentlyIndexed(actor, timestamp)) { 136 + if (!force && actor && this.isHandleRecentlyIndexed(actor, timestamp)) { 126 137 return 127 138 } 128 139 129 140 // Resolve DID to handle 130 141 const didDoc = await this.resolver.resolveDidToDidDoc(did) 131 - 142 + 132 143 // Verify handle ownership 133 144 let handle: string | undefined = undefined 134 145 if (didDoc.handle) { 135 - const handleDidDoc = await this.resolver.resolveHandleToDidDoc(didDoc.handle) 136 - handle = did === handleDidDoc.did ? didDoc.handle.toLowerCase() : undefined 146 + const handleDidDoc = await this.resolver.resolveHandleToDidDoc( 147 + didDoc.handle, 148 + ) 149 + handle = 150 + did === handleDidDoc.did ? didDoc.handle.toLowerCase() : undefined 137 151 } 138 152 139 153 // Handle conflict resolution - if another actor has this handle ··· 143 157 // Clear handle from the other actor 144 158 await this.db.models.Actor.updateOne( 145 159 { did: actorWithHandle.did }, 146 - { $set: { handle: null } } 160 + { $set: { handle: null } }, 147 161 ) 148 162 } 149 163 } 150 164 151 165 await this.db.models.Actor.updateOne( 152 166 { did }, 153 - { 154 - $set: { 167 + { 168 + $set: { 155 169 handle, 156 170 indexedAt: timestamp, 157 - } 171 + }, 158 172 }, 159 - { upsert: true } 173 + { upsert: true }, 160 174 ) 161 - 162 175 } catch (error) { 163 176 this.logger.error({ error, did }, 'Error indexing handle') 164 177 } 165 178 } 166 179 167 180 /** 181 + * Index all Bsky follows for a given user when they switch to bsky mode. 182 + * 183 + * @param did The DID of the user to index follows for 184 + */ 185 + async indexBSkyFollows(did: string): Promise<void> { 186 + try { 187 + const timestamp = new Date().toISOString() 188 + 189 + // Resolve the user's PDS endpoint from their DID document 190 + const didData = await this.resolver.resolveDidToDidDoc(did) 191 + 192 + // Validate that PDS endpoint exists and is a valid URL 193 + if (!didData.pds) { 194 + this.logger.warn({ did }, 'No PDS endpoint found in DID document') 195 + return 196 + } 197 + 198 + let pdsUrl: URL 199 + try { 200 + pdsUrl = new URL(didData.pds) 201 + } catch (urlError) { 202 + this.logger.error( 203 + { did, pds: didData.pds, error: urlError }, 204 + 'Invalid PDS URL in DID document' 205 + ) 206 + return 207 + } 208 + 209 + const agent = new Agent(pdsUrl) 210 + 211 + // Debug: starting follow indexing 212 + this.logger.debug({ did, pds: didData.pds }, 'Starting indexBSkyFollows') 213 + 214 + const collection = 'app.bsky.graph.follow' 215 + let cursor: string | undefined = undefined 216 + 217 + do { 218 + this.logger.debug({ cursor }, 'Listing bsky follow records') 219 + const res = await agent.com.atproto.repo.listRecords({ 220 + repo: did, 221 + collection, 222 + limit: 100, 223 + cursor, 224 + }) 225 + const { records, cursor: nextCursor } = res.data 226 + this.logger.debug( 227 + { count: records.length, nextCursor }, 228 + 'Fetched bsky follow records page', 229 + ) 230 + for (const rec of records) { 231 + this.logger.debug( 232 + { uri: rec.uri, cid: rec.cid }, 233 + 'Indexing bsky follow record', 234 + ) 235 + 236 + const uri = new AtUri(rec.uri) 237 + const cid = CID.parse(rec.cid) 238 + await this.indexRecord(uri, cid, rec.value, 'create', timestamp) 239 + } 240 + cursor = nextCursor 241 + } while (cursor) 242 + } catch (error) { 243 + this.logger.error( 244 + { error, did }, 245 + 'Error indexing BSky follows' 246 + ) 247 + } 248 + } 249 + 250 + /** 168 251 * Find the indexer responsible for a collection 169 - * 252 + * 170 253 * @param collection The collection to find an indexer for 171 254 * @returns The indexer or undefined if not found 172 255 */ 173 - findIndexerForCollection(collection: string): RecordProcessor<any> | undefined { 256 + findIndexerForCollection( 257 + collection: string, 258 + ): RecordProcessor<any> | undefined { 174 259 return Object.values(this.records).find( 175 - (indexer) => indexer.collection === collection 260 + (indexer) => indexer.collection === collection, 176 261 ) 177 262 } 178 263 179 264 /** 180 265 * Check if an actor's handle was recently indexed 181 - * 266 + * 182 267 * @param actor The actor document 183 268 * @param timestamp Current timestamp 184 269 * @returns Whether the actor was recently indexed 185 270 */ 186 271 private isHandleRecentlyIndexed(actor: any, timestamp: string): boolean { 187 272 if (!actor.indexedAt) return false 188 - 189 - const timeDiff = new Date(timestamp).getTime() - new Date(actor.indexedAt).getTime() 273 + 274 + const timeDiff = 275 + new Date(timestamp).getTime() - new Date(actor.indexedAt).getTime() 190 276 const ONE_DAY = 24 * 60 * 60 * 1000 191 277 const ONE_HOUR = 60 * 60 * 1000 192 - 278 + 193 279 // Reindex daily for all actors 194 280 if (timeDiff > ONE_DAY) return false 195 - 281 + 196 282 // Reindex more frequently for actors without handles 197 283 if (actor.handle === null && timeDiff > ONE_HOUR) return false 198 - 284 + 199 285 return true 200 286 } 201 - } 287 + }
+114
services/appview/src/services/plugins/bskyFollow.ts
··· 1 + import { AtUri } from '@atproto/syntax' 2 + import { CID } from 'multiformats/cid' 3 + import { pino } from 'pino' 4 + import { Database, FollowDocument } from '../../data-plane/server/index.js' 5 + 6 + const logger = pino({ name: 'bsky-follow-processor' }) 7 + 8 + export type BskyFollowRecord = { 9 + subject: string 10 + createdAt: string 11 + } 12 + 13 + export function makePlugin(db: Database) { 14 + return { 15 + collection: 'app.bsky.graph.follow', 16 + 17 + async insertRecord( 18 + uri: AtUri, 19 + cid: CID, 20 + recordObj: unknown, 21 + timestamp: string, 22 + ): Promise<FollowDocument | null> { 23 + const record = recordObj as BskyFollowRecord 24 + if (!record?.subject) return null 25 + 26 + try { 27 + const actor = await db.models.Actor.findOne({ did: uri.hostname }) 28 + if (!actor) { 29 + logger.warn({ did: uri.hostname }, 'Actor not found when indexing bsky follow') 30 + return null 31 + } 32 + 33 + const followData = { 34 + uri: uri.toString(), 35 + subject: record.subject, 36 + authorDid: uri.hostname, 37 + authorHandle: actor.handle || uri.hostname, 38 + createdAt: record.createdAt, 39 + indexedAt: timestamp, 40 + cid: cid.toString(), 41 + type: 'bsky' as const, 42 + } 43 + 44 + const follow = await db.models.Follow.findOneAndUpdate( 45 + { uri: uri.toString() }, 46 + { $set: followData }, 47 + { upsert: true, new: true }, 48 + ) 49 + 50 + return follow 51 + } catch (error) { 52 + logger.error( 53 + { error, uri: uri.toString(), cid: cid.toString() }, 54 + 'Error indexing bsky follow record', 55 + ) 56 + return null 57 + } 58 + }, 59 + 60 + async updateRecord( 61 + uri: AtUri, 62 + cid: CID, 63 + recordObj: unknown, 64 + timestamp: string, 65 + ): Promise<FollowDocument | null> { 66 + const record = recordObj as BskyFollowRecord 67 + if (!record?.subject) return null 68 + 69 + try { 70 + const actor = await db.models.Actor.findOne({ did: uri.hostname }) 71 + if (!actor) { 72 + logger.warn({ did: uri.hostname }, 'Actor not found when updating bsky follow') 73 + return null 74 + } 75 + 76 + const updateData = { 77 + cid: cid.toString(), 78 + indexedAt: timestamp, 79 + } 80 + 81 + const follow = await db.models.Follow.findOneAndUpdate( 82 + { uri: uri.toString() }, 83 + { $set: updateData }, 84 + { new: true }, 85 + ) 86 + 87 + return follow 88 + } catch (error) { 89 + logger.error( 90 + { error, uri: uri.toString(), cid: cid.toString() }, 91 + 'Error updating bsky follow record', 92 + ) 93 + return null 94 + } 95 + }, 96 + 97 + async deleteRecord(uri: AtUri): Promise<void> { 98 + try { 99 + const follow = await db.models.Follow.findOne({ uri: uri.toString() }) 100 + if (!follow) { 101 + logger.warn({ uri: uri.toString() }, 'Follow not found for deletion') 102 + return 103 + } 104 + 105 + await db.models.Follow.deleteOne({ uri: uri.toString() }) 106 + } catch (error) { 107 + logger.error( 108 + { error, uri: uri.toString() }, 109 + 'Error deleting bsky follow record', 110 + ) 111 + } 112 + }, 113 + } 114 + }
+8 -4
services/ingester/src/db/connection.ts
··· 16 16 } from './models.js' 17 17 import { env } from '../utils/env.js' 18 18 import { pino } from 'pino' 19 + import { customConfig } from '../utils/logger-config.js' 19 20 20 21 export class Database { 21 22 private connection: Connection 22 23 public models: DatabaseModels 23 - private logger = pino({ name: 'database' }) 24 + private logger = pino(customConfig('database')) 24 25 25 26 constructor() { 26 27 this.connection = mongoose.createConnection() ··· 41 42 } 42 43 43 44 async connect(): Promise<void> { 44 - const { DB_USER, DB_PASSWORD, DB_HOST, DB_PORT, DB_NAME } = env 45 - const uri = `mongodb://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:${DB_PORT}/?appName=ingester` 45 + const { DB_URI, DB_USER, DB_PASSWORD, DB_HOST, DB_PORT, DB_NAME } = env 46 + 47 + const uri = DB_URI || `mongodb://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:${DB_PORT}/?appName=ingester` 46 48 47 49 this.logger.info( 48 - `Connecting to MongoDB at ${DB_HOST}:${DB_PORT}/?appName=ingester`, 50 + DB_URI 51 + ? `Connecting to MongoDB using provided URI` 52 + : `Connecting to MongoDB at ${DB_HOST}:${DB_PORT}/?appName=ingester`, 49 53 ) 50 54 51 55 try {
+4 -1
services/ingester/src/db/models.ts
··· 55 55 createdAt: string 56 56 indexedAt: string 57 57 cid: string 58 + type: 'sprk' | 'bsky' 58 59 } 59 60 60 61 export const followSchema = new Schema<FollowDocument>({ ··· 65 66 createdAt: { type: String, required: true }, 66 67 indexedAt: { type: String, required: true }, 67 68 cid: { type: String, required: true }, 69 + type: { type: String, required: true, enum: ['sprk', 'bsky'], index: true, default: 'sprk' }, 68 70 }) 69 71 70 72 export interface BlockDocument extends Document { ··· 292 294 postSchema.index({ tags: 1, createdAt: -1 }) 293 295 294 296 // Add compound indexes for new schemas 295 - followSchema.index({ authorDid: 1, subject: 1 }, { unique: true }) 297 + followSchema.index({ authorDid: 1, subject: 1, type: 1 }, { unique: true }) 296 298 followSchema.index({ subject: 1, createdAt: -1 }) 299 + followSchema.index({ type: 1, createdAt: -1 }) 297 300 298 301 blockSchema.index({ authorDid: 1, subject: 1 }, { unique: true }) 299 302 blockSchema.index({ subject: 1, createdAt: -1 })
+23 -9
services/ingester/src/handlers/actor-handler.ts
··· 2 2 import { Database } from '../db/connection.js' 3 3 import type { NormalizedEvent } from '../types/events.js' 4 4 import { IndexingService } from '../services/indexing.js' 5 - import { BidirectionalResolver } from '../id-resolver.js' 5 + import type { BidirectionalResolver } from '../utils/id-resolver.js' 6 + import { ensureActor } from '../utils/actor-utils.js' 7 + import { customConfig } from '../utils/logger-config.js' 6 8 7 - const logger = pino({ name: 'actor-handler' }) 9 + const logger = pino(customConfig('actor-handler')) 8 10 9 11 /** 10 12 * This handler is called by all other handlers to ensure that 11 13 * any DID referenced in an event has a corresponding actor entry. 12 - * 14 + * 13 15 * @param evt The normalized event to process 14 16 * @param db Database connection 17 + * @param resolver Bidirectional resolver 15 18 */ 16 - export async function handleActorReferences(evt: NormalizedEvent, db: Database): Promise<void> { 19 + export async function handleActorReferences(evt: NormalizedEvent, db: Database, resolver: BidirectionalResolver): Promise<void> { 17 20 try { 18 21 const now = new Date().toISOString() 19 - const resolver = new BidirectionalResolver() 20 22 const indexingService = new IndexingService(db, resolver) 21 23 22 24 // Always ensure the author DID has an actor 23 25 if (evt.did) { 24 26 await indexingService.indexHandle(evt.did, now) 27 + // Resolve and update the handle on the event object 28 + try { 29 + const didData = await resolver.resolveDidToDidDoc(evt.did) 30 + evt.handle = didData.handle ?? null 31 + } catch (error) { 32 + logger.warn( 33 + { did: evt.did, error: (error as Error).message }, 34 + 'Failed to resolve DID to handle after indexing for event update', 35 + ) 36 + // Ensure evt.handle is null if resolution fails 37 + evt.handle = null 38 + } 25 39 } 26 40 27 41 // Handle subject DIDs for follow, block, like events ··· 36 50 // Handle reply references for posts 37 51 if (evt.collection === 'so.sprk.feed.post' && evt.record?.reply) { 38 52 const reply = evt.record.reply as { root?: { uri?: string }, parent?: { uri?: string } } 39 - 53 + 40 54 // Extract DIDs from reply URIs (format: at://did:plc:12345/...) 41 55 if (reply.root?.uri) { 42 56 const rootDid = extractDidFromUri(reply.root.uri) ··· 44 58 await indexingService.indexHandle(rootDid, now) 45 59 } 46 60 } 47 - 61 + 48 62 if (reply.parent?.uri) { 49 63 const parentDid = extractDidFromUri(reply.parent.uri) 50 64 if (parentDid && parentDid !== extractDidFromUri(reply.root?.uri || '')) { ··· 52 66 } 53 67 } 54 68 } 55 - 69 + 56 70 // Handle repost subjects 57 71 if (evt.collection === 'so.sprk.feed.repost' && evt.record?.subject?.uri) { 58 72 const subjectUri = evt.record.subject.uri as string ··· 72 86 function extractDidFromUri(uri: string): string | null { 73 87 const match = uri.match(/at:\/\/(did:[^/]+)/) 74 88 return match ? match[1] : null 75 - } 89 + }
+2 -1
services/ingester/src/handlers/audio-handler.ts
··· 1 1 import { pino } from 'pino' 2 + import { customConfig } from '../utils/logger-config.js' 2 3 import { Database } from '../db/connection.js' 3 4 import type { NormalizedEvent } from '../types/events.js' 4 5 5 - const logger = pino({ name: 'audio-handler' }) 6 + const logger = pino(customConfig('audio-handler')) 6 7 7 8 export async function handleAudioEvent(evt: NormalizedEvent, db: Database): Promise<void> { 8 9 if (evt.collection !== 'so.sprk.feed.audio') {
+2 -1
services/ingester/src/handlers/block-handler.ts
··· 1 1 import { pino } from 'pino' 2 2 import { Database } from '../db/connection.js' 3 3 import type { NormalizedEvent } from '../types/events.js' 4 + import { customConfig } from '../utils/logger-config.js' 4 5 5 - const logger = pino({ name: 'block-handler' }) 6 + const logger = pino(customConfig('block-handler')) 6 7 7 8 export async function handleBlockEvent(evt: NormalizedEvent, db: Database): Promise<void> { 8 9 if (evt.collection !== 'so.sprk.graph.block') {
+98
services/ingester/src/handlers/bsky/follow-handler.ts
··· 1 + import { pino } from 'pino' 2 + import { customConfig } from '../../utils/logger-config.js' 3 + import { Database } from '../../db/connection.js' 4 + import type { NormalizedEvent } from '../../types/events.js' 5 + import { isActorInDatabase } from '../../utils/actor-cache.js' 6 + 7 + const logger = pino(customConfig('bsky-follow-handler')) 8 + 9 + export async function handleAppBskyFollowEvent( 10 + evt: NormalizedEvent, 11 + db: Database, 12 + ): Promise<void> { 13 + if (evt.collection !== 'app.bsky.graph.follow') { 14 + return 15 + } 16 + 17 + const actorExists = await isActorInDatabase(evt.did, db) 18 + if (!actorExists) { 19 + logger.trace( 20 + { did: evt.did, uri: evt.uri, collection: evt.collection }, 21 + 'Author of follow not found in Actor table. Skipping follow ingestion for app.bsky.graph.follow event.', 22 + ) 23 + return 24 + } 25 + 26 + if (evt.event === 'create' || evt.event === 'update') { 27 + await handleCreateOrUpdate(evt, db) 28 + return 29 + } 30 + 31 + if (evt.event === 'delete') { 32 + await handleDelete(evt, db) 33 + return 34 + } 35 + } 36 + 37 + async function handleCreateOrUpdate( 38 + evt: NormalizedEvent, 39 + db: Database, 40 + ): Promise<void> { 41 + const now = new Date() 42 + const record = evt.record 43 + 44 + if (!record) { 45 + logger.warn({ uri: evt.uri }, 'Follow event missing record data') 46 + return 47 + } 48 + 49 + logger.info( 50 + { 51 + did: evt.did, 52 + handle: evt.handle, 53 + collection: evt.collection, 54 + uri: evt.uri, 55 + }, 56 + 'Processing follow event', 57 + ) 58 + 59 + try { 60 + const followData = { 61 + uri: evt.uri, 62 + subject: record.subject, 63 + authorDid: evt.did, 64 + authorHandle: evt.handle || 'unknown', 65 + createdAt: record.createdAt, 66 + indexedAt: now.toISOString(), 67 + cid: evt.commit.cid, 68 + type: 'bsky' as const, 69 + } 70 + 71 + await db.models.Follow.findOneAndUpdate({ uri: evt.uri }, followData, { 72 + upsert: true, 73 + new: true, 74 + }) 75 + 76 + logger.info({ uri: evt.uri }, 'Successfully saved follow to database') 77 + } catch (error) { 78 + logger.error({ error, uri: evt.uri }, 'Failed to save follow to database') 79 + } 80 + } 81 + 82 + async function handleDelete(evt: NormalizedEvent, db: Database): Promise<void> { 83 + try { 84 + const result = await db.models.Follow.deleteOne({ uri: evt.uri }) 85 + 86 + if (result.deletedCount > 0) { 87 + logger.info({ uri: evt.uri }, 'Successfully removed follow from database') 88 + return 89 + } 90 + 91 + logger.warn({ uri: evt.uri }, 'Follow not found in database for deletion') 92 + } catch (error) { 93 + logger.error( 94 + { error, uri: evt.uri }, 95 + 'Failed to delete follow from database', 96 + ) 97 + } 98 + }
+4 -2
services/ingester/src/handlers/follow-handler.ts
··· 1 1 import { pino } from 'pino' 2 + import { customConfig } from '../utils/logger-config.js' 2 3 import { Database } from '../db/connection.js' 3 4 import type { NormalizedEvent } from '../types/events.js' 4 5 5 - const logger = pino({ name: 'follow-handler' }) 6 + const logger = pino(customConfig('follow-handler')) 6 7 7 8 export async function handleFollowEvent(evt: NormalizedEvent, db: Database): Promise<void> { 8 9 if (evt.collection !== 'so.sprk.graph.follow') { ··· 44 45 authorHandle: evt.handle || 'unknown', 45 46 createdAt: record.createdAt, 46 47 indexedAt: now.toISOString(), 47 - cid: evt.commit.cid 48 + cid: evt.commit.cid, 49 + type: 'sprk' as const, 48 50 } 49 51 50 52 await db.models.Follow.findOneAndUpdate(
+2 -1
services/ingester/src/handlers/generator-handler.ts
··· 1 1 import { pino } from 'pino' 2 + import { customConfig } from '../utils/logger-config.js' 2 3 import { Database } from '../db/connection.js' 3 4 import type { NormalizedEvent } from '../types/events.js' 4 5 5 - const logger = pino({ name: 'generator-handler' }) 6 + const logger = pino(customConfig('generator-handler')) 6 7 7 8 export async function handleGeneratorEvent(evt: NormalizedEvent, db: Database): Promise<void> { 8 9 // Skip if not a generator event
+20 -5
services/ingester/src/handlers/index.ts
··· 12 12 import { handleLookEvent } from './look-handler.js' 13 13 import { handleGeneratorEvent } from './generator-handler.js' 14 14 import { handleActorReferences } from './actor-handler.js' 15 + import { handleAppBskyFollowEvent } from './bsky/follow-handler.js' 16 + import { customConfig } from '../utils/logger-config.js' 17 + import type { BidirectionalResolver } from '../utils/id-resolver.js' 15 18 16 - const logger = pino({ name: 'event-handler' }) 19 + const logger = pino(customConfig('event-handler')) 17 20 18 - export async function handleEvent(evt: NormalizedEvent, db: Database): Promise<void> { 21 + export async function handleEvent(evt: NormalizedEvent, db: Database, resolver: BidirectionalResolver): Promise<void> { 19 22 try { 20 - // First, ensure all actor references are handled properly 21 - await handleActorReferences(evt, db) 22 - 23 + // Skip actor reference handling for any app.bsky.* events 24 + if (evt.collection.startsWith('app.bsky.')) { 25 + logger.trace( 26 + { did: evt.did, collection: evt.collection }, 27 + 'Skipping actor reference handling for app.bsky event.' 28 + ); 29 + } else { 30 + await handleActorReferences(evt, db, resolver) 31 + } 32 + 23 33 // Then handle different events based on collection 24 34 if (evt.collection === 'so.sprk.feed.like') { 25 35 await handleLikeEvent(evt, db) ··· 33 43 34 44 if (evt.collection === 'so.sprk.graph.follow') { 35 45 await handleFollowEvent(evt, db) 46 + return 47 + } 48 + 49 + if (evt.collection === 'app.bsky.graph.follow') { 50 + await handleAppBskyFollowEvent(evt, db) 36 51 return 37 52 } 38 53
+2 -1
services/ingester/src/handlers/like-handler.ts
··· 1 1 import { pino } from 'pino' 2 + import { customConfig } from '../utils/logger-config.js' 2 3 import { Database } from '../db/connection.js' 3 4 import type { NormalizedEvent } from '../types/events.js' 4 5 5 - const logger = pino({ name: 'like-handler' }) 6 + const logger = pino(customConfig('like-handler')) 6 7 7 8 export async function handleLikeEvent(evt: NormalizedEvent, db: Database): Promise<void> { 8 9 // Skip if not a like event
+2 -1
services/ingester/src/handlers/look-handler.ts
··· 1 1 import { pino } from 'pino' 2 + import { customConfig } from '../utils/logger-config.js' 2 3 import { Database } from '../db/connection.js' 3 4 import type { NormalizedEvent } from '../types/events.js' 4 5 5 - const logger = pino({ name: 'look-handler' }) 6 + const logger = pino(customConfig('look-handler')) 6 7 7 8 export async function handleLookEvent(evt: NormalizedEvent, db: Database): Promise<void> { 8 9 if (evt.collection !== 'so.sprk.feed.look') {
+2 -1
services/ingester/src/handlers/music-handler.ts
··· 1 1 import { pino } from 'pino' 2 + import { customConfig } from '../utils/logger-config.js' 2 3 import { Database } from '../db/connection.js' 3 4 import type { NormalizedEvent } from '../types/events.js' 4 5 5 - const logger = pino({ name: 'music-handler' }) 6 + const logger = pino(customConfig('music-handler')) 6 7 7 8 export async function handleMusicEvent(evt: NormalizedEvent, db: Database): Promise<void> { 8 9 if (evt.collection !== 'so.sprk.feed.music') {
+2 -1
services/ingester/src/handlers/post-handler.ts
··· 1 1 import { pino } from 'pino' 2 2 import { Database } from '../db/connection.js' 3 3 import type { NormalizedEvent } from '../types/events.js' 4 + import { customConfig } from '../utils/logger-config.js' 4 5 5 - const logger = pino({ name: 'post-handler' }) 6 + const logger = pino(customConfig('post-handler')) 6 7 7 8 export async function handlePostEvent(evt: NormalizedEvent, db: Database): Promise<void> { 8 9 // Skip if not a post event
+2 -1
services/ingester/src/handlers/profile-handler.ts
··· 1 1 import { pino } from 'pino' 2 + import { customConfig } from '../utils/logger-config.js' 2 3 import { Database } from '../db/connection.js' 3 4 import type { NormalizedEvent } from '../types/events.js' 4 5 import { ensureActor, linkProfileToActor } from '../utils/actor-utils.js' 5 6 import type { ProfileDocument } from '../db/models.js' 6 7 7 - const logger = pino({ name: 'profile-handler' }) 8 + const logger = pino(customConfig('profile-handler')) 8 9 9 10 export async function handleProfileEvent(evt: NormalizedEvent, db: Database): Promise<void> { 10 11 if (evt.collection !== 'so.sprk.actor.profile') {
+2 -1
services/ingester/src/handlers/repost-handler.ts
··· 1 1 import { pino } from 'pino' 2 2 import { Database } from '../db/connection.js' 3 3 import type { NormalizedEvent } from '../types/events.js' 4 + import { customConfig } from '../utils/logger-config.js' 4 5 5 - const logger = pino({ name: 'repost-handler' }) 6 + const logger = pino(customConfig('repost-handler')) 6 7 7 8 export async function handleRepostEvent(evt: NormalizedEvent, db: Database): Promise<void> { 8 9 if (evt.collection !== 'so.sprk.feed.repost') {
-61
services/ingester/src/id-resolver.ts
··· 1 - import { pino } from 'pino' 2 - 3 - const logger = pino({ name: 'id-resolver' }) 4 - 5 - export interface DidDocument { 6 - did: string 7 - handle?: string 8 - } 9 - 10 - /** 11 - * Service to handle resolving DIDs to handles and vice versa 12 - */ 13 - export class BidirectionalResolver { 14 - private logger = pino({ name: 'id-resolver' }) 15 - 16 - /** 17 - * Resolve a DID to its DID document 18 - */ 19 - async resolveDidToDidDoc(did: string): Promise<DidDocument> { 20 - try { 21 - // TODO: Implement actual DID resolution 22 - // For now, return basic document 23 - return { 24 - did, 25 - } 26 - } catch (error) { 27 - this.logger.error({ error, did }, 'Failed to resolve DID to DID document') 28 - throw error 29 - } 30 - } 31 - 32 - /** 33 - * Resolve a handle to its DID document 34 - */ 35 - async resolveHandleToDidDoc(handle: string): Promise<DidDocument> { 36 - try { 37 - // TODO: Implement actual handle resolution 38 - // For now, return basic document 39 - return { 40 - did: `did:plc:${handle.toLowerCase()}`, 41 - handle, 42 - } 43 - } catch (error) { 44 - this.logger.error({ error, handle }, 'Failed to resolve handle to DID document') 45 - throw error 46 - } 47 - } 48 - 49 - /** 50 - * Resolve a DID to its handle 51 - */ 52 - async resolveDidToHandle(did: string): Promise<string> { 53 - try { 54 - const doc = await this.resolveDidToDidDoc(did) 55 - return doc.handle || did 56 - } catch (error) { 57 - this.logger.error({ error, did }, 'Failed to resolve DID to handle') 58 - throw error 59 - } 60 - } 61 - }
+3 -7
services/ingester/src/index.ts
··· 5 5 createBidirectionalResolver, 6 6 } from './utils/id-resolver.js' 7 7 import { createJetstreamClient } from './utils/jetstream-client.js' 8 + import { customConfig } from './utils/logger-config.js' 8 9 9 - const logger = pino({ 10 - name: 'ingester', 11 - transport: { 12 - target: 'pino-pretty', 13 - }, 14 - }) 10 + const logger = pino(customConfig('ingester')) 15 11 16 12 async function main() { 17 13 logger.info('Starting Jetstream ingester service') ··· 32 28 // Create and start Jetstream client 33 29 const jetstreamClient = await createJetstreamClient(db, bidirectionalResolver) 34 30 const connection = jetstreamClient.connect({ 35 - filterCollections: ['so.sprk.*'], 31 + filterCollections: ['so.sprk.*', 'app.bsky.graph.follow'], 36 32 }) 37 33 38 34 // Handle shutdown gracefully
+15 -14
services/ingester/src/services/indexing.ts
··· 1 1 import { pino } from 'pino' 2 2 import { Database } from '../db/connection.js' 3 - import { BidirectionalResolver } from '../id-resolver.js' 3 + import type { BidirectionalResolver } from '../utils/id-resolver.js' 4 + import { customConfig } from '../utils/logger-config.js' 4 5 5 - const logger = pino({ name: 'indexing-service' }) 6 + const logger = pino(customConfig('indexing-service')) 6 7 7 8 /** 8 9 * Service to handle indexing of actors and their handles 9 10 */ 10 11 export class IndexingService { 11 - private logger = pino({ name: 'indexing-service' }) 12 + private logger = pino(customConfig('indexing-service')) 12 13 13 14 constructor( 14 15 private db: Database, ··· 17 18 18 19 /** 19 20 * Index or update actor handle information 20 - * 21 + * 21 22 * @param did The DID of the actor 22 23 * @param timestamp The timestamp of the operation 23 24 * @param force Force reindexing even if recently indexed 24 25 */ 25 - 26 + 26 27 async indexHandle(did: string, timestamp: string, force = false): Promise<void> { 27 28 try { 28 29 // Find existing actor 29 30 const actor = await this.db.models.Actor.findOne({ did }) 30 - 31 + 31 32 // Skip if recently indexed and not forced 32 33 if (!force && actor && this.isHandleRecentlyIndexed(actor, timestamp)) { 33 34 return ··· 35 36 36 37 // Resolve DID to handle 37 38 const didDoc = await this.resolver.resolveDidToDidDoc(did) 38 - 39 + 39 40 // Verify handle ownership 40 41 let handle: string | undefined = undefined 41 42 if (didDoc.handle) { ··· 58 59 // Update or create actor 59 60 await this.db.models.Actor.updateOne( 60 61 { did }, 61 - { 62 - $set: { 62 + { 63 + $set: { 63 64 handle, 64 65 indexedAt: timestamp 65 66 }, ··· 76 77 */ 77 78 private isHandleRecentlyIndexed(actor: any, timestamp: string): boolean { 78 79 if (!actor.indexedAt) return false 79 - 80 + 80 81 const timeDiff = new Date(timestamp).getTime() - new Date(actor.indexedAt).getTime() 81 82 const ONE_DAY = 24 * 60 * 60 * 1000 82 83 const ONE_HOUR = 60 * 60 * 1000 83 - 84 + 84 85 // Reindex daily for all actors 85 86 if (timeDiff > ONE_DAY) return false 86 - 87 + 87 88 // Reindex more frequently for actors without handles 88 89 if (actor.handle === null && timeDiff > ONE_HOUR) return false 89 - 90 + 90 91 return true 91 92 } 92 - } 93 + }
+72
services/ingester/src/utils/actor-cache.ts
··· 1 + import { pino } from 'pino' 2 + import { customConfig } from './logger-config.js' 3 + import { TtlCache } from './ttl-cache.js' 4 + import { Database } from '../db/connection.js' 5 + 6 + const logger = pino(customConfig('actor-cache')) 7 + const actorsCache = new TtlCache<string, Set<string>>({ defaultTtlMs: 60_000 }) // 60 seconds TTL 8 + const ACTORS_CACHE_KEY = 'all_actor_dids' 9 + 10 + // Cache refresh lock to prevent multiple simultaneous refreshes 11 + let cacheRefreshInProgress = false 12 + let refreshPromise: Promise<Set<string>> | null = null 13 + 14 + /** 15 + * Queries the database directly to check if an actor with the given DID exists 16 + */ 17 + async function queryActorDirectly(did: string, db: Database): Promise<boolean> { 18 + return !!(await db.models.Actor.findOne({ did }).lean()) 19 + } 20 + 21 + /** 22 + * Checks if an actor with the given DID exists in the database using cache for optimization 23 + */ 24 + export async function isActorInDatabase(did: string, db: Database): Promise<boolean> { 25 + // Check if actors are already cached 26 + const actorDids = actorsCache.get(ACTORS_CACHE_KEY) 27 + if (actorDids) { 28 + return actorDids.has(did) 29 + } 30 + 31 + // If a refresh is already in progress, wait for it 32 + if (cacheRefreshInProgress && refreshPromise) { 33 + try { 34 + const refreshedActors = await refreshPromise 35 + return refreshedActors.has(did) 36 + } catch (error) { 37 + logger.warn({ error, did }, 'Shared cache refresh failed, falling back to direct query') 38 + return queryActorDirectly(did, db) 39 + } 40 + } 41 + 42 + // Start a new cache refresh 43 + cacheRefreshInProgress = true 44 + refreshPromise = refreshActorsCache(db) 45 + 46 + try { 47 + const refreshedActors = await refreshPromise 48 + return refreshedActors.has(did) 49 + } catch (error) { 50 + logger.warn({ error, did }, 'Cache refresh failed, falling back to direct query') 51 + return queryActorDirectly(did, db) 52 + } finally { 53 + cacheRefreshInProgress = false 54 + refreshPromise = null 55 + } 56 + } 57 + 58 + /** 59 + * Refreshes the actors cache by fetching all actors from the database 60 + */ 61 + async function refreshActorsCache(db: Database): Promise<Set<string>> { 62 + try { 63 + const actors = await db.models.Actor.find({}, { did: 1, _id: 0 }).lean() 64 + const actorDids = new Set(actors.map((actor) => actor.did)) 65 + actorsCache.set(ACTORS_CACHE_KEY, actorDids) 66 + logger.info({ actorCount: actorDids.size }, 'Refreshed actors cache') 67 + return actorDids 68 + } catch (error) { 69 + logger.error({ error }, 'Failed to fetch actors for caching') 70 + throw error 71 + } 72 + }
+11 -10
services/ingester/src/utils/actor-utils.ts
··· 1 1 import { pino } from 'pino' 2 2 import { Database } from '../db/connection.js' 3 + import { customConfig } from './logger-config.js' 3 4 4 - const logger = pino({ name: 'actor-utils' }) 5 + const logger = pino(customConfig('actor-utils')) 5 6 6 7 /** 7 8 * Ensures that an actor exists for the given DID. 8 9 * If the actor doesn't exist, it creates a new one. 9 - * 10 + * 10 11 * @param did The DID to ensure has an actor 11 12 * @param handle Optional handle associated with the DID 12 13 * @param db Database connection 13 14 * @returns The actor document, either existing or newly created 14 15 */ 15 16 export async function ensureActor( 16 - did: string, 17 - handle?: string, 17 + did: string, 18 + handle?: string, 18 19 db?: Database 19 20 ): Promise<any> { 20 21 if (!db) { ··· 25 26 try { 26 27 // Try to find existing actor 27 28 const existingActor = await db.models.Actor.findOne({ did }) 28 - 29 + 29 30 if (existingActor) { 30 31 // If handle is provided and different from existing, update it 31 32 if (handle && existingActor.handle !== handle) { ··· 39 40 // Create new actor if none exists 40 41 const now = new Date() 41 42 const uri = `at://${did}/so.sprk.actor.profile` 42 - 43 + 43 44 const newActor = await db.models.Actor.create({ 44 45 uri, 45 46 did, ··· 62 63 63 64 /** 64 65 * Links a profile to an actor 65 - * 66 + * 66 67 * @param did The DID of the actor 67 68 * @param profileId The MongoDB ID of the profile 68 69 * @param profileCid The CID of the profile ··· 77 78 try { 78 79 await db.models.Actor.findOneAndUpdate( 79 80 { did }, 80 - { 81 + { 81 82 profile: profileId, 82 83 profileCid 83 84 }, 84 85 { new: true } 85 86 ) 86 - 87 + 87 88 logger.info({ did, profileId }, 'Linked profile to actor') 88 89 } catch (error) { 89 90 logger.error({ error, did, profileId }, 'Failed to link profile to actor') 90 91 } 91 - } 92 + }
+3 -2
services/ingester/src/utils/env.ts
··· 1 1 import * as dotenv from 'dotenv' 2 - import { envBool, envInt, envList, envStr } from '@atproto/common' 2 + import { envInt, envStr } from '@atproto/common' 3 3 4 4 dotenv.config() 5 5 6 6 export const env = { 7 - NODE_ENV: envStr('NODE_ENV') ?? 'test', 7 + NODE_ENV: envStr('NODE_ENV') ?? 'development', 8 8 9 9 JETSTREAM_URL: 10 10 envStr('JETSTREAM_URL') ?? 11 11 'wss://jetstream2.us-east.bsky.network/subscribe', 12 12 13 + DB_URI: envStr('DB_URI'), 13 14 DB_NAME: envStr('DB_NAME') ?? 'dev', 14 15 DB_HOST: envStr('DB_HOST') ?? 'localhost', 15 16 DB_PORT: envInt('DB_PORT') ?? 27017,
+5 -16
services/ingester/src/utils/jetstream-client.ts
··· 5 5 import { Database } from '../db/connection.js' 6 6 import type { BidirectionalResolver } from './id-resolver.js' 7 7 import { handleEvent } from '../handlers/index.js' 8 + import { customConfig } from './logger-config.js' 8 9 9 - const logger = pino({ name: 'jetstream-client' }) 10 + const logger = pino(customConfig('jetstream-client')) 10 11 11 12 export interface JetstreamClientOptions { 12 13 filterCollections?: string[] ··· 165 166 const { did, time_us } = event 166 167 const { operation, collection, rkey, record } = event.commit 167 168 168 - logger.debug( 169 + logger.trace( 169 170 `Processing ${operation} operation for DID: ${did}, collection: ${collection}, rkey: ${rkey}`, 170 171 ) 171 172 172 - // Resolve DID to handle if needed 173 - let handle = null 174 - try { 175 - if (did) { 176 - const didData = await resolver.resolveDidToDidDoc(did) 177 - handle = didData.handle 178 - } 179 - } catch (error) { 180 - logger.warn( 181 - { did, error: (error as Error).message }, 182 - 'Failed to resolve DID to handle', 183 - ) 184 - } 173 + const handle = null 185 174 186 175 // Construct a normalized event object 187 176 const normalizedEvent: NormalizedEvent = { ··· 197 186 } 198 187 199 188 // Process the normalized event 200 - await handleEvent(normalizedEvent, db) 189 + await handleEvent(normalizedEvent, db, resolver) 201 190 } 202 191 203 192 return { connect }
+15
services/ingester/src/utils/logger-config.ts
··· 1 + import type { LoggerOptions } from 'pino' 2 + import { env } from './env' 3 + 4 + export const customConfig = (name: string): LoggerOptions => { 5 + return { 6 + name, 7 + level: env.NODE_ENV === 'development' ? 'debug' : 'info', 8 + ...(env.NODE_ENV === 'development' && { 9 + transport: { 10 + target: 'pino-pretty', 11 + options: { colorize: true }, 12 + }, 13 + }), 14 + } 15 + }
+90
services/ingester/src/utils/ttl-cache.ts
··· 1 + type Timestamp = number; 2 + 3 + interface CacheEntry<T> { 4 + value: T; 5 + expiresAt: Timestamp; 6 + } 7 + 8 + interface TtlCacheOptions { 9 + defaultTtlMs?: number; 10 + } 11 + 12 + export class TtlCache<K, V> { 13 + private store = new Map<K, CacheEntry<V>>(); 14 + private defaultTtlMs: number; 15 + 16 + constructor(options?: TtlCacheOptions) { 17 + this.defaultTtlMs = options?.defaultTtlMs ?? 60_000; // default: 1 min 18 + } 19 + 20 + /** 21 + * Inserts a value into the cache. 22 + * @param key The item key. 23 + * @param value The value to be cached. 24 + * @param ttlMs Time to live in milliseconds (optional). 25 + */ 26 + set(key: K, value: V, ttlMs?: number): void { 27 + const expiresAt = Date.now() + (ttlMs ?? this.defaultTtlMs); 28 + this.store.set(key, { value, expiresAt }); 29 + } 30 + 31 + /** 32 + * Returns the cached value, or undefined if expired or non-existent. 33 + */ 34 + get(key: K): V | undefined { 35 + const entry = this.store.get(key); 36 + if (!entry) return undefined; 37 + 38 + if (Date.now() > entry.expiresAt) { 39 + this.store.delete(key); 40 + return undefined; 41 + } 42 + 43 + return entry.value; 44 + } 45 + 46 + /** 47 + * Checks if the key exists and is still valid. 48 + */ 49 + has(key: K): boolean { 50 + const entry = this.store.get(key); 51 + if (!entry) return false; 52 + 53 + if (Date.now() > entry.expiresAt) { 54 + this.store.delete(key); 55 + return false; 56 + } 57 + 58 + return true; 59 + } 60 + 61 + /** 62 + * Manually removes an entry. 63 + */ 64 + delete(key: K): void { 65 + this.store.delete(key); 66 + } 67 + 68 + /** 69 + * Clears the entire cache. 70 + */ 71 + clear(): void { 72 + this.store.clear(); 73 + } 74 + 75 + /** 76 + * Returns the number of valid items in the cache. 77 + */ 78 + size(): number { 79 + let count = 0; 80 + const now = Date.now(); 81 + for (const [key, entry] of this.store) { 82 + if (now > entry.expiresAt) { 83 + this.store.delete(key); 84 + } else { 85 + count++; 86 + } 87 + } 88 + return count; 89 + } 90 + }