···99import { Moderation } from "./routes/moderation.ts";
1010import { Actors } from "./routes/actors.ts";
1111import { Identity } from "./routes/identity.ts";
1212+import { Notifications } from "./routes/notifs.ts";
1213import { Records } from "./routes/records.ts";
1314import { Relationships } from "./routes/relationships.ts";
1415import { Interactions } from "./routes/interactions.ts";
···4243 public moderation: Moderation;
4344 public actors: Actors;
4445 public identity: Identity;
4646+ public notifications: Notifications;
4547 public records: Records;
4648 public relationships: Relationships;
4749 public interactions: Interactions;
···7173 this.moderation = new Moderation(db);
7274 this.actors = new Actors(db);
7375 this.identity = new Identity(idResolver);
7676+ this.notifications = new Notifications(db);
7477 this.records = new Records(db);
7578 this.relationships = new Relationships(db);
7679 this.interactions = new Interactions(db);
+43-5
data-plane/indexing/processor.ts
···44import { lexicons } from "../../lex/lexicons.ts";
55import { BackgroundQueue } from "../background.ts";
66import { Database } from "../db/index.ts";
77+import { chunkArray } from "@atp/common";
7889// @NOTE re: insertions and deletions. Due to how record updates are handled,
910// (insertFn) should have the same effect as (insertFn -> deleteFn -> insertFn).
···292293 }
293294 }
294295295295- handleNotifs(op: { deleted?: S; inserted?: S }) {
296296- let _notifs: Notif[] = [];
296296+ async handleNotifs(op: { deleted?: S; inserted?: S }) {
297297+ let notifs: Notif[] = [];
298298+ const runOnCommit: ((db: Database) => Promise<void>)[] = [];
297299 if (op.deleted) {
298300 const forDelete = this.params.notifsForDelete(
299301 op.deleted,
300302 op.inserted ?? null,
301303 );
302302- _notifs = forDelete.notifs;
304304+ if (forDelete.toDelete.length > 0) {
305305+ // Notifs can be deleted in background: they are expensive to delete and
306306+ // listNotifications already excludes notifs with missing records.
307307+ runOnCommit.push(async (db) => {
308308+ await db.models.Notification.deleteMany({
309309+ recordUri: { $in: forDelete.toDelete },
310310+ });
311311+ });
312312+ }
313313+ notifs = forDelete.notifs;
303314 } else if (op.inserted) {
304304- _notifs = this.params.notifsForInsert(op.inserted);
315315+ notifs = this.params.notifsForInsert(op.inserted);
316316+ }
317317+ for (const chunk of chunkArray(notifs, 500)) {
318318+ runOnCommit.push(async (db) => {
319319+ const filtered = await this.filterNotifsForThreadMutes(chunk);
320320+ if (filtered.length > 0) {
321321+ await db.models.Notification.insertMany(
322322+ filtered.map((n) => ({
323323+ did: n.did,
324324+ recordUri: n.recordUri,
325325+ recordCid: n.recordCid,
326326+ author: n.author,
327327+ reason: n.reason,
328328+ reasonSubject: n.reasonSubject ?? null,
329329+ sortAt: n.sortAt,
330330+ })),
331331+ );
332332+ }
333333+ });
334334+ }
335335+ // Need to ensure notif deletion always happens before creation, otherwise delete may clobber in a race.
336336+ for (const fn of runOnCommit) {
337337+ await fn(this.appDb); // these could be backgrounded
305338 }
339339+ }
306340307307- // TODO: Implement notification handling
341341+ // Filter notifications for thread mutes (placeholder for future implementation)
342342+ filterNotifsForThreadMutes(notifs: Notif[]): Promise<Notif[]> {
343343+ // TODO: Implement thread mute filtering
344344+ // For now, return all notifications unfiltered
345345+ return Promise.resolve(notifs);
308346 }
309347310348 aggregateOnCommit(indexed: S) {
+239
data-plane/routes/notifs.ts
···11+import { Database } from "../db/index.ts";
22+import { GenericKeyset } from "../db/pagination.ts";
33+44+type SortAtCidResult = { sortAt: string; recordCid: string };
55+type SortAtCidLabeledResult = { primary: string; secondary: string };
66+77+class SortAtCidKeyset extends GenericKeyset<
88+ SortAtCidResult,
99+ SortAtCidLabeledResult
1010+> {
1111+ constructor() {
1212+ super("sortAt", "recordCid");
1313+ }
1414+1515+ labelResult(result: SortAtCidResult): SortAtCidLabeledResult {
1616+ const sortAt = result.sortAt || new Date().toISOString();
1717+ return { primary: sortAt, secondary: result.recordCid };
1818+ }
1919+2020+ labeledResultToCursor(labeled: SortAtCidLabeledResult) {
2121+ const timestamp = new Date(labeled.primary).getTime();
2222+ if (isNaN(timestamp)) {
2323+ throw new Error("Invalid date for cursor");
2424+ }
2525+ const secondsBase36 = Math.floor(timestamp / 1000).toString(36);
2626+ return {
2727+ primary: secondsBase36,
2828+ secondary: labeled.secondary,
2929+ };
3030+ }
3131+3232+ cursorToLabeledResult(cursor: { primary: string; secondary: string }) {
3333+ const seconds = parseInt(cursor.primary, 36);
3434+ if (isNaN(seconds)) {
3535+ throw new Error("Malformed cursor: invalid timestamp");
3636+ }
3737+ const primaryDate = new Date(seconds * 1000);
3838+ if (isNaN(primaryDate.getTime())) {
3939+ throw new Error("Malformed cursor: invalid date");
4040+ }
4141+ return {
4242+ primary: primaryDate.toISOString(),
4343+ secondary: cursor.secondary,
4444+ };
4545+ }
4646+}
4747+4848+export interface Notification {
4949+ recipientDid: string;
5050+ uri: string;
5151+ cid: string;
5252+ reason: string;
5353+ reasonSubject?: string;
5454+ sortAt: string;
5555+ authorDid: string;
5656+ priority?: boolean;
5757+}
5858+5959+export class Notifications {
6060+ private db: Database;
6161+ private sortAtCidKeyset: SortAtCidKeyset;
6262+6363+ constructor(db: Database) {
6464+ this.db = db;
6565+ this.sortAtCidKeyset = new SortAtCidKeyset();
6666+ }
6767+6868+ async getNotifications(
6969+ actorDid: string,
7070+ limit = 50,
7171+ cursor?: string,
7272+ priority?: boolean,
7373+ ): Promise<{ notifications: Notification[]; cursor?: string }> {
7474+ // Get follows for priority filtering
7575+ let priorityDids: string[] | undefined;
7676+ if (priority) {
7777+ const follows = await this.db.models.Follow.find({
7878+ authorDid: actorDid,
7979+ }).select("subject");
8080+ priorityDids = follows.map((f) => f.subject);
8181+ if (priorityDids.length === 0) {
8282+ return { notifications: [], cursor: undefined };
8383+ }
8484+ }
8585+8686+ // Build base query
8787+ const baseFilter: Record<string, unknown> = { did: actorDid };
8888+8989+ // If priority, filter to only notifications from followed users
9090+ if (priorityDids) {
9191+ baseFilter.author = { $in: priorityDids };
9292+ }
9393+9494+ // Get notifications
9595+ const notifsQuery = this.db.models.Notification.find(baseFilter);
9696+9797+ // Apply pagination
9898+ const paginatedQuery = this.sortAtCidKeyset.paginate(notifsQuery, {
9999+ limit,
100100+ cursor,
101101+ direction: "desc",
102102+ });
103103+104104+ const notifs = await paginatedQuery.exec();
105105+106106+ // Filter out notifications with missing reasonSubject records
107107+ const filteredNotifs = await this.filterValidReasonSubjects(notifs);
108108+109109+ // Get priority status for each notification
110110+ const followedDids = priorityDids ?? await this.getFollowedDids(actorDid);
111111+ const followedSet = new Set(followedDids);
112112+113113+ // Generate cursor from the last item if we have results
114114+ let nextCursor: string | undefined;
115115+ if (notifs.length === limit && notifs.length > 0) {
116116+ const lastNotif = notifs[notifs.length - 1];
117117+ nextCursor = this.sortAtCidKeyset.pack({
118118+ primary: lastNotif.sortAt,
119119+ secondary: lastNotif.recordCid,
120120+ });
121121+ }
122122+123123+ const notifications = filteredNotifs.map((notif) => ({
124124+ recipientDid: actorDid,
125125+ uri: notif.recordUri,
126126+ cid: notif.recordCid,
127127+ reason: notif.reason,
128128+ reasonSubject: notif.reasonSubject ?? undefined,
129129+ sortAt: notif.sortAt,
130130+ authorDid: notif.author,
131131+ priority: followedSet.has(notif.author),
132132+ }));
133133+134134+ return {
135135+ notifications,
136136+ cursor: nextCursor,
137137+ };
138138+ }
139139+140140+ async getNotificationSeen(
141141+ actorDid: string,
142142+ _priority?: boolean,
143143+ ): Promise<{ timestamp?: string }> {
144144+ const actor = await this.db.models.Actor.findOne({ did: actorDid });
145145+ if (!actor) {
146146+ return {};
147147+ }
148148+149149+ // For now, we don't have lastSeenNotifs on Actor model
150150+ // This would need to be added to track notification seen status
151151+ // Returning empty for now
152152+ return {};
153153+ }
154154+155155+ async getUnreadNotificationCount(
156156+ actorDid: string,
157157+ lastSeen?: string,
158158+ priority?: boolean,
159159+ ): Promise<{ count: number }> {
160160+ const baseFilter: Record<string, unknown> = { did: actorDid };
161161+162162+ // Filter by lastSeen if provided
163163+ if (lastSeen) {
164164+ baseFilter.sortAt = { $gt: lastSeen };
165165+ }
166166+167167+ // If priority, filter to only notifications from followed users
168168+ if (priority) {
169169+ const follows = await this.db.models.Follow.find({
170170+ authorDid: actorDid,
171171+ }).select("subject");
172172+ const priorityDids = follows.map((f) => f.subject);
173173+ if (priorityDids.length === 0) {
174174+ return { count: 0 };
175175+ }
176176+ baseFilter.author = { $in: priorityDids };
177177+ }
178178+179179+ const count = await this.db.models.Notification.countDocuments(baseFilter);
180180+181181+ return { count };
182182+ }
183183+184184+ async updateNotificationSeen(
185185+ _actorDid: string,
186186+ _timestamp: string,
187187+ _priority?: boolean,
188188+ ): Promise<void> {
189189+ // This would require adding notification seen tracking to the Actor model
190190+ // or creating a separate ActorState model
191191+ // For now, this is a no-op
192192+ }
193193+194194+ // Helper methods
195195+196196+ private async getFollowedDids(actorDid: string): Promise<string[]> {
197197+ const follows = await this.db.models.Follow.find({
198198+ authorDid: actorDid,
199199+ }).select("subject");
200200+ return follows.map((f) => f.subject);
201201+ }
202202+203203+ private async filterValidReasonSubjects(
204204+ notifs: Array<{
205205+ recordUri: string;
206206+ recordCid: string;
207207+ author: string;
208208+ reason: string;
209209+ reasonSubject: string | null;
210210+ sortAt: string;
211211+ }>,
212212+ ): Promise<
213213+ Array<{
214214+ recordUri: string;
215215+ recordCid: string;
216216+ author: string;
217217+ reason: string;
218218+ reasonSubject: string | null;
219219+ sortAt: string;
220220+ }>
221221+ > {
222222+ // Filter out notifications where reasonSubject exists but the record doesn't
223223+ const notifsWithSubject = notifs.filter((n) => n.reasonSubject);
224224+ if (notifsWithSubject.length === 0) {
225225+ return notifs;
226226+ }
227227+228228+ const subjectUris = notifsWithSubject.map((n) => n.reasonSubject as string);
229229+ const existingRecords = await this.db.models.Record.find({
230230+ uri: { $in: subjectUris },
231231+ }).select("uri");
232232+233233+ const existingUris = new Set(existingRecords.map((r) => r.uri));
234234+235235+ return notifs.filter(
236236+ (n) => !n.reasonSubject || existingUris.has(n.reasonSubject),
237237+ );
238238+ }
239239+}
···4848 Media,
4949 MediaView,
5050 NotFoundPost,
5151+ NotificationView,
5152 VideoMedia,
5253 VideoMediaView,
5354} from "./types.ts";
···6667import { cidFromBlobJson } from "./util.ts";
6768import { uriToDid } from "../utils/uris.ts";
6869import { mapDefined } from "@atp/common";
6969-import { FeedItem } from "../hydration/feed.ts";
7070+import { FeedItem, Like, Post, Reply, Repost } from "../hydration/feed.ts";
7071import {
7172 QueryParams as GetThreadQueryParams,
7273 ThreadItem,
···8283 LabelerViewDetailed,
8384} from "../lex/types/so/sprk/labeler/defs.ts";
8485import { isSelfLabels } from "../lex/types/com/atproto/label/defs.ts";
8686+import { Follow } from "../hydration/graph.ts";
8787+import { RecordInfo } from "../hydration/util.ts";
8888+import { Notification } from "../data-plane/routes/notifs.ts";
85898690export class Views {
8791 public indexedAtEpoch?: Date | undefined;
···10301034 if (actor?.upstreamStatus === "takendown") return true;
10311035 if (actor?.upstreamStatus === "suspended") return true;
10321036 return false;
10371037+ }
10381038+10391039+ viewerSeesNeedsReview(
10401040+ { did, uri }: { did?: string; uri?: string },
10411041+ state: HydrationState,
10421042+ ): boolean {
10431043+ const { labels, profileViewers, ctx } = state;
10441044+ did = did || (uri && uriToDid(uri));
10451045+ if (!did) {
10461046+ return true;
10471047+ }
10481048+ if (
10491049+ labels?.get(did)?.needsReview ||
10501050+ (uri && labels?.get(uri)?.needsReview)
10511051+ ) {
10521052+ // content marked as needs review
10531053+ return ctx?.viewer === did || !!profileViewers?.get(did)?.following;
10541054+ }
10551055+ return true;
10561056+ }
10571057+10581058+ notification(
10591059+ notif: Notification,
10601060+ lastSeenAt: string | undefined,
10611061+ state: HydrationState,
10621062+ ): Un$Typed<NotificationView> | undefined {
10631063+ if (!notif.sortAt || !notif.reason) return;
10641064+ const uri = new AtUri(notif.uri);
10651065+ const authorDid = notif.authorDid;
10661066+ const author = this.profile(authorDid, state);
10671067+ if (!author) return;
10681068+10691069+ let recordInfo:
10701070+ | Post
10711071+ | Reply
10721072+ | Like
10731073+ | Repost
10741074+ | Follow
10751075+ | RecordInfo<ProfileRecord>
10761076+ | undefined
10771077+ | null;
10781078+10791079+ if (uri.collection === ids.SoSprkFeedPost) {
10801080+ recordInfo = state.posts?.get(notif.uri);
10811081+ } else if (uri.collection === ids.SoSprkFeedReply) {
10821082+ recordInfo = state.replies?.get(notif.uri);
10831083+ } else if (uri.collection === ids.SoSprkFeedLike) {
10841084+ recordInfo = state.likes?.get(notif.uri);
10851085+ } else if (uri.collection === ids.SoSprkFeedRepost) {
10861086+ recordInfo = state.reposts?.get(notif.uri);
10871087+ } else if (uri.collection === ids.SoSprkGraphFollow) {
10881088+ recordInfo = state.follows?.get(notif.uri);
10891089+ } else if (uri.collection === ids.SoSprkActorProfile) {
10901090+ const actor = state.actors?.get(authorDid);
10911091+ recordInfo = actor && actor.profile && actor.profileCid
10921092+ ? {
10931093+ record: actor.profile,
10941094+ cid: actor.profileCid,
10951095+ sortedAt: actor.sortedAt ?? new Date(0), // @NOTE will be present since profile record is present
10961096+ indexedAt: actor.indexedAt ?? new Date(0), // @NOTE will be present since profile record is present
10971097+ takedownRef: actor.profileTakedownRef,
10981098+ }
10991099+ : undefined;
11001100+ }
11011101+ if (!recordInfo) return;
11021102+11031103+ const labels = state.labels?.getBySubject(notif.uri) ?? [];
11041104+ // selfLabels only applies to posts and profiles, safe to pass the record
11051105+ const selfLabels = isPostRecord(recordInfo.record) ||
11061106+ isProfileRecord(recordInfo.record)
11071107+ ? this.selfLabels({
11081108+ uri: notif.uri,
11091109+ cid: recordInfo.cid,
11101110+ record: recordInfo.record,
11111111+ })
11121112+ : [];
11131113+ const indexedAt = notif.sortAt;
11141114+ return {
11151115+ uri: notif.uri,
11161116+ cid: recordInfo.cid,
11171117+ author,
11181118+ reason: notif.reason as NotificationView["reason"],
11191119+ reasonSubject: notif.reasonSubject || undefined,
11201120+ record: recordInfo.record,
11211121+ // @NOTE works with a hack in listNotifications so that when there's no last-seen time,
11221122+ // the user's first notification is marked unread, and all previous read. in this case,
11231123+ // the last seen time will be equal to the first notification's indexed time.
11241124+ isRead: lastSeenAt ? lastSeenAt > indexedAt : true,
11251125+ indexedAt: notif.sortAt,
11261126+ labels: [...labels, ...selfLabels],
11271127+ };
10331128 }
10341129}
+4
views/types.ts
···1515} from "../lex/types/so/sprk/feed/defs.ts";
1616import { LabelerView } from "../lex/types/so/sprk/labeler/defs.ts";
17171818+export type {
1919+ Notification as NotificationView,
2020+} from "../lex/types/so/sprk/notification/listNotifications.ts";
2121+1822export {
1923 isMain as isImagesMedia,
2024 type Main as ImagesMedia,