[READ ONLY MIRROR] Spark Social AppView Server github.com/sprksocial/server
atproto deno hono lexicon
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 178 lines 4.5 kB view raw
1import { Cid } from "@atp/lex"; 2import { AtUri, normalizeDatetimeAlways } from "@atp/syntax"; 3import * as so from "../../../lex/so.ts"; 4import { BackgroundQueue } from "../../background.ts"; 5import { Database } from "../../db/index.ts"; 6import { RepostDocument } from "../../db/models.ts"; 7import { RecordProcessor } from "../processor.ts"; 8 9const schema = so.sprk.feed.repost.main; 10type RepostRecord = so.sprk.feed.repost.Main; 11type IndexedRepost = RepostDocument; 12 13const insertFn = async ( 14 db: Database, 15 uri: AtUri, 16 cid: Cid, 17 obj: RepostRecord, 18 timestamp: string, 19): Promise<IndexedRepost | null> => { 20 const viaObj = obj.via as { uri: string; cid: string } | undefined; 21 const via = viaObj?.uri || null; 22 const viaCid = viaObj?.cid || null; 23 24 const repost = { 25 uri: uri.toString(), 26 cid: cid.toString(), 27 authorDid: uri.host, 28 subject: obj.subject.uri, 29 subjectCid: obj.subject.cid, 30 via, 31 viaCid, 32 createdAt: normalizeDatetimeAlways(obj.createdAt), 33 indexedAt: timestamp, 34 }; 35 36 // Use findOneAndUpdate with compound key to handle potential duplicate key errors 37 const insertedRepost = await db.models.Repost.findOneAndUpdate( 38 { uri: repost.uri }, 39 { $set: repost }, 40 { upsert: true, returnDocument: "after" }, 41 ); 42 return insertedRepost; 43}; 44 45const findDuplicate = async ( 46 db: Database, 47 uri: AtUri, 48 obj: RepostRecord, 49): Promise<AtUri | null> => { 50 const found = await db.models.Repost.findOne({ 51 authorDid: uri.host, 52 subject: obj.subject.uri, 53 }).lean(); 54 return found ? new AtUri(found.uri) : null; 55}; 56 57const notifsForInsert = (obj: IndexedRepost) => { 58 const subjectUri = new AtUri(obj.subject); 59 // prevent self-notifications 60 const isRepostFromSubjectUser = subjectUri.host === obj.authorDid; 61 if (isRepostFromSubjectUser) { 62 return []; 63 } 64 65 const notifs: Array<{ 66 did: string; 67 reason: string; 68 author: string; 69 recordUri: string; 70 recordCid: string; 71 sortAt: string; 72 reasonSubject?: string; 73 }> = [ 74 // Notification to the author of the reposted record. 75 { 76 did: subjectUri.host, 77 author: obj.authorDid, 78 recordUri: obj.uri, 79 recordCid: obj.cid, 80 reason: "repost" as const, 81 reasonSubject: subjectUri.toString(), 82 sortAt: obj.createdAt, 83 }, 84 ]; 85 86 if (obj.via) { 87 const viaUri = new AtUri(obj.via); 88 const isRepostFromViaSubjectUser = viaUri.host === obj.authorDid; 89 // prevent self-notifications 90 if (!isRepostFromViaSubjectUser) { 91 notifs.push( 92 // Notification to the reposter via whose repost the repost was made. 93 { 94 did: viaUri.host, 95 author: obj.authorDid, 96 recordUri: obj.uri, 97 recordCid: obj.cid, 98 reason: "repost-via-repost" as const, 99 reasonSubject: viaUri.toString(), 100 sortAt: obj.createdAt, 101 }, 102 ); 103 } 104 } 105 106 return notifs; 107}; 108 109const deleteFn = async ( 110 db: Database, 111 uri: AtUri, 112): Promise<IndexedRepost | null> => { 113 const deleted = await db.models.Repost.findOneAndDelete({ 114 uri: uri.toString(), 115 }); 116 return deleted; 117}; 118 119const notifsForDelete = ( 120 deleted: IndexedRepost, 121 replacedBy: IndexedRepost | null, 122) => { 123 const toDelete = replacedBy ? [] : [deleted.uri]; 124 return { notifs: [], toDelete }; 125}; 126 127const updateAggregates = async (db: Database, repost: IndexedRepost) => { 128 const repostCount = await db.models.Repost.countDocuments({ 129 subject: repost.subject, 130 }); 131 132 const existingPost = await db.models.Post.findOne({ 133 uri: repost.subject, 134 }); 135 136 if (existingPost) { 137 await db.models.Post.findOneAndUpdate( 138 { uri: repost.subject }, 139 { $set: { repostCount } }, 140 { returnDocument: "after" }, 141 ); 142 } 143 144 const authorRepostCount = await db.models.Repost.countDocuments({ 145 authorDid: repost.authorDid, 146 }); 147 148 const existingProfile = await db.models.Profile.findOne({ 149 authorDid: repost.authorDid, 150 }); 151 152 if (existingProfile) { 153 await db.models.Profile.findOneAndUpdate( 154 { authorDid: repost.authorDid }, 155 { $set: { repostCount: authorRepostCount } }, 156 { returnDocument: "after" }, 157 ); 158 } 159}; 160 161export type PluginType = RecordProcessor<typeof schema, IndexedRepost>; 162 163export const makePlugin = ( 164 db: Database, 165 background: BackgroundQueue, 166): PluginType => { 167 return new RecordProcessor(db, background, { 168 schema, 169 insertFn, 170 findDuplicate, 171 deleteFn, 172 notifsForInsert, 173 notifsForDelete, 174 updateAggregates, 175 }); 176}; 177 178export default makePlugin;