AppView in a box as a Vite plugin thing hatk.dev
4
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: add applyWrites support for batched PDS writes

Adds pdsApplyWrites to proxy batched create/update/delete operations
through com.atproto.repo.applyWrites with local indexing and label rules.
Exposes it as dev.hatk.applyWrites XRPC handler and on XrpcContext.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+201 -2
+1 -1
packages/hatk/package.json
··· 1 1 { 2 2 "name": "@hatk/hatk", 3 - "version": "0.0.1-alpha.55", 3 + "version": "0.0.1-alpha.56", 4 4 "license": "MIT", 5 5 "bin": { 6 6 "hatk": "dist/cli.js"
+87
packages/hatk/src/lexicons/dev/hatk/applyWrites.json
··· 1 + { 2 + "lexicon": 1, 3 + "id": "dev.hatk.applyWrites", 4 + "defs": { 5 + "main": { 6 + "type": "procedure", 7 + "description": "Apply multiple record writes in a single atomic PDS transaction.", 8 + "input": { 9 + "encoding": "application/json", 10 + "schema": { 11 + "type": "object", 12 + "required": ["writes"], 13 + "properties": { 14 + "writes": { 15 + "type": "array", 16 + "items": { 17 + "type": "union", 18 + "refs": ["#create", "#update", "#delete"] 19 + } 20 + } 21 + } 22 + } 23 + }, 24 + "output": { 25 + "encoding": "application/json", 26 + "schema": { 27 + "type": "object", 28 + "properties": { 29 + "results": { 30 + "type": "array", 31 + "items": { 32 + "type": "union", 33 + "refs": ["#createResult", "#updateResult", "#deleteResult"] 34 + } 35 + } 36 + } 37 + } 38 + } 39 + }, 40 + "create": { 41 + "type": "object", 42 + "required": ["collection", "value"], 43 + "properties": { 44 + "collection": { "type": "string" }, 45 + "rkey": { "type": "string" }, 46 + "value": { "type": "unknown" } 47 + } 48 + }, 49 + "update": { 50 + "type": "object", 51 + "required": ["collection", "rkey", "value"], 52 + "properties": { 53 + "collection": { "type": "string" }, 54 + "rkey": { "type": "string" }, 55 + "value": { "type": "unknown" } 56 + } 57 + }, 58 + "delete": { 59 + "type": "object", 60 + "required": ["collection", "rkey"], 61 + "properties": { 62 + "collection": { "type": "string" }, 63 + "rkey": { "type": "string" } 64 + } 65 + }, 66 + "createResult": { 67 + "type": "object", 68 + "required": ["uri", "cid"], 69 + "properties": { 70 + "uri": { "type": "string", "format": "at-uri" }, 71 + "cid": { "type": "string", "format": "cid" } 72 + } 73 + }, 74 + "updateResult": { 75 + "type": "object", 76 + "required": ["uri", "cid"], 77 + "properties": { 78 + "uri": { "type": "string", "format": "at-uri" }, 79 + "cid": { "type": "string", "format": "cid" } 80 + } 81 + }, 82 + "deleteResult": { 83 + "type": "object", 84 + "properties": {} 85 + } 86 + } 87 + }
+93
packages/hatk/src/pds-proxy.ts
··· 262 262 return pdsRes.body as { uri?: string; cid?: string } 263 263 } 264 264 265 + export interface ApplyWritesOp { 266 + $type: string 267 + collection: string 268 + rkey?: string 269 + value?: Record<string, unknown> 270 + } 271 + 272 + export interface ApplyWritesResult { 273 + $type: string 274 + uri?: string 275 + cid?: string 276 + } 277 + 278 + /** Map dev.hatk.applyWrites#* types to com.atproto.repo.applyWrites#* for PDS. */ 279 + function toPdsWriteType($type: string): string { 280 + return $type.replace('dev.hatk.applyWrites#', 'com.atproto.repo.applyWrites#') 281 + } 282 + 283 + function isCreateOrUpdate($type: string): boolean { 284 + const mapped = toPdsWriteType($type) 285 + return mapped === 'com.atproto.repo.applyWrites#create' || mapped === 'com.atproto.repo.applyWrites#update' 286 + } 287 + 288 + export async function pdsApplyWrites( 289 + oauthConfig: OAuthConfig, 290 + viewer: { did: string }, 291 + input: { writes: ApplyWritesOp[] }, 292 + ): Promise<{ results?: ApplyWritesResult[] }> { 293 + // Validate all create/update records before sending 294 + for (const write of input.writes) { 295 + if (isCreateOrUpdate(write.$type) && write.value) { 296 + const validationError = validateRecord(getLexiconArray(), write.collection, write.value) 297 + if (validationError) { 298 + throw new ProxyError( 299 + 400, 300 + `InvalidRecord: ${validationError.path ? validationError.path + ': ' : ''}${validationError.message}`, 301 + ) 302 + } 303 + } 304 + } 305 + 306 + const session = await getSession(viewer.did) 307 + if (!session) throw new ProxyError(401, 'No PDS session for user') 308 + 309 + const pdsUrl = `${session.pds_endpoint}/xrpc/com.atproto.repo.applyWrites` 310 + const pdsBody = { 311 + repo: viewer.did, 312 + writes: input.writes.map((w) => ({ ...w, $type: toPdsWriteType(w.$type) })), 313 + } 314 + 315 + const pdsRes = await proxyToPds(oauthConfig, session, 'POST', pdsUrl, pdsBody) 316 + if (!pdsRes.ok) throw new ProxyError(pdsRes.status, String(pdsRes.body.error || 'PDS applyWrites failed')) 317 + 318 + // Index results locally 319 + const results = (pdsRes.body.results as ApplyWritesResult[]) ?? [] 320 + for (let i = 0; i < input.writes.length; i++) { 321 + const write = input.writes[i] 322 + const result = results[i] 323 + try { 324 + const mapped = toPdsWriteType(write.$type) 325 + if (mapped === 'com.atproto.repo.applyWrites#create' && result?.uri && result?.cid && write.value) { 326 + await insertRecord(write.collection, result.uri, result.cid, viewer.did, write.value) 327 + await runLabelRules({ 328 + uri: result.uri, 329 + cid: result.cid, 330 + did: viewer.did, 331 + collection: write.collection, 332 + value: write.value, 333 + }) 334 + } else if (mapped === 'com.atproto.repo.applyWrites#update' && result?.uri && result?.cid && write.value) { 335 + await insertRecord(write.collection, result.uri, result.cid, viewer.did, write.value) 336 + await runLabelRules({ 337 + uri: result.uri, 338 + cid: result.cid, 339 + did: viewer.did, 340 + collection: write.collection, 341 + value: write.value, 342 + }) 343 + } else if (mapped === 'com.atproto.repo.applyWrites#delete' && write.rkey) { 344 + const uri = `at://${viewer.did}/${write.collection}/${write.rkey}` 345 + await dbDeleteRecord(write.collection, uri) 346 + } 347 + } catch (err: unknown) { 348 + emit('pds-proxy', 'local_index_error', { 349 + op: 'applyWrites', 350 + error: err instanceof Error ? err.message : String(err), 351 + }) 352 + } 353 + } 354 + 355 + return pdsRes.body as { results?: ApplyWritesResult[] } 356 + } 357 + 265 358 export async function pdsUploadBlob( 266 359 oauthConfig: OAuthConfig, 267 360 viewer: { did: string },
+6
packages/hatk/src/server.ts
··· 60 60 pdsCreateRecord, 61 61 pdsDeleteRecord, 62 62 pdsPutRecord, 63 + pdsApplyWrites, 63 64 pdsUploadBlob, 64 65 ProxyError, 65 66 ScopeMissingProxyError, ··· 203 204 registerCoreXrpcHandler('dev.hatk.uploadBlob', async (_params, _cursor, _limit, viewer, input) => { 204 205 if (!viewer) throw new InvalidRequestError('Authentication required') 205 206 return pdsUploadBlob(oauth, viewer, input as any, 'application/octet-stream') 207 + }) 208 + 209 + registerCoreXrpcHandler('dev.hatk.applyWrites', async (_params, _cursor, _limit, viewer, input) => { 210 + if (!viewer) throw new InvalidRequestError('Authentication required') 211 + return pdsApplyWrites(oauth, viewer, input as any) 206 212 }) 207 213 208 214 registerCoreXrpcHandler('dev.hatk.push.registerToken', async (_params, _cursor, _limit, viewer, input) => {
+14 -1
packages/hatk/src/xrpc.ts
··· 40 40 import { getLexicon } from './database/schema.ts' 41 41 import type { Row, FlatRow } from './lex-types.ts' 42 42 import type { OAuthConfig } from './config.ts' 43 - import { pdsCreateRecord, pdsPutRecord, pdsDeleteRecord } from './pds-proxy.ts' 43 + import { pdsCreateRecord, pdsPutRecord, pdsDeleteRecord, pdsApplyWrites } from './pds-proxy.ts' 44 44 45 45 export type { Row, FlatRow } 46 46 ··· 115 115 collection: string, 116 116 rkey: string, 117 117 ) => Promise<void> 118 + applyWrites: ( 119 + writes: Array<{ 120 + $type: string 121 + collection: string 122 + rkey?: string 123 + value?: Record<string, unknown> 124 + }>, 125 + ) => Promise<{ results?: Array<{ $type: string; uri?: string; cid?: string }> }> 118 126 } 119 127 120 128 /** Internal representation of a loaded XRPC handler module. */ ··· 195 203 if (!_oauthConfig) throw new Error('No OAuth config — cannot write to PDS') 196 204 if (!viewer) throw new Error('Authentication required to write records') 197 205 await pdsDeleteRecord(_oauthConfig, viewer, { collection, rkey }) 206 + }, 207 + applyWrites: async (writes) => { 208 + if (!_oauthConfig) throw new Error('No OAuth config — cannot write to PDS') 209 + if (!viewer) throw new Error('Authentication required to write records') 210 + return pdsApplyWrites(_oauthConfig, viewer, { writes }) 198 211 }, 199 212 } 200 213 }