AppView in a box as a Vite plugin thing hatk.dev
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: add createRecord/putRecord/deleteRecord helpers to OnLoginCtx and XrpcContext

Wraps existing pds-proxy functions so hooks and XRPC handlers can write
records through the PDS with local indexing, without raw SQL. Also adds
awaitBackfill so ensureRepo properly waits for in-flight backfills.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+648 -17
+112
docs/plans/2026-03-20-hook-record-helpers-design.md
··· 1 + # Hook & XRPC Record Helpers Design 2 + 3 + **Goal:** Add `createRecord`, `deleteRecord`, `putRecord` helpers to `OnLoginCtx` and `XrpcContext` so server-side code can write records through the PDS with local indexing, without raw SQL. 4 + 5 + **Motivation:** Writing records currently requires raw SQL with manual column mapping, JSON serialization, and AT URI construction. The framework already has PDS proxy functions (`pdsCreateRecord`, etc.) that validate against lexicons, write to the user's PDS, and index locally — but they're only wired up to the core HTTP handlers, not available to hook or XRPC authors. 6 + 7 + ## Design 8 + 9 + ### Where the helpers live 10 + 11 + - `XrpcContext` — already has an authenticated viewer and PDS session access 12 + - `OnLoginCtx` — has a viewer (the user who just logged in) with a fresh PDS session 13 + - `BaseContext` — stays read-only (feeds can run without a viewer) 14 + 15 + ### New fields 16 + 17 + ```typescript 18 + createRecord: ( 19 + collection: string, 20 + record: Record<string, unknown>, 21 + opts?: { rkey?: string }, 22 + ) => Promise<{ uri?: string; cid?: string }> 23 + 24 + putRecord: ( 25 + collection: string, 26 + rkey: string, 27 + record: Record<string, unknown>, 28 + ) => Promise<{ uri?: string; cid?: string }> 29 + 30 + deleteRecord: ( 31 + collection: string, 32 + rkey: string, 33 + ) => Promise<void> 34 + ``` 35 + 36 + ### What they do 37 + 38 + Each helper wraps the existing `pdsCreateRecord` / `pdsPutRecord` / `pdsDeleteRecord` from `pds-proxy.ts`: 39 + 40 + 1. Validate record against lexicons 41 + 2. Authenticate to user's PDS via DPoP (with nonce retry + token refresh) 42 + 3. Write to PDS 43 + 4. Index locally (insert/delete in SQLite) 44 + 5. Log errors on index failure (record still exists on PDS, will sync via firehose) 45 + 46 + ### Wiring 47 + 48 + Both `XrpcContext` and `OnLoginCtx` need `oauthConfig` to call the PDS proxy functions. 49 + 50 + **XrpcContext:** `buildXrpcContext` in `xrpc.ts` already runs inside `initXrpc` which has `oauthConfig`. Add the helpers there. 51 + 52 + **OnLoginCtx:** `fireOnLoginHook` is called from `oauth/server.ts` `handleCallback` which has `config: OAuthConfig`. Pass it through: 53 + 54 + ```typescript 55 + // oauth/server.ts 56 + await fireOnLoginHook(did, config) 57 + ``` 58 + 59 + ### Usage in hooks 60 + 61 + ```typescript 62 + export default defineHook('on-login', async (ctx) => { 63 + await ctx.ensureRepo(ctx.did) 64 + 65 + const existing = await ctx.lookup('social.grain.actor.profile', 'did', [ctx.did]) 66 + if (existing.has(ctx.did)) return 67 + 68 + const bsky = await ctx.lookup('app.bsky.actor.profile', 'did', [ctx.did]) 69 + const profile = bsky.get(ctx.did) 70 + if (!profile) return 71 + 72 + await ctx.createRecord('social.grain.actor.profile', { 73 + displayName: profile.value.displayName, 74 + description: profile.value.description, 75 + avatar: profile.value.avatar, 76 + createdAt: new Date().toISOString(), 77 + }, { rkey: 'self' }) 78 + }) 79 + ``` 80 + 81 + ### Usage in XRPC handlers 82 + 83 + ```typescript 84 + export default defineQuery('my.app.doThing', async (ctx) => { 85 + await ctx.createRecord('my.app.activity', { 86 + action: 'did-thing', 87 + createdAt: new Date().toISOString(), 88 + }) 89 + return ctx.ok({ success: true }) 90 + }) 91 + ``` 92 + 93 + ## Files to change 94 + 95 + **hatk framework:** 96 + - `hooks.ts` — Add helpers to `OnLoginCtx`, update `fireOnLoginHook(did, oauthConfig)` 97 + - `xrpc.ts` — Add helpers to `XrpcContext`, wire in `buildXrpcContext` 98 + - `oauth/server.ts` — Pass `config` to `fireOnLoginHook` 99 + 100 + **Templates:** 101 + - `server/on-login.ts` — Replace raw SQL with `ctx.createRecord` 102 + 103 + ## Future: typed records 104 + 105 + The helpers can later be typed with `RecordRegistry` so `createRecord('social.grain.actor.profile', ...)` validates the record shape at compile time, matching how `defineFeed` and `defineQuery` work. Not required for v1. 106 + 107 + ## Edge cases 108 + 109 + - **No PDS session:** Throws `ProxyError(401)`. Caught by hook/XRPC error handling. Login/request still succeeds. 110 + - **PDS write fails:** Error propagates to caller. In hooks, caught and logged. In XRPCs, returns error to client. 111 + - **Local index fails after PDS write:** Already handled in pds-proxy.ts — logged, record syncs back via firehose. 112 + - **No viewer (unauthenticated XRPC):** Helpers should not be called. Throws if no session exists.
+367
docs/plans/2026-03-20-hook-record-helpers-implementation.md
··· 1 + # Hook & XRPC Record Helpers Implementation Plan 2 + 3 + > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. 4 + 5 + **Goal:** Add `createRecord`, `putRecord`, `deleteRecord` helpers to `OnLoginCtx` and `XrpcContext` so server-side code can write records through the PDS with local indexing, without raw SQL. 6 + 7 + **Architecture:** The helpers wrap existing `pdsCreateRecord`/`pdsPutRecord`/`pdsDeleteRecord` from `pds-proxy.ts`. Both contexts need `oauthConfig` plumbed through — `OnLoginCtx` gets it via `fireOnLoginHook(did, config)` from `oauth/server.ts`, and `XrpcContext` gets it via a module-level setter called during boot. 8 + 9 + **Tech Stack:** TypeScript, AT Protocol PDS proxy, SQLite indexing 10 + 11 + --- 12 + 13 + ### Task 1: Add `oauthConfig` to `fireOnLoginHook` and wire record helpers into `OnLoginCtx` 14 + 15 + **Files:** 16 + - Modify: `packages/hatk/src/hooks.ts` 17 + - Modify: `packages/hatk/src/oauth/server.ts:540` 18 + 19 + **Step 1: Update `OnLoginCtx` type to include record helpers** 20 + 21 + In `packages/hatk/src/hooks.ts`, add the record helper types to `OnLoginCtx`: 22 + 23 + ```typescript 24 + import type { OAuthConfig } from './config.ts' 25 + import { pdsCreateRecord, pdsPutRecord, pdsDeleteRecord } from './pds-proxy.ts' 26 + 27 + export type OnLoginCtx = Omit<BaseContext, 'db'> & { 28 + did: string 29 + db: { 30 + query: (sql: string, params?: unknown[]) => Promise<unknown[]> 31 + run: (sql: string, params?: unknown[]) => Promise<void> 32 + } 33 + ensureRepo: (did: string) => Promise<void> 34 + createRecord: ( 35 + collection: string, 36 + record: Record<string, unknown>, 37 + opts?: { rkey?: string }, 38 + ) => Promise<{ uri?: string; cid?: string }> 39 + putRecord: ( 40 + collection: string, 41 + rkey: string, 42 + record: Record<string, unknown>, 43 + ) => Promise<{ uri?: string; cid?: string }> 44 + deleteRecord: ( 45 + collection: string, 46 + rkey: string, 47 + ) => Promise<void> 48 + } 49 + ``` 50 + 51 + **Step 2: Update `fireOnLoginHook` to accept `oauthConfig` and build helpers** 52 + 53 + Change the signature and body of `fireOnLoginHook`: 54 + 55 + ```typescript 56 + export async function fireOnLoginHook(did: string, oauthConfig?: OAuthConfig | null): Promise<void> { 57 + if (!onLoginHook) return 58 + try { 59 + const base = buildBaseContext({ did }) 60 + const viewer = { did } 61 + 62 + const hookPromise = onLoginHook({ 63 + ...base, 64 + did, 65 + db: { query: base.db.query, run: runSQL }, 66 + ensureRepo, 67 + createRecord: async (collection, record, opts) => { 68 + if (!oauthConfig) throw new Error('No OAuth config — cannot write to PDS') 69 + return pdsCreateRecord(oauthConfig, viewer, { collection, record, rkey: opts?.rkey }) 70 + }, 71 + putRecord: async (collection, rkey, record) => { 72 + if (!oauthConfig) throw new Error('No OAuth config — cannot write to PDS') 73 + return pdsPutRecord(oauthConfig, viewer, { collection, rkey, record }) 74 + }, 75 + deleteRecord: async (collection, rkey) => { 76 + if (!oauthConfig) throw new Error('No OAuth config — cannot write to PDS') 77 + await pdsDeleteRecord(oauthConfig, viewer, { collection, rkey }) 78 + }, 79 + }) 80 + const timeout = new Promise<void>((_, reject) => 81 + setTimeout(() => reject(new Error('on-login hook timed out after 30s')), 30_000) 82 + ) 83 + await Promise.race([hookPromise, timeout]) 84 + } catch (err: any) { 85 + emit('hooks', 'on_login_error', { did, error: err.message }) 86 + } 87 + } 88 + ``` 89 + 90 + **Step 3: Pass `config` in `oauth/server.ts`** 91 + 92 + In `packages/hatk/src/oauth/server.ts`, line 540, change: 93 + 94 + ```typescript 95 + // Before: 96 + await fireOnLoginHook(did) 97 + 98 + // After: 99 + await fireOnLoginHook(did, config) 100 + ``` 101 + 102 + **Step 4: Verify the build** 103 + 104 + Run: `cd /Users/chadmiller/code/hatk && npx tsc --noEmit -p packages/hatk/tsconfig.json` 105 + Expected: No type errors 106 + 107 + **Step 5: Commit** 108 + 109 + ```bash 110 + git add packages/hatk/src/hooks.ts packages/hatk/src/oauth/server.ts 111 + git commit -m "feat: add createRecord/putRecord/deleteRecord helpers to OnLoginCtx" 112 + ``` 113 + 114 + --- 115 + 116 + ### Task 2: Add record helpers to `XrpcContext` 117 + 118 + **Files:** 119 + - Modify: `packages/hatk/src/xrpc.ts` 120 + 121 + **Step 1: Add module-level `oauthConfig` setter and record helper types to `XrpcContext`** 122 + 123 + Add imports and a module-level config variable at the top of `xrpc.ts`: 124 + 125 + ```typescript 126 + import type { OAuthConfig } from './config.ts' 127 + import { pdsCreateRecord, pdsPutRecord, pdsDeleteRecord } from './pds-proxy.ts' 128 + 129 + let _oauthConfig: OAuthConfig | null = null 130 + 131 + export function configureOAuth(config: OAuthConfig | null) { 132 + _oauthConfig = config 133 + } 134 + ``` 135 + 136 + Add the helper fields to the `XrpcContext` interface: 137 + 138 + ```typescript 139 + export interface XrpcContext< 140 + P = Record<string, string>, 141 + Records extends Record<string, any> = Record<string, any>, 142 + I = unknown, 143 + > extends BaseContext { 144 + // ... existing fields ... 145 + createRecord: ( 146 + collection: string, 147 + record: Record<string, unknown>, 148 + opts?: { rkey?: string }, 149 + ) => Promise<{ uri?: string; cid?: string }> 150 + putRecord: ( 151 + collection: string, 152 + rkey: string, 153 + record: Record<string, unknown>, 154 + ) => Promise<{ uri?: string; cid?: string }> 155 + deleteRecord: ( 156 + collection: string, 157 + rkey: string, 158 + ) => Promise<void> 159 + } 160 + ``` 161 + 162 + **Step 2: Wire helpers into `buildXrpcContext`** 163 + 164 + Update the `buildXrpcContext` function to include the record helpers. The helpers use the `viewer` param already available in the function: 165 + 166 + ```typescript 167 + export function buildXrpcContext( 168 + params: Record<string, string>, 169 + cursor: string | undefined, 170 + limit: number, 171 + viewer: { did: string; handle?: string } | null, 172 + input?: unknown, 173 + ): XrpcContext { 174 + const base = buildBaseContext(viewer) 175 + return { 176 + ...base, 177 + db: { query: querySQL, run: runSQL }, 178 + params, 179 + input: input || {}, 180 + cursor, 181 + limit, 182 + packCursor, 183 + unpackCursor, 184 + isTakendown: isTakendownDid, 185 + filterTakendownDids, 186 + search: searchRecords, 187 + resolve: resolveRecords as any, 188 + exists: async (collection, filters) => { 189 + const conditions = Object.entries(filters).map(([field, value]) => ({ field, value })) 190 + const uri = await findUriByFields(collection, conditions) 191 + return uri !== null 192 + }, 193 + createRecord: async (collection, record, opts) => { 194 + if (!_oauthConfig) throw new Error('No OAuth config — cannot write to PDS') 195 + if (!viewer) throw new Error('Authentication required to write records') 196 + return pdsCreateRecord(_oauthConfig, viewer, { collection, record, rkey: opts?.rkey }) 197 + }, 198 + putRecord: async (collection, rkey, record) => { 199 + if (!_oauthConfig) throw new Error('No OAuth config — cannot write to PDS') 200 + if (!viewer) throw new Error('Authentication required to write records') 201 + return pdsPutRecord(_oauthConfig, viewer, { collection, rkey, record }) 202 + }, 203 + deleteRecord: async (collection, rkey) => { 204 + if (!_oauthConfig) throw new Error('No OAuth config — cannot write to PDS') 205 + if (!viewer) throw new Error('Authentication required to write records') 206 + await pdsDeleteRecord(_oauthConfig, viewer, { collection, rkey }) 207 + }, 208 + } 209 + } 210 + ``` 211 + 212 + **Step 3: Verify the build** 213 + 214 + Run: `cd /Users/chadmiller/code/hatk && npx tsc --noEmit -p packages/hatk/tsconfig.json` 215 + Expected: No type errors 216 + 217 + **Step 4: Commit** 218 + 219 + ```bash 220 + git add packages/hatk/src/xrpc.ts 221 + git commit -m "feat: add createRecord/putRecord/deleteRecord helpers to XrpcContext" 222 + ``` 223 + 224 + --- 225 + 226 + ### Task 3: Call `configureOAuth` during boot 227 + 228 + **Files:** 229 + - Modify: `packages/hatk/src/main.ts` 230 + - Modify: `packages/hatk/src/dev-entry.ts` 231 + 232 + **Step 1: Wire `configureOAuth` in `main.ts`** 233 + 234 + Import and call `configureOAuth` alongside the existing `registerCoreHandlers` call: 235 + 236 + ```typescript 237 + import { initXrpc, listXrpc, configureRelay, callXrpc, configureOAuth } from './xrpc.ts' 238 + 239 + // After line 127 (registerCoreHandlers): 240 + configureOAuth(config.oauth) 241 + ``` 242 + 243 + **Step 2: Wire `configureOAuth` in `dev-entry.ts`** 244 + 245 + ```typescript 246 + import { configureOAuth } from './xrpc.ts' 247 + 248 + // After line 76 (registerCoreHandlers): 249 + configureOAuth(config.oauth) 250 + ``` 251 + 252 + **Step 3: Verify the build** 253 + 254 + Run: `cd /Users/chadmiller/code/hatk && npx tsc --noEmit -p packages/hatk/tsconfig.json` 255 + Expected: No type errors 256 + 257 + **Step 4: Commit** 258 + 259 + ```bash 260 + git add packages/hatk/src/main.ts packages/hatk/src/dev-entry.ts 261 + git commit -m "feat: wire configureOAuth at boot for XRPC record helpers" 262 + ``` 263 + 264 + --- 265 + 266 + ### Task 4: Update grain template `on-login.ts` to use `ctx.createRecord` 267 + 268 + **Files:** 269 + - Modify: `/Users/chadmiller/code/hatk-template-grain/server/on-login.ts` 270 + 271 + **Step 1: Replace raw SQL with `ctx.createRecord`** 272 + 273 + ```typescript 274 + import { defineHook, type GrainActorProfile, type BskyActorProfile } from "$hatk"; 275 + 276 + export default defineHook("on-login", async (ctx) => { 277 + const { did, ensureRepo, lookup } = ctx; 278 + 279 + // Backfill the user's repo and wait for completion 280 + await ensureRepo(did); 281 + 282 + // Check if user already has a grain profile 283 + const grainProfiles = await lookup<GrainActorProfile>("social.grain.actor.profile", "did", [did]); 284 + if (grainProfiles.has(did)) return; 285 + 286 + // No grain profile — copy from bsky profile if available 287 + const bskyProfiles = await lookup<BskyActorProfile>("app.bsky.actor.profile", "did", [did]); 288 + const bsky = bskyProfiles.get(did); 289 + if (!bsky) return; 290 + 291 + const record: Record<string, unknown> = { 292 + createdAt: new Date().toISOString(), 293 + }; 294 + if (bsky.value.displayName) record.displayName = bsky.value.displayName; 295 + if (bsky.value.description) record.description = bsky.value.description; 296 + if (bsky.value.avatar) record.avatar = bsky.value.avatar; 297 + 298 + await ctx.createRecord("social.grain.actor.profile", record, { rkey: "self" }); 299 + }); 300 + ``` 301 + 302 + **Step 2: Verify the template builds** 303 + 304 + Run: `cd /Users/chadmiller/code/hatk-template-grain && npx tsc --noEmit` 305 + Expected: No type errors (after hatk types are regenerated) 306 + 307 + **Step 3: Commit** 308 + 309 + ```bash 310 + git add server/on-login.ts 311 + git commit -m "refactor: replace raw SQL with ctx.createRecord in on-login hook" 312 + ``` 313 + 314 + --- 315 + 316 + ### Task 5: Update documentation 317 + 318 + **Files:** 319 + - Modify: `packages/hatk/docs/site/guides/hooks.md` 320 + 321 + **Step 1: Add record helpers to the hook context table** 322 + 323 + Add three rows to the context table in `hooks.md`: 324 + 325 + ```markdown 326 + | `createRecord` | `(collection, record, opts?) => Promise<{uri?, cid?}>` | Write a record to the user's PDS and index locally | 327 + | `putRecord` | `(collection, rkey, record) => Promise<{uri?, cid?}>` | Create or update a record on the user's PDS | 328 + | `deleteRecord` | `(collection, rkey) => Promise<void>` | Delete a record from the user's PDS and local index | 329 + ``` 330 + 331 + **Step 2: Update the "Populating records on first login" example** 332 + 333 + Replace the raw SQL example with the `ctx.createRecord` version: 334 + 335 + ```typescript 336 + // server/on-login.ts 337 + import { defineHook, type BskyActorProfile, type MyAppProfile } from '$hatk' 338 + 339 + export default defineHook('on-login', async (ctx) => { 340 + const { did, ensureRepo, lookup } = ctx 341 + 342 + await ensureRepo(did) 343 + 344 + // Check if user already has an app profile 345 + const existing = await lookup<MyAppProfile>('my.app.profile', 'did', [did]) 346 + if (existing.has(did)) return 347 + 348 + // Copy from Bluesky profile 349 + const bsky = await lookup<BskyActorProfile>('app.bsky.actor.profile', 'did', [did]) 350 + const profile = bsky.get(did) 351 + if (!profile) return 352 + 353 + await ctx.createRecord('my.app.profile', { 354 + displayName: profile.value.displayName, 355 + description: profile.value.description, 356 + avatar: profile.value.avatar, 357 + createdAt: new Date().toISOString(), 358 + }, { rkey: 'self' }) 359 + }) 360 + ``` 361 + 362 + **Step 3: Commit** 363 + 364 + ```bash 365 + git add packages/hatk/docs/site/guides/hooks.md 366 + git commit -m "docs: update hooks guide with record helper examples" 367 + ```
+46 -5
docs/site/guides/hooks.md
··· 18 18 }) 19 19 ``` 20 20 21 - This is three lines, but it's important: without it, a new user's existing records won't appear until the firehose (the AT Protocol's real-time event stream) delivers them. `ensureRepo` fetches the user's repository from their PDS and indexes it right away. 21 + This is three lines, but it's important: without it, a new user's existing records won't appear until the firehose (the AT Protocol's real-time event stream) delivers them. `ensureRepo` fetches the user's repository from their PDS, indexes it, and **waits for the backfill to complete** before returning. 22 + 23 + ### Populating records on first login 24 + 25 + Since the hook has full database and record access, you can check for records and create them if needed. For example, copying a user's Bluesky profile to a custom profile collection on first login: 26 + 27 + ```typescript 28 + // server/on-login.ts 29 + import { defineHook, type BskyActorProfile, type MyAppProfile } from '$hatk' 30 + 31 + export default defineHook('on-login', async (ctx) => { 32 + const { did, ensureRepo, lookup } = ctx 33 + 34 + await ensureRepo(did) 35 + 36 + // Check if user already has an app profile 37 + const existing = await lookup<MyAppProfile>('my.app.profile', 'did', [did]) 38 + if (existing.has(did)) return 39 + 40 + // Copy from Bluesky profile 41 + const bsky = await lookup<BskyActorProfile>('app.bsky.actor.profile', 'did', [did]) 42 + const profile = bsky.get(did) 43 + if (!profile) return 44 + 45 + await ctx.createRecord('my.app.profile', { 46 + displayName: profile.value.displayName, 47 + description: profile.value.description, 48 + avatar: profile.value.avatar, 49 + createdAt: new Date().toISOString(), 50 + }, { rkey: 'self' }) 51 + }) 52 + ``` 22 53 23 54 ## Hook context 24 55 25 - The `on-login` handler receives: 56 + The `on-login` handler receives a context object with database access, record helpers, and the login event data: 26 57 27 58 | Field | Type | Description | 28 59 | --- | --- | --- | 29 - | `did` | string | The DID (decentralized identifier) of the user who logged in | 30 - | `ensureRepo` | `(did: string) => Promise<void>` | Marks the user's repo as pending and triggers a backfill from their PDS | 60 + | `did` | `string` | The DID of the user who logged in | 61 + | `ensureRepo` | `(did: string) => Promise<void>` | Backfills the user's repo from their PDS and waits for completion | 62 + | `db.query` | `(sql, params?) => Promise<unknown[]>` | Run a read query | 63 + | `db.run` | `(sql, params?) => Promise<void>` | Run a write query (INSERT, UPDATE, DELETE) | 64 + | `lookup` | `(collection, field, values) => Promise<Map>` | Look up records by field values | 65 + | `count` | `(collection, field, values) => Promise<Map>` | Count records by field values | 66 + | `getRecords` | `(collection, uris) => Promise<Map>` | Fetch records by URI | 67 + | `labels` | `(uris) => Promise<Map>` | Get labels for URIs | 68 + | `blobUrl` | `(did, ref, preset?) => string` | Generate a blob URL | 69 + | `createRecord` | `(collection, record, opts?) => Promise<{uri?, cid?}>` | Write a record to the user's PDS and index locally | 70 + | `putRecord` | `(collection, rkey, record) => Promise<{uri?, cid?}>` | Create or update a record on the user's PDS | 71 + | `deleteRecord` | `(collection, rkey) => Promise<void>` | Delete a record from the user's PDS and local index | 31 72 32 73 ## Error handling 33 74 34 - If a hook throws, the error is logged but does not block the login flow. The user still completes authentication successfully. 75 + If a hook throws, the error is logged but does not block the login flow. The user still completes authentication successfully. Hooks have a 30-second timeout — if the hook takes longer, it is cancelled and the login proceeds.
+3
docs/site/guides/xrpc-handlers.md
··· 102 102 | `unpackCursor` | function | Decode a cursor back into `{ primary, cid }` | 103 103 | `isTakendown` | function | Check if a DID has been taken down | 104 104 | `filterTakendownDids` | function | Filter a list of DIDs, returning those taken down | 105 + | `createRecord` | function | Write a record to the viewer's PDS and index locally | 106 + | `putRecord` | function | Create or update a record on the viewer's PDS | 107 + | `deleteRecord` | function | Delete a record from the viewer's PDS and local index | 105 108 106 109 ### `ctx.ok()` 107 110
+2 -1
packages/hatk/src/dev-entry.ts
··· 9 9 import { createAdapter } from './database/adapter-factory.ts' 10 10 import { getDialect } from './database/dialect.ts' 11 11 import { setSearchPort } from './database/fts.ts' 12 - import { configureRelay } from './xrpc.ts' 12 + import { configureRelay, configureOAuth } from './xrpc.ts' 13 13 import { initOAuth } from './oauth/server.ts' 14 14 import { initServer } from './server-init.ts' 15 15 import { createHandler, registerCoreHandlers } from './server.ts' ··· 74 74 75 75 // Register built-in dev.hatk.* handlers so callXrpc() can find them 76 76 registerCoreHandlers(collections, config.oauth) 77 + configureOAuth(config.oauth) 77 78 78 79 if (config.oauth) { 79 80 await initOAuth(config.oauth, config.plc, config.relay)
+58 -9
packages/hatk/src/hooks.ts
··· 21 21 */ 22 22 import { existsSync } from 'node:fs' 23 23 import { resolve } from 'node:path' 24 - import { log } from './logger.ts' 25 - import { setRepoStatus } from './database/db.ts' 26 - import { triggerAutoBackfill } from './indexer.ts' 24 + import type { OAuthConfig } from './config.ts' 25 + import { pdsCreateRecord, pdsPutRecord, pdsDeleteRecord } from './pds-proxy.ts' 26 + import { log, emit } from './logger.ts' 27 + import { setRepoStatus, runSQL } from './database/db.ts' 28 + import { triggerAutoBackfill, awaitBackfill } from './indexer.ts' 29 + import { buildBaseContext, type BaseContext } from './hydrate.ts' 27 30 28 31 /** Context passed to the on-login hook after a successful OAuth login. */ 29 - export type OnLoginCtx = { 32 + export type OnLoginCtx = Omit<BaseContext, 'db'> & { 30 33 /** DID of the user who just logged in. */ 31 34 did: string 32 - /** Trigger a backfill for a DID if it hasn't been indexed yet. */ 35 + /** Database access with both read and write. */ 36 + db: { 37 + query: (sql: string, params?: unknown[]) => Promise<unknown[]> 38 + run: (sql: string, params?: unknown[]) => Promise<void> 39 + } 40 + /** Trigger a backfill for a DID and wait for it to complete. */ 33 41 ensureRepo: (did: string) => Promise<void> 42 + /** Write a record to the user's PDS and index locally. */ 43 + createRecord: ( 44 + collection: string, 45 + record: Record<string, unknown>, 46 + opts?: { rkey?: string }, 47 + ) => Promise<{ uri?: string; cid?: string }> 48 + /** Create or update a record on the user's PDS and index locally. */ 49 + putRecord: ( 50 + collection: string, 51 + rkey: string, 52 + record: Record<string, unknown>, 53 + ) => Promise<{ uri?: string; cid?: string }> 54 + /** Delete a record from the user's PDS and local index. */ 55 + deleteRecord: ( 56 + collection: string, 57 + rkey: string, 58 + ) => Promise<void> 34 59 } 35 60 36 61 export function defineHook(event: 'on-login', handler: (ctx: OnLoginCtx) => Promise<void>) { ··· 55 80 log('[hooks] on-login hook loaded') 56 81 } 57 82 58 - /** Mark a DID as pending and trigger auto-backfill. */ 83 + /** Mark a DID as pending, trigger auto-backfill, and wait for completion. */ 59 84 async function ensureRepo(did: string): Promise<void> { 60 85 await setRepoStatus(did, 'pending') 61 86 triggerAutoBackfill(did) 87 + await awaitBackfill(did) 62 88 } 63 89 64 90 /** Register a hook from a scanned server/ module. */ ··· 70 96 } 71 97 72 98 /** Fire the on-login hook if loaded. Errors are logged but never block login. */ 73 - export async function fireOnLoginHook(did: string): Promise<void> { 99 + export async function fireOnLoginHook(did: string, oauthConfig: OAuthConfig | null): Promise<void> { 74 100 if (!onLoginHook) return 75 101 try { 76 - await onLoginHook({ did, ensureRepo }) 102 + const base = buildBaseContext({ did }) 103 + const viewer = { did } 104 + const hookPromise = onLoginHook({ 105 + ...base, 106 + did, 107 + db: { query: base.db.query, run: runSQL }, 108 + ensureRepo, 109 + createRecord: async (collection, record, opts) => { 110 + if (!oauthConfig) throw new Error('No OAuth config — cannot write to PDS') 111 + return pdsCreateRecord(oauthConfig, viewer, { collection, record, rkey: opts?.rkey }) 112 + }, 113 + putRecord: async (collection, rkey, record) => { 114 + if (!oauthConfig) throw new Error('No OAuth config — cannot write to PDS') 115 + return pdsPutRecord(oauthConfig, viewer, { collection, rkey, record }) 116 + }, 117 + deleteRecord: async (collection, rkey) => { 118 + if (!oauthConfig) throw new Error('No OAuth config — cannot write to PDS') 119 + await pdsDeleteRecord(oauthConfig, viewer, { collection, rkey }) 120 + }, 121 + }) 122 + const timeout = new Promise<void>((_, reject) => 123 + setTimeout(() => reject(new Error('on-login hook timed out after 30s')), 30_000) 124 + ) 125 + await Promise.race([hookPromise, timeout]) 77 126 } catch (err: any) { 78 - console.error('[hooks] onLogin hook error:', err.message) 127 + emit('hooks', 'on_login_error', { did, error: err.message }) 79 128 } 80 129 }
+19
packages/hatk/src/indexer.ts
··· 39 39 40 40 // Track in-flight backfills to avoid duplicates 41 41 const backfillInFlight = new Set<string>() 42 + const backfillPromises = new Map<string, { promise: Promise<void>; resolve: () => void }>() 42 43 const pendingReschedule = new Set<string>() 43 44 44 45 // In-memory cache of repo status to avoid flooding the DB read queue ··· 157 158 * `maxConcurrentBackfills`. Failed backfills retry with exponential delay up 158 159 * to `maxRetries`. 159 160 */ 161 + /** Wait for a DID's backfill to complete if one is in flight. */ 162 + export function awaitBackfill(did: string): Promise<void> { 163 + const entry = backfillPromises.get(did) 164 + return entry ? entry.promise : Promise.resolve() 165 + } 166 + 160 167 export async function triggerAutoBackfill(did: string, attempt = 0): Promise<void> { 161 168 if (backfillInFlight.has(did)) return 162 169 if (backfillInFlight.size >= maxConcurrentBackfills) { ··· 171 178 } 172 179 backfillInFlight.add(did) 173 180 pendingBuffers.set(did, []) 181 + if (!backfillPromises.has(did)) { 182 + let resolveBackfill!: () => void 183 + const promise = new Promise<void>((r) => { resolveBackfill = r }) 184 + backfillPromises.set(did, { promise, resolve: resolveBackfill }) 185 + } 174 186 if (attempt === 0) await setRepoStatus(did, 'pending') 175 187 const elapsed = timer() 176 188 ··· 213 225 error, 214 226 retry_count: currentRetryCount, 215 227 }) 228 + 229 + // Resolve awaiting callers (e.g. on-login hooks) 230 + const entry = backfillPromises.get(did) 231 + if (entry) { 232 + entry.resolve() 233 + backfillPromises.delete(did) 234 + } 216 235 217 236 if (status === 'error' && currentRetryCount < indexerMaxRetries) { 218 237 const delaySecs = Math.min(currentRetryCount * 60, 3600)
+2 -1
packages/hatk/src/main.ts
··· 11 11 import { getDialect } from './database/dialect.ts' 12 12 import { setSearchPort } from './database/fts.ts' 13 13 import { initFeeds, listFeeds } from './feeds.ts' 14 - import { initXrpc, listXrpc, configureRelay, callXrpc } from './xrpc.ts' 14 + import { initXrpc, listXrpc, configureRelay, configureOAuth, callXrpc } from './xrpc.ts' 15 15 import { initOpengraph } from './opengraph.ts' 16 16 import { initLabels, getLabelDefinitions } from './labels.ts' 17 17 import { startIndexer } from './indexer.ts' ··· 125 125 126 126 // Register built-in dev.hatk.* handlers so callXrpc() can find them 127 127 registerCoreHandlers(collections, config.oauth) 128 + configureOAuth(config.oauth) 128 129 129 130 // Write db/schema.sql (after setup, so setup-created tables are included) 130 131 try {
+1 -1
packages/hatk/src/oauth/server.ts
··· 537 537 tokenExpiresAt: tokenData.expires_in ? Math.floor(Date.now() / 1000) + tokenData.expires_in : undefined, 538 538 }) 539 539 540 - await fireOnLoginHook(did) 540 + await fireOnLoginHook(did, config) 541 541 542 542 // Generate authorization code for the client 543 543 const clientCode = randomToken()
+38
packages/hatk/src/xrpc.ts
··· 39 39 import type { BaseContext } from './hydrate.ts' 40 40 import { getLexicon } from './database/schema.ts' 41 41 import type { Row, FlatRow } from './lex-types.ts' 42 + import type { OAuthConfig } from './config.ts' 43 + import { pdsCreateRecord, pdsPutRecord, pdsDeleteRecord } from './pds-proxy.ts' 42 44 43 45 export type { Row, FlatRow } 46 + 47 + let _oauthConfig: OAuthConfig | null = null 48 + 49 + /** Set the OAuth config used for record write helpers. Called once during boot. */ 50 + export function configureOAuth(config: OAuthConfig | null) { 51 + _oauthConfig = config 52 + } 44 53 45 54 /** Thrown from XRPC handlers to return a 400 response with an error message. */ 46 55 export class InvalidRequestError extends Error { ··· 92 101 ) => Promise<{ records: Row<Records[K]>[]; cursor?: string }> 93 102 resolve: <R = unknown>(uris: string[]) => Promise<Row<R>[]> 94 103 exists: (collection: string, filters: Record<string, string>) => Promise<boolean> 104 + createRecord: ( 105 + collection: string, 106 + record: Record<string, unknown>, 107 + opts?: { rkey?: string }, 108 + ) => Promise<{ uri?: string; cid?: string }> 109 + putRecord: ( 110 + collection: string, 111 + rkey: string, 112 + record: Record<string, unknown>, 113 + ) => Promise<{ uri?: string; cid?: string }> 114 + deleteRecord: ( 115 + collection: string, 116 + rkey: string, 117 + ) => Promise<void> 95 118 } 96 119 97 120 /** Internal representation of a loaded XRPC handler module. */ ··· 157 180 const conditions = Object.entries(filters).map(([field, value]) => ({ field, value })) 158 181 const uri = await findUriByFields(collection, conditions) 159 182 return uri !== null 183 + }, 184 + createRecord: async (collection, record, opts) => { 185 + if (!_oauthConfig) throw new Error('No OAuth config — cannot write to PDS') 186 + if (!viewer) throw new Error('Authentication required to write records') 187 + return pdsCreateRecord(_oauthConfig, viewer, { collection, record, rkey: opts?.rkey }) 188 + }, 189 + putRecord: async (collection, rkey, record) => { 190 + if (!_oauthConfig) throw new Error('No OAuth config — cannot write to PDS') 191 + if (!viewer) throw new Error('Authentication required to write records') 192 + return pdsPutRecord(_oauthConfig, viewer, { collection, rkey, record }) 193 + }, 194 + deleteRecord: async (collection, rkey) => { 195 + if (!_oauthConfig) throw new Error('No OAuth config — cannot write to PDS') 196 + if (!viewer) throw new Error('Authentication required to write records') 197 + await pdsDeleteRecord(_oauthConfig, viewer, { collection, rkey }) 160 198 }, 161 199 } 162 200 }