AppView in a box as a Vite plugin thing hatk.dev
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: add on-commit hooks and push notification delivery

- Add on-commit hook primitive with collection filtering
- Add APNs HTTP/2 push delivery module with JWT auth and token self-cleaning
- Add _push_tokens table and registerToken/unregisterToken XRPC endpoints
- Add PushConfig to defineConfig
- Wire fireOnCommitHooks into indexer flushBuffer (creates) and processMessage (deletes)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+401 -12
+1 -1
packages/hatk/package.json
··· 1 1 { 2 2 "name": "@hatk/hatk", 3 - "version": "0.0.1-alpha.50", 3 + "version": "0.0.1-alpha.51", 4 4 "license": "MIT", 5 5 "bin": { 6 6 "hatk": "dist/cli.js"
+16 -1
packages/hatk/src/config.ts
··· 38 38 maxRetries: number // max retry attempts for failed repos (default 5) 39 39 } 40 40 41 + export interface ApnsPushConfig { 42 + keyFile: string 43 + keyId: string 44 + teamId: string 45 + bundleId: string 46 + production?: boolean // defaults to true; set false for sandbox 47 + } 48 + 49 + export interface PushConfig { 50 + apns: ApnsPushConfig 51 + } 52 + 41 53 export interface HatkConfig { 42 54 relay: string 43 55 plc: string // PLC directory URL for DID resolution ··· 49 61 backfill: BackfillConfig 50 62 ftsRebuildInterval: number // rebuild FTS index every N writes (lower = fresher search) 51 63 oauth: OAuthConfig | null 64 + push: PushConfig | null // push notification delivery (null to disable) 52 65 admins: string[] // DIDs allowed to access /admin/* endpoints 53 66 } 54 67 55 68 /** Input type for defineConfig — fields that have defaults are optional. */ 56 - export type HatkConfigInput = Partial<Omit<HatkConfig, 'oauth' | 'backfill'>> & { 69 + export type HatkConfigInput = Partial<Omit<HatkConfig, 'oauth' | 'backfill' | 'push'>> & { 57 70 oauth?: (Partial<OAuthConfig> & { clients: OAuthClientConfig[] }) | null 58 71 backfill?: Partial<BackfillConfig> 72 + push?: PushConfig | null 59 73 } 60 74 61 75 /** Identity function that provides type inference for hatk config files. */ ··· 111 125 }, 112 126 ftsRebuildInterval: parseInt(env.FTS_REBUILD_INTERVAL || '') || parsed.ftsRebuildInterval || 5000, 113 127 oauth: null, 128 + push: parsed.push || null, 114 129 admins: env.ADMINS ? env.ADMINS.split(',').map((s) => s.trim()) : parsed.admins || [], 115 130 } 116 131
+9
packages/hatk/src/database/db.ts
··· 154 154 await run(`CREATE INDEX IF NOT EXISTS idx_reports_status ON _reports(status)`) 155 155 await run(`CREATE INDEX IF NOT EXISTS idx_reports_subject_uri ON _reports(subject_uri)`) 156 156 157 + // Push notification tokens 158 + await run(`CREATE TABLE IF NOT EXISTS _push_tokens ( 159 + did TEXT NOT NULL, 160 + token TEXT NOT NULL, 161 + platform TEXT NOT NULL, 162 + created_at TEXT NOT NULL, 163 + PRIMARY KEY (did, token) 164 + )`) 165 + 157 166 // OAuth tables 158 167 await port.executeMultiple(OAUTH_DDL) 159 168
+112 -9
packages/hatk/src/hooks.ts
··· 4 4 * Place hook modules in the `hooks/` directory. Currently supported hooks: 5 5 * 6 6 * - `on-login.ts` — called after each successful OAuth login 7 + * - `on-commit-*.ts` — called after records are indexed from the firehose 7 8 * 8 - * Each hook default-exports an async function that receives an event-specific 9 - * context object. 9 + * Each hook default-exports the result of `defineHook()`. 10 10 * 11 11 * @example 12 12 * ```ts 13 13 * // hooks/on-login.ts 14 - * import type { OnLoginCtx } from '@hatk/hatk/hooks' 14 + * import { defineHook } from '$hatk' 15 15 * 16 - * export default async function (ctx: OnLoginCtx) { 17 - * // Ensure the user's repo is backfilled on first login 16 + * export default defineHook("on-login", async (ctx) => { 18 17 * await ctx.ensureRepo(ctx.did) 19 - * } 18 + * }) 19 + * ``` 20 + * 21 + * @example 22 + * ```ts 23 + * // hooks/on-commit-favorite.ts 24 + * import { defineHook } from '$hatk' 25 + * 26 + * export default defineHook("on-commit", { collections: ["social.grain.favorite"] }, 27 + * async ({ action, collection, record, repo, uri, db, lookup, push }) => { 28 + * if (action !== "create") return 29 + * // send push notification, etc. 30 + * } 31 + * ) 20 32 * ``` 21 33 */ 22 34 import { existsSync } from 'node:fs' ··· 27 39 import { setRepoStatus, runSQL } from './database/db.ts' 28 40 import { triggerAutoBackfill, awaitBackfill } from './indexer.ts' 29 41 import { buildBaseContext, type BaseContext } from './hydrate.ts' 42 + import { buildPushInterface, isPushEnabled, type PushInterface } from './push.ts' 30 43 31 44 /** Context passed to the on-login hook after a successful OAuth login. */ 32 45 export type OnLoginCtx = Omit<BaseContext, 'db'> & { ··· 58 71 ) => Promise<void> 59 72 } 60 73 61 - export function defineHook(event: 'on-login', handler: (ctx: OnLoginCtx) => Promise<void>) { 62 - return { __type: 'hook' as const, event, handler } 74 + /** Context passed to on-commit hooks after a record is indexed. */ 75 + export type OnCommitCtx = { 76 + /** Whether the record was created or deleted. */ 77 + action: 'create' | 'delete' 78 + /** The collection NSID that matched. */ 79 + collection: string 80 + /** The record value (null for deletes). */ 81 + record: Record<string, any> | null 82 + /** DID of the committing actor. */ 83 + repo: string 84 + /** AT URI of the record. */ 85 + uri: string 86 + /** Database access (read and write). */ 87 + db: { 88 + query: (sql: string, params?: unknown[]) => Promise<unknown[]> 89 + run: (sql: string, params?: unknown[]) => Promise<void> 90 + } 91 + /** Typed record lookup (same as BaseContext). */ 92 + lookup: BaseContext['lookup'] 93 + /** Push notification delivery. */ 94 + push: PushInterface 95 + } 96 + 97 + interface OnCommitHookEntry { 98 + collections: Set<string> 99 + handler: (ctx: OnCommitCtx) => Promise<void> 100 + } 101 + 102 + // Overloaded defineHook for both event types 103 + export function defineHook( 104 + event: 'on-login', 105 + handler: (ctx: OnLoginCtx) => Promise<void>, 106 + ): { __type: 'hook'; event: 'on-login'; handler: (ctx: OnLoginCtx) => Promise<void> } 107 + export function defineHook( 108 + event: 'on-commit', 109 + options: { collections: string[] }, 110 + handler: (ctx: OnCommitCtx) => Promise<void>, 111 + ): { __type: 'hook'; event: 'on-commit'; collections: string[]; handler: (ctx: OnCommitCtx) => Promise<void> } 112 + export function defineHook(event: string, ...args: any[]): any { 113 + if (event === 'on-login') { 114 + return { __type: 'hook' as const, event, handler: args[0] } 115 + } 116 + if (event === 'on-commit') { 117 + const options = args[0] as { collections: string[] } 118 + const handler = args[1] as (ctx: OnCommitCtx) => Promise<void> 119 + return { __type: 'hook' as const, event, collections: options.collections, handler } 120 + } 121 + throw new Error(`Unknown hook event: ${event}`) 63 122 } 64 123 65 124 type OnLoginHook = (ctx: OnLoginCtx) => Promise<void> 66 125 67 126 let onLoginHook: OnLoginHook | null = null 127 + const onCommitHooks: OnCommitHookEntry[] = [] 68 128 69 129 /** 70 130 * Discover and load the on-login hook from the project's `hooks/` directory. ··· 88 148 } 89 149 90 150 /** Register a hook from a scanned server/ module. */ 91 - export function registerHook(event: string, handler: Function): void { 151 + export function registerHook(event: string, handler: Function, options?: any): void { 92 152 if (event === 'on-login') { 93 153 onLoginHook = handler as OnLoginHook 94 154 log('[hooks] on-login hook registered') 155 + } else if (event === 'on-commit') { 156 + const collections = new Set<string>(options?.collections || []) 157 + onCommitHooks.push({ collections, handler: handler as (ctx: OnCommitCtx) => Promise<void> }) 158 + log(`[hooks] on-commit hook registered (collections: ${[...collections].join(', ')})`) 95 159 } 96 160 } 97 161 ··· 127 191 emit('hooks', 'on_login_error', { did, error: err.message }) 128 192 } 129 193 } 194 + 195 + /** 196 + * Fire on-commit hooks for a batch of indexed records. 197 + * Runs async and non-blocking — errors are logged but never throw. 198 + */ 199 + export function fireOnCommitHooks(items: Array<{ 200 + action: 'create' | 'delete' 201 + collection: string 202 + uri: string 203 + authorDid: string 204 + record: Record<string, any> | null 205 + }>): void { 206 + if (onCommitHooks.length === 0) return 207 + 208 + const base = buildBaseContext(null) 209 + const push = isPushEnabled() ? buildPushInterface() : { send: async () => {} } 210 + 211 + for (const item of items) { 212 + for (const hook of onCommitHooks) { 213 + if (hook.collections.size > 0 && !hook.collections.has(item.collection)) continue 214 + hook.handler({ 215 + action: item.action, 216 + collection: item.collection, 217 + record: item.record, 218 + repo: item.authorDid, 219 + uri: item.uri, 220 + db: { query: base.db.query, run: runSQL }, 221 + lookup: base.lookup, 222 + push, 223 + }).catch((err: any) => { 224 + emit('hooks', 'on_commit_error', { 225 + collection: item.collection, 226 + uri: item.uri, 227 + error: err.message, 228 + }) 229 + }) 230 + } 231 + } 232 + }
+17
packages/hatk/src/indexer.ts
··· 14 14 import { rebuildAllIndexes } from './database/fts.ts' 15 15 import { log, emit, timer } from './logger.ts' 16 16 import { runLabelRules } from './labels.ts' 17 + import { fireOnCommitHooks } from './hooks.ts' 17 18 import { getLexiconArray } from './database/schema.ts' 18 19 import { validateRecord } from '@bigmoves/lexicon' 19 20 ··· 95 96 value: item.record, 96 97 }).catch(() => {}) 97 98 } 99 + 100 + // Fire on-commit hooks for inserted records (async, non-blocking) 101 + fireOnCommitHooks(inserted.map((item) => ({ 102 + action: 'create' as const, 103 + collection: item.collection, 104 + uri: item.uri, 105 + authorDid: item.authorDid, 106 + record: item.record, 107 + }))) 98 108 99 109 // Aggregate collection counts and unique DIDs for wide event 100 110 const collections: Record<string, number> = {} ··· 413 423 414 424 if (op.action === 'delete') { 415 425 deleteRecord(collection, uri) 426 + fireOnCommitHooks([{ 427 + action: 'delete', 428 + collection, 429 + uri, 430 + authorDid: did, 431 + record: null, 432 + }]) 416 433 continue 417 434 } 418 435
+10
packages/hatk/src/main.ts
··· 24 24 import { initOAuth } from './oauth/server.ts' 25 25 import { parseSessionCookie, getSessionCookieName } from './oauth/session.ts' 26 26 import { loadOnLoginHook } from './hooks.ts' 27 + import { initPush, isPushEnabled } from './push.ts' 27 28 import { initSetup } from './setup.ts' 28 29 import { initServer } from './server-init.ts' 29 30 ··· 149 150 if (config.oauth) { 150 151 await initOAuth(config.oauth, config.plc, config.relay) 151 152 log(`[main] OAuth initialized (issuer: ${config.oauth.issuer})`) 153 + } 154 + 155 + if (config.push) { 156 + initPush(config.push, configDir) 157 + if (isPushEnabled()) { 158 + log(`[main] Push initialized (APNs bundle: ${config.push.apns.bundleId})`) 159 + } else { 160 + log(`[main] Push configured but key file missing — push disabled`) 161 + } 152 162 } 153 163 154 164
+215
packages/hatk/src/push.ts
··· 1 + /** 2 + * Push notification delivery via APNs HTTP/2. 3 + * 4 + * Provides `push.send()` for use in on-commit hook context. Looks up device 5 + * tokens, builds APNs payloads, and sends via HTTP/2. Self-cleans invalid 6 + * tokens on Apple 410 responses. Fire-and-forget — failures are logged via 7 + * `emit()` but never throw. 8 + */ 9 + import { connect, type ClientHttp2Session } from 'node:http2' 10 + import { readFileSync } from 'node:fs' 11 + import { createSign } from 'node:crypto' 12 + import { resolve } from 'node:path' 13 + import { emit } from './logger.ts' 14 + import { runSQL, querySQL } from './database/db.ts' 15 + 16 + export interface ApnsConfig { 17 + keyFile: string 18 + keyId: string 19 + teamId: string 20 + bundleId: string 21 + production?: boolean 22 + } 23 + 24 + export interface PushConfig { 25 + apns: ApnsConfig 26 + } 27 + 28 + export interface PushPayload { 29 + did: string 30 + title: string 31 + body: string 32 + data?: Record<string, string> 33 + collapseId?: string 34 + } 35 + 36 + export interface PushInterface { 37 + send: (payload: PushPayload) => Promise<void> 38 + } 39 + 40 + let pushConfig: PushConfig | null = null 41 + let apnsKey: string | null = null 42 + let cachedJwt: { token: string; expires: number } | null = null 43 + let http2Session: ClientHttp2Session | null = null 44 + 45 + /** Initialize push with config. Must be called before send(). */ 46 + export function initPush(config: PushConfig, configDir: string): void { 47 + pushConfig = config 48 + const keyPath = resolve(configDir, config.apns.keyFile) 49 + try { 50 + apnsKey = readFileSync(keyPath, 'utf8') 51 + } catch { 52 + emit('push', 'init_error', { error: `APNs key file not found: ${keyPath}` }) 53 + pushConfig = null 54 + } 55 + } 56 + 57 + /** Check if push is configured and available. */ 58 + export function isPushEnabled(): boolean { 59 + return pushConfig !== null && apnsKey !== null 60 + } 61 + 62 + /** Build the push interface injected into hook contexts. */ 63 + export function buildPushInterface(): PushInterface { 64 + return { send } 65 + } 66 + 67 + /** Create a JWT for APNs authentication (cached for 50 minutes). */ 68 + function getApnsJwt(): string { 69 + if (cachedJwt && Date.now() < cachedJwt.expires) return cachedJwt.token 70 + if (!pushConfig || !apnsKey) throw new Error('Push not initialized') 71 + 72 + const header = Buffer.from(JSON.stringify({ 73 + alg: 'ES256', 74 + kid: pushConfig.apns.keyId, 75 + })).toString('base64url') 76 + 77 + const now = Math.floor(Date.now() / 1000) 78 + const claims = Buffer.from(JSON.stringify({ 79 + iss: pushConfig.apns.teamId, 80 + iat: now, 81 + })).toString('base64url') 82 + 83 + const signer = createSign('SHA256') 84 + signer.update(`${header}.${claims}`) 85 + const signature = signer.sign(apnsKey, 'base64url') 86 + 87 + const token = `${header}.${claims}.${signature}` 88 + cachedJwt = { token, expires: Date.now() + 50 * 60 * 1000 } 89 + return token 90 + } 91 + 92 + /** Get or create an HTTP/2 connection to APNs. */ 93 + function getHttp2Session(): ClientHttp2Session { 94 + if (http2Session && !http2Session.closed && !http2Session.destroyed) { 95 + return http2Session 96 + } 97 + const host = pushConfig?.apns.production !== false 98 + ? 'https://api.push.apple.com' 99 + : 'https://api.sandbox.push.apple.com' 100 + http2Session = connect(host) 101 + http2Session.on('error', () => { 102 + http2Session = null 103 + }) 104 + http2Session.on('close', () => { 105 + http2Session = null 106 + }) 107 + return http2Session 108 + } 109 + 110 + /** Send a push notification to all devices registered for a DID. */ 111 + async function send(payload: PushPayload): Promise<void> { 112 + if (!pushConfig || !apnsKey) return 113 + 114 + const tokens = await querySQL( 115 + `SELECT token, platform FROM _push_tokens WHERE did = $1`, 116 + [payload.did], 117 + ) as { token: string; platform: string }[] 118 + 119 + if (tokens.length === 0) return 120 + 121 + const jwt = getApnsJwt() 122 + const apnsPayload = JSON.stringify({ 123 + aps: { 124 + alert: { title: payload.title, body: payload.body }, 125 + sound: 'default', 126 + }, 127 + ...(payload.data || {}), 128 + }) 129 + 130 + for (const { token, platform } of tokens) { 131 + if (platform !== 'apns') continue 132 + sendToApns(token, apnsPayload, jwt, payload).catch(() => {}) 133 + } 134 + } 135 + 136 + /** Send a single APNs push and handle the response. */ 137 + async function sendToApns( 138 + token: string, 139 + payload: string, 140 + jwt: string, 141 + original: PushPayload, 142 + ): Promise<void> { 143 + const session = getHttp2Session() 144 + const headers: Record<string, string> = { 145 + ':method': 'POST', 146 + ':path': `/3/device/${token}`, 147 + 'authorization': `bearer ${jwt}`, 148 + 'apns-topic': pushConfig!.apns.bundleId, 149 + 'apns-push-type': 'alert', 150 + } 151 + if (original.collapseId) { 152 + headers['apns-collapse-id'] = original.collapseId 153 + } 154 + 155 + return new Promise<void>((resolve) => { 156 + const req = session.request(headers) 157 + req.setTimeout(15_000, () => { 158 + req.close() 159 + emit('push', 'send_error', { did: original.did, error: 'APNs request timed out' }) 160 + resolve() 161 + }) 162 + let status = 0 163 + let body = '' 164 + 165 + req.on('response', (headers) => { 166 + status = headers[':status'] as number 167 + }) 168 + req.on('data', (chunk: Buffer) => { 169 + body += chunk.toString() 170 + }) 171 + req.on('end', async () => { 172 + if (status === 200) { 173 + emit('push', 'sent', { did: original.did, token: token.slice(0, 8) + '...' }) 174 + } else if (status === 410) { 175 + // Token is no longer valid — remove it 176 + await removeToken(token).catch(() => {}) 177 + emit('push', 'token_removed', { did: original.did, reason: 'expired' }) 178 + } else { 179 + emit('push', 'send_error', { 180 + did: original.did, 181 + status, 182 + body: body.slice(0, 200), 183 + }) 184 + } 185 + resolve() 186 + }) 187 + req.on('error', (err: Error) => { 188 + emit('push', 'send_error', { did: original.did, error: err.message }) 189 + resolve() 190 + }) 191 + 192 + req.write(payload) 193 + req.end() 194 + }) 195 + } 196 + 197 + /** Register a push token for a DID. Upserts on conflict. */ 198 + export async function registerToken(did: string, token: string, platform: string): Promise<void> { 199 + await runSQL( 200 + `INSERT INTO _push_tokens (did, token, platform, created_at) 201 + VALUES ($1, $2, $3, $4) 202 + ON CONFLICT (did, token) DO UPDATE SET platform = excluded.platform`, 203 + [did, token, platform, new Date().toISOString()], 204 + ) 205 + } 206 + 207 + /** Remove a push token. */ 208 + export async function removeToken(token: string): Promise<void> { 209 + await runSQL(`DELETE FROM _push_tokens WHERE token = $1`, [token]) 210 + } 211 + 212 + /** Unregister a specific token for a DID. */ 213 + export async function unregisterToken(did: string, token: string): Promise<void> { 214 + await runSQL(`DELETE FROM _push_tokens WHERE did = $1 AND token = $2`, [did, token]) 215 + }
+1 -1
packages/hatk/src/server-init.ts
··· 44 44 45 45 // 4. Register hooks 46 46 for (const entry of scanned.hooks) { 47 - registerHook(entry.mod.event, entry.mod.handler) 47 + registerHook(entry.mod.event, entry.mod.handler, entry.mod) 48 48 } 49 49 50 50 // 5. Register labels (clear first for hot-reload)
+20
packages/hatk/src/server.ts
··· 205 205 return pdsUploadBlob(oauth, viewer, input as any, 'application/octet-stream') 206 206 }) 207 207 208 + registerCoreXrpcHandler('dev.hatk.push.registerToken', async (_params, _cursor, _limit, viewer, input) => { 209 + if (!viewer) throw new InvalidRequestError('Authentication required') 210 + const body = input as { token?: string; platform?: string } 211 + if (!body.token || typeof body.token !== 'string') throw new InvalidRequestError('Missing or invalid token') 212 + const platform = body.platform || 'apns' 213 + if (!['apns', 'fcm', 'web'].includes(platform)) throw new InvalidRequestError('Invalid platform') 214 + const { registerToken } = await import('./push.ts') 215 + await registerToken(viewer.did, body.token, platform) 216 + return { success: true } 217 + }) 218 + 219 + registerCoreXrpcHandler('dev.hatk.push.unregisterToken', async (_params, _cursor, _limit, viewer, input) => { 220 + if (!viewer) throw new InvalidRequestError('Authentication required') 221 + const body = input as { token?: string } 222 + if (!body.token || typeof body.token !== 'string') throw new InvalidRequestError('Missing or invalid token') 223 + const { unregisterToken } = await import('./push.ts') 224 + await unregisterToken(viewer.did, body.token) 225 + return { success: true } 226 + }) 227 + 208 228 registerCoreXrpcHandler('dev.hatk.createReport', async (_params, _cursor, _limit, viewer, input) => { 209 229 if (!viewer) throw new InvalidRequestError('Authentication required') 210 230 const body = input as { subject?: any; label?: string; reason?: string }