[READ ONLY MIRROR] Spark Social AppView Server github.com/sprksocial/server
atproto deno hono lexicon
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 138 lines 3.6 kB view raw
1import { Cid } from "@atp/lex"; 2import { AtUri, normalizeDatetimeAlways } from "@atp/syntax"; 3import * as so from "../../../lex/so.ts"; 4import { BackgroundQueue } from "../../background.ts"; 5import { Database } from "../../db/index.ts"; 6import { FollowDocument } from "../../db/models.ts"; 7import { RecordProcessor } from "../processor.ts"; 8 9const schema = so.sprk.graph.follow.main; 10type FollowRecord = so.sprk.graph.follow.Main; 11type IndexedFollow = FollowDocument; 12 13const insertFn = async ( 14 db: Database, 15 uri: AtUri, 16 cid: Cid, 17 obj: FollowRecord, 18 timestamp: string, 19): Promise<IndexedFollow | null> => { 20 const follow = { 21 uri: uri.toString(), 22 cid: cid.toString(), 23 authorDid: uri.host, 24 subject: obj.subject, 25 createdAt: normalizeDatetimeAlways(obj.createdAt), 26 indexedAt: timestamp, 27 }; 28 29 const insertedFollow = await db.models.Follow.findOneAndUpdate( 30 { uri: follow.uri }, 31 { $set: follow }, 32 { upsert: true, returnDocument: "after" }, 33 ); 34 return insertedFollow; 35}; 36 37const findDuplicate = async ( 38 db: Database, 39 uri: AtUri, 40 obj: FollowRecord, 41): Promise<AtUri | null> => { 42 const found = await db.models.Follow.findOne({ 43 authorDid: uri.host, 44 subject: obj.subject, 45 }).select("uri").lean(); 46 return found ? new AtUri(found.uri) : null; 47}; 48 49const notifsForInsert = (obj: IndexedFollow) => { 50 return [ 51 { 52 did: obj.subject, 53 author: obj.authorDid, 54 recordUri: obj.uri, 55 recordCid: obj.cid, 56 reason: "follow" as const, 57 reasonSubject: undefined, 58 sortAt: obj.createdAt, 59 }, 60 ]; 61}; 62 63const deleteFn = async ( 64 db: Database, 65 uri: AtUri, 66): Promise<IndexedFollow | null> => { 67 const deleted = await db.models.Follow.findOneAndDelete({ 68 uri: uri.toString(), 69 }); 70 return deleted; 71}; 72 73const notifsForDelete = ( 74 deleted: IndexedFollow, 75 replacedBy: IndexedFollow | null, 76) => { 77 const toDelete = replacedBy ? [] : [deleted.uri]; 78 return { notifs: [], toDelete }; 79}; 80 81const updateAggregates = async (db: Database, follow: IndexedFollow) => { 82 // Update followers count for the subject (count both types) 83 const followersCount = await db.models.Follow.countDocuments({ 84 subject: follow.subject, 85 }); 86 87 // First check if profile exists to avoid creating one with null URI 88 const existingSubjectProfile = await db.models.Profile.findOne({ 89 authorDid: follow.subject, 90 }); 91 92 if (existingSubjectProfile) { 93 // Only update existing profiles 94 await db.models.Profile.findOneAndUpdate( 95 { authorDid: follow.subject }, 96 { $set: { followersCount } }, 97 { returnDocument: "after" }, 98 ); 99 } 100 101 // Update follows count for the author (count both types) 102 const followsCount = await db.models.Follow.countDocuments({ 103 authorDid: follow.authorDid, 104 }); 105 106 // First check if profile exists to avoid creating one with null URI 107 const existingAuthorProfile = await db.models.Profile.findOne({ 108 authorDid: follow.authorDid, 109 }); 110 111 if (existingAuthorProfile) { 112 // Only update existing profiles 113 await db.models.Profile.findOneAndUpdate( 114 { authorDid: follow.authorDid }, 115 { $set: { followsCount } }, 116 { returnDocument: "after" }, 117 ); 118 } 119}; 120 121export type PluginType = RecordProcessor<typeof schema, IndexedFollow>; 122 123export const makePlugin = ( 124 db: Database, 125 background: BackgroundQueue, 126): PluginType => { 127 return new RecordProcessor(db, background, { 128 schema, 129 insertFn, 130 findDuplicate, 131 deleteFn, 132 notifsForInsert, 133 notifsForDelete, 134 updateAggregates, 135 }); 136}; 137 138export default makePlugin;