WIP! A BB-style forum, on the ATmosphere! We're still working... we'll be back soon when we have something to show off!
node typescript hono htmx atproto
4
fork

Configure Feed

Select the types of activity you want to include in your feed.

at bc5c0dc421fb0a09f8ed2b35d180f7eb67bd4e7e 1081 lines 34 kB view raw
1import type { 2 CommitCreateEvent, 3 CommitDeleteEvent, 4 CommitUpdateEvent, 5} from "@skyware/jetstream"; 6import type { Database, DbOrTransaction } from "@atbb/db"; 7import type { Logger } from "@atbb/logger"; 8import { 9 posts, 10 forums, 11 categories, 12 boards, 13 users, 14 memberships, 15 modActions, 16 roles, 17} from "@atbb/db"; 18import { eq, and } from "drizzle-orm"; 19import { parseAtUri } from "./at-uri.js"; 20import { BanEnforcer } from "./ban-enforcer.js"; 21import { 22 SpaceAtbbPost as Post, 23 SpaceAtbbForumForum as Forum, 24 SpaceAtbbForumCategory as Category, 25 SpaceAtbbForumBoard as Board, 26 SpaceAtbbMembership as Membership, 27 SpaceAtbbModAction as ModAction, 28 SpaceAtbbForumRole as Role, 29} from "@atbb/lexicon"; 30 31// ── Collection Config Types ───────────────────────────── 32 33/** 34 * Configuration for a data-driven collection handler. 35 * Encodes the per-collection logic that differs across the 5 indexed types, 36 * while the generic handler methods supply the shared try/catch/log/throw scaffolding. 37 */ 38interface CollectionConfig<TRecord> { 39 /** Human-readable name for logging (e.g. "Post", "Forum") */ 40 name: string; 41 /** Drizzle table reference */ 42 table: any; 43 /** "hard" = DELETE FROM (all non-post collections) */ 44 deleteStrategy: "hard"; 45 /** Call ensureUser(event.did) before insert? (user-owned records) */ 46 ensureUserOnCreate?: boolean; 47 /** 48 * Transform event+record into DB insert values. 49 * Return null to skip the insert (e.g. when a required foreign key is missing). 50 */ 51 toInsertValues: ( 52 event: any, 53 record: TRecord, 54 tx: DbOrTransaction 55 ) => Promise<Record<string, any> | null>; 56 /** 57 * Transform event+record into DB update set values. 58 * Runs inside a transaction. Return null to skip the update. 59 */ 60 toUpdateValues: ( 61 event: any, 62 record: TRecord, 63 tx: DbOrTransaction 64 ) => Promise<Record<string, any> | null>; 65} 66 67 68/** 69 * Indexer class for processing AT Proto firehose events 70 * Converts events into database records for the atBB AppView 71 */ 72export class Indexer { 73 private banEnforcer: BanEnforcer; 74 75 constructor(private db: Database, private logger: Logger) { 76 this.banEnforcer = new BanEnforcer(db, logger); 77 } 78 79 // ── Collection Configs ────────────────────────────────── 80 81 private postConfig: CollectionConfig<Post.Record> = { 82 name: "Post", 83 table: posts, 84 deleteStrategy: "hard", 85 ensureUserOnCreate: true, 86 toInsertValues: async (event, record, tx) => { 87 // Look up parent/root for replies 88 let rootId: bigint | null = null; 89 let parentId: bigint | null = null; 90 91 if (Post.isReplyRef(record.reply)) { 92 rootId = await this.getPostIdByUri(record.reply.root.uri, tx); 93 parentId = await this.getPostIdByUri(record.reply.parent.uri, tx); 94 } else if (record.reply) { 95 // reply ref present but $type omitted — rootPostId/parentPostId will be null, 96 // making this reply unreachable in thread navigation (data corruption). 97 this.logger.error("Post reply ref missing $type — rootPostId/parentPostId not resolved", { 98 operation: "Post CREATE", 99 postDid: event.did, 100 postRkey: event.commit.rkey, 101 errorId: "POST_REPLY_REF_MISSING_TYPE", 102 }); 103 } 104 105 // Look up board ID if board reference exists 106 let boardId: bigint | null = null; 107 if (record.board?.board.uri) { 108 boardId = await this.getBoardIdByUri(record.board.board.uri, tx); 109 if (!boardId) { 110 this.logger.error("Failed to index post: board not found", { 111 operation: "Post CREATE", 112 postDid: event.did, 113 postRkey: event.commit.rkey, 114 boardUri: record.board.board.uri, 115 errorId: "POST_BOARD_MISSING", 116 }); 117 throw new Error(`Board not found: ${record.board.board.uri}`); 118 } 119 } 120 121 return { 122 did: event.did, 123 rkey: event.commit.rkey, 124 cid: event.commit.cid, 125 title: record.reply ? null : (record.title ?? null), 126 text: record.text, 127 forumUri: record.forum?.forum.uri ?? null, 128 boardUri: record.board?.board.uri ?? null, 129 boardId, 130 rootPostId: rootId, 131 rootUri: record.reply?.root.uri ?? null, 132 parentPostId: parentId, 133 parentUri: record.reply?.parent.uri ?? null, 134 createdAt: new Date(record.createdAt), 135 indexedAt: new Date(), 136 }; 137 }, 138 toUpdateValues: async (event, record, tx) => { 139 // Look up board ID if board reference exists 140 let boardId: bigint | null = null; 141 if (record.board?.board.uri) { 142 boardId = await this.getBoardIdByUri(record.board.board.uri, tx); 143 if (!boardId) { 144 this.logger.error("Failed to index post: board not found", { 145 operation: "Post UPDATE", 146 postDid: event.did, 147 postRkey: event.commit.rkey, 148 boardUri: record.board.board.uri, 149 errorId: "POST_BOARD_MISSING", 150 }); 151 throw new Error(`Board not found: ${record.board.board.uri}`); 152 } 153 } 154 155 return { 156 cid: event.commit.cid, 157 title: record.reply ? null : (record.title ?? null), 158 text: record.text, 159 forumUri: record.forum?.forum.uri ?? null, 160 boardUri: record.board?.board.uri ?? null, 161 boardId, 162 indexedAt: new Date(), 163 }; 164 }, 165 }; 166 167 private forumConfig: CollectionConfig<Forum.Record> = { 168 name: "Forum", 169 table: forums, 170 deleteStrategy: "hard", 171 ensureUserOnCreate: true, 172 toInsertValues: async (event, record) => ({ 173 did: event.did, 174 rkey: event.commit.rkey, 175 cid: event.commit.cid, 176 name: record.name, 177 description: record.description ?? null, 178 indexedAt: new Date(), 179 }), 180 toUpdateValues: async (event, record) => ({ 181 cid: event.commit.cid, 182 name: record.name, 183 description: record.description ?? null, 184 indexedAt: new Date(), 185 }), 186 }; 187 188 private categoryConfig: CollectionConfig<Category.Record> = { 189 name: "Category", 190 table: categories, 191 deleteStrategy: "hard", 192 toInsertValues: async (event, record, tx) => { 193 // Categories are owned by the Forum DID, so event.did IS the forum DID 194 const forumId = await this.getForumIdByDid(event.did, tx); 195 196 if (!forumId) { 197 this.logger.warn("Category: Forum not found for DID", { 198 operation: "Category CREATE", 199 did: event.did, 200 }); 201 return null; 202 } 203 204 return { 205 did: event.did, 206 rkey: event.commit.rkey, 207 cid: event.commit.cid, 208 forumId, 209 name: record.name, 210 description: record.description ?? null, 211 slug: record.slug ?? null, 212 sortOrder: record.sortOrder ?? 0, 213 createdAt: new Date(record.createdAt), 214 indexedAt: new Date(), 215 }; 216 }, 217 toUpdateValues: async (event, record, tx) => { 218 // Categories are owned by the Forum DID, so event.did IS the forum DID 219 const forumId = await this.getForumIdByDid(event.did, tx); 220 221 if (!forumId) { 222 this.logger.warn("Category: Forum not found for DID", { 223 operation: "Category UPDATE", 224 did: event.did, 225 }); 226 return null; 227 } 228 229 return { 230 cid: event.commit.cid, 231 forumId, 232 name: record.name, 233 description: record.description ?? null, 234 slug: record.slug ?? null, 235 sortOrder: record.sortOrder ?? 0, 236 indexedAt: new Date(), 237 }; 238 }, 239 }; 240 241 private boardConfig: CollectionConfig<Board.Record> = { 242 name: "Board", 243 table: boards, 244 deleteStrategy: "hard", 245 toInsertValues: async (event, record, tx) => { 246 // Boards are owned by Forum DID 247 const categoryId = await this.getCategoryIdByUri( 248 record.category.category.uri, 249 tx 250 ); 251 252 if (!categoryId) { 253 this.logger.error("Failed to index board: category not found", { 254 operation: "Board CREATE", 255 boardDid: event.did, 256 boardRkey: event.commit.rkey, 257 categoryUri: record.category.category.uri, 258 errorId: "BOARD_CATEGORY_MISSING", 259 }); 260 throw new Error(`Category not found: ${record.category.category.uri}`); 261 } 262 263 return { 264 did: event.did, 265 rkey: event.commit.rkey, 266 cid: event.commit.cid, 267 name: record.name, 268 description: record.description ?? null, 269 slug: record.slug ?? null, 270 sortOrder: record.sortOrder ?? null, 271 categoryId, 272 categoryUri: record.category.category.uri, 273 createdAt: new Date(record.createdAt), 274 indexedAt: new Date(), 275 }; 276 }, 277 toUpdateValues: async (event, record, tx) => { 278 const categoryId = await this.getCategoryIdByUri( 279 record.category.category.uri, 280 tx 281 ); 282 283 if (!categoryId) { 284 this.logger.error("Failed to index board: category not found", { 285 operation: "Board UPDATE", 286 boardDid: event.did, 287 boardRkey: event.commit.rkey, 288 categoryUri: record.category.category.uri, 289 errorId: "BOARD_CATEGORY_MISSING", 290 }); 291 throw new Error(`Category not found: ${record.category.category.uri}`); 292 } 293 294 return { 295 cid: event.commit.cid, 296 name: record.name, 297 description: record.description ?? null, 298 slug: record.slug ?? null, 299 sortOrder: record.sortOrder ?? null, 300 categoryId, 301 categoryUri: record.category.category.uri, 302 indexedAt: new Date(), 303 }; 304 }, 305 }; 306 307 private roleConfig: CollectionConfig<Role.Record> = { 308 name: "Role", 309 table: roles, 310 deleteStrategy: "hard", 311 toInsertValues: async (event, record) => ({ 312 did: event.did, 313 rkey: event.commit.rkey, 314 cid: event.commit.cid, 315 name: record.name, 316 description: record.description ?? null, 317 permissions: record.permissions, 318 priority: record.priority, 319 createdAt: new Date(record.createdAt), 320 indexedAt: new Date(), 321 }), 322 toUpdateValues: async (event, record) => ({ 323 cid: event.commit.cid, 324 name: record.name, 325 description: record.description ?? null, 326 permissions: record.permissions, 327 priority: record.priority, 328 indexedAt: new Date(), 329 }), 330 }; 331 332 private membershipConfig: CollectionConfig<Membership.Record> = { 333 name: "Membership", 334 table: memberships, 335 deleteStrategy: "hard", 336 ensureUserOnCreate: true, 337 toInsertValues: async (event, record, tx) => { 338 // Look up forum by URI (inside transaction) 339 const forumId = await this.getForumIdByUri(record.forum.forum.uri, tx); 340 341 if (!forumId) { 342 this.logger.warn("Membership: Forum not found", { 343 operation: "Membership CREATE", 344 forumUri: record.forum.forum.uri, 345 }); 346 return null; 347 } 348 349 return { 350 did: event.did, 351 rkey: event.commit.rkey, 352 cid: event.commit.cid, 353 forumId, 354 forumUri: record.forum.forum.uri, 355 role: null, // TODO: Extract role name from roleUri or lexicon 356 roleUri: record.role?.role.uri ?? null, 357 joinedAt: record.joinedAt ? new Date(record.joinedAt) : null, 358 createdAt: new Date(record.createdAt), 359 indexedAt: new Date(), 360 }; 361 }, 362 toUpdateValues: async (event, record, tx) => { 363 // Look up forum by URI (may have changed) 364 const forumId = await this.getForumIdByUri(record.forum.forum.uri, tx); 365 366 if (!forumId) { 367 this.logger.warn("Membership: Forum not found", { 368 operation: "Membership UPDATE", 369 forumUri: record.forum.forum.uri, 370 }); 371 return null; 372 } 373 374 return { 375 cid: event.commit.cid, 376 forumId, 377 forumUri: record.forum.forum.uri, 378 role: null, // TODO: Extract role name from roleUri or lexicon 379 roleUri: record.role?.role.uri ?? null, 380 joinedAt: record.joinedAt ? new Date(record.joinedAt) : null, 381 indexedAt: new Date(), 382 }; 383 }, 384 }; 385 386 private modActionConfig: CollectionConfig<ModAction.Record> = { 387 name: "ModAction", 388 table: modActions, 389 deleteStrategy: "hard", 390 toInsertValues: async (event, record, tx) => { 391 // ModActions are owned by the Forum DID, so event.did IS the forum DID 392 const forumId = await this.getForumIdByDid(event.did, tx); 393 394 if (!forumId) { 395 this.logger.warn("ModAction: Forum not found for DID", { 396 operation: "ModAction CREATE", 397 did: event.did, 398 }); 399 return null; 400 } 401 402 // Ensure moderator exists 403 await this.ensureUser(record.createdBy, tx); 404 405 // Determine subject type (post or user) 406 let subjectPostUri: string | null = null; 407 let subjectDid: string | null = null; 408 409 if (record.subject.post) { 410 subjectPostUri = record.subject.post.uri; 411 } 412 if (record.subject.did) { 413 subjectDid = record.subject.did; 414 } 415 416 return { 417 did: event.did, 418 rkey: event.commit.rkey, 419 cid: event.commit.cid, 420 forumId, 421 action: record.action, 422 subjectPostUri, 423 subjectDid, 424 reason: record.reason ?? null, 425 createdBy: record.createdBy, 426 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null, 427 createdAt: new Date(record.createdAt), 428 indexedAt: new Date(), 429 }; 430 }, 431 toUpdateValues: async (event, record, tx) => { 432 // ModActions are owned by the Forum DID, so event.did IS the forum DID 433 const forumId = await this.getForumIdByDid(event.did, tx); 434 435 if (!forumId) { 436 this.logger.warn("ModAction: Forum not found for DID", { 437 operation: "ModAction UPDATE", 438 did: event.did, 439 }); 440 return null; 441 } 442 443 // Determine subject type (post or user) 444 let subjectPostUri: string | null = null; 445 let subjectDid: string | null = null; 446 447 if (record.subject.post) { 448 subjectPostUri = record.subject.post.uri; 449 } 450 if (record.subject.did) { 451 subjectDid = record.subject.did; 452 } 453 454 return { 455 cid: event.commit.cid, 456 forumId, 457 action: record.action, 458 subjectPostUri, 459 subjectDid, 460 reason: record.reason ?? null, 461 createdBy: record.createdBy, 462 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null, 463 indexedAt: new Date(), 464 }; 465 }, 466 }; 467 468 // ── Generic Handler Methods ───────────────────────────── 469 470 /** 471 * Generic create handler. Wraps the insert in a transaction, 472 * optionally ensures the user exists, and delegates to the 473 * config's toInsertValues callback for collection-specific logic. 474 */ 475 private async genericCreate<TRecord>( 476 config: CollectionConfig<TRecord>, 477 event: any 478 ): Promise<boolean> { 479 try { 480 const record = event.commit.record as unknown as TRecord; 481 let skipped = false; 482 483 await this.db.transaction(async (tx) => { 484 if (config.ensureUserOnCreate) { 485 await this.ensureUser(event.did, tx); 486 } 487 488 const values = await config.toInsertValues(event, record, tx); 489 if (!values) { 490 skipped = true; 491 return; // Skip insert (e.g. foreign key not found) 492 } 493 494 await tx.insert(config.table).values(values); 495 }); 496 497 // Only log success if insert actually happened 498 if (!skipped) { 499 this.logger.info(`${config.name} created`, { 500 did: event.did, 501 rkey: event.commit.rkey, 502 }); 503 } 504 return !skipped; 505 } catch (error) { 506 this.logger.error(`Failed to index ${config.name.toLowerCase()} create`, { 507 did: event.did, 508 rkey: event.commit.rkey, 509 error: error instanceof Error ? error.message : String(error), 510 }); 511 throw error; 512 } 513 } 514 515 /** 516 * Generic update handler. Wraps the update in a transaction 517 * and delegates to the config's toUpdateValues callback for 518 * collection-specific logic. 519 */ 520 private async genericUpdate<TRecord>( 521 config: CollectionConfig<TRecord>, 522 event: any 523 ) { 524 try { 525 const record = event.commit.record as unknown as TRecord; 526 let skipped = false; 527 528 await this.db.transaction(async (tx) => { 529 const values = await config.toUpdateValues(event, record, tx); 530 if (!values) { 531 skipped = true; 532 return; // Skip update (e.g. foreign key not found) 533 } 534 535 await tx 536 .update(config.table) 537 .set(values) 538 .where( 539 and( 540 eq(config.table.did, event.did), 541 eq(config.table.rkey, event.commit.rkey) 542 ) 543 ); 544 }); 545 546 // Only log success if update actually happened 547 if (!skipped) { 548 this.logger.info(`${config.name} updated`, { 549 did: event.did, 550 rkey: event.commit.rkey, 551 }); 552 } 553 } catch (error) { 554 this.logger.error(`Failed to update ${config.name.toLowerCase()}`, { 555 did: event.did, 556 rkey: event.commit.rkey, 557 error: error instanceof Error ? error.message : String(error), 558 }); 559 throw error; 560 } 561 } 562 563 /** 564 * Generic delete handler. Hard-deletes a record (DELETE FROM). 565 * Posts use handlePostDelete instead (always tombstone). 566 */ 567 private async genericDelete(config: CollectionConfig<any>, event: any) { 568 try { 569 await this.db 570 .delete(config.table) 571 .where( 572 and( 573 eq(config.table.did, event.did), 574 eq(config.table.rkey, event.commit.rkey) 575 ) 576 ); 577 578 this.logger.info(`${config.name} deleted`, { 579 did: event.did, 580 rkey: event.commit.rkey, 581 }); 582 } catch (error) { 583 this.logger.error(`Failed to delete ${config.name.toLowerCase()}`, { 584 did: event.did, 585 rkey: event.commit.rkey, 586 error: error instanceof Error ? error.message : String(error), 587 }); 588 throw error; 589 } 590 } 591 592 // ── Post Handlers ─────────────────────────────────────── 593 594 async handlePostCreate(event: CommitCreateEvent<"space.atbb.post">) { 595 const banned = await this.banEnforcer.isBanned(event.did); 596 if (banned) { 597 this.logger.info("Skipping post from banned user", { 598 did: event.did, 599 rkey: event.commit.rkey, 600 }); 601 return; 602 } 603 await this.genericCreate(this.postConfig, event); 604 } 605 606 async handlePostUpdate(event: CommitUpdateEvent<"space.atbb.post">) { 607 await this.genericUpdate(this.postConfig, event); 608 } 609 610 /** 611 * Handles a user-initiated post delete from the PDS. 612 * Always tombstones: replaces personal content with a placeholder and marks 613 * deletedByUser=true. The row is kept so threads referencing this post as 614 * their root or parent remain intact. Personal content is gone; structure is preserved. 615 */ 616 async handlePostDelete(event: CommitDeleteEvent<"space.atbb.post">) { 617 const { did, commit: { rkey } } = event; 618 try { 619 await this.db 620 .update(posts) 621 .set({ text: "[user deleted this post]", deletedByUser: true }) 622 .where(and(eq(posts.did, did), eq(posts.rkey, rkey))); 623 this.logger.info("Post tombstoned: content replaced, structure preserved", { did, rkey }); 624 } catch (error) { 625 this.logger.error("Failed to tombstone post", { 626 did, 627 rkey, 628 error: error instanceof Error ? error.message : String(error), 629 }); 630 throw error; 631 } 632 } 633 634 // ── Forum Handlers ────────────────────────────────────── 635 636 async handleForumCreate(event: CommitCreateEvent<"space.atbb.forum.forum">) { 637 await this.genericCreate(this.forumConfig, event); 638 } 639 640 async handleForumUpdate(event: CommitUpdateEvent<"space.atbb.forum.forum">) { 641 await this.genericUpdate(this.forumConfig, event); 642 } 643 644 async handleForumDelete(event: CommitDeleteEvent<"space.atbb.forum.forum">) { 645 await this.genericDelete(this.forumConfig, event); 646 } 647 648 // ── Category Handlers ─────────────────────────────────── 649 650 async handleCategoryCreate( 651 event: CommitCreateEvent<"space.atbb.forum.category"> 652 ) { 653 await this.genericCreate(this.categoryConfig, event); 654 } 655 656 async handleCategoryUpdate( 657 event: CommitUpdateEvent<"space.atbb.forum.category"> 658 ) { 659 await this.genericUpdate(this.categoryConfig, event); 660 } 661 662 async handleCategoryDelete( 663 event: CommitDeleteEvent<"space.atbb.forum.category"> 664 ) { 665 await this.genericDelete(this.categoryConfig, event); 666 } 667 668 // ── Board Handlers ────────────────────────────────────── 669 670 async handleBoardCreate(event: CommitCreateEvent<"space.atbb.forum.board">) { 671 await this.genericCreate(this.boardConfig, event); 672 } 673 674 async handleBoardUpdate(event: CommitUpdateEvent<"space.atbb.forum.board">) { 675 await this.genericUpdate(this.boardConfig, event); 676 } 677 678 async handleBoardDelete(event: CommitDeleteEvent<"space.atbb.forum.board">) { 679 await this.genericDelete(this.boardConfig, event); 680 } 681 682 // ── Role Handlers ─────────────────────────────────────── 683 684 async handleRoleCreate(event: CommitCreateEvent<"space.atbb.forum.role">) { 685 await this.genericCreate(this.roleConfig, event); 686 } 687 688 async handleRoleUpdate(event: CommitUpdateEvent<"space.atbb.forum.role">) { 689 await this.genericUpdate(this.roleConfig, event); 690 } 691 692 async handleRoleDelete(event: CommitDeleteEvent<"space.atbb.forum.role">) { 693 await this.genericDelete(this.roleConfig, event); 694 } 695 696 // ── Membership Handlers ───────────────────────────────── 697 698 async handleMembershipCreate( 699 event: CommitCreateEvent<"space.atbb.membership"> 700 ) { 701 await this.genericCreate(this.membershipConfig, event); 702 } 703 704 async handleMembershipUpdate( 705 event: CommitUpdateEvent<"space.atbb.membership"> 706 ) { 707 await this.genericUpdate(this.membershipConfig, event); 708 } 709 710 async handleMembershipDelete( 711 event: CommitDeleteEvent<"space.atbb.membership"> 712 ) { 713 await this.genericDelete(this.membershipConfig, event); 714 } 715 716 // ── ModAction Handlers ────────────────────────────────── 717 718 async handleModActionCreate( 719 event: CommitCreateEvent<"space.atbb.modAction"> 720 ) { 721 const record = event.commit.record as unknown as ModAction.Record; 722 const isBan = 723 record.action === "space.atbb.modAction.ban" && record.subject.did; 724 const isUnban = 725 record.action === "space.atbb.modAction.unban" && record.subject.did; 726 727 try { 728 if (isBan) { 729 // Custom atomic path: insert ban record + applyBan in one transaction 730 let skipped = false; 731 await this.db.transaction(async (tx) => { 732 const forumId = await this.getForumIdByDid(event.did, tx); 733 if (!forumId) { 734 this.logger.warn("ModAction (ban): Forum not found for DID", { 735 operation: "ModAction CREATE", 736 did: event.did, 737 }); 738 skipped = true; 739 return; 740 } 741 await this.ensureUser(record.createdBy, tx); 742 await tx.insert(modActions).values({ 743 did: event.did, 744 rkey: event.commit.rkey, 745 cid: event.commit.cid, 746 forumId, 747 action: record.action, 748 subjectPostUri: null, 749 subjectDid: record.subject.did ?? null, 750 reason: record.reason ?? null, 751 createdBy: record.createdBy, 752 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null, 753 createdAt: new Date(record.createdAt), 754 indexedAt: new Date(), 755 }); 756 await this.banEnforcer.applyBan(record.subject.did!, tx); 757 }); 758 if (!skipped) { 759 this.logger.info("ModAction (ban) created", { 760 did: event.did, 761 rkey: event.commit.rkey, 762 }); 763 } 764 } else if (isUnban) { 765 // Custom atomic path: insert unban record + liftBan in one transaction 766 let skipped = false; 767 await this.db.transaction(async (tx) => { 768 const forumId = await this.getForumIdByDid(event.did, tx); 769 if (!forumId) { 770 this.logger.warn("ModAction (unban): Forum not found for DID", { 771 operation: "ModAction CREATE", 772 did: event.did, 773 }); 774 skipped = true; 775 return; 776 } 777 await this.ensureUser(record.createdBy, tx); 778 await tx.insert(modActions).values({ 779 did: event.did, 780 rkey: event.commit.rkey, 781 cid: event.commit.cid, 782 forumId, 783 action: record.action, 784 subjectPostUri: null, 785 subjectDid: record.subject.did ?? null, 786 reason: record.reason ?? null, 787 createdBy: record.createdBy, 788 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null, 789 createdAt: new Date(record.createdAt), 790 indexedAt: new Date(), 791 }); 792 await this.banEnforcer.liftBan(record.subject.did!, tx); 793 }); 794 if (!skipped) { 795 this.logger.info("ModAction (unban) created", { 796 did: event.did, 797 rkey: event.commit.rkey, 798 }); 799 } 800 } else { 801 // Generic path for all other mod actions (mute, pin, lock, delete, etc.) 802 await this.genericCreate(this.modActionConfig, event); 803 804 // Ban/unban without subject.did — shouldn't happen but log if it does 805 if ( 806 record.action === "space.atbb.modAction.ban" || 807 record.action === "space.atbb.modAction.unban" 808 ) { 809 this.logger.warn("ModAction: ban/unban action missing subject.did, skipping enforcement", { 810 did: event.did, 811 rkey: event.commit.rkey, 812 action: record.action, 813 }); 814 } 815 } 816 } catch (error) { 817 this.logger.error("Failed to index ModAction create", { 818 did: event.did, 819 rkey: event.commit.rkey, 820 error: error instanceof Error ? error.message : String(error), 821 }); 822 throw error; 823 } 824 } 825 826 async handleModActionUpdate( 827 event: CommitUpdateEvent<"space.atbb.modAction"> 828 ) { 829 await this.genericUpdate(this.modActionConfig, event); 830 } 831 832 async handleModActionDelete( 833 event: CommitDeleteEvent<"space.atbb.modAction"> 834 ) { 835 try { 836 await this.db.transaction(async (tx) => { 837 // 1. Read before delete to capture action type and subject 838 const [existing] = await tx 839 .select({ 840 action: modActions.action, 841 subjectDid: modActions.subjectDid, 842 }) 843 .from(modActions) 844 .where( 845 and( 846 eq(modActions.did, event.did), 847 eq(modActions.rkey, event.commit.rkey) 848 ) 849 ) 850 .limit(1); 851 852 // 2. Hard delete the record 853 await tx 854 .delete(modActions) 855 .where( 856 and( 857 eq(modActions.did, event.did), 858 eq(modActions.rkey, event.commit.rkey) 859 ) 860 ); 861 862 // 3. Restore posts if the deleted record was a ban 863 if ( 864 existing?.action === "space.atbb.modAction.ban" && 865 existing?.subjectDid 866 ) { 867 await this.banEnforcer.liftBan(existing.subjectDid, tx); 868 } 869 }); 870 871 this.logger.info("ModAction deleted", { 872 did: event.did, 873 rkey: event.commit.rkey, 874 }); 875 } catch (error) { 876 this.logger.error("Failed to delete modAction", { 877 did: event.did, 878 rkey: event.commit.rkey, 879 error: error instanceof Error ? error.message : String(error), 880 }); 881 throw error; 882 } 883 } 884 885 // ── Reaction Handlers (Stub) ──────────────────────────── 886 887 async handleReactionCreate( 888 event: CommitCreateEvent<"space.atbb.reaction"> 889 ) { 890 this.logger.warn("Reaction created (not implemented)", { did: event.did, rkey: event.commit.rkey }); 891 // TODO: Add reactions table to schema 892 } 893 894 async handleReactionUpdate( 895 event: CommitUpdateEvent<"space.atbb.reaction"> 896 ) { 897 this.logger.warn("Reaction updated (not implemented)", { did: event.did, rkey: event.commit.rkey }); 898 // TODO: Add reactions table to schema 899 } 900 901 async handleReactionDelete( 902 event: CommitDeleteEvent<"space.atbb.reaction"> 903 ) { 904 this.logger.warn("Reaction deleted (not implemented)", { did: event.did, rkey: event.commit.rkey }); 905 // TODO: Add reactions table to schema 906 } 907 908 // ── Helper Methods ────────────────────────────────────── 909 910 /** 911 * Ensure a user exists in the database. Creates if not exists. 912 * @param dbOrTx - Database instance or transaction 913 */ 914 private async ensureUser(did: string, dbOrTx: DbOrTransaction = this.db) { 915 try { 916 const existing = await dbOrTx.select().from(users).where(eq(users.did, did)).limit(1); 917 918 if (existing.length === 0) { 919 await dbOrTx.insert(users).values({ 920 did, 921 handle: null, // Will be updated by identity events 922 indexedAt: new Date(), 923 }); 924 this.logger.info("Created user", { did }); 925 } 926 } catch (error) { 927 this.logger.error("Failed to ensure user exists", { 928 did, 929 error: error instanceof Error ? error.message : String(error), 930 }); 931 throw error; 932 } 933 } 934 935 /** 936 * Look up a forum ID by its AT URI 937 * @param dbOrTx - Database instance or transaction 938 */ 939 private async getForumIdByUri( 940 forumUri: string, 941 dbOrTx: DbOrTransaction = this.db 942 ): Promise<bigint | null> { 943 const parsed = parseAtUri(forumUri); 944 if (!parsed) return null; 945 946 try { 947 const result = await dbOrTx 948 .select({ id: forums.id }) 949 .from(forums) 950 .where(and(eq(forums.did, parsed.did), eq(forums.rkey, parsed.rkey))) 951 .limit(1); 952 953 return result.length > 0 ? result[0].id : null; 954 } catch (error) { 955 this.logger.error("Database error in getForumIdByUri", { 956 operation: "getForumIdByUri", 957 forumUri, 958 error: error instanceof Error ? error.message : String(error), 959 }); 960 throw error; 961 } 962 } 963 964 /** 965 * Look up a forum ID by the forum's DID 966 * Used for records owned by the forum (categories, modActions) 967 * @param dbOrTx - Database instance or transaction 968 */ 969 private async getForumIdByDid( 970 forumDid: string, 971 dbOrTx: DbOrTransaction = this.db 972 ): Promise<bigint | null> { 973 try { 974 const result = await dbOrTx 975 .select({ id: forums.id }) 976 .from(forums) 977 .where(eq(forums.did, forumDid)) 978 .limit(1); 979 980 return result.length > 0 ? result[0].id : null; 981 } catch (error) { 982 this.logger.error("Database error in getForumIdByDid", { 983 operation: "getForumIdByDid", 984 forumDid, 985 error: error instanceof Error ? error.message : String(error), 986 }); 987 throw error; 988 } 989 } 990 991 /** 992 * Look up a post ID by its AT URI 993 * @param dbOrTx - Database instance or transaction 994 */ 995 private async getPostIdByUri( 996 postUri: string, 997 dbOrTx: DbOrTransaction = this.db 998 ): Promise<bigint | null> { 999 const parsed = parseAtUri(postUri); 1000 if (!parsed) return null; 1001 1002 try { 1003 const result = await dbOrTx 1004 .select({ id: posts.id }) 1005 .from(posts) 1006 .where(and(eq(posts.did, parsed.did), eq(posts.rkey, parsed.rkey))) 1007 .limit(1); 1008 1009 return result.length > 0 ? result[0].id : null; 1010 } catch (error) { 1011 this.logger.error("Database error in getPostIdByUri", { 1012 operation: "getPostIdByUri", 1013 postUri, 1014 error: error instanceof Error ? error.message : String(error), 1015 }); 1016 throw error; 1017 } 1018 } 1019 1020 /** 1021 * Look up board ID by AT URI (at://did/collection/rkey) 1022 * @param uri - AT URI of the board 1023 * @param dbOrTx - Database instance or transaction 1024 */ 1025 private async getBoardIdByUri( 1026 uri: string, 1027 dbOrTx: DbOrTransaction = this.db 1028 ): Promise<bigint | null> { 1029 const parsed = parseAtUri(uri); 1030 if (!parsed) return null; 1031 1032 try { 1033 const [result] = await dbOrTx 1034 .select({ id: boards.id }) 1035 .from(boards) 1036 .where(and(eq(boards.did, parsed.did), eq(boards.rkey, parsed.rkey))) 1037 .limit(1); 1038 return result?.id ?? null; 1039 } catch (error) { 1040 this.logger.error("Database error in getBoardIdByUri", { 1041 operation: "getBoardIdByUri", 1042 uri, 1043 did: parsed.did, 1044 rkey: parsed.rkey, 1045 error: error instanceof Error ? error.message : String(error), 1046 }); 1047 throw error; 1048 } 1049 } 1050 1051 /** 1052 * Look up category ID by AT URI (at://did/collection/rkey) 1053 * @param uri - AT URI of the category 1054 * @param dbOrTx - Database instance or transaction 1055 */ 1056 private async getCategoryIdByUri( 1057 uri: string, 1058 dbOrTx: DbOrTransaction = this.db 1059 ): Promise<bigint | null> { 1060 const parsed = parseAtUri(uri); 1061 if (!parsed) return null; 1062 1063 try { 1064 const [result] = await dbOrTx 1065 .select({ id: categories.id }) 1066 .from(categories) 1067 .where(and(eq(categories.did, parsed.did), eq(categories.rkey, parsed.rkey))) 1068 .limit(1); 1069 return result?.id ?? null; 1070 } catch (error) { 1071 this.logger.error("Database error in getCategoryIdByUri", { 1072 operation: "getCategoryIdByUri", 1073 uri, 1074 did: parsed.did, 1075 rkey: parsed.rkey, 1076 error: error instanceof Error ? error.message : String(error), 1077 }); 1078 throw error; 1079 } 1080 } 1081}