[READ ONLY MIRROR] Spark Social AppView Server
github.com/sprksocial/server
atproto
deno
hono
lexicon
1import { Cid } from "@atp/lex";
2import { AtUri, normalizeDatetimeAlways } from "@atp/syntax";
3import * as so from "../../../lex/so.ts";
4import { BackgroundQueue } from "../../background.ts";
5import { Database } from "../../db/index.ts";
6import { RepostDocument } from "../../db/models.ts";
7import { RecordProcessor } from "../processor.ts";
8
9const schema = so.sprk.feed.repost.main;
10type RepostRecord = so.sprk.feed.repost.Main;
11type IndexedRepost = RepostDocument;
12
13const insertFn = async (
14 db: Database,
15 uri: AtUri,
16 cid: Cid,
17 obj: RepostRecord,
18 timestamp: string,
19): Promise<IndexedRepost | null> => {
20 const viaObj = obj.via as { uri: string; cid: string } | undefined;
21 const via = viaObj?.uri || null;
22 const viaCid = viaObj?.cid || null;
23
24 const repost = {
25 uri: uri.toString(),
26 cid: cid.toString(),
27 authorDid: uri.host,
28 subject: obj.subject.uri,
29 subjectCid: obj.subject.cid,
30 via,
31 viaCid,
32 createdAt: normalizeDatetimeAlways(obj.createdAt),
33 indexedAt: timestamp,
34 };
35
36 // Use findOneAndUpdate with compound key to handle potential duplicate key errors
37 const insertedRepost = await db.models.Repost.findOneAndUpdate(
38 { uri: repost.uri },
39 { $set: repost },
40 { upsert: true, returnDocument: "after" },
41 );
42 return insertedRepost;
43};
44
45const findDuplicate = async (
46 db: Database,
47 uri: AtUri,
48 obj: RepostRecord,
49): Promise<AtUri | null> => {
50 const found = await db.models.Repost.findOne({
51 authorDid: uri.host,
52 subject: obj.subject.uri,
53 }).lean();
54 return found ? new AtUri(found.uri) : null;
55};
56
57const notifsForInsert = (obj: IndexedRepost) => {
58 const subjectUri = new AtUri(obj.subject);
59 // prevent self-notifications
60 const isRepostFromSubjectUser = subjectUri.host === obj.authorDid;
61 if (isRepostFromSubjectUser) {
62 return [];
63 }
64
65 const notifs: Array<{
66 did: string;
67 reason: string;
68 author: string;
69 recordUri: string;
70 recordCid: string;
71 sortAt: string;
72 reasonSubject?: string;
73 }> = [
74 // Notification to the author of the reposted record.
75 {
76 did: subjectUri.host,
77 author: obj.authorDid,
78 recordUri: obj.uri,
79 recordCid: obj.cid,
80 reason: "repost" as const,
81 reasonSubject: subjectUri.toString(),
82 sortAt: obj.createdAt,
83 },
84 ];
85
86 if (obj.via) {
87 const viaUri = new AtUri(obj.via);
88 const isRepostFromViaSubjectUser = viaUri.host === obj.authorDid;
89 // prevent self-notifications
90 if (!isRepostFromViaSubjectUser) {
91 notifs.push(
92 // Notification to the reposter via whose repost the repost was made.
93 {
94 did: viaUri.host,
95 author: obj.authorDid,
96 recordUri: obj.uri,
97 recordCid: obj.cid,
98 reason: "repost-via-repost" as const,
99 reasonSubject: viaUri.toString(),
100 sortAt: obj.createdAt,
101 },
102 );
103 }
104 }
105
106 return notifs;
107};
108
109const deleteFn = async (
110 db: Database,
111 uri: AtUri,
112): Promise<IndexedRepost | null> => {
113 const deleted = await db.models.Repost.findOneAndDelete({
114 uri: uri.toString(),
115 });
116 return deleted;
117};
118
119const notifsForDelete = (
120 deleted: IndexedRepost,
121 replacedBy: IndexedRepost | null,
122) => {
123 const toDelete = replacedBy ? [] : [deleted.uri];
124 return { notifs: [], toDelete };
125};
126
127const updateAggregates = async (db: Database, repost: IndexedRepost) => {
128 const repostCount = await db.models.Repost.countDocuments({
129 subject: repost.subject,
130 });
131
132 const existingPost = await db.models.Post.findOne({
133 uri: repost.subject,
134 });
135
136 if (existingPost) {
137 await db.models.Post.findOneAndUpdate(
138 { uri: repost.subject },
139 { $set: { repostCount } },
140 { returnDocument: "after" },
141 );
142 }
143
144 const authorRepostCount = await db.models.Repost.countDocuments({
145 authorDid: repost.authorDid,
146 });
147
148 const existingProfile = await db.models.Profile.findOne({
149 authorDid: repost.authorDid,
150 });
151
152 if (existingProfile) {
153 await db.models.Profile.findOneAndUpdate(
154 { authorDid: repost.authorDid },
155 { $set: { repostCount: authorRepostCount } },
156 { returnDocument: "after" },
157 );
158 }
159};
160
161export type PluginType = RecordProcessor<typeof schema, IndexedRepost>;
162
163export const makePlugin = (
164 db: Database,
165 background: BackgroundQueue,
166): PluginType => {
167 return new RecordProcessor(db, background, {
168 schema,
169 insertFn,
170 findDuplicate,
171 deleteFn,
172 notifsForInsert,
173 notifsForDelete,
174 updateAggregates,
175 });
176};
177
178export default makePlugin;