[READ ONLY MIRROR] Spark Social AppView Server
github.com/sprksocial/server
atproto
deno
hono
lexicon
1import { Cid } from "@atp/lex";
2import { AtUri, normalizeDatetimeAlways } from "@atp/syntax";
3import * as so from "../../../lex/so.ts";
4import { BackgroundQueue } from "../../background.ts";
5import { Database } from "../../db/index.ts";
6import { FollowDocument } from "../../db/models.ts";
7import { RecordProcessor } from "../processor.ts";
8
9const schema = so.sprk.graph.follow.main;
10type FollowRecord = so.sprk.graph.follow.Main;
11type IndexedFollow = FollowDocument;
12
13const insertFn = async (
14 db: Database,
15 uri: AtUri,
16 cid: Cid,
17 obj: FollowRecord,
18 timestamp: string,
19): Promise<IndexedFollow | null> => {
20 const follow = {
21 uri: uri.toString(),
22 cid: cid.toString(),
23 authorDid: uri.host,
24 subject: obj.subject,
25 createdAt: normalizeDatetimeAlways(obj.createdAt),
26 indexedAt: timestamp,
27 };
28
29 const insertedFollow = await db.models.Follow.findOneAndUpdate(
30 { uri: follow.uri },
31 { $set: follow },
32 { upsert: true, returnDocument: "after" },
33 );
34 return insertedFollow;
35};
36
37const findDuplicate = async (
38 db: Database,
39 uri: AtUri,
40 obj: FollowRecord,
41): Promise<AtUri | null> => {
42 const found = await db.models.Follow.findOne({
43 authorDid: uri.host,
44 subject: obj.subject,
45 }).select("uri").lean();
46 return found ? new AtUri(found.uri) : null;
47};
48
49const notifsForInsert = (obj: IndexedFollow) => {
50 return [
51 {
52 did: obj.subject,
53 author: obj.authorDid,
54 recordUri: obj.uri,
55 recordCid: obj.cid,
56 reason: "follow" as const,
57 reasonSubject: undefined,
58 sortAt: obj.createdAt,
59 },
60 ];
61};
62
63const deleteFn = async (
64 db: Database,
65 uri: AtUri,
66): Promise<IndexedFollow | null> => {
67 const deleted = await db.models.Follow.findOneAndDelete({
68 uri: uri.toString(),
69 });
70 return deleted;
71};
72
73const notifsForDelete = (
74 deleted: IndexedFollow,
75 replacedBy: IndexedFollow | null,
76) => {
77 const toDelete = replacedBy ? [] : [deleted.uri];
78 return { notifs: [], toDelete };
79};
80
81const updateAggregates = async (db: Database, follow: IndexedFollow) => {
82 // Update followers count for the subject (count both types)
83 const followersCount = await db.models.Follow.countDocuments({
84 subject: follow.subject,
85 });
86
87 // First check if profile exists to avoid creating one with null URI
88 const existingSubjectProfile = await db.models.Profile.findOne({
89 authorDid: follow.subject,
90 });
91
92 if (existingSubjectProfile) {
93 // Only update existing profiles
94 await db.models.Profile.findOneAndUpdate(
95 { authorDid: follow.subject },
96 { $set: { followersCount } },
97 { returnDocument: "after" },
98 );
99 }
100
101 // Update follows count for the author (count both types)
102 const followsCount = await db.models.Follow.countDocuments({
103 authorDid: follow.authorDid,
104 });
105
106 // First check if profile exists to avoid creating one with null URI
107 const existingAuthorProfile = await db.models.Profile.findOne({
108 authorDid: follow.authorDid,
109 });
110
111 if (existingAuthorProfile) {
112 // Only update existing profiles
113 await db.models.Profile.findOneAndUpdate(
114 { authorDid: follow.authorDid },
115 { $set: { followsCount } },
116 { returnDocument: "after" },
117 );
118 }
119};
120
121export type PluginType = RecordProcessor<typeof schema, IndexedFollow>;
122
123export const makePlugin = (
124 db: Database,
125 background: BackgroundQueue,
126): PluginType => {
127 return new RecordProcessor(db, background, {
128 schema,
129 insertFn,
130 findDuplicate,
131 deleteFn,
132 notifsForInsert,
133 notifsForDelete,
134 updateAggregates,
135 });
136};
137
138export default makePlugin;