[READ ONLY MIRROR] Spark Social AppView Server
github.com/sprksocial/server
atproto
deno
hono
lexicon
1import { CID } from "multiformats/cid";
2import { AtUri } from "@atp/syntax";
3import * as lex from "../../../lex/lexicons.ts";
4import {
5 isMain as isMediaImages,
6 Main as MediaImages,
7} from "../../../lex/types/so/sprk/media/images.ts";
8import { Main as MediaImage } from "../../../lex/types/so/sprk/media/image.ts";
9import {
10 isMain as isMediaVideo,
11 Main as MediaVideo,
12} from "../../../lex/types/so/sprk/media/video.ts";
13import { Record as PostRecord } from "../../../lex/types/so/sprk/feed/post.ts";
14import { Record as GateRecord } from "../../../lex/types/so/sprk/feed/threadgate.ts";
15import {
16 isLink,
17 isMention,
18} from "../../../lex/types/so/sprk/richtext/facet.ts";
19import { BackgroundQueue } from "../../background.ts";
20import { Database } from "../../db/index.ts";
21import { PostDocument } from "../../db/models.ts";
22import { getDescendents } from "../../util.ts";
23import { RecordProcessor } from "../processor.ts";
24
25type PostAncestor = {
26 uri: string;
27 height: number;
28};
29type PostDescendent = {
30 uri: string;
31 depth: number;
32 cid: string;
33 creator: string;
34 sortAt: string;
35};
36type IndexedPost = {
37 post: PostDocument;
38 facets?: { type: "mention" | "link"; value: string }[];
39 medias?: Array<{
40 position?: number;
41 imageCid?: string;
42 alt?: string | null;
43 thumbCid?: string | null;
44 videoCid?: string;
45 }>;
46 ancestors?: PostAncestor[];
47 descendents?: PostDescendent[];
48 threadgate?: GateRecord;
49};
50
51const lexId = lex.ids.SoSprkFeedPost;
52
53const REPLY_NOTIF_DEPTH = 5;
54
55const insertFn = async (
56 db: Database,
57 uri: AtUri,
58 cid: CID,
59 obj: PostRecord,
60 timestamp: string,
61): Promise<IndexedPost | null> => {
62 const post = {
63 uri: uri.toString(),
64 cid: cid.toString(),
65 authorDid: uri.host,
66 caption: obj.caption
67 ? {
68 text: obj.caption.text || "",
69 facets: obj.caption.facets || [],
70 }
71 : undefined,
72 media: obj.media || null,
73 sound: obj.sound || null,
74 langs: obj.langs || [],
75 labels: obj.labels || null,
76 tags: obj.tags || [],
77 createdAt: obj.createdAt,
78 indexedAt: timestamp,
79 };
80
81 const insertedPost = await db.models.Post.findOneAndUpdate(
82 { uri: post.uri },
83 { $set: post },
84 { upsert: true, new: true },
85 );
86
87 const facets = (obj.caption?.facets || [])
88 .flatMap((facet) => facet.features)
89 .flatMap((feature) => {
90 if (isMention(feature)) {
91 return {
92 type: "mention" as const,
93 value: feature.did,
94 };
95 }
96 if (isLink(feature)) {
97 return {
98 type: "link" as const,
99 value: feature.uri,
100 };
101 }
102 return [];
103 });
104
105 // Media processing - medias are stored inline in the Post model
106 const medias: Array<{
107 position?: number;
108 imageCid?: string;
109 alt?: string | null;
110 thumbCid?: string | null;
111 videoCid?: string;
112 }> = [];
113 const postMedias = separateMedia(obj.media);
114 for (const postMedia of postMedias) {
115 if (isMediaImages(postMedia)) {
116 const { images } = postMedia as MediaImages;
117 const imagesMedia = images.map((
118 img: MediaImage,
119 i: number,
120 ) => ({
121 position: i,
122 imageCid: img.image.ref.toString(),
123 alt: img.alt,
124 }));
125 medias.push(...imagesMedia);
126 } else if (isMediaVideo(postMedia)) {
127 const media = postMedia as MediaVideo;
128 const videoMedia = {
129 postUri: uri.toString(),
130 videoCid: media.video.ref.toString(),
131 alt: media.alt ?? null,
132 };
133 medias.push(videoMedia);
134 }
135 }
136
137 const descendents = await getDescendents(db, {
138 uri: post.uri,
139 depth: REPLY_NOTIF_DEPTH,
140 });
141
142 return {
143 post: insertedPost,
144 facets,
145 medias,
146 descendents,
147 };
148};
149
150const findDuplicate = (): AtUri | null => {
151 return null;
152};
153
154const notifsForInsert = (obj: IndexedPost) => {
155 const notifs: Array<{
156 did: string;
157 reason: string;
158 author: string;
159 recordUri: string;
160 recordCid: string;
161 sortAt: string;
162 reasonSubject?: string;
163 }> = [];
164 const notified = new Set([obj.post.authorDid]);
165 const maybeNotify = (notif: {
166 did: string;
167 reason: string;
168 author: string;
169 recordUri: string;
170 recordCid: string;
171 sortAt: string;
172 reasonSubject?: string;
173 }) => {
174 if (!notified.has(notif.did)) {
175 notified.add(notif.did);
176 notifs.push(notif);
177 }
178 };
179 for (const facet of obj.facets ?? []) {
180 if (facet.type === "mention") {
181 maybeNotify({
182 did: facet.value,
183 reason: "mention",
184 author: obj.post.authorDid,
185 recordUri: obj.post.uri,
186 recordCid: obj.post.cid,
187 sortAt: obj.post.createdAt,
188 });
189 }
190 }
191
192 const threadgateHiddenReplies = obj.threadgate?.hiddenReplies || [];
193
194 // reply notifications
195 for (const ancestor of obj.ancestors ?? []) {
196 if (ancestor.uri === obj.post.uri) continue; // no need to notify for own post
197 if (ancestor.height < REPLY_NOTIF_DEPTH) {
198 const ancestorUri = new AtUri(ancestor.uri);
199 maybeNotify({
200 did: ancestorUri.host,
201 reason: "reply",
202 reasonSubject: ancestorUri.toString(),
203 author: obj.post.authorDid,
204 recordUri: obj.post.uri,
205 recordCid: obj.post.cid,
206 sortAt: obj.post.createdAt,
207 });
208 // found hidden reply, don't notify any higher ancestors
209 if (threadgateHiddenReplies.includes(ancestorUri.toString())) break;
210 }
211 }
212
213 // descendents indicate out-of-order indexing: need to notify
214 // the current post and upwards.
215 for (const descendent of obj.descendents ?? []) {
216 for (const ancestor of obj.ancestors ?? []) {
217 const totalHeight = descendent.depth + ancestor.height;
218 if (totalHeight < REPLY_NOTIF_DEPTH) {
219 const ancestorUri = new AtUri(ancestor.uri);
220 maybeNotify({
221 did: ancestorUri.host,
222 reason: "reply",
223 reasonSubject: ancestorUri.toString(),
224 author: descendent.creator,
225 recordUri: descendent.uri,
226 recordCid: descendent.cid,
227 sortAt: descendent.sortAt,
228 });
229 }
230 }
231 }
232
233 return notifs;
234};
235
236const deleteFn = async (
237 db: Database,
238 uri: AtUri,
239): Promise<IndexedPost | null> => {
240 const uriStr = uri.toString();
241 const deleted = await db.models.Post.findOneAndDelete({ uri: uriStr });
242
243 if (!deleted) {
244 return null;
245 }
246
247 return {
248 post: deleted,
249 facets: [],
250 };
251};
252
253const notifsForDelete = (
254 deleted: IndexedPost,
255 replacedBy: IndexedPost | null,
256) => {
257 const notifs = replacedBy ? notifsForInsert(replacedBy) : [];
258 return {
259 notifs,
260 toDelete: [deleted.post.uri],
261 };
262};
263
264const updateAggregates = async (db: Database, postIdx: IndexedPost) => {
265 // Update posts count for author
266 const postsCount = await db.models.Post.countDocuments({
267 authorDid: postIdx.post.authorDid,
268 });
269
270 // First check if profile exists to avoid creating one with null URI
271 const existingProfile = await db.models.Profile.findOne({
272 authorDid: postIdx.post.authorDid,
273 });
274
275 if (existingProfile) {
276 // Only update existing profiles
277 await db.models.Profile.findOneAndUpdate(
278 { authorDid: postIdx.post.authorDid },
279 { $set: { postsCount } },
280 { new: true },
281 );
282 }
283};
284
285export type PluginType = RecordProcessor<PostRecord, IndexedPost>;
286
287export const makePlugin = (
288 db: Database,
289 background: BackgroundQueue,
290): PluginType => {
291 return new RecordProcessor(db, background, {
292 lexId,
293 insertFn,
294 findDuplicate,
295 deleteFn,
296 notifsForInsert,
297 notifsForDelete,
298 updateAggregates,
299 });
300};
301
302export default makePlugin;
303
304function separateMedia(
305 media: PostRecord["media"],
306): Array<NonNullable<PostRecord["media"]>> {
307 if (!media) {
308 return [];
309 }
310 return [media];
311}