···11+import { pino } from 'pino'
22+import { Database } from '../db/connection.js'
33+import type { NormalizedEvent } from '../types/events.js'
44+import { IndexingService } from '../services/indexing.js'
55+import { BidirectionalResolver } from '../id-resolver.js'
66+77+const logger = pino({ name: 'actor-handler' })
88+99+/**
1010+ * This handler is called by all other handlers to ensure that
1111+ * any DID referenced in an event has a corresponding actor entry.
1212+ *
1313+ * @param evt The normalized event to process
1414+ * @param db Database connection
1515+ */
1616+export async function handleActorReferences(evt: NormalizedEvent, db: Database): Promise<void> {
1717+ try {
1818+ const now = new Date().toISOString()
1919+ const resolver = new BidirectionalResolver()
2020+ const indexingService = new IndexingService(db, resolver)
2121+2222+ // Always ensure the author DID has an actor
2323+ if (evt.did) {
2424+ await indexingService.indexHandle(evt.did, now)
2525+ }
2626+2727+ // Handle subject DIDs for follow, block, like events
2828+ if (['follow', 'block', 'like'].includes(evt.event) && evt.record?.subject) {
2929+ // Subject is usually a DID in format did:plc:12345
3030+ const subjectDid = evt.record.subject as string
3131+ if (subjectDid && subjectDid.startsWith('did:')) {
3232+ await indexingService.indexHandle(subjectDid, now)
3333+ }
3434+ }
3535+3636+ // Handle reply references for posts
3737+ if (evt.collection === 'so.sprk.feed.post' && evt.record?.reply) {
3838+ const reply = evt.record.reply as { root?: { uri?: string }, parent?: { uri?: string } }
3939+4040+ // Extract DIDs from reply URIs (format: at://did:plc:12345/...)
4141+ if (reply.root?.uri) {
4242+ const rootDid = extractDidFromUri(reply.root.uri)
4343+ if (rootDid) {
4444+ await indexingService.indexHandle(rootDid, now)
4545+ }
4646+ }
4747+4848+ if (reply.parent?.uri) {
4949+ const parentDid = extractDidFromUri(reply.parent.uri)
5050+ if (parentDid && parentDid !== extractDidFromUri(reply.root?.uri || '')) {
5151+ await indexingService.indexHandle(parentDid, now)
5252+ }
5353+ }
5454+ }
5555+5656+ // Handle repost subjects
5757+ if (evt.collection === 'so.sprk.feed.repost' && evt.record?.subject?.uri) {
5858+ const subjectUri = evt.record.subject.uri as string
5959+ const subjectDid = extractDidFromUri(subjectUri)
6060+ if (subjectDid) {
6161+ await indexingService.indexHandle(subjectDid, now)
6262+ }
6363+ }
6464+ } catch (error) {
6565+ logger.error({ error, uri: evt.uri }, 'Error while handling actor references')
6666+ }
6767+}
6868+6969+/**
7070+ * Helper function to extract DID from a URI
7171+ */
7272+function extractDidFromUri(uri: string): string | null {
7373+ const match = uri.match(/at:\/\/(did:[^/]+)/)
7474+ return match ? match[1] : null
7575+}
+5-1
services/ingester/src/handlers/index.ts
···1111import { handleMusicEvent } from './music-handler.js'
1212import { handleLookEvent } from './look-handler.js'
1313import { handleGeneratorEvent } from './generator-handler.js'
1414+import { handleActorReferences } from './actor-handler.js'
14151516const logger = pino({ name: 'event-handler' })
16171718export async function handleEvent(evt: NormalizedEvent, db: Database): Promise<void> {
1819 try {
1919- // Handle different events based on collection
2020+ // First, ensure all actor references are handled properly
2121+ await handleActorReferences(evt, db)
2222+2323+ // Then handle different events based on collection
2024 if (evt.collection === 'so.sprk.feed.like') {
2125 await handleLikeEvent(evt, db)
2226 return
+14-3
services/ingester/src/handlers/profile-handler.ts
···11import { pino } from 'pino'
22import { Database } from '../db/connection.js'
33import type { NormalizedEvent } from '../types/events.js'
44+import { ensureActor, linkProfileToActor } from '../utils/actor-utils.js'
55+import type { ProfileDocument } from '../db/models.js'
4657const logger = pino({ name: 'profile-handler' })
68···3739 }, 'Processing profile event')
38403941 try {
4242+ // First, ensure we have an actor for this DID
4343+ await ensureActor(evt.did, evt.handle || undefined, db)
4444+4045 const profileData = {
4146 uri: evt.uri,
4247 displayName: record.displayName,
···5358 cid: evt.commit.cid
5459 }
55605656- await db.models.Profile.findOneAndUpdate(
6161+ // Save the profile
6262+ const profile = await db.models.Profile.findOneAndUpdate(
5763 { uri: evt.uri },
5864 profileData,
5965 { upsert: true, new: true }
6060- )
6666+ ) as ProfileDocument
6767+6868+ if (profile && profile._id) {
6969+ // Link the profile to the actor
7070+ await linkProfileToActor(evt.did, profile._id.toString(), evt.commit.cid, db)
7171+ }
61726273 logger.info(
6374 { uri: evt.uri },
6464- 'Successfully saved profile to database'
7575+ 'Successfully saved profile to database and linked to actor'
6576 )
6677 } catch (error) {
6778 logger.error(
+61
services/ingester/src/id-resolver.ts
···11+import { pino } from 'pino'
22+33+const logger = pino({ name: 'id-resolver' })
44+55+export interface DidDocument {
66+ did: string
77+ handle?: string
88+}
99+1010+/**
1111+ * Service to handle resolving DIDs to handles and vice versa
1212+ */
1313+export class BidirectionalResolver {
1414+ private logger = pino({ name: 'id-resolver' })
1515+1616+ /**
1717+ * Resolve a DID to its DID document
1818+ */
1919+ async resolveDidToDidDoc(did: string): Promise<DidDocument> {
2020+ try {
2121+ // TODO: Implement actual DID resolution
2222+ // For now, return basic document
2323+ return {
2424+ did,
2525+ }
2626+ } catch (error) {
2727+ this.logger.error({ error, did }, 'Failed to resolve DID to DID document')
2828+ throw error
2929+ }
3030+ }
3131+3232+ /**
3333+ * Resolve a handle to its DID document
3434+ */
3535+ async resolveHandleToDidDoc(handle: string): Promise<DidDocument> {
3636+ try {
3737+ // TODO: Implement actual handle resolution
3838+ // For now, return basic document
3939+ return {
4040+ did: `did:plc:${handle.toLowerCase()}`,
4141+ handle,
4242+ }
4343+ } catch (error) {
4444+ this.logger.error({ error, handle }, 'Failed to resolve handle to DID document')
4545+ throw error
4646+ }
4747+ }
4848+4949+ /**
5050+ * Resolve a DID to its handle
5151+ */
5252+ async resolveDidToHandle(did: string): Promise<string> {
5353+ try {
5454+ const doc = await this.resolveDidToDidDoc(did)
5555+ return doc.handle || did
5656+ } catch (error) {
5757+ this.logger.error({ error, did }, 'Failed to resolve DID to handle')
5858+ throw error
5959+ }
6060+ }
6161+}
+109
services/ingester/src/services/indexing.ts
···11+import { pino } from 'pino'
22+import { Database } from '../db/connection.js'
33+import { BidirectionalResolver } from '../id-resolver.js'
44+55+const logger = pino({ name: 'indexing-service' })
66+77+/**
88+ * Service to handle indexing of actors and their handles
99+ */
1010+export class IndexingService {
1111+ private logger = pino({ name: 'indexing-service' })
1212+1313+ constructor(
1414+ private db: Database,
1515+ private resolver: BidirectionalResolver,
1616+ ) {}
1717+1818+ /**
1919+ * Index or update actor handle information
2020+ *
2121+ * @param did The DID of the actor
2222+ * @param timestamp The timestamp of the operation
2323+ * @param force Force reindexing even if recently indexed
2424+ */
2525+ async indexHandle(did: string, timestamp: string, force = false): Promise<void> {
2626+ try {
2727+ // Find existing actor
2828+ const actor = await this.db.models.Actor.findOne({ did })
2929+3030+ // Skip if recently indexed and not forced
3131+ if (!force && actor && this.isHandleRecentlyIndexed(actor, timestamp)) {
3232+ return
3333+ }
3434+3535+ // Resolve DID to handle
3636+ const didDoc = await this.resolver.resolveDidToDidDoc(did)
3737+3838+ // Verify handle ownership
3939+ let handle: string | undefined = undefined
4040+ if (didDoc.handle) {
4141+ const handleDidDoc = await this.resolver.resolveHandleToDidDoc(didDoc.handle)
4242+ handle = did === handleDidDoc.did ? didDoc.handle.toLowerCase() : undefined
4343+ }
4444+4545+ // Handle conflict resolution - if another actor has this handle
4646+ if (handle) {
4747+ const actorWithHandle = await this.db.models.Actor.findOne({ handle })
4848+ if (actorWithHandle && actorWithHandle.did !== did) {
4949+ // Clear handle from the other actor
5050+ await this.db.models.Actor.updateOne(
5151+ { did: actorWithHandle.did },
5252+ { $set: { handle: null } }
5353+ )
5454+ }
5555+ }
5656+5757+ const existingProfile = await this.db.models.Profile.findOne({ authorDid: did })
5858+5959+ // Update or create actor
6060+ await this.db.models.Actor.updateOne(
6161+ { did },
6262+ {
6363+ $set: {
6464+ handle,
6565+ indexedAt: timestamp,
6666+ ...(existingProfile && existingProfile._id ? {
6767+ profile: existingProfile._id,
6868+ profileCid: existingProfile.cid
6969+ } : {})
7070+ },
7171+ $setOnInsert: {
7272+ uri: `at://${did}/so.sprk.actor.profile`,
7373+ followersCount: 0,
7474+ followingCount: 0,
7575+ postsCount: 0,
7676+ isLabeler: false,
7777+ priorityNotifications: false,
7878+ }
7979+ },
8080+ { upsert: true }
8181+ )
8282+8383+ if (existingProfile) {
8484+ this.logger.info({ did, profileId: existingProfile._id }, 'Linked existing profile to actor during indexing')
8585+ }
8686+ } catch (error) {
8787+ this.logger.error({ error, did }, 'Error indexing handle')
8888+ }
8989+ }
9090+9191+ /**
9292+ * Check if an actor's handle was recently indexed
9393+ */
9494+ private isHandleRecentlyIndexed(actor: any, timestamp: string): boolean {
9595+ if (!actor.indexedAt) return false
9696+9797+ const timeDiff = new Date(timestamp).getTime() - new Date(actor.indexedAt).getTime()
9898+ const ONE_DAY = 24 * 60 * 60 * 1000
9999+ const ONE_HOUR = 60 * 60 * 1000
100100+101101+ // Reindex daily for all actors
102102+ if (timeDiff > ONE_DAY) return false
103103+104104+ // Reindex more frequently for actors without handles
105105+ if (actor.handle === null && timeDiff > ONE_HOUR) return false
106106+107107+ return true
108108+ }
109109+}
+91
services/ingester/src/utils/actor-utils.ts
···11+import { pino } from 'pino'
22+import { Database } from '../db/connection.js'
33+44+const logger = pino({ name: 'actor-utils' })
55+66+/**
77+ * Ensures that an actor exists for the given DID.
88+ * If the actor doesn't exist, it creates a new one.
99+ *
1010+ * @param did The DID to ensure has an actor
1111+ * @param handle Optional handle associated with the DID
1212+ * @param db Database connection
1313+ * @returns The actor document, either existing or newly created
1414+ */
1515+export async function ensureActor(
1616+ did: string,
1717+ handle?: string,
1818+ db?: Database
1919+): Promise<any> {
2020+ if (!db) {
2121+ logger.warn({ did }, 'No database connection provided to ensureActor')
2222+ return null
2323+ }
2424+2525+ try {
2626+ // Try to find existing actor
2727+ const existingActor = await db.models.Actor.findOne({ did })
2828+2929+ if (existingActor) {
3030+ // If handle is provided and different from existing, update it
3131+ if (handle && existingActor.handle !== handle) {
3232+ existingActor.handle = handle
3333+ await existingActor.save()
3434+ logger.info({ did, handle }, 'Updated actor handle')
3535+ }
3636+ return existingActor
3737+ }
3838+3939+ // Create new actor if none exists
4040+ const now = new Date()
4141+ const uri = `at://${did}/so.sprk.actor.profile`
4242+4343+ const newActor = await db.models.Actor.create({
4444+ uri,
4545+ did,
4646+ handle: handle || undefined,
4747+ followersCount: 0,
4848+ followingCount: 0,
4949+ postsCount: 0,
5050+ indexedAt: now.toISOString(),
5151+ isLabeler: false,
5252+ priorityNotifications: false
5353+ })
5454+5555+ logger.info({ did, handle }, 'Created new actor')
5656+ return newActor
5757+ } catch (error) {
5858+ logger.error({ error, did, handle }, 'Failed to ensure actor exists')
5959+ return null
6060+ }
6161+}
6262+6363+/**
6464+ * Links a profile to an actor
6565+ *
6666+ * @param did The DID of the actor
6767+ * @param profileId The MongoDB ID of the profile
6868+ * @param profileCid The CID of the profile
6969+ * @param db Database connection
7070+ */
7171+export async function linkProfileToActor(
7272+ did: string,
7373+ profileId: string,
7474+ profileCid: string,
7575+ db: Database
7676+): Promise<void> {
7777+ try {
7878+ await db.models.Actor.findOneAndUpdate(
7979+ { did },
8080+ {
8181+ profile: profileId,
8282+ profileCid
8383+ },
8484+ { new: true }
8585+ )
8686+8787+ logger.info({ did, profileId }, 'Linked profile to actor')
8888+ } catch (error) {
8989+ logger.error({ error, did, profileId }, 'Failed to link profile to actor')
9090+ }
9191+}