WIP! A BB-style forum, on the ATmosphere!
We're still working... we'll be back soon when we have something to show off!
node
typescript
hono
htmx
atproto
1import type {
2 CommitCreateEvent,
3 CommitDeleteEvent,
4 CommitUpdateEvent,
5} from "@skyware/jetstream";
6import type { Database, DbOrTransaction } from "@atbb/db";
7import type { Logger } from "@atbb/logger";
8import {
9 posts,
10 forums,
11 categories,
12 boards,
13 users,
14 memberships,
15 modActions,
16 roles,
17} from "@atbb/db";
18import { eq, and } from "drizzle-orm";
19import { parseAtUri } from "./at-uri.js";
20import { BanEnforcer } from "./ban-enforcer.js";
21import {
22 SpaceAtbbPost as Post,
23 SpaceAtbbForumForum as Forum,
24 SpaceAtbbForumCategory as Category,
25 SpaceAtbbForumBoard as Board,
26 SpaceAtbbMembership as Membership,
27 SpaceAtbbModAction as ModAction,
28 SpaceAtbbForumRole as Role,
29} from "@atbb/lexicon";
30
31// ── Collection Config Types ─────────────────────────────
32
33/**
34 * Configuration for a data-driven collection handler.
35 * Encodes the per-collection logic that differs across the 5 indexed types,
36 * while the generic handler methods supply the shared try/catch/log/throw scaffolding.
37 */
38interface CollectionConfig<TRecord> {
39 /** Human-readable name for logging (e.g. "Post", "Forum") */
40 name: string;
41 /** Drizzle table reference */
42 table: any;
43 /** "hard" = DELETE FROM (all non-post collections) */
44 deleteStrategy: "hard";
45 /** Call ensureUser(event.did) before insert? (user-owned records) */
46 ensureUserOnCreate?: boolean;
47 /**
48 * Transform event+record into DB insert values.
49 * Return null to skip the insert (e.g. when a required foreign key is missing).
50 */
51 toInsertValues: (
52 event: any,
53 record: TRecord,
54 tx: DbOrTransaction
55 ) => Promise<Record<string, any> | null>;
56 /**
57 * Transform event+record into DB update set values.
58 * Runs inside a transaction. Return null to skip the update.
59 */
60 toUpdateValues: (
61 event: any,
62 record: TRecord,
63 tx: DbOrTransaction
64 ) => Promise<Record<string, any> | null>;
65}
66
67
68/**
69 * Indexer class for processing AT Proto firehose events
70 * Converts events into database records for the atBB AppView
71 */
72export class Indexer {
73 private banEnforcer: BanEnforcer;
74
75 constructor(private db: Database, private logger: Logger) {
76 this.banEnforcer = new BanEnforcer(db, logger);
77 }
78
79 // ── Collection Configs ──────────────────────────────────
80
81 private postConfig: CollectionConfig<Post.Record> = {
82 name: "Post",
83 table: posts,
84 deleteStrategy: "hard",
85 ensureUserOnCreate: true,
86 toInsertValues: async (event, record, tx) => {
87 // Look up parent/root for replies
88 let rootId: bigint | null = null;
89 let parentId: bigint | null = null;
90
91 if (Post.isReplyRef(record.reply)) {
92 rootId = await this.getPostIdByUri(record.reply.root.uri, tx);
93 parentId = await this.getPostIdByUri(record.reply.parent.uri, tx);
94 } else if (record.reply) {
95 // reply ref present but $type omitted — rootPostId/parentPostId will be null,
96 // making this reply unreachable in thread navigation (data corruption).
97 this.logger.error("Post reply ref missing $type — rootPostId/parentPostId not resolved", {
98 operation: "Post CREATE",
99 postDid: event.did,
100 postRkey: event.commit.rkey,
101 errorId: "POST_REPLY_REF_MISSING_TYPE",
102 });
103 }
104
105 // Look up board ID if board reference exists
106 let boardId: bigint | null = null;
107 if (record.board?.board.uri) {
108 boardId = await this.getBoardIdByUri(record.board.board.uri, tx);
109 if (!boardId) {
110 this.logger.error("Failed to index post: board not found", {
111 operation: "Post CREATE",
112 postDid: event.did,
113 postRkey: event.commit.rkey,
114 boardUri: record.board.board.uri,
115 errorId: "POST_BOARD_MISSING",
116 });
117 throw new Error(`Board not found: ${record.board.board.uri}`);
118 }
119 }
120
121 return {
122 did: event.did,
123 rkey: event.commit.rkey,
124 cid: event.commit.cid,
125 title: record.reply ? null : (record.title ?? null),
126 text: record.text,
127 forumUri: record.forum?.forum.uri ?? null,
128 boardUri: record.board?.board.uri ?? null,
129 boardId,
130 rootPostId: rootId,
131 rootUri: record.reply?.root.uri ?? null,
132 parentPostId: parentId,
133 parentUri: record.reply?.parent.uri ?? null,
134 createdAt: new Date(record.createdAt),
135 indexedAt: new Date(),
136 };
137 },
138 toUpdateValues: async (event, record, tx) => {
139 // Look up board ID if board reference exists
140 let boardId: bigint | null = null;
141 if (record.board?.board.uri) {
142 boardId = await this.getBoardIdByUri(record.board.board.uri, tx);
143 if (!boardId) {
144 this.logger.error("Failed to index post: board not found", {
145 operation: "Post UPDATE",
146 postDid: event.did,
147 postRkey: event.commit.rkey,
148 boardUri: record.board.board.uri,
149 errorId: "POST_BOARD_MISSING",
150 });
151 throw new Error(`Board not found: ${record.board.board.uri}`);
152 }
153 }
154
155 return {
156 cid: event.commit.cid,
157 title: record.reply ? null : (record.title ?? null),
158 text: record.text,
159 forumUri: record.forum?.forum.uri ?? null,
160 boardUri: record.board?.board.uri ?? null,
161 boardId,
162 indexedAt: new Date(),
163 };
164 },
165 };
166
167 private forumConfig: CollectionConfig<Forum.Record> = {
168 name: "Forum",
169 table: forums,
170 deleteStrategy: "hard",
171 ensureUserOnCreate: true,
172 toInsertValues: async (event, record) => ({
173 did: event.did,
174 rkey: event.commit.rkey,
175 cid: event.commit.cid,
176 name: record.name,
177 description: record.description ?? null,
178 indexedAt: new Date(),
179 }),
180 toUpdateValues: async (event, record) => ({
181 cid: event.commit.cid,
182 name: record.name,
183 description: record.description ?? null,
184 indexedAt: new Date(),
185 }),
186 };
187
188 private categoryConfig: CollectionConfig<Category.Record> = {
189 name: "Category",
190 table: categories,
191 deleteStrategy: "hard",
192 toInsertValues: async (event, record, tx) => {
193 // Categories are owned by the Forum DID, so event.did IS the forum DID
194 const forumId = await this.getForumIdByDid(event.did, tx);
195
196 if (!forumId) {
197 this.logger.warn("Category: Forum not found for DID", {
198 operation: "Category CREATE",
199 did: event.did,
200 });
201 return null;
202 }
203
204 return {
205 did: event.did,
206 rkey: event.commit.rkey,
207 cid: event.commit.cid,
208 forumId,
209 name: record.name,
210 description: record.description ?? null,
211 slug: record.slug ?? null,
212 sortOrder: record.sortOrder ?? 0,
213 createdAt: new Date(record.createdAt),
214 indexedAt: new Date(),
215 };
216 },
217 toUpdateValues: async (event, record, tx) => {
218 // Categories are owned by the Forum DID, so event.did IS the forum DID
219 const forumId = await this.getForumIdByDid(event.did, tx);
220
221 if (!forumId) {
222 this.logger.warn("Category: Forum not found for DID", {
223 operation: "Category UPDATE",
224 did: event.did,
225 });
226 return null;
227 }
228
229 return {
230 cid: event.commit.cid,
231 forumId,
232 name: record.name,
233 description: record.description ?? null,
234 slug: record.slug ?? null,
235 sortOrder: record.sortOrder ?? 0,
236 indexedAt: new Date(),
237 };
238 },
239 };
240
241 private boardConfig: CollectionConfig<Board.Record> = {
242 name: "Board",
243 table: boards,
244 deleteStrategy: "hard",
245 toInsertValues: async (event, record, tx) => {
246 // Boards are owned by Forum DID
247 const categoryId = await this.getCategoryIdByUri(
248 record.category.category.uri,
249 tx
250 );
251
252 if (!categoryId) {
253 this.logger.error("Failed to index board: category not found", {
254 operation: "Board CREATE",
255 boardDid: event.did,
256 boardRkey: event.commit.rkey,
257 categoryUri: record.category.category.uri,
258 errorId: "BOARD_CATEGORY_MISSING",
259 });
260 throw new Error(`Category not found: ${record.category.category.uri}`);
261 }
262
263 return {
264 did: event.did,
265 rkey: event.commit.rkey,
266 cid: event.commit.cid,
267 name: record.name,
268 description: record.description ?? null,
269 slug: record.slug ?? null,
270 sortOrder: record.sortOrder ?? null,
271 categoryId,
272 categoryUri: record.category.category.uri,
273 createdAt: new Date(record.createdAt),
274 indexedAt: new Date(),
275 };
276 },
277 toUpdateValues: async (event, record, tx) => {
278 const categoryId = await this.getCategoryIdByUri(
279 record.category.category.uri,
280 tx
281 );
282
283 if (!categoryId) {
284 this.logger.error("Failed to index board: category not found", {
285 operation: "Board UPDATE",
286 boardDid: event.did,
287 boardRkey: event.commit.rkey,
288 categoryUri: record.category.category.uri,
289 errorId: "BOARD_CATEGORY_MISSING",
290 });
291 throw new Error(`Category not found: ${record.category.category.uri}`);
292 }
293
294 return {
295 cid: event.commit.cid,
296 name: record.name,
297 description: record.description ?? null,
298 slug: record.slug ?? null,
299 sortOrder: record.sortOrder ?? null,
300 categoryId,
301 categoryUri: record.category.category.uri,
302 indexedAt: new Date(),
303 };
304 },
305 };
306
307 private roleConfig: CollectionConfig<Role.Record> = {
308 name: "Role",
309 table: roles,
310 deleteStrategy: "hard",
311 toInsertValues: async (event, record) => ({
312 did: event.did,
313 rkey: event.commit.rkey,
314 cid: event.commit.cid,
315 name: record.name,
316 description: record.description ?? null,
317 permissions: record.permissions,
318 priority: record.priority,
319 createdAt: new Date(record.createdAt),
320 indexedAt: new Date(),
321 }),
322 toUpdateValues: async (event, record) => ({
323 cid: event.commit.cid,
324 name: record.name,
325 description: record.description ?? null,
326 permissions: record.permissions,
327 priority: record.priority,
328 indexedAt: new Date(),
329 }),
330 };
331
332 private membershipConfig: CollectionConfig<Membership.Record> = {
333 name: "Membership",
334 table: memberships,
335 deleteStrategy: "hard",
336 ensureUserOnCreate: true,
337 toInsertValues: async (event, record, tx) => {
338 // Look up forum by URI (inside transaction)
339 const forumId = await this.getForumIdByUri(record.forum.forum.uri, tx);
340
341 if (!forumId) {
342 this.logger.warn("Membership: Forum not found", {
343 operation: "Membership CREATE",
344 forumUri: record.forum.forum.uri,
345 });
346 return null;
347 }
348
349 return {
350 did: event.did,
351 rkey: event.commit.rkey,
352 cid: event.commit.cid,
353 forumId,
354 forumUri: record.forum.forum.uri,
355 role: null, // TODO: Extract role name from roleUri or lexicon
356 roleUri: record.role?.role.uri ?? null,
357 joinedAt: record.joinedAt ? new Date(record.joinedAt) : null,
358 createdAt: new Date(record.createdAt),
359 indexedAt: new Date(),
360 };
361 },
362 toUpdateValues: async (event, record, tx) => {
363 // Look up forum by URI (may have changed)
364 const forumId = await this.getForumIdByUri(record.forum.forum.uri, tx);
365
366 if (!forumId) {
367 this.logger.warn("Membership: Forum not found", {
368 operation: "Membership UPDATE",
369 forumUri: record.forum.forum.uri,
370 });
371 return null;
372 }
373
374 return {
375 cid: event.commit.cid,
376 forumId,
377 forumUri: record.forum.forum.uri,
378 role: null, // TODO: Extract role name from roleUri or lexicon
379 roleUri: record.role?.role.uri ?? null,
380 joinedAt: record.joinedAt ? new Date(record.joinedAt) : null,
381 indexedAt: new Date(),
382 };
383 },
384 };
385
386 private modActionConfig: CollectionConfig<ModAction.Record> = {
387 name: "ModAction",
388 table: modActions,
389 deleteStrategy: "hard",
390 toInsertValues: async (event, record, tx) => {
391 // ModActions are owned by the Forum DID, so event.did IS the forum DID
392 const forumId = await this.getForumIdByDid(event.did, tx);
393
394 if (!forumId) {
395 this.logger.warn("ModAction: Forum not found for DID", {
396 operation: "ModAction CREATE",
397 did: event.did,
398 });
399 return null;
400 }
401
402 // Ensure moderator exists
403 await this.ensureUser(record.createdBy, tx);
404
405 // Determine subject type (post or user)
406 let subjectPostUri: string | null = null;
407 let subjectDid: string | null = null;
408
409 if (record.subject.post) {
410 subjectPostUri = record.subject.post.uri;
411 }
412 if (record.subject.did) {
413 subjectDid = record.subject.did;
414 }
415
416 return {
417 did: event.did,
418 rkey: event.commit.rkey,
419 cid: event.commit.cid,
420 forumId,
421 action: record.action,
422 subjectPostUri,
423 subjectDid,
424 reason: record.reason ?? null,
425 createdBy: record.createdBy,
426 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null,
427 createdAt: new Date(record.createdAt),
428 indexedAt: new Date(),
429 };
430 },
431 toUpdateValues: async (event, record, tx) => {
432 // ModActions are owned by the Forum DID, so event.did IS the forum DID
433 const forumId = await this.getForumIdByDid(event.did, tx);
434
435 if (!forumId) {
436 this.logger.warn("ModAction: Forum not found for DID", {
437 operation: "ModAction UPDATE",
438 did: event.did,
439 });
440 return null;
441 }
442
443 // Determine subject type (post or user)
444 let subjectPostUri: string | null = null;
445 let subjectDid: string | null = null;
446
447 if (record.subject.post) {
448 subjectPostUri = record.subject.post.uri;
449 }
450 if (record.subject.did) {
451 subjectDid = record.subject.did;
452 }
453
454 return {
455 cid: event.commit.cid,
456 forumId,
457 action: record.action,
458 subjectPostUri,
459 subjectDid,
460 reason: record.reason ?? null,
461 createdBy: record.createdBy,
462 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null,
463 indexedAt: new Date(),
464 };
465 },
466 };
467
468 // ── Generic Handler Methods ─────────────────────────────
469
470 /**
471 * Generic create handler. Wraps the insert in a transaction,
472 * optionally ensures the user exists, and delegates to the
473 * config's toInsertValues callback for collection-specific logic.
474 */
475 private async genericCreate<TRecord>(
476 config: CollectionConfig<TRecord>,
477 event: any
478 ): Promise<boolean> {
479 try {
480 const record = event.commit.record as unknown as TRecord;
481 let skipped = false;
482
483 await this.db.transaction(async (tx) => {
484 if (config.ensureUserOnCreate) {
485 await this.ensureUser(event.did, tx);
486 }
487
488 const values = await config.toInsertValues(event, record, tx);
489 if (!values) {
490 skipped = true;
491 return; // Skip insert (e.g. foreign key not found)
492 }
493
494 await tx.insert(config.table).values(values);
495 });
496
497 // Only log success if insert actually happened
498 if (!skipped) {
499 this.logger.info(`${config.name} created`, {
500 did: event.did,
501 rkey: event.commit.rkey,
502 });
503 }
504 return !skipped;
505 } catch (error) {
506 this.logger.error(`Failed to index ${config.name.toLowerCase()} create`, {
507 did: event.did,
508 rkey: event.commit.rkey,
509 error: error instanceof Error ? error.message : String(error),
510 });
511 throw error;
512 }
513 }
514
515 /**
516 * Generic update handler. Wraps the update in a transaction
517 * and delegates to the config's toUpdateValues callback for
518 * collection-specific logic.
519 */
520 private async genericUpdate<TRecord>(
521 config: CollectionConfig<TRecord>,
522 event: any
523 ) {
524 try {
525 const record = event.commit.record as unknown as TRecord;
526 let skipped = false;
527
528 await this.db.transaction(async (tx) => {
529 const values = await config.toUpdateValues(event, record, tx);
530 if (!values) {
531 skipped = true;
532 return; // Skip update (e.g. foreign key not found)
533 }
534
535 await tx
536 .update(config.table)
537 .set(values)
538 .where(
539 and(
540 eq(config.table.did, event.did),
541 eq(config.table.rkey, event.commit.rkey)
542 )
543 );
544 });
545
546 // Only log success if update actually happened
547 if (!skipped) {
548 this.logger.info(`${config.name} updated`, {
549 did: event.did,
550 rkey: event.commit.rkey,
551 });
552 }
553 } catch (error) {
554 this.logger.error(`Failed to update ${config.name.toLowerCase()}`, {
555 did: event.did,
556 rkey: event.commit.rkey,
557 error: error instanceof Error ? error.message : String(error),
558 });
559 throw error;
560 }
561 }
562
563 /**
564 * Generic delete handler. Hard-deletes a record (DELETE FROM).
565 * Posts use handlePostDelete instead (always tombstone).
566 */
567 private async genericDelete(config: CollectionConfig<any>, event: any) {
568 try {
569 await this.db
570 .delete(config.table)
571 .where(
572 and(
573 eq(config.table.did, event.did),
574 eq(config.table.rkey, event.commit.rkey)
575 )
576 );
577
578 this.logger.info(`${config.name} deleted`, {
579 did: event.did,
580 rkey: event.commit.rkey,
581 });
582 } catch (error) {
583 this.logger.error(`Failed to delete ${config.name.toLowerCase()}`, {
584 did: event.did,
585 rkey: event.commit.rkey,
586 error: error instanceof Error ? error.message : String(error),
587 });
588 throw error;
589 }
590 }
591
592 // ── Post Handlers ───────────────────────────────────────
593
594 async handlePostCreate(event: CommitCreateEvent<"space.atbb.post">) {
595 const banned = await this.banEnforcer.isBanned(event.did);
596 if (banned) {
597 this.logger.info("Skipping post from banned user", {
598 did: event.did,
599 rkey: event.commit.rkey,
600 });
601 return;
602 }
603 await this.genericCreate(this.postConfig, event);
604 }
605
606 async handlePostUpdate(event: CommitUpdateEvent<"space.atbb.post">) {
607 await this.genericUpdate(this.postConfig, event);
608 }
609
610 /**
611 * Handles a user-initiated post delete from the PDS.
612 * Always tombstones: replaces personal content with a placeholder and marks
613 * deletedByUser=true. The row is kept so threads referencing this post as
614 * their root or parent remain intact. Personal content is gone; structure is preserved.
615 */
616 async handlePostDelete(event: CommitDeleteEvent<"space.atbb.post">) {
617 const { did, commit: { rkey } } = event;
618 try {
619 await this.db
620 .update(posts)
621 .set({ text: "[user deleted this post]", deletedByUser: true })
622 .where(and(eq(posts.did, did), eq(posts.rkey, rkey)));
623 this.logger.info("Post tombstoned: content replaced, structure preserved", { did, rkey });
624 } catch (error) {
625 this.logger.error("Failed to tombstone post", {
626 did,
627 rkey,
628 error: error instanceof Error ? error.message : String(error),
629 });
630 throw error;
631 }
632 }
633
634 // ── Forum Handlers ──────────────────────────────────────
635
636 async handleForumCreate(event: CommitCreateEvent<"space.atbb.forum.forum">) {
637 await this.genericCreate(this.forumConfig, event);
638 }
639
640 async handleForumUpdate(event: CommitUpdateEvent<"space.atbb.forum.forum">) {
641 await this.genericUpdate(this.forumConfig, event);
642 }
643
644 async handleForumDelete(event: CommitDeleteEvent<"space.atbb.forum.forum">) {
645 await this.genericDelete(this.forumConfig, event);
646 }
647
648 // ── Category Handlers ───────────────────────────────────
649
650 async handleCategoryCreate(
651 event: CommitCreateEvent<"space.atbb.forum.category">
652 ) {
653 await this.genericCreate(this.categoryConfig, event);
654 }
655
656 async handleCategoryUpdate(
657 event: CommitUpdateEvent<"space.atbb.forum.category">
658 ) {
659 await this.genericUpdate(this.categoryConfig, event);
660 }
661
662 async handleCategoryDelete(
663 event: CommitDeleteEvent<"space.atbb.forum.category">
664 ) {
665 await this.genericDelete(this.categoryConfig, event);
666 }
667
668 // ── Board Handlers ──────────────────────────────────────
669
670 async handleBoardCreate(event: CommitCreateEvent<"space.atbb.forum.board">) {
671 await this.genericCreate(this.boardConfig, event);
672 }
673
674 async handleBoardUpdate(event: CommitUpdateEvent<"space.atbb.forum.board">) {
675 await this.genericUpdate(this.boardConfig, event);
676 }
677
678 async handleBoardDelete(event: CommitDeleteEvent<"space.atbb.forum.board">) {
679 await this.genericDelete(this.boardConfig, event);
680 }
681
682 // ── Role Handlers ───────────────────────────────────────
683
684 async handleRoleCreate(event: CommitCreateEvent<"space.atbb.forum.role">) {
685 await this.genericCreate(this.roleConfig, event);
686 }
687
688 async handleRoleUpdate(event: CommitUpdateEvent<"space.atbb.forum.role">) {
689 await this.genericUpdate(this.roleConfig, event);
690 }
691
692 async handleRoleDelete(event: CommitDeleteEvent<"space.atbb.forum.role">) {
693 await this.genericDelete(this.roleConfig, event);
694 }
695
696 // ── Membership Handlers ─────────────────────────────────
697
698 async handleMembershipCreate(
699 event: CommitCreateEvent<"space.atbb.membership">
700 ) {
701 await this.genericCreate(this.membershipConfig, event);
702 }
703
704 async handleMembershipUpdate(
705 event: CommitUpdateEvent<"space.atbb.membership">
706 ) {
707 await this.genericUpdate(this.membershipConfig, event);
708 }
709
710 async handleMembershipDelete(
711 event: CommitDeleteEvent<"space.atbb.membership">
712 ) {
713 await this.genericDelete(this.membershipConfig, event);
714 }
715
716 // ── ModAction Handlers ──────────────────────────────────
717
718 async handleModActionCreate(
719 event: CommitCreateEvent<"space.atbb.modAction">
720 ) {
721 const record = event.commit.record as unknown as ModAction.Record;
722 const isBan =
723 record.action === "space.atbb.modAction.ban" && record.subject.did;
724 const isUnban =
725 record.action === "space.atbb.modAction.unban" && record.subject.did;
726
727 try {
728 if (isBan) {
729 // Custom atomic path: insert ban record + applyBan in one transaction
730 let skipped = false;
731 await this.db.transaction(async (tx) => {
732 const forumId = await this.getForumIdByDid(event.did, tx);
733 if (!forumId) {
734 this.logger.warn("ModAction (ban): Forum not found for DID", {
735 operation: "ModAction CREATE",
736 did: event.did,
737 });
738 skipped = true;
739 return;
740 }
741 await this.ensureUser(record.createdBy, tx);
742 await tx.insert(modActions).values({
743 did: event.did,
744 rkey: event.commit.rkey,
745 cid: event.commit.cid,
746 forumId,
747 action: record.action,
748 subjectPostUri: null,
749 subjectDid: record.subject.did ?? null,
750 reason: record.reason ?? null,
751 createdBy: record.createdBy,
752 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null,
753 createdAt: new Date(record.createdAt),
754 indexedAt: new Date(),
755 });
756 await this.banEnforcer.applyBan(record.subject.did!, tx);
757 });
758 if (!skipped) {
759 this.logger.info("ModAction (ban) created", {
760 did: event.did,
761 rkey: event.commit.rkey,
762 });
763 }
764 } else if (isUnban) {
765 // Custom atomic path: insert unban record + liftBan in one transaction
766 let skipped = false;
767 await this.db.transaction(async (tx) => {
768 const forumId = await this.getForumIdByDid(event.did, tx);
769 if (!forumId) {
770 this.logger.warn("ModAction (unban): Forum not found for DID", {
771 operation: "ModAction CREATE",
772 did: event.did,
773 });
774 skipped = true;
775 return;
776 }
777 await this.ensureUser(record.createdBy, tx);
778 await tx.insert(modActions).values({
779 did: event.did,
780 rkey: event.commit.rkey,
781 cid: event.commit.cid,
782 forumId,
783 action: record.action,
784 subjectPostUri: null,
785 subjectDid: record.subject.did ?? null,
786 reason: record.reason ?? null,
787 createdBy: record.createdBy,
788 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null,
789 createdAt: new Date(record.createdAt),
790 indexedAt: new Date(),
791 });
792 await this.banEnforcer.liftBan(record.subject.did!, tx);
793 });
794 if (!skipped) {
795 this.logger.info("ModAction (unban) created", {
796 did: event.did,
797 rkey: event.commit.rkey,
798 });
799 }
800 } else {
801 // Generic path for all other mod actions (mute, pin, lock, delete, etc.)
802 await this.genericCreate(this.modActionConfig, event);
803
804 // Ban/unban without subject.did — shouldn't happen but log if it does
805 if (
806 record.action === "space.atbb.modAction.ban" ||
807 record.action === "space.atbb.modAction.unban"
808 ) {
809 this.logger.warn("ModAction: ban/unban action missing subject.did, skipping enforcement", {
810 did: event.did,
811 rkey: event.commit.rkey,
812 action: record.action,
813 });
814 }
815 }
816 } catch (error) {
817 this.logger.error("Failed to index ModAction create", {
818 did: event.did,
819 rkey: event.commit.rkey,
820 error: error instanceof Error ? error.message : String(error),
821 });
822 throw error;
823 }
824 }
825
826 async handleModActionUpdate(
827 event: CommitUpdateEvent<"space.atbb.modAction">
828 ) {
829 await this.genericUpdate(this.modActionConfig, event);
830 }
831
832 async handleModActionDelete(
833 event: CommitDeleteEvent<"space.atbb.modAction">
834 ) {
835 try {
836 await this.db.transaction(async (tx) => {
837 // 1. Read before delete to capture action type and subject
838 const [existing] = await tx
839 .select({
840 action: modActions.action,
841 subjectDid: modActions.subjectDid,
842 })
843 .from(modActions)
844 .where(
845 and(
846 eq(modActions.did, event.did),
847 eq(modActions.rkey, event.commit.rkey)
848 )
849 )
850 .limit(1);
851
852 // 2. Hard delete the record
853 await tx
854 .delete(modActions)
855 .where(
856 and(
857 eq(modActions.did, event.did),
858 eq(modActions.rkey, event.commit.rkey)
859 )
860 );
861
862 // 3. Restore posts if the deleted record was a ban
863 if (
864 existing?.action === "space.atbb.modAction.ban" &&
865 existing?.subjectDid
866 ) {
867 await this.banEnforcer.liftBan(existing.subjectDid, tx);
868 }
869 });
870
871 this.logger.info("ModAction deleted", {
872 did: event.did,
873 rkey: event.commit.rkey,
874 });
875 } catch (error) {
876 this.logger.error("Failed to delete modAction", {
877 did: event.did,
878 rkey: event.commit.rkey,
879 error: error instanceof Error ? error.message : String(error),
880 });
881 throw error;
882 }
883 }
884
885 // ── Reaction Handlers (Stub) ────────────────────────────
886
887 async handleReactionCreate(
888 event: CommitCreateEvent<"space.atbb.reaction">
889 ) {
890 this.logger.warn("Reaction created (not implemented)", { did: event.did, rkey: event.commit.rkey });
891 // TODO: Add reactions table to schema
892 }
893
894 async handleReactionUpdate(
895 event: CommitUpdateEvent<"space.atbb.reaction">
896 ) {
897 this.logger.warn("Reaction updated (not implemented)", { did: event.did, rkey: event.commit.rkey });
898 // TODO: Add reactions table to schema
899 }
900
901 async handleReactionDelete(
902 event: CommitDeleteEvent<"space.atbb.reaction">
903 ) {
904 this.logger.warn("Reaction deleted (not implemented)", { did: event.did, rkey: event.commit.rkey });
905 // TODO: Add reactions table to schema
906 }
907
908 // ── Helper Methods ──────────────────────────────────────
909
910 /**
911 * Ensure a user exists in the database. Creates if not exists.
912 * @param dbOrTx - Database instance or transaction
913 */
914 private async ensureUser(did: string, dbOrTx: DbOrTransaction = this.db) {
915 try {
916 const existing = await dbOrTx.select().from(users).where(eq(users.did, did)).limit(1);
917
918 if (existing.length === 0) {
919 await dbOrTx.insert(users).values({
920 did,
921 handle: null, // Will be updated by identity events
922 indexedAt: new Date(),
923 });
924 this.logger.info("Created user", { did });
925 }
926 } catch (error) {
927 this.logger.error("Failed to ensure user exists", {
928 did,
929 error: error instanceof Error ? error.message : String(error),
930 });
931 throw error;
932 }
933 }
934
935 /**
936 * Look up a forum ID by its AT URI
937 * @param dbOrTx - Database instance or transaction
938 */
939 private async getForumIdByUri(
940 forumUri: string,
941 dbOrTx: DbOrTransaction = this.db
942 ): Promise<bigint | null> {
943 const parsed = parseAtUri(forumUri);
944 if (!parsed) return null;
945
946 try {
947 const result = await dbOrTx
948 .select({ id: forums.id })
949 .from(forums)
950 .where(and(eq(forums.did, parsed.did), eq(forums.rkey, parsed.rkey)))
951 .limit(1);
952
953 return result.length > 0 ? result[0].id : null;
954 } catch (error) {
955 this.logger.error("Database error in getForumIdByUri", {
956 operation: "getForumIdByUri",
957 forumUri,
958 error: error instanceof Error ? error.message : String(error),
959 });
960 throw error;
961 }
962 }
963
964 /**
965 * Look up a forum ID by the forum's DID
966 * Used for records owned by the forum (categories, modActions)
967 * @param dbOrTx - Database instance or transaction
968 */
969 private async getForumIdByDid(
970 forumDid: string,
971 dbOrTx: DbOrTransaction = this.db
972 ): Promise<bigint | null> {
973 try {
974 const result = await dbOrTx
975 .select({ id: forums.id })
976 .from(forums)
977 .where(eq(forums.did, forumDid))
978 .limit(1);
979
980 return result.length > 0 ? result[0].id : null;
981 } catch (error) {
982 this.logger.error("Database error in getForumIdByDid", {
983 operation: "getForumIdByDid",
984 forumDid,
985 error: error instanceof Error ? error.message : String(error),
986 });
987 throw error;
988 }
989 }
990
991 /**
992 * Look up a post ID by its AT URI
993 * @param dbOrTx - Database instance or transaction
994 */
995 private async getPostIdByUri(
996 postUri: string,
997 dbOrTx: DbOrTransaction = this.db
998 ): Promise<bigint | null> {
999 const parsed = parseAtUri(postUri);
1000 if (!parsed) return null;
1001
1002 try {
1003 const result = await dbOrTx
1004 .select({ id: posts.id })
1005 .from(posts)
1006 .where(and(eq(posts.did, parsed.did), eq(posts.rkey, parsed.rkey)))
1007 .limit(1);
1008
1009 return result.length > 0 ? result[0].id : null;
1010 } catch (error) {
1011 this.logger.error("Database error in getPostIdByUri", {
1012 operation: "getPostIdByUri",
1013 postUri,
1014 error: error instanceof Error ? error.message : String(error),
1015 });
1016 throw error;
1017 }
1018 }
1019
1020 /**
1021 * Look up board ID by AT URI (at://did/collection/rkey)
1022 * @param uri - AT URI of the board
1023 * @param dbOrTx - Database instance or transaction
1024 */
1025 private async getBoardIdByUri(
1026 uri: string,
1027 dbOrTx: DbOrTransaction = this.db
1028 ): Promise<bigint | null> {
1029 const parsed = parseAtUri(uri);
1030 if (!parsed) return null;
1031
1032 try {
1033 const [result] = await dbOrTx
1034 .select({ id: boards.id })
1035 .from(boards)
1036 .where(and(eq(boards.did, parsed.did), eq(boards.rkey, parsed.rkey)))
1037 .limit(1);
1038 return result?.id ?? null;
1039 } catch (error) {
1040 this.logger.error("Database error in getBoardIdByUri", {
1041 operation: "getBoardIdByUri",
1042 uri,
1043 did: parsed.did,
1044 rkey: parsed.rkey,
1045 error: error instanceof Error ? error.message : String(error),
1046 });
1047 throw error;
1048 }
1049 }
1050
1051 /**
1052 * Look up category ID by AT URI (at://did/collection/rkey)
1053 * @param uri - AT URI of the category
1054 * @param dbOrTx - Database instance or transaction
1055 */
1056 private async getCategoryIdByUri(
1057 uri: string,
1058 dbOrTx: DbOrTransaction = this.db
1059 ): Promise<bigint | null> {
1060 const parsed = parseAtUri(uri);
1061 if (!parsed) return null;
1062
1063 try {
1064 const [result] = await dbOrTx
1065 .select({ id: categories.id })
1066 .from(categories)
1067 .where(and(eq(categories.did, parsed.did), eq(categories.rkey, parsed.rkey)))
1068 .limit(1);
1069 return result?.id ?? null;
1070 } catch (error) {
1071 this.logger.error("Database error in getCategoryIdByUri", {
1072 operation: "getCategoryIdByUri",
1073 uri,
1074 did: parsed.did,
1075 rkey: parsed.rkey,
1076 error: error instanceof Error ? error.message : String(error),
1077 });
1078 throw error;
1079 }
1080 }
1081}