import { Cid } from "@atp/lex"; import { AtUri, normalizeDatetimeAlways } from "@atp/syntax"; import * as so from "../../../lex/so.ts"; import { BackgroundQueue } from "../../background.ts"; import { Database } from "../../db/index.ts"; import { FollowDocument } from "../../db/models.ts"; import { RecordProcessor } from "../processor.ts"; const schema = so.sprk.graph.follow.main; type FollowRecord = so.sprk.graph.follow.Main; type IndexedFollow = FollowDocument; const insertFn = async ( db: Database, uri: AtUri, cid: Cid, obj: FollowRecord, timestamp: string, ): Promise => { const follow = { uri: uri.toString(), cid: cid.toString(), authorDid: uri.host, subject: obj.subject, createdAt: normalizeDatetimeAlways(obj.createdAt), indexedAt: timestamp, }; const insertedFollow = await db.models.Follow.findOneAndUpdate( { uri: follow.uri }, { $set: follow }, { upsert: true, returnDocument: "after" }, ); return insertedFollow; }; const findDuplicate = async ( db: Database, uri: AtUri, obj: FollowRecord, ): Promise => { const found = await db.models.Follow.findOne({ authorDid: uri.host, subject: obj.subject, }).select("uri").lean(); return found ? new AtUri(found.uri) : null; }; const notifsForInsert = (obj: IndexedFollow) => { return [ { did: obj.subject, author: obj.authorDid, recordUri: obj.uri, recordCid: obj.cid, reason: "follow" as const, reasonSubject: undefined, sortAt: obj.createdAt, }, ]; }; const deleteFn = async ( db: Database, uri: AtUri, ): Promise => { const deleted = await db.models.Follow.findOneAndDelete({ uri: uri.toString(), }); return deleted; }; const notifsForDelete = ( deleted: IndexedFollow, replacedBy: IndexedFollow | null, ) => { const toDelete = replacedBy ? [] : [deleted.uri]; return { notifs: [], toDelete }; }; const updateAggregates = async (db: Database, follow: IndexedFollow) => { // Update followers count for the subject (count both types) const followersCount = await db.models.Follow.countDocuments({ subject: follow.subject, }); // First check if profile exists to avoid creating one with null URI const existingSubjectProfile = await db.models.Profile.findOne({ authorDid: follow.subject, }); if (existingSubjectProfile) { // Only update existing profiles await db.models.Profile.findOneAndUpdate( { authorDid: follow.subject }, { $set: { followersCount } }, { returnDocument: "after" }, ); } // Update follows count for the author (count both types) const followsCount = await db.models.Follow.countDocuments({ authorDid: follow.authorDid, }); // First check if profile exists to avoid creating one with null URI const existingAuthorProfile = await db.models.Profile.findOne({ authorDid: follow.authorDid, }); if (existingAuthorProfile) { // Only update existing profiles await db.models.Profile.findOneAndUpdate( { authorDid: follow.authorDid }, { $set: { followsCount } }, { returnDocument: "after" }, ); } }; export type PluginType = RecordProcessor; export const makePlugin = ( db: Database, background: BackgroundQueue, ): PluginType => { return new RecordProcessor(db, background, { schema, insertFn, findDuplicate, deleteFn, notifsForInsert, notifsForDelete, updateAggregates, }); }; export default makePlugin;