search for standard sites pub-search.waow.tech
search zig blog atproto
11
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: bsky DM subscriptions for standard.site publications

new page at pub-search.waow.tech/subscriptions — sign in with atproto
oauth, see your site.standard.publication records, toggle on to get a
bsky DM whenever pub-search indexes a new doc under that publication.

architecture:
- oauth client uses zat (pattern lifted from ken); scope =
`atproto repo:tech.waow.pub-search.subscription transition:chat.bsky`
- subscription records live on the user's PDS as
tech.waow.pub-search.subscription — portable, inspectable
- local sqlite mirror (new `subscriptions` table) keyed by (owner, rkey)
so match at ingest time is an indexed lookup
- indexer.insertDocument gains a void-returning hook that queries
matching subs and enqueues deliveries on a bounded in-memory queue
(drops when full; never blocks the tap worker)
- a single worker thread drains the queue and sends via
chat.bsky.convo.sendMessage, proxied through the subscriber's PDS

notes:
- sessions are in-memory (ken pattern) so deliveries for users who
haven't signed in since the last backend restart are skipped —
tracked in the skipped_no_session counter
- frontend is served from CF Pages; cross-origin cookies use
SameSite=None; Secure with credentialed CORS back to the backend
- load-bearing paths (search, atlas, tap, reconciler, embedder, sync)
are not touched

Co-Authored-By: Claude Opus 4 (1M context) <noreply@anthropic.com>

+2734 -2
+2
.gitignore
··· 3 3 dist/ 4 4 .env 5 5 *.db 6 + *.db-shm 7 + *.db-wal 6 8 .zig-cache/ 7 9 zig-out/ 8 10 .loq_cache
+16
backend/src/ingest/indexer.zig
··· 2 2 const Io = std.Io; 3 3 const logfire = @import("logfire"); 4 4 const db = @import("../db.zig"); 5 + const notifications = @import("../notifications.zig"); 5 6 6 7 /// Hash title+content for cross-platform dedup. 7 8 /// Returns a 16-char hex string (wyhash of "title\x00content"). ··· 235 236 &.{ uri, tag }, 236 237 ) catch {}; 237 238 } 239 + 240 + // fire notifications for any subscribers watching this author / 241 + // publication / platform / tag. non-blocking — failures don't affect 242 + // the indexer path. 243 + notifications.onDocumentIndexed(.{ 244 + .uri = uri, 245 + .did = did, 246 + .title = title, 247 + .platform = actual_platform, 248 + .publication_uri = pub_uri, 249 + .base_path = base_path, 250 + .path = path orelse "", 251 + .created_at = created_at orelse "", 252 + .tags = tags, 253 + }); 238 254 } 239 255 240 256 pub fn insertPublication(
+35
backend/src/main.zig
··· 7 7 const metrics = @import("metrics.zig"); 8 8 const server = @import("server.zig"); 9 9 const ingest = @import("ingest.zig"); 10 + const state = @import("state.zig"); 11 + const oauth = @import("oauth.zig"); 12 + const notifications = @import("notifications.zig"); 10 13 11 14 const SOCKET_TIMEOUT_SECS = 5; 12 15 ··· 49 52 // init turso client synchronously (fast, needed for search fallback) 50 53 try db.initTurso(io); 51 54 55 + // init oauth session store (in-memory; ok because subscription records 56 + // live on user PDSes — sessions are just the bearer for CRUD) 57 + state.init(io, allocator); 58 + oauth.init(.{ 59 + .io = io, 60 + .client_id = getenv("OAUTH_CLIENT_ID") orelse "https://leaflet-search-backend.fly.dev/oauth-client-metadata.json", 61 + .redirect_uri = getenv("OAUTH_REDIRECT_URI") orelse "https://leaflet-search-backend.fly.dev/oauth/callback", 62 + // frontend_origin is where the subscriptions page lives. it's the 63 + // origin CORS allows credentials from and where the oauth callback 64 + // redirects after login. default: prod frontend on CF Pages. 65 + .frontend_origin = getenv("FRONTEND_ORIGIN") orelse "https://pub-search.waow.tech", 66 + .client_key_hex = getenv("OAUTH_CLIENT_SECRET_KEY") orelse "", 67 + }); 68 + if (oauth.config().client_key_hex.len != 64) { 69 + logfire.warn("OAUTH_CLIENT_SECRET_KEY not set (need 64 hex chars) — oauth flows will fail", .{}); 70 + } 71 + 72 + // notifications module needs io + alloc for the delivery queue & worker 73 + notifications.init(allocator, io); 74 + 52 75 // init local db and other services in background (slow) 53 76 const init_thread = try Thread.spawn(.{}, initServices, .{ allocator, io }); 54 77 init_thread.detach(); ··· 74 97 } 75 98 } 76 99 100 + fn getenv(name: [*:0]const u8) ?[]const u8 { 101 + return if (std.c.getenv(name)) |p| std.mem.span(p) else null; 102 + } 103 + 77 104 fn initServices(allocator: std.mem.Allocator, io: Io) void { 78 105 // run schema migrations first (idempotent, but may be slow if turso is laggy) 79 106 db.initSchema(); ··· 81 108 // init local db (slow - turso already initialized) 82 109 db.initLocalDb(io); 83 110 db.startSync(io); 111 + 112 + // notifications schema + worker — depends on local db being open 113 + notifications.initSchema() catch |err| { 114 + std.debug.print("notifications: initSchema failed: {}\n", .{err}); 115 + }; 116 + notifications.startWorker() catch |err| { 117 + std.debug.print("notifications: startWorker failed: {}\n", .{err}); 118 + }; 84 119 85 120 // start activity tracker 86 121 metrics.activity.init(io);
+407
backend/src/notifications.zig
··· 1 + //! Subscription storage + match + bsky DM delivery. 2 + //! 3 + //! Subscriptions live as `tech.waow.pub-search.subscription` records on each 4 + //! user's PDS (portable, inspectable, publicly listable). We keep a local 5 + //! SQLite mirror so match at ingest time is O(log n). 6 + //! 7 + //! Delivery: chat.bsky DMs. The subscription *owner* (the person who made 8 + //! the subscription) is who the DM is sent *from* — so we pull their oauth 9 + //! session on each delivery and call chat.bsky.convo.getConvoForMembers 10 + //! + sendMessage via their PDS (proxied to did:web:api.bsky.chat). 11 + //! 12 + //! Session liveness: sessions are in-memory. If the subscriber hasn't 13 + //! logged in since the last backend restart, deliveries for their subs 14 + //! are skipped until they next sign in. This is a known limitation of 15 + //! the ken-style memory store we're reusing. Persistent session storage 16 + //! is a separate upgrade. 17 + 18 + const std = @import("std"); 19 + const Io = std.Io; 20 + const http = std.http; 21 + const json = std.json; 22 + const Allocator = std.mem.Allocator; 23 + const logfire = @import("logfire"); 24 + const db = @import("db.zig"); 25 + const oauth = @import("oauth.zig"); 26 + const store = @import("state.zig"); 27 + 28 + pub const SUBSCRIPTION_COLLECTION = "tech.waow.pub-search.subscription"; 29 + 30 + var global_io: ?Io = null; 31 + var global_alloc: ?Allocator = null; 32 + 33 + const QUEUE_CAPACITY = 1024; 34 + 35 + const DeliveryJob = struct { 36 + owner_did: []u8, 37 + recipient_did: []u8, 38 + sub_rkey: []u8, 39 + trigger_kind: []u8, 40 + trigger_value: []u8, 41 + doc_title: []u8, 42 + doc_url: []u8, // resolved frontend url if possible, else at-uri 43 + 44 + fn deinit(self: *DeliveryJob, a: Allocator) void { 45 + a.free(self.owner_did); 46 + a.free(self.recipient_did); 47 + a.free(self.sub_rkey); 48 + a.free(self.trigger_kind); 49 + a.free(self.trigger_value); 50 + a.free(self.doc_title); 51 + a.free(self.doc_url); 52 + } 53 + }; 54 + 55 + var queue: std.ArrayListUnmanaged(DeliveryJob) = .empty; 56 + var queue_mutex: Io.Mutex = .init; 57 + var queue_cond: Io.Condition = .init; 58 + var dropped_count = std.atomic.Value(u64).init(0); 59 + var delivered_count = std.atomic.Value(u64).init(0); 60 + var failed_count = std.atomic.Value(u64).init(0); 61 + var skipped_no_session_count = std.atomic.Value(u64).init(0); 62 + 63 + // --------------------------------------------------------------------------- 64 + // init + schema 65 + // --------------------------------------------------------------------------- 66 + 67 + pub fn init(allocator: Allocator, io: Io) void { 68 + global_io = io; 69 + global_alloc = allocator; 70 + } 71 + 72 + pub fn initSchema() !void { 73 + const local = db.getLocalDbRaw() orelse { 74 + std.log.warn("notifications: local db not available, skipping schema init", .{}); 75 + return; 76 + }; 77 + 78 + local.lock(); 79 + defer local.unlock(); 80 + const c = local.getConn() orelse return error.NotOpen; 81 + 82 + c.exec( 83 + \\CREATE TABLE IF NOT EXISTS subscriptions ( 84 + \\ id INTEGER PRIMARY KEY AUTOINCREMENT, 85 + \\ owner_did TEXT NOT NULL, 86 + \\ rkey TEXT NOT NULL, 87 + \\ trigger_kind TEXT NOT NULL, 88 + \\ trigger_value TEXT NOT NULL, 89 + \\ destination_kind TEXT NOT NULL, 90 + \\ destination_value TEXT NOT NULL, 91 + \\ secret TEXT DEFAULT '', 92 + \\ label TEXT DEFAULT '', 93 + \\ created_at TEXT NOT NULL, 94 + \\ UNIQUE(owner_did, rkey) 95 + \\) 96 + , .{}) catch |err| { 97 + std.log.err("notifications: failed to create subscriptions table: {}", .{err}); 98 + return err; 99 + }; 100 + 101 + c.exec("CREATE INDEX IF NOT EXISTS idx_sub_match ON subscriptions(trigger_kind, trigger_value)", .{}) catch {}; 102 + c.exec("CREATE INDEX IF NOT EXISTS idx_sub_owner ON subscriptions(owner_did)", .{}) catch {}; 103 + 104 + std.log.info("notifications: schema ready", .{}); 105 + } 106 + 107 + // --------------------------------------------------------------------------- 108 + // CRUD on the local mirror 109 + // --------------------------------------------------------------------------- 110 + 111 + pub const NewSubscription = struct { 112 + owner_did: []const u8, 113 + rkey: []const u8, 114 + trigger_kind: []const u8, 115 + trigger_value: []const u8, 116 + destination_kind: []const u8, 117 + destination_value: []const u8, 118 + secret: []const u8, 119 + label: []const u8, 120 + created_at: []const u8, 121 + }; 122 + 123 + pub fn insert(s: NewSubscription) !void { 124 + const local = db.getLocalDbRaw() orelse return error.LocalDbUnavailable; 125 + try local.exec( 126 + \\INSERT OR REPLACE INTO subscriptions 127 + \\ (owner_did, rkey, trigger_kind, trigger_value, destination_kind, destination_value, secret, label, created_at) 128 + \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 129 + , .{ 130 + s.owner_did, 131 + s.rkey, 132 + s.trigger_kind, 133 + s.trigger_value, 134 + s.destination_kind, 135 + s.destination_value, 136 + s.secret, 137 + s.label, 138 + s.created_at, 139 + }); 140 + } 141 + 142 + pub fn deleteByRkey(owner_did: []const u8, rkey: []const u8) !void { 143 + const local = db.getLocalDbRaw() orelse return error.LocalDbUnavailable; 144 + try local.exec("DELETE FROM subscriptions WHERE owner_did = ? AND rkey = ?", .{ owner_did, rkey }); 145 + } 146 + 147 + pub fn listByOwnerJson(arena: Allocator, owner_did: []const u8) ![]const u8 { 148 + const local = db.getLocalDbRaw() orelse return error.LocalDbUnavailable; 149 + 150 + var rows = try local.query( 151 + \\SELECT rkey, trigger_kind, trigger_value, destination_kind, destination_value, label, created_at 152 + \\FROM subscriptions WHERE owner_did = ? ORDER BY created_at DESC 153 + , .{owner_did}); 154 + defer rows.deinit(); 155 + 156 + var out: std.Io.Writer.Allocating = .init(arena); 157 + errdefer out.deinit(); 158 + var jw: json.Stringify = .{ .writer = &out.writer }; 159 + 160 + try jw.beginArray(); 161 + while (rows.next()) |row| { 162 + try jw.beginObject(); 163 + try jw.objectField("rkey"); 164 + try jw.write(row.text(0)); 165 + try jw.objectField("triggerKind"); 166 + try jw.write(row.text(1)); 167 + try jw.objectField("triggerValue"); 168 + try jw.write(row.text(2)); 169 + try jw.objectField("destinationKind"); 170 + try jw.write(row.text(3)); 171 + try jw.objectField("destinationValue"); 172 + try jw.write(row.text(4)); 173 + try jw.objectField("label"); 174 + try jw.write(row.text(5)); 175 + try jw.objectField("createdAt"); 176 + try jw.write(row.text(6)); 177 + try jw.endObject(); 178 + } 179 + try jw.endArray(); 180 + 181 + return try out.toOwnedSlice(); 182 + } 183 + 184 + // --------------------------------------------------------------------------- 185 + // match + enqueue 186 + // --------------------------------------------------------------------------- 187 + 188 + pub const DocIndexed = struct { 189 + uri: []const u8, 190 + did: []const u8, 191 + title: []const u8, 192 + platform: []const u8, 193 + publication_uri: []const u8, 194 + base_path: []const u8, 195 + path: []const u8, 196 + created_at: []const u8, 197 + tags: []const []const u8, 198 + }; 199 + 200 + pub fn onDocumentIndexed(doc: DocIndexed) void { 201 + const alloc = global_alloc orelse return; 202 + var arena = std.heap.ArenaAllocator.init(alloc); 203 + defer arena.deinit(); 204 + const a = arena.allocator(); 205 + 206 + const doc_url = buildDocUrl(a, doc) catch return; 207 + 208 + matchAndEnqueue(a, "author", doc.did, doc.title, doc_url); 209 + if (doc.publication_uri.len > 0) { 210 + matchAndEnqueue(a, "publication", doc.publication_uri, doc.title, doc_url); 211 + } 212 + matchAndEnqueue(a, "platform", doc.platform, doc.title, doc_url); 213 + for (doc.tags) |tag| { 214 + matchAndEnqueue(a, "tag", tag, doc.title, doc_url); 215 + } 216 + } 217 + 218 + fn buildDocUrl(arena: Allocator, doc: DocIndexed) ![]const u8 { 219 + // pub-search frontend routes map (did, platform, path) → public URL. 220 + // if base_path + path are present we can reconstruct the canonical 221 + // frontend link; otherwise fall back to the at-uri. 222 + if (doc.base_path.len > 0 and doc.path.len > 0) { 223 + return try std.fmt.allocPrint(arena, "https://{s}{s}", .{ doc.base_path, doc.path }); 224 + } 225 + if (doc.base_path.len > 0) { 226 + return try std.fmt.allocPrint(arena, "https://{s}", .{doc.base_path}); 227 + } 228 + return try arena.dupe(u8, doc.uri); 229 + } 230 + 231 + fn matchAndEnqueue(arena: Allocator, kind: []const u8, value: []const u8, doc_title: []const u8, doc_url: []const u8) void { 232 + const local = db.getLocalDbRaw() orelse return; 233 + 234 + var rows = local.query( 235 + \\SELECT rkey, owner_did, destination_kind, destination_value, trigger_kind, trigger_value 236 + \\FROM subscriptions 237 + \\WHERE trigger_kind = ? AND trigger_value = ? 238 + , .{ kind, value }) catch |err| { 239 + std.log.warn("notifications: match query failed ({s}={s}): {}", .{ kind, value, err }); 240 + return; 241 + }; 242 + defer rows.deinit(); 243 + 244 + while (rows.next()) |row| { 245 + const rkey = row.text(0); 246 + const owner_did = row.text(1); 247 + const dest_kind = row.text(2); 248 + const dest_value = row.text(3); 249 + const trigger_kind = row.text(4); 250 + const trigger_value = row.text(5); 251 + 252 + if (!std.mem.eql(u8, dest_kind, "bsky")) continue; 253 + 254 + enqueue(.{ 255 + .owner_did = owner_did, 256 + .recipient_did = dest_value, 257 + .sub_rkey = rkey, 258 + .trigger_kind = trigger_kind, 259 + .trigger_value = trigger_value, 260 + .doc_title = doc_title, 261 + .doc_url = doc_url, 262 + }, arena) catch |err| { 263 + std.log.warn("notifications: enqueue failed: {}", .{err}); 264 + _ = dropped_count.fetchAdd(1, .monotonic); 265 + }; 266 + } 267 + } 268 + 269 + // --------------------------------------------------------------------------- 270 + // test fire — build a synthetic delivery for a single sub 271 + // --------------------------------------------------------------------------- 272 + 273 + pub fn testFire(arena: Allocator, owner_did: []const u8, rkey: []const u8) !void { 274 + const local = db.getLocalDbRaw() orelse return error.LocalDbUnavailable; 275 + 276 + var rows = try local.query( 277 + \\SELECT destination_kind, destination_value, trigger_kind, trigger_value 278 + \\FROM subscriptions WHERE owner_did = ? AND rkey = ? 279 + , .{ owner_did, rkey }); 280 + defer rows.deinit(); 281 + 282 + const row = rows.next() orelse return error.NotFound; 283 + const dest_kind = row.text(0); 284 + const dest_value = row.text(1); 285 + const trigger_kind = row.text(2); 286 + const trigger_value = row.text(3); 287 + 288 + if (!std.mem.eql(u8, dest_kind, "bsky")) return error.UnsupportedDestination; 289 + 290 + try enqueue(.{ 291 + .owner_did = owner_did, 292 + .recipient_did = dest_value, 293 + .sub_rkey = rkey, 294 + .trigger_kind = trigger_kind, 295 + .trigger_value = trigger_value, 296 + .doc_title = "[test delivery] pub-search subscription fire", 297 + .doc_url = "https://pub-search.waow.tech/subscriptions", 298 + }, arena); 299 + } 300 + 301 + // --------------------------------------------------------------------------- 302 + // queue 303 + // --------------------------------------------------------------------------- 304 + 305 + const EnqueueInput = struct { 306 + owner_did: []const u8, 307 + recipient_did: []const u8, 308 + sub_rkey: []const u8, 309 + trigger_kind: []const u8, 310 + trigger_value: []const u8, 311 + doc_title: []const u8, 312 + doc_url: []const u8, 313 + }; 314 + 315 + fn enqueue(in: EnqueueInput, _: Allocator) !void { 316 + const alloc = global_alloc orelse return error.NotInitialized; 317 + const io = global_io orelse return error.NotInitialized; 318 + 319 + queue_mutex.lockUncancelable(io); 320 + defer queue_mutex.unlock(io); 321 + 322 + if (queue.items.len >= QUEUE_CAPACITY) return error.QueueFull; 323 + 324 + const job: DeliveryJob = .{ 325 + .owner_did = try alloc.dupe(u8, in.owner_did), 326 + .recipient_did = try alloc.dupe(u8, in.recipient_did), 327 + .sub_rkey = try alloc.dupe(u8, in.sub_rkey), 328 + .trigger_kind = try alloc.dupe(u8, in.trigger_kind), 329 + .trigger_value = try alloc.dupe(u8, in.trigger_value), 330 + .doc_title = try alloc.dupe(u8, in.doc_title), 331 + .doc_url = try alloc.dupe(u8, in.doc_url), 332 + }; 333 + try queue.append(alloc, job); 334 + queue_cond.signal(io); 335 + } 336 + 337 + fn dequeueBlocking(io: Io) ?DeliveryJob { 338 + queue_mutex.lockUncancelable(io); 339 + defer queue_mutex.unlock(io); 340 + while (queue.items.len == 0) { 341 + queue_cond.wait(io, &queue_mutex) catch return null; 342 + } 343 + return queue.orderedRemove(0); 344 + } 345 + 346 + // --------------------------------------------------------------------------- 347 + // worker 348 + // --------------------------------------------------------------------------- 349 + 350 + pub fn startWorker() !void { 351 + const io = global_io orelse return error.NotInitialized; 352 + const t = try std.Thread.spawn(.{}, workerLoop, .{io}); 353 + t.detach(); 354 + std.log.info("notifications: bsky DM worker started", .{}); 355 + } 356 + 357 + fn workerLoop(io: Io) void { 358 + const alloc = global_alloc orelse return; 359 + while (true) { 360 + var job = dequeueBlocking(io) orelse continue; 361 + defer job.deinit(alloc); 362 + 363 + deliver(alloc, &job) catch |err| switch (err) { 364 + error.NoSession => { 365 + _ = skipped_no_session_count.fetchAdd(1, .monotonic); 366 + logfire.warn("notifications: skip delivery sub={s} — subscriber has no live session", .{job.sub_rkey}); 367 + }, 368 + else => { 369 + _ = failed_count.fetchAdd(1, .monotonic); 370 + logfire.warn("notifications: delivery failed sub={s}: {}", .{ job.sub_rkey, err }); 371 + }, 372 + }; 373 + } 374 + } 375 + 376 + fn deliver(alloc: Allocator, job: *const DeliveryJob) !void { 377 + var arena = std.heap.ArenaAllocator.init(alloc); 378 + defer arena.deinit(); 379 + const a = arena.allocator(); 380 + 381 + const session = (try store.getSession(a, job.owner_did)) orelse return error.NoSession; 382 + 383 + const convo_id = try oauth.chatGetConvoForMembers(a, session, &.{job.recipient_did}); 384 + 385 + const text = try std.fmt.allocPrint(a, 386 + \\new on pub-search — matched your {s}="{s}" subscription 387 + \\ 388 + \\{s} 389 + \\{s} 390 + , .{ job.trigger_kind, job.trigger_value, job.doc_title, job.doc_url }); 391 + 392 + try oauth.chatSendMessage(a, session, convo_id, text); 393 + _ = delivered_count.fetchAdd(1, .monotonic); 394 + } 395 + 396 + pub fn stats() struct { delivered: u64, failed: u64, dropped: u64, skipped_no_session: u64, queued: usize } { 397 + const io = global_io orelse return .{ .delivered = 0, .failed = 0, .dropped = 0, .skipped_no_session = 0, .queued = 0 }; 398 + queue_mutex.lockUncancelable(io); 399 + defer queue_mutex.unlock(io); 400 + return .{ 401 + .delivered = delivered_count.load(.monotonic), 402 + .failed = failed_count.load(.monotonic), 403 + .dropped = dropped_count.load(.monotonic), 404 + .skipped_no_session = skipped_no_session_count.load(.monotonic), 405 + .queued = queue.items.len, 406 + }; 407 + }
+1109
backend/src/oauth.zig
··· 1 + //! OAuth client + authenticated PDS request helpers. 2 + //! 3 + //! lifted from ken/backend/src/oauth.zig (which in turn was lifted from 4 + //! pollz/backend/src/http.zig). the OAuth flow itself (PAR, token exchange, 5 + //! DPoP nonce retry, token refresh) is mechanical protocol work and there's 6 + //! no value in re-inventing it. what differs here: 7 + //! - scope: `atproto repo:tech.waow.pub-search.subscription` 8 + //! - cookie name: pubsearch_session 9 + //! - redirect post-callback: /subscriptions.html 10 + //! 11 + //! reference: https://atproto.com/specs/oauth 12 + 13 + const std = @import("std"); 14 + const Io = std.Io; 15 + const http = std.http; 16 + const mem = std.mem; 17 + const json = std.json; 18 + const Allocator = mem.Allocator; 19 + 20 + const zat = @import("zat"); 21 + const zat_oauth = zat.oauth; 22 + const store = @import("state.zig"); 23 + 24 + // `transition:chat.bsky` grants access to chat.bsky.* xrpc endpoints 25 + // (proxied through the user's PDS to did:web:api.bsky.chat). needed so 26 + // subscribers can receive DM deliveries on their own behalf. 27 + pub const SCOPE = "atproto repo:tech.waow.pub-search.subscription transition:chat.bsky"; 28 + 29 + pub const Config = struct { 30 + io: Io, 31 + client_id: []const u8, 32 + redirect_uri: []const u8, 33 + frontend_origin: []const u8, 34 + client_key_hex: []const u8, // 64 hex chars (32 bytes p256 private) 35 + }; 36 + 37 + var cfg: Config = undefined; 38 + var cfg_set: bool = false; 39 + 40 + pub fn init(c: Config) void { 41 + cfg = c; 42 + cfg_set = true; 43 + } 44 + 45 + pub fn config() Config { 46 + std.debug.assert(cfg_set); 47 + return cfg; 48 + } 49 + 50 + pub fn getClientKeypair() !zat.Keypair { 51 + if (cfg.client_key_hex.len != 64) return error.InvalidClientKey; 52 + var key_bytes: [32]u8 = undefined; 53 + _ = std.fmt.hexToBytes(&key_bytes, cfg.client_key_hex) catch return error.InvalidClientKey; 54 + return zat.Keypair.fromSecretKey(.p256, key_bytes); 55 + } 56 + 57 + pub fn keypairFromHex(hex: []const u8) !zat.Keypair { 58 + if (hex.len != 64) return error.InvalidKeyHex; 59 + var key_bytes: [32]u8 = undefined; 60 + _ = std.fmt.hexToBytes(&key_bytes, hex) catch return error.InvalidKeyHex; 61 + return zat.Keypair.fromSecretKey(.p256, key_bytes); 62 + } 63 + 64 + // --------------------------------------------------------------------------- 65 + // basic HTTP helpers 66 + // --------------------------------------------------------------------------- 67 + 68 + pub fn httpGet(alloc: Allocator, url: []const u8) ![]u8 { 69 + var client: std.http.Client = .{ .allocator = alloc, .io = cfg.io }; 70 + defer client.deinit(); 71 + 72 + var aw: std.Io.Writer.Allocating = .init(alloc); 73 + const result = client.fetch(.{ 74 + .location = .{ .url = url }, 75 + .response_writer = &aw.writer, 76 + .headers = .{ .accept_encoding = .{ .override = "identity" } }, 77 + }) catch { 78 + aw.deinit(); 79 + return error.FetchFailed; 80 + }; 81 + if (result.status != .ok) { 82 + aw.deinit(); 83 + return error.FetchFailed; 84 + } 85 + return aw.toOwnedSlice() catch error.FetchFailed; 86 + } 87 + 88 + pub const HttpResult = struct { 89 + status: http.Status, 90 + body: []u8, 91 + dpop_nonce: ?[]const u8, 92 + }; 93 + 94 + pub fn doPost( 95 + alloc: Allocator, 96 + url: []const u8, 97 + payload: []const u8, 98 + extra_headers: []const http.Header, 99 + ) !HttpResult { 100 + var client: std.http.Client = .{ .allocator = alloc, .io = cfg.io }; 101 + defer client.deinit(); 102 + 103 + var req = try client.request(.POST, try std.Uri.parse(url), .{ 104 + .extra_headers = extra_headers, 105 + .headers = .{ 106 + .content_type = .{ .override = "application/x-www-form-urlencoded" }, 107 + .accept_encoding = .{ .override = "identity" }, 108 + }, 109 + }); 110 + defer req.deinit(); 111 + 112 + req.transfer_encoding = .{ .content_length = payload.len }; 113 + var body_writer = try req.sendBodyUnflushed(&.{}); 114 + try body_writer.writer.writeAll(payload); 115 + try body_writer.end(); 116 + try req.connection.?.flush(); 117 + 118 + var redirect_buf: [1]u8 = undefined; 119 + var response = req.receiveHead(&redirect_buf) catch return error.FetchFailed; 120 + 121 + var dpop_nonce: ?[]const u8 = null; 122 + var it = response.head.iterateHeaders(); 123 + while (it.next()) |h| { 124 + if (std.ascii.eqlIgnoreCase(h.name, "dpop-nonce")) { 125 + dpop_nonce = try alloc.dupe(u8, h.value); 126 + break; 127 + } 128 + } 129 + 130 + var aw: std.Io.Writer.Allocating = .init(alloc); 131 + const reader = response.reader(&.{}); 132 + _ = reader.streamRemaining(&aw.writer) catch { 133 + aw.deinit(); 134 + return error.FetchFailed; 135 + }; 136 + const resp_body = aw.toOwnedSlice() catch return error.FetchFailed; 137 + 138 + return .{ .status = response.head.status, .body = resp_body, .dpop_nonce = dpop_nonce }; 139 + } 140 + 141 + pub fn isDpopNonceError(status: http.Status, body: []const u8) bool { 142 + if (status != .bad_request and status != .unauthorized) return false; 143 + return mem.indexOf(u8, body, "use_dpop_nonce") != null; 144 + } 145 + 146 + pub fn isWwwAuthNonceError(status: http.Status, www_auth: ?[]const u8) bool { 147 + if (status != .unauthorized) return false; 148 + const h = www_auth orelse return false; 149 + return mem.indexOf(u8, h, "use_dpop_nonce") != null; 150 + } 151 + 152 + // --------------------------------------------------------------------------- 153 + // oauth metadata discovery 154 + // --------------------------------------------------------------------------- 155 + 156 + pub fn fetchAuthServerUrl(alloc: Allocator, pds_url: []const u8) ![]const u8 { 157 + const url = try std.fmt.allocPrint(alloc, "{s}/.well-known/oauth-protected-resource", .{pds_url}); 158 + defer alloc.free(url); 159 + 160 + const body = try httpGet(alloc, url); 161 + defer alloc.free(body); 162 + 163 + const parsed = try json.parseFromSlice(json.Value, alloc, body, .{}); 164 + defer parsed.deinit(); 165 + 166 + const servers = parsed.value.object.get("authorization_servers") orelse return error.NoAuthServers; 167 + if (servers != .array or servers.array.items.len == 0) return error.NoAuthServers; 168 + const first = servers.array.items[0]; 169 + if (first != .string) return error.NoAuthServers; 170 + return alloc.dupe(u8, first.string); 171 + } 172 + 173 + pub fn fetchAuthServerMeta(alloc: Allocator, authserver_url: []const u8) !json.Parsed(json.Value) { 174 + const url = try std.fmt.allocPrint(alloc, "{s}/.well-known/oauth-authorization-server", .{authserver_url}); 175 + defer alloc.free(url); 176 + const body = try httpGet(alloc, url); 177 + return json.parseFromSlice(json.Value, alloc, body, .{}); 178 + } 179 + 180 + pub fn jsonGetString(value: json.Value, key: []const u8) ?[]const u8 { 181 + if (value != .object) return null; 182 + const v = value.object.get(key) orelse return null; 183 + if (v != .string) return null; 184 + return v.string; 185 + } 186 + 187 + // --------------------------------------------------------------------------- 188 + // PAR + token exchange + refresh 189 + // --------------------------------------------------------------------------- 190 + 191 + pub const ParResult = struct { request_uri: []const u8, dpop_nonce: []const u8 }; 192 + pub const ParParams = struct { 193 + par_url: []const u8, 194 + authserver_url: []const u8, 195 + client_id: []const u8, 196 + redirect_uri: []const u8, 197 + scope: []const u8, 198 + state: []const u8, 199 + pkce_challenge: []const u8, 200 + handle: []const u8, 201 + client_keypair: *const zat.Keypair, 202 + dpop_keypair: *const zat.Keypair, 203 + }; 204 + 205 + pub fn sendParRequest(alloc: Allocator, params: ParParams) !ParResult { 206 + const client_assertion = try zat_oauth.createClientAssertion( 207 + alloc, 208 + cfg.io, 209 + params.client_keypair, 210 + params.client_id, 211 + params.authserver_url, 212 + ); 213 + defer alloc.free(client_assertion); 214 + 215 + const dpop_proof = try zat_oauth.createDpopProof( 216 + alloc, 217 + cfg.io, 218 + params.dpop_keypair, 219 + "POST", 220 + params.par_url, 221 + null, 222 + null, 223 + ); 224 + defer alloc.free(dpop_proof); 225 + 226 + const form_params = [_][2][]const u8{ 227 + .{ "response_type", "code" }, 228 + .{ "code_challenge", params.pkce_challenge }, 229 + .{ "code_challenge_method", "S256" }, 230 + .{ "redirect_uri", params.redirect_uri }, 231 + .{ "scope", params.scope }, 232 + .{ "state", params.state }, 233 + .{ "login_hint", params.handle }, 234 + .{ "client_id", params.client_id }, 235 + .{ "client_assertion_type", "urn:ietf:params:oauth:client-assertion-type:jwt-bearer" }, 236 + .{ "client_assertion", client_assertion }, 237 + }; 238 + const form_body = try zat_oauth.formEncode(alloc, &form_params); 239 + defer alloc.free(form_body); 240 + 241 + var result = try doPost(alloc, params.par_url, form_body, &.{ 242 + .{ .name = "DPoP", .value = dpop_proof }, 243 + }); 244 + 245 + if (isDpopNonceError(result.status, result.body)) { 246 + const nonce = result.dpop_nonce orelse return error.MissingDpopNonce; 247 + alloc.free(result.body); 248 + const proof2 = try zat_oauth.createDpopProof( 249 + alloc, 250 + cfg.io, 251 + params.dpop_keypair, 252 + "POST", 253 + params.par_url, 254 + nonce, 255 + null, 256 + ); 257 + defer alloc.free(proof2); 258 + result = try doPost(alloc, params.par_url, form_body, &.{ 259 + .{ .name = "DPoP", .value = proof2 }, 260 + }); 261 + } 262 + defer alloc.free(result.body); 263 + 264 + if (result.status != .ok and result.status != .created) { 265 + std.log.warn("PAR error ({t}): {s}", .{ result.status, result.body }); 266 + return error.ParFailed; 267 + } 268 + 269 + const parsed = try json.parseFromSlice(json.Value, alloc, result.body, .{}); 270 + defer parsed.deinit(); 271 + const request_uri = jsonGetString(parsed.value, "request_uri") orelse return error.MissingRequestUri; 272 + 273 + return .{ 274 + .request_uri = try alloc.dupe(u8, request_uri), 275 + .dpop_nonce = if (result.dpop_nonce) |n| try alloc.dupe(u8, n) else try alloc.dupe(u8, ""), 276 + }; 277 + } 278 + 279 + pub const TokenResult = struct { 280 + access_token: []const u8, 281 + refresh_token: []const u8, 282 + sub: []const u8, 283 + dpop_nonce: []const u8, 284 + }; 285 + 286 + pub const TokenParams = struct { 287 + token_url: []const u8, 288 + authserver_url: []const u8, 289 + client_id: []const u8, 290 + redirect_uri: []const u8, 291 + code: []const u8, 292 + pkce_verifier: []const u8, 293 + client_keypair: *const zat.Keypair, 294 + dpop_keypair: *const zat.Keypair, 295 + dpop_nonce: []const u8, 296 + }; 297 + 298 + pub fn sendTokenRequest(alloc: Allocator, params: TokenParams) !TokenResult { 299 + const client_assertion = try zat_oauth.createClientAssertion( 300 + alloc, 301 + cfg.io, 302 + params.client_keypair, 303 + params.client_id, 304 + params.authserver_url, 305 + ); 306 + defer alloc.free(client_assertion); 307 + 308 + const dpop_proof = try zat_oauth.createDpopProof( 309 + alloc, 310 + cfg.io, 311 + params.dpop_keypair, 312 + "POST", 313 + params.token_url, 314 + if (params.dpop_nonce.len > 0) params.dpop_nonce else null, 315 + null, 316 + ); 317 + defer alloc.free(dpop_proof); 318 + 319 + const form_params = [_][2][]const u8{ 320 + .{ "grant_type", "authorization_code" }, 321 + .{ "code", params.code }, 322 + .{ "redirect_uri", params.redirect_uri }, 323 + .{ "code_verifier", params.pkce_verifier }, 324 + .{ "client_id", params.client_id }, 325 + .{ "client_assertion_type", "urn:ietf:params:oauth:client-assertion-type:jwt-bearer" }, 326 + .{ "client_assertion", client_assertion }, 327 + }; 328 + const form_body = try zat_oauth.formEncode(alloc, &form_params); 329 + defer alloc.free(form_body); 330 + 331 + var result = try doPost(alloc, params.token_url, form_body, &.{ 332 + .{ .name = "DPoP", .value = dpop_proof }, 333 + }); 334 + 335 + if (isDpopNonceError(result.status, result.body)) { 336 + const nonce = result.dpop_nonce orelse return error.MissingDpopNonce; 337 + alloc.free(result.body); 338 + const proof2 = try zat_oauth.createDpopProof( 339 + alloc, 340 + cfg.io, 341 + params.dpop_keypair, 342 + "POST", 343 + params.token_url, 344 + nonce, 345 + null, 346 + ); 347 + defer alloc.free(proof2); 348 + result = try doPost(alloc, params.token_url, form_body, &.{ 349 + .{ .name = "DPoP", .value = proof2 }, 350 + }); 351 + } 352 + defer alloc.free(result.body); 353 + 354 + if (result.status != .ok) { 355 + std.log.warn("token exchange error ({t}): {s}", .{ result.status, result.body }); 356 + return error.TokenExchangeFailed; 357 + } 358 + 359 + const parsed = try json.parseFromSlice(json.Value, alloc, result.body, .{}); 360 + defer parsed.deinit(); 361 + 362 + return .{ 363 + .access_token = try alloc.dupe(u8, jsonGetString(parsed.value, "access_token") orelse return error.MissingAccessToken), 364 + .refresh_token = try alloc.dupe(u8, jsonGetString(parsed.value, "refresh_token") orelse return error.MissingRefreshToken), 365 + .sub = try alloc.dupe(u8, jsonGetString(parsed.value, "sub") orelse return error.MissingSub), 366 + .dpop_nonce = if (result.dpop_nonce) |n| try alloc.dupe(u8, n) else try alloc.dupe(u8, ""), 367 + }; 368 + } 369 + 370 + // --------------------------------------------------------------------------- 371 + // authenticated PDS request (DPoP + nonce retry + 401 refresh) 372 + // --------------------------------------------------------------------------- 373 + 374 + pub const PdsError = error{ 375 + Unauthorized, 376 + FetchFailed, 377 + InvalidSessionKey, 378 + AuthHeaderTooLong, 379 + DpopNonceRetryExhausted, 380 + OutOfMemory, 381 + }; 382 + 383 + pub fn pdsAuthedRequest( 384 + alloc: Allocator, 385 + session: store.Session, 386 + method_str: []const u8, 387 + path: []const u8, 388 + body: ?[]const u8, 389 + content_type: []const u8, 390 + extra_headers: []const http.Header, 391 + ) ![]u8 { 392 + const dpop_keypair = keypairFromHex(session.dpop_private_key) catch return error.InvalidSessionKey; 393 + 394 + var access_token_buf: [2048]u8 = undefined; 395 + @memcpy(access_token_buf[0..session.access_token.len], session.access_token); 396 + var access_token_len = session.access_token.len; 397 + var refreshed = false; 398 + 399 + for (0..2) |attempt| { 400 + const access_token = access_token_buf[0..access_token_len]; 401 + const url = try std.fmt.allocPrint(alloc, "{s}{s}", .{ session.pds_url, path }); 402 + defer alloc.free(url); 403 + 404 + const ath = try zat_oauth.accessTokenHash(alloc, access_token); 405 + defer alloc.free(ath); 406 + 407 + var nonce: ?[]const u8 = if (session.dpop_pds_nonce.len > 0) session.dpop_pds_nonce else null; 408 + 409 + const inner = for (0..2) |_| { 410 + const dpop_proof = try zat_oauth.createDpopProof(alloc, cfg.io, &dpop_keypair, method_str, url, nonce, ath); 411 + defer alloc.free(dpop_proof); 412 + 413 + var auth_hdr_buf: [4096]u8 = undefined; 414 + const auth_header = std.fmt.bufPrint(&auth_hdr_buf, "DPoP {s}", .{access_token}) catch return error.AuthHeaderTooLong; 415 + 416 + const http_method: http.Method = if (mem.eql(u8, method_str, "POST")) .POST else .GET; 417 + 418 + var client: std.http.Client = .{ .allocator = alloc, .io = cfg.io }; 419 + defer client.deinit(); 420 + 421 + // concat auth + dpop + caller-provided extra headers 422 + var hdrs_buf: [16]http.Header = undefined; 423 + hdrs_buf[0] = .{ .name = "Authorization", .value = auth_header }; 424 + hdrs_buf[1] = .{ .name = "DPoP", .value = dpop_proof }; 425 + var n_hdrs: usize = 2; 426 + for (extra_headers) |h| { 427 + if (n_hdrs >= hdrs_buf.len) break; 428 + hdrs_buf[n_hdrs] = h; 429 + n_hdrs += 1; 430 + } 431 + var req = try client.request(http_method, try std.Uri.parse(url), .{ 432 + .extra_headers = hdrs_buf[0..n_hdrs], 433 + .headers = .{ 434 + .content_type = .{ .override = content_type }, 435 + .accept_encoding = .{ .override = "identity" }, 436 + }, 437 + }); 438 + defer req.deinit(); 439 + 440 + if (body) |b| { 441 + req.transfer_encoding = .{ .content_length = b.len }; 442 + var body_writer = try req.sendBodyUnflushed(&.{}); 443 + try body_writer.writer.writeAll(b); 444 + try body_writer.end(); 445 + try req.connection.?.flush(); 446 + } else { 447 + try req.sendBodiless(); 448 + } 449 + 450 + var redirect_buf: [1]u8 = undefined; 451 + var response = req.receiveHead(&redirect_buf) catch return error.FetchFailed; 452 + 453 + var new_nonce: ?[]const u8 = null; 454 + var www_auth: ?[]const u8 = null; 455 + var hit = response.head.iterateHeaders(); 456 + while (hit.next()) |h| { 457 + if (std.ascii.eqlIgnoreCase(h.name, "dpop-nonce")) { 458 + new_nonce = h.value; 459 + } else if (std.ascii.eqlIgnoreCase(h.name, "www-authenticate")) { 460 + www_auth = h.value; 461 + } 462 + } 463 + 464 + var aw: std.Io.Writer.Allocating = .init(alloc); 465 + const reader = response.reader(&.{}); 466 + _ = reader.streamRemaining(&aw.writer) catch { 467 + aw.deinit(); 468 + return error.FetchFailed; 469 + }; 470 + const resp_body = aw.toOwnedSlice() catch return error.FetchFailed; 471 + 472 + if (new_nonce) |n| store.updateSessionNonce(session.did, .pds, n); 473 + 474 + const is_nonce_err = new_nonce != null and (isDpopNonceError(response.head.status, resp_body) or isWwwAuthNonceError(response.head.status, www_auth)); 475 + if (is_nonce_err) { 476 + alloc.free(resp_body); 477 + nonce = try alloc.dupe(u8, new_nonce.?); 478 + continue; 479 + } 480 + break .{ response.head.status, resp_body }; 481 + } else { 482 + return error.DpopNonceRetryExhausted; 483 + }; 484 + 485 + const status = inner[0]; 486 + const resp_body = inner[1]; 487 + 488 + if (status != .unauthorized) { 489 + return resp_body; 490 + } 491 + 492 + alloc.free(resp_body); 493 + if (attempt > 0 or refreshed) return error.Unauthorized; 494 + 495 + std.log.info("access token rejected for {s}, refreshing", .{session.did}); 496 + const new_tokens = refreshAccessToken(alloc, session, &dpop_keypair) catch return error.Unauthorized; 497 + if (new_tokens.access_token.len > access_token_buf.len) return error.AuthHeaderTooLong; 498 + @memcpy(access_token_buf[0..new_tokens.access_token.len], new_tokens.access_token); 499 + access_token_len = new_tokens.access_token.len; 500 + refreshed = true; 501 + } 502 + 503 + return error.Unauthorized; 504 + } 505 + 506 + fn refreshAccessToken( 507 + alloc: Allocator, 508 + session: store.Session, 509 + dpop_keypair: *const zat.Keypair, 510 + ) !TokenResult { 511 + var authserver_meta = try fetchAuthServerMeta(alloc, session.authserver_iss); 512 + defer authserver_meta.deinit(); 513 + 514 + const token_url = jsonGetString(authserver_meta.value, "token_endpoint") orelse return error.MissingTokenEndpoint; 515 + 516 + const client_keypair = getClientKeypair() catch return error.InvalidSessionKey; 517 + const client_id = cfg.client_id; 518 + 519 + const client_assertion = try zat_oauth.createClientAssertion(alloc, cfg.io, &client_keypair, client_id, session.authserver_iss); 520 + defer alloc.free(client_assertion); 521 + 522 + var authserver_nonce: ?[]const u8 = if (session.dpop_authserver_nonce.len > 0) session.dpop_authserver_nonce else null; 523 + 524 + for (0..2) |_| { 525 + const dpop_proof = try zat_oauth.createDpopProof(alloc, cfg.io, dpop_keypair, "POST", token_url, authserver_nonce, null); 526 + defer alloc.free(dpop_proof); 527 + 528 + const form_params = [_][2][]const u8{ 529 + .{ "grant_type", "refresh_token" }, 530 + .{ "refresh_token", session.refresh_token }, 531 + .{ "client_id", client_id }, 532 + .{ "client_assertion_type", "urn:ietf:params:oauth:client-assertion-type:jwt-bearer" }, 533 + .{ "client_assertion", client_assertion }, 534 + }; 535 + const form_body = try zat_oauth.formEncode(alloc, &form_params); 536 + defer alloc.free(form_body); 537 + 538 + const result = try doPost(alloc, token_url, form_body, &.{ 539 + .{ .name = "DPoP", .value = dpop_proof }, 540 + }); 541 + 542 + if (result.dpop_nonce) |n| store.updateSessionNonce(session.did, .authserver, n); 543 + 544 + if (isDpopNonceError(result.status, result.body)) { 545 + authserver_nonce = result.dpop_nonce; 546 + alloc.free(result.body); 547 + continue; 548 + } 549 + 550 + defer alloc.free(result.body); 551 + if (result.status != .ok) { 552 + std.log.warn("token refresh error ({t}): {s}", .{ result.status, result.body }); 553 + return error.TokenRefreshFailed; 554 + } 555 + 556 + const parsed = try json.parseFromSlice(json.Value, alloc, result.body, .{}); 557 + defer parsed.deinit(); 558 + 559 + const new_access = try alloc.dupe(u8, jsonGetString(parsed.value, "access_token") orelse return error.MissingAccessToken); 560 + const new_refresh = try alloc.dupe(u8, jsonGetString(parsed.value, "refresh_token") orelse return error.MissingRefreshToken); 561 + 562 + store.updateSessionTokens(session.did, new_access, new_refresh); 563 + 564 + return .{ 565 + .access_token = new_access, 566 + .refresh_token = new_refresh, 567 + .sub = try alloc.dupe(u8, session.did), 568 + .dpop_nonce = if (result.dpop_nonce) |n| try alloc.dupe(u8, n) else try alloc.dupe(u8, ""), 569 + }; 570 + } 571 + return error.TokenRefreshFailed; 572 + } 573 + 574 + // --------------------------------------------------------------------------- 575 + // HTTP route handlers — called from server.zig's dispatcher 576 + // --------------------------------------------------------------------------- 577 + 578 + pub fn handleClientMetadata(request: *http.Server.Request) !void { 579 + var arena = std.heap.ArenaAllocator.init(std.heap.smp_allocator); 580 + defer arena.deinit(); 581 + const alloc = arena.allocator(); 582 + 583 + const keypair = getClientKeypair() catch { 584 + try sendError(request, .internal_server_error, "server configuration error"); 585 + return; 586 + }; 587 + const jwk = keypair.jwk(alloc) catch { 588 + try sendError(request, .internal_server_error, "key error"); 589 + return; 590 + }; 591 + 592 + var body: std.ArrayList(u8) = .empty; 593 + try body.print(alloc, 594 + \\{{ 595 + \\ "client_id": "{s}", 596 + \\ "client_name": "pub-search", 597 + \\ "client_uri": "{s}", 598 + \\ "application_type": "web", 599 + \\ "grant_types": ["authorization_code", "refresh_token"], 600 + \\ "response_types": ["code"], 601 + \\ "redirect_uris": ["{s}"], 602 + \\ "token_endpoint_auth_method": "private_key_jwt", 603 + \\ "token_endpoint_auth_signing_alg": "ES256", 604 + \\ "scope": "{s}", 605 + \\ "dpop_bound_access_tokens": true, 606 + \\ "jwks": {{"keys": [{s}]}} 607 + \\}} 608 + , .{ cfg.client_id, getClientOrigin(), cfg.redirect_uri, SCOPE, jwk }); 609 + 610 + try sendJson(request, body.items); 611 + } 612 + 613 + pub fn handleJwks(request: *http.Server.Request) !void { 614 + var arena = std.heap.ArenaAllocator.init(std.heap.smp_allocator); 615 + defer arena.deinit(); 616 + const alloc = arena.allocator(); 617 + 618 + const keypair = getClientKeypair() catch { 619 + try sendError(request, .internal_server_error, "server configuration error"); 620 + return; 621 + }; 622 + const jwks = zat_oauth.jwksJson(alloc, &keypair) catch { 623 + try sendError(request, .internal_server_error, "key error"); 624 + return; 625 + }; 626 + try sendJson(request, jwks); 627 + } 628 + 629 + pub fn handleLogin(request: *http.Server.Request) !void { 630 + var arena = std.heap.ArenaAllocator.init(std.heap.smp_allocator); 631 + defer arena.deinit(); 632 + const alloc = arena.allocator(); 633 + 634 + const target = request.head.target; 635 + const handle_str = extractQueryParam(target, "handle") orelse { 636 + try sendError(request, .bad_request, "missing handle parameter"); 637 + return; 638 + }; 639 + 640 + var handle_resolver = zat.HandleResolver.init(cfg.io, alloc); 641 + defer handle_resolver.deinit(); 642 + const did = handle_resolver.resolve(zat.Handle.parse(handle_str) orelse { 643 + try sendError(request, .bad_request, "invalid handle"); 644 + return; 645 + }) catch { 646 + try sendError(request, .bad_request, "could not resolve handle"); 647 + return; 648 + }; 649 + 650 + var did_resolver = zat.DidResolver.init(cfg.io, alloc); 651 + defer did_resolver.deinit(); 652 + var did_doc = did_resolver.resolve(zat.Did.parse(did) orelse { 653 + try sendError(request, .bad_request, "invalid DID"); 654 + return; 655 + }) catch { 656 + try sendError(request, .bad_request, "could not resolve DID"); 657 + return; 658 + }; 659 + defer did_doc.deinit(); 660 + 661 + const pds_url = did_doc.pdsEndpoint() orelse { 662 + try sendError(request, .bad_request, "no PDS endpoint"); 663 + return; 664 + }; 665 + 666 + const authserver_url = fetchAuthServerUrl(alloc, pds_url) catch { 667 + try sendError(request, .bad_request, "could not discover auth server"); 668 + return; 669 + }; 670 + 671 + var authserver_meta = fetchAuthServerMeta(alloc, authserver_url) catch { 672 + try sendError(request, .bad_request, "could not fetch auth server metadata"); 673 + return; 674 + }; 675 + defer authserver_meta.deinit(); 676 + 677 + const authserver_iss = jsonGetString(authserver_meta.value, "issuer") orelse { 678 + try sendError(request, .bad_request, "auth server missing issuer"); 679 + return; 680 + }; 681 + const par_url = jsonGetString(authserver_meta.value, "pushed_authorization_request_endpoint") orelse { 682 + try sendError(request, .bad_request, "auth server missing PAR endpoint"); 683 + return; 684 + }; 685 + const authorization_endpoint = jsonGetString(authserver_meta.value, "authorization_endpoint") orelse { 686 + try sendError(request, .bad_request, "auth server missing authorization endpoint"); 687 + return; 688 + }; 689 + 690 + const pkce_verifier = try zat_oauth.generatePkceVerifier(alloc, cfg.io); 691 + const pkce_challenge = try zat_oauth.generatePkceChallenge(alloc, pkce_verifier); 692 + const state = try zat_oauth.generateState(alloc, cfg.io); 693 + 694 + var dpop_key_bytes: [32]u8 = undefined; 695 + cfg.io.random(&dpop_key_bytes); 696 + const dpop_keypair = zat.Keypair.fromSecretKey(.p256, dpop_key_bytes) catch { 697 + try sendError(request, .internal_server_error, "key generation failed"); 698 + return; 699 + }; 700 + 701 + const client_keypair = getClientKeypair() catch { 702 + try sendError(request, .internal_server_error, "server configuration error"); 703 + return; 704 + }; 705 + 706 + const par_result = sendParRequest(alloc, .{ 707 + .par_url = par_url, 708 + .authserver_url = authserver_iss, 709 + .client_id = cfg.client_id, 710 + .redirect_uri = cfg.redirect_uri, 711 + .scope = SCOPE, 712 + .state = state, 713 + .pkce_challenge = pkce_challenge, 714 + .handle = handle_str, 715 + .client_keypair = &client_keypair, 716 + .dpop_keypair = &dpop_keypair, 717 + }) catch { 718 + try sendError(request, .bad_gateway, "PAR request failed"); 719 + return; 720 + }; 721 + 722 + const dpop_hex = std.fmt.bytesToHex(dpop_key_bytes, .lower); 723 + store.insertAuthRequest( 724 + state, 725 + authserver_iss, 726 + did, 727 + handle_str, 728 + pds_url, 729 + pkce_verifier, 730 + SCOPE, 731 + par_result.dpop_nonce, 732 + &dpop_hex, 733 + ) catch { 734 + try sendError(request, .internal_server_error, "could not store auth request"); 735 + return; 736 + }; 737 + 738 + var redirect_url: std.ArrayList(u8) = .empty; 739 + try redirect_url.print(alloc, "{s}?request_uri={s}&client_id={s}&state={s}", .{ 740 + authorization_endpoint, par_result.request_uri, cfg.client_id, state, 741 + }); 742 + try sendRedirect(request, redirect_url.items); 743 + } 744 + 745 + pub fn handleCallback(request: *http.Server.Request) !void { 746 + var arena = std.heap.ArenaAllocator.init(std.heap.smp_allocator); 747 + defer arena.deinit(); 748 + const alloc = arena.allocator(); 749 + 750 + const target = request.head.target; 751 + const code = extractQueryParam(target, "code") orelse { 752 + try sendError(request, .bad_request, "missing code"); 753 + return; 754 + }; 755 + const state = extractQueryParam(target, "state") orelse { 756 + try sendError(request, .bad_request, "missing state"); 757 + return; 758 + }; 759 + const iss_raw = extractQueryParam(target, "iss"); 760 + const iss = if (iss_raw) |raw| blk: { 761 + const buf = try alloc.dupe(u8, raw); 762 + break :blk std.Uri.percentDecodeBackwards(buf, buf); 763 + } else null; 764 + 765 + const auth_req = (try store.getAuthRequest(alloc, state)) orelse { 766 + try sendError(request, .bad_request, "unknown state — login may have expired"); 767 + return; 768 + }; 769 + 770 + if (iss) |issuer| { 771 + if (!mem.eql(u8, issuer, auth_req.authserver_iss)) { 772 + try sendError(request, .bad_request, "issuer mismatch"); 773 + return; 774 + } 775 + } 776 + 777 + const dpop_keypair = keypairFromHex(auth_req.dpop_private_key) catch { 778 + try sendError(request, .internal_server_error, "invalid stored key"); 779 + return; 780 + }; 781 + 782 + const client_keypair = getClientKeypair() catch { 783 + try sendError(request, .internal_server_error, "server configuration error"); 784 + return; 785 + }; 786 + 787 + var authserver_meta = fetchAuthServerMeta(alloc, auth_req.authserver_iss) catch { 788 + try sendError(request, .bad_gateway, "could not fetch auth server metadata"); 789 + return; 790 + }; 791 + defer authserver_meta.deinit(); 792 + 793 + const token_url = jsonGetString(authserver_meta.value, "token_endpoint") orelse { 794 + try sendError(request, .bad_gateway, "auth server missing token endpoint"); 795 + return; 796 + }; 797 + 798 + const token_result = sendTokenRequest(alloc, .{ 799 + .token_url = token_url, 800 + .authserver_url = auth_req.authserver_iss, 801 + .client_id = cfg.client_id, 802 + .redirect_uri = cfg.redirect_uri, 803 + .code = code, 804 + .pkce_verifier = auth_req.pkce_verifier, 805 + .client_keypair = &client_keypair, 806 + .dpop_keypair = &dpop_keypair, 807 + .dpop_nonce = auth_req.dpop_authserver_nonce, 808 + }) catch { 809 + try sendError(request, .bad_gateway, "token exchange failed"); 810 + return; 811 + }; 812 + 813 + if (!mem.eql(u8, token_result.sub, auth_req.did)) { 814 + try sendError(request, .bad_request, "token subject mismatch"); 815 + return; 816 + } 817 + 818 + store.upsertSession( 819 + auth_req.did, 820 + auth_req.handle, 821 + auth_req.pds_url, 822 + auth_req.authserver_iss, 823 + token_result.access_token, 824 + token_result.refresh_token, 825 + token_result.dpop_nonce, 826 + "", 827 + auth_req.dpop_private_key, 828 + ) catch { 829 + try sendError(request, .internal_server_error, "could not store session"); 830 + return; 831 + }; 832 + store.deleteAuthRequest(state); 833 + 834 + // redirect back to the subscriptions page with ?logged_in={handle} 835 + var redirect_url: std.ArrayList(u8) = .empty; 836 + // use .html so the redirect lands on a file regardless of host server — 837 + // Cloudflare Pages serves /subscriptions and /subscriptions.html the same, 838 + // but plain static servers (like python -m http.server) only know the file. 839 + try redirect_url.print(alloc, "{s}/subscriptions.html?logged_in={s}", .{ cfg.frontend_origin, auth_req.handle }); 840 + 841 + const session_token = store.createSessionToken(auth_req.did) catch { 842 + try sendError(request, .internal_server_error, "could not create session token"); 843 + return; 844 + }; 845 + 846 + // SameSite=None + Secure — the frontend is on a different origin than 847 + // the backend (pub-search.waow.tech vs leaflet-search-backend.fly.dev) 848 + // so we need cross-site cookies. Secure is required alongside None. 849 + var cookie_buf: [512]u8 = undefined; 850 + const cookie = std.fmt.bufPrint( 851 + &cookie_buf, 852 + "pubsearch_session={s}; HttpOnly; Secure; SameSite=None; Path=/; Max-Age=2592000", 853 + .{session_token}, 854 + ) catch { 855 + try sendError(request, .internal_server_error, "cookie error"); 856 + return; 857 + }; 858 + 859 + try request.respond("", .{ 860 + .status = .found, 861 + .extra_headers = &.{ 862 + .{ .name = "location", .value = redirect_url.items }, 863 + .{ .name = "set-cookie", .value = cookie }, 864 + }, 865 + }); 866 + } 867 + 868 + pub fn handleLogout(request: *http.Server.Request) !void { 869 + var it = request.iterateHeaders(); 870 + while (it.next()) |h| { 871 + if (std.ascii.eqlIgnoreCase(h.name, "cookie")) { 872 + if (parseCookieValue(h.value, "pubsearch_session")) |token| { 873 + if (store.resolveSessionToken(token)) |did| { 874 + store.deleteSession(did); 875 + } 876 + store.deleteSessionToken(token); 877 + } 878 + break; 879 + } 880 + } 881 + try request.respond("{\"ok\":true}", .{ 882 + .status = .ok, 883 + .extra_headers = &.{ 884 + .{ .name = "content-type", .value = "application/json" }, 885 + .{ .name = "set-cookie", .value = "pubsearch_session=; HttpOnly; Secure; SameSite=None; Path=/; Max-Age=0" }, 886 + }, 887 + }); 888 + } 889 + 890 + // --------------------------------------------------------------------------- 891 + // response helpers + cookie parsing 892 + // --------------------------------------------------------------------------- 893 + 894 + pub fn getSessionDid(request: *http.Server.Request) ?[]const u8 { 895 + var it = request.iterateHeaders(); 896 + while (it.next()) |h| { 897 + if (std.ascii.eqlIgnoreCase(h.name, "cookie")) { 898 + const token = parseCookieValue(h.value, "pubsearch_session") orelse continue; 899 + return store.resolveSessionToken(token); 900 + } 901 + } 902 + return null; 903 + } 904 + 905 + fn parseCookieValue(cookie_header: []const u8, name: []const u8) ?[]const u8 { 906 + var it = mem.splitSequence(u8, cookie_header, "; "); 907 + while (it.next()) |pair| { 908 + if (mem.startsWith(u8, pair, name)) { 909 + if (pair.len > name.len and pair[name.len] == '=') { 910 + return pair[name.len + 1 ..]; 911 + } 912 + } 913 + } 914 + return null; 915 + } 916 + 917 + fn extractQueryParam(target: []const u8, name: []const u8) ?[]const u8 { 918 + const q_idx = mem.indexOf(u8, target, "?") orelse return null; 919 + const query = target[q_idx + 1 ..]; 920 + var it = mem.splitScalar(u8, query, '&'); 921 + while (it.next()) |pair| { 922 + const eq_idx = mem.indexOf(u8, pair, "=") orelse continue; 923 + if (mem.eql(u8, pair[0..eq_idx], name)) { 924 + return pair[eq_idx + 1 ..]; 925 + } 926 + } 927 + return null; 928 + } 929 + 930 + fn getClientOrigin() []const u8 { 931 + const cid = cfg.client_id; 932 + const scheme_end = mem.indexOf(u8, cid, "://") orelse return cid; 933 + const after = cid[scheme_end + 3 ..]; 934 + const path_start = mem.indexOf(u8, after, "/") orelse return cid; 935 + return cid[0 .. scheme_end + 3 + path_start]; 936 + } 937 + 938 + fn sendError(request: *http.Server.Request, status: http.Status, message: []const u8) !void { 939 + var buf: [512]u8 = undefined; 940 + const body = std.fmt.bufPrint(&buf, "{{\"error\":\"{s}\"}}", .{message}) catch "{\"error\":\"internal error\"}"; 941 + try request.respond(body, .{ 942 + .status = status, 943 + .extra_headers = &.{ 944 + .{ .name = "content-type", .value = "application/json" }, 945 + .{ .name = "access-control-allow-origin", .value = cfg.frontend_origin }, 946 + .{ .name = "access-control-allow-credentials", .value = "true" }, 947 + .{ .name = "vary", .value = "origin" }, 948 + }, 949 + }); 950 + } 951 + 952 + fn sendJson(request: *http.Server.Request, body: []const u8) !void { 953 + try request.respond(body, .{ 954 + .status = .ok, 955 + .extra_headers = &.{ 956 + .{ .name = "content-type", .value = "application/json" }, 957 + .{ .name = "access-control-allow-origin", .value = cfg.frontend_origin }, 958 + .{ .name = "access-control-allow-credentials", .value = "true" }, 959 + .{ .name = "vary", .value = "origin" }, 960 + }, 961 + }); 962 + } 963 + 964 + fn sendRedirect(request: *http.Server.Request, location: []const u8) !void { 965 + try request.respond("", .{ 966 + .status = .found, 967 + .extra_headers = &.{ 968 + .{ .name = "location", .value = location }, 969 + }, 970 + }); 971 + } 972 + 973 + // --------------------------------------------------------------------------- 974 + // high-level PDS write operations — used by notifications.zig to write 975 + // subscription records to the user's PDS. 976 + // --------------------------------------------------------------------------- 977 + 978 + pub fn createRecord( 979 + alloc: Allocator, 980 + session: store.Session, 981 + collection: []const u8, 982 + record_json: []const u8, 983 + ) ![]u8 { 984 + const body = try std.fmt.allocPrint( 985 + alloc, 986 + "{{\"repo\":\"{s}\",\"collection\":\"{s}\",\"record\":{s}}}", 987 + .{ session.did, collection, record_json }, 988 + ); 989 + defer alloc.free(body); 990 + 991 + const resp = try pdsAuthedRequest( 992 + alloc, 993 + session, 994 + "POST", 995 + "/xrpc/com.atproto.repo.createRecord", 996 + body, 997 + "application/json", 998 + &.{}, 999 + ); 1000 + defer alloc.free(resp); 1001 + 1002 + const parsed = json.parseFromSlice(json.Value, alloc, resp, .{}) catch |err| { 1003 + const preview_len = @min(resp.len, 400); 1004 + std.log.warn( 1005 + "createRecord: response was not json ({t}). body[0..{d}]={s}", 1006 + .{ err, preview_len, resp[0..preview_len] }, 1007 + ); 1008 + return error.ParseFailed; 1009 + }; 1010 + defer parsed.deinit(); 1011 + 1012 + const uri_v = parsed.value.object.get("uri") orelse return error.MissingUri; 1013 + if (uri_v != .string) return error.MissingUri; 1014 + return alloc.dupe(u8, uri_v.string); 1015 + } 1016 + 1017 + pub fn deleteRecord( 1018 + alloc: Allocator, 1019 + session: store.Session, 1020 + collection: []const u8, 1021 + rkey: []const u8, 1022 + ) !void { 1023 + const body = try std.fmt.allocPrint( 1024 + alloc, 1025 + "{{\"repo\":\"{s}\",\"collection\":\"{s}\",\"rkey\":\"{s}\"}}", 1026 + .{ session.did, collection, rkey }, 1027 + ); 1028 + defer alloc.free(body); 1029 + 1030 + const resp = try pdsAuthedRequest( 1031 + alloc, 1032 + session, 1033 + "POST", 1034 + "/xrpc/com.atproto.repo.deleteRecord", 1035 + body, 1036 + "application/json", 1037 + &.{}, 1038 + ); 1039 + defer alloc.free(resp); 1040 + } 1041 + 1042 + // --------------------------------------------------------------------------- 1043 + // chat.bsky sender — proxied through the user's PDS with atproto-proxy header 1044 + // so DMs are sent as the authenticated subscriber. 1045 + // --------------------------------------------------------------------------- 1046 + 1047 + const BSKY_CHAT_PROXY = "did:web:api.bsky.chat#bsky_chat"; 1048 + 1049 + /// GET chat.bsky.convo.getConvoForMembers — returns a convoId suitable for 1050 + /// sendMessage. members is a slice of DIDs (1+, excluding the caller). 1051 + pub fn chatGetConvoForMembers(alloc: Allocator, session: store.Session, member_dids: []const []const u8) ![]const u8 { 1052 + var path_buf: std.Io.Writer.Allocating = .init(alloc); 1053 + try path_buf.writer.writeAll("/xrpc/chat.bsky.convo.getConvoForMembers"); 1054 + for (member_dids, 0..) |d, i| { 1055 + const sep: u8 = if (i == 0) '?' else '&'; 1056 + try path_buf.writer.print("{c}members={s}", .{ sep, d }); 1057 + } 1058 + const path = try path_buf.toOwnedSlice(); 1059 + defer alloc.free(path); 1060 + 1061 + const resp = try pdsAuthedRequest( 1062 + alloc, 1063 + session, 1064 + "GET", 1065 + path, 1066 + null, 1067 + "application/json", 1068 + &.{.{ .name = "atproto-proxy", .value = BSKY_CHAT_PROXY }}, 1069 + ); 1070 + defer alloc.free(resp); 1071 + 1072 + const parsed = try json.parseFromSlice(json.Value, alloc, resp, .{}); 1073 + defer parsed.deinit(); 1074 + 1075 + const convo = parsed.value.object.get("convo") orelse return error.MissingConvo; 1076 + if (convo != .object) return error.MissingConvo; 1077 + const id = convo.object.get("id") orelse return error.MissingConvoId; 1078 + if (id != .string) return error.MissingConvoId; 1079 + return alloc.dupe(u8, id.string); 1080 + } 1081 + 1082 + /// POST chat.bsky.convo.sendMessage. 1083 + pub fn chatSendMessage(alloc: Allocator, session: store.Session, convo_id: []const u8, text: []const u8) !void { 1084 + // JSON-escape the text into the body 1085 + var body_buf: std.Io.Writer.Allocating = .init(alloc); 1086 + var jw: json.Stringify = .{ .writer = &body_buf.writer }; 1087 + try jw.beginObject(); 1088 + try jw.objectField("convoId"); 1089 + try jw.write(convo_id); 1090 + try jw.objectField("message"); 1091 + try jw.beginObject(); 1092 + try jw.objectField("text"); 1093 + try jw.write(text); 1094 + try jw.endObject(); 1095 + try jw.endObject(); 1096 + const body = try body_buf.toOwnedSlice(); 1097 + defer alloc.free(body); 1098 + 1099 + const resp = try pdsAuthedRequest( 1100 + alloc, 1101 + session, 1102 + "POST", 1103 + "/xrpc/chat.bsky.convo.sendMessage", 1104 + body, 1105 + "application/json", 1106 + &.{.{ .name = "atproto-proxy", .value = BSKY_CHAT_PROXY }}, 1107 + ); 1108 + defer alloc.free(resp); 1109 + }
+34 -2
backend/src/server.zig
··· 11 11 const metrics = @import("metrics.zig"); 12 12 const search = @import("server/search.zig"); 13 13 const dashboard = @import("server/dashboard.zig"); 14 + const subs = @import("server/subscriptions.zig"); 15 + const oauth = @import("oauth.zig"); 14 16 15 17 const HTTP_BUF_SIZE = 65536; 16 18 const QUERY_PARAM_BUF_SIZE = 64; ··· 94 96 try handleSimilar(request, target, io); 95 97 } else if (mem.eql(u8, path, "/activity")) { 96 98 try handleActivity(request); 99 + } else if (mem.eql(u8, path, "/oauth-client-metadata.json")) { 100 + try oauth.handleClientMetadata(request); 101 + } else if (mem.eql(u8, path, "/oauth/jwks")) { 102 + try oauth.handleJwks(request); 103 + } else if (mem.eql(u8, path, "/oauth/login")) { 104 + try oauth.handleLogin(request); 105 + } else if (mem.eql(u8, path, "/oauth/callback")) { 106 + try oauth.handleCallback(request); 107 + } else if (mem.eql(u8, path, "/oauth/logout") and request.head.method == .POST) { 108 + try oauth.handleLogout(request); 109 + } else if (mem.eql(u8, path, "/api/me")) { 110 + try subs.handleMe(request, io); 111 + } else if (mem.eql(u8, path, "/api/my-publications")) { 112 + try subs.handleMyPublications(request, io); 113 + } else if (mem.eql(u8, path, "/api/subscriptions") and request.head.method == .GET) { 114 + try subs.handleList(request, io); 115 + } else if (mem.eql(u8, path, "/api/subscriptions") and request.head.method == .POST) { 116 + try subs.handleCreate(request, io); 117 + } else if (mem.startsWith(u8, path, "/api/subscriptions/") and mem.endsWith(u8, path, "/test") and request.head.method == .POST) { 118 + const rest = path["/api/subscriptions/".len..]; 119 + const rkey = rest[0 .. rest.len - "/test".len]; 120 + try subs.handleTestFire(request, rkey, io); 121 + } else if (mem.startsWith(u8, path, "/api/subscriptions/") and request.head.method == .DELETE) { 122 + const rkey = path["/api/subscriptions/".len..]; 123 + try subs.handleDelete(request, rkey, io); 97 124 } else { 98 125 try sendNotFound(request); 99 126 } 100 127 } 128 + 101 129 102 130 fn handleSearch(request: *http.Server.Request, target: []const u8, io: Io) !void { 103 131 const start_time = microTimestamp(io); ··· 412 440 } 413 441 414 442 fn sendCorsHeaders(request: *http.Server.Request, body: []const u8) !void { 443 + // credentials + specific origin required for the cross-site cookie 444 + // exchange with the subscriptions frontend on pub-search.waow.tech. 415 445 try request.respond(body, .{ 416 446 .status = .no_content, 417 447 .extra_headers = &.{ 418 - .{ .name = "access-control-allow-origin", .value = "*" }, 419 - .{ .name = "access-control-allow-methods", .value = "GET, OPTIONS" }, 448 + .{ .name = "access-control-allow-origin", .value = oauth.config().frontend_origin }, 449 + .{ .name = "access-control-allow-methods", .value = "GET, POST, DELETE, OPTIONS" }, 420 450 .{ .name = "access-control-allow-headers", .value = "content-type" }, 451 + .{ .name = "access-control-allow-credentials", .value = "true" }, 452 + .{ .name = "vary", .value = "origin" }, 421 453 }, 422 454 }); 423 455 }
+372
backend/src/server/subscriptions.zig
··· 1 + //! HTTP handlers for subscription CRUD. Paired with oauth session auth. 2 + //! 3 + //! Endpoints: 4 + //! GET /api/me — returns {did, handle} or 401 5 + //! GET /api/subscriptions — list the caller's subscriptions (local mirror) 6 + //! POST /api/subscriptions — create (writes to PDS + mirror) 7 + //! DELETE /api/subscriptions/:rkey — delete from PDS + mirror 8 + 9 + const std = @import("std"); 10 + const Io = std.Io; 11 + const http = std.http; 12 + const mem = std.mem; 13 + const json = std.json; 14 + const Allocator = std.mem.Allocator; 15 + 16 + const oauth = @import("../oauth.zig"); 17 + const store = @import("../state.zig"); 18 + const notifications = @import("../notifications.zig"); 19 + 20 + const SUBSCRIPTION_COLLECTION = notifications.SUBSCRIPTION_COLLECTION; 21 + 22 + const ALLOWED_TRIGGER_KINDS = [_][]const u8{ "author", "publication", "platform", "tag" }; 23 + const ALLOWED_DEST_KINDS = [_][]const u8{"bsky"}; 24 + 25 + fn contains(haystack: []const []const u8, needle: []const u8) bool { 26 + for (haystack) |h| if (mem.eql(u8, h, needle)) return true; 27 + return false; 28 + } 29 + 30 + pub fn handleMe(request: *http.Server.Request, io: Io) !void { 31 + _ = io; 32 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 33 + defer arena.deinit(); 34 + const alloc = arena.allocator(); 35 + 36 + const did = oauth.getSessionDid(request) orelse { 37 + try sendJsonStatus(request, .unauthorized, "{\"error\":\"not signed in\"}"); 38 + return; 39 + }; 40 + const session = (try store.getSession(alloc, did)) orelse { 41 + try sendJsonStatus(request, .unauthorized, "{\"error\":\"session expired\"}"); 42 + return; 43 + }; 44 + 45 + const body = try std.fmt.allocPrint(alloc, "{{\"did\":\"{s}\",\"handle\":\"{s}\"}}", .{ session.did, session.handle }); 46 + try sendJson(request, body); 47 + } 48 + 49 + pub fn handleList(request: *http.Server.Request, io: Io) !void { 50 + _ = io; 51 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 52 + defer arena.deinit(); 53 + const alloc = arena.allocator(); 54 + 55 + const did = oauth.getSessionDid(request) orelse { 56 + try sendJsonStatus(request, .unauthorized, "{\"error\":\"not signed in\"}"); 57 + return; 58 + }; 59 + 60 + const body = notifications.listByOwnerJson(alloc, did) catch |err| { 61 + std.log.warn("handleList: listByOwnerJson failed: {}", .{err}); 62 + try sendJsonStatus(request, .internal_server_error, "{\"error\":\"failed to list subscriptions\"}"); 63 + return; 64 + }; 65 + try sendJson(request, body); 66 + } 67 + 68 + const CreateBody = struct { 69 + triggerKind: []const u8, 70 + triggerValue: []const u8, 71 + destinationKind: []const u8, 72 + destinationValue: []const u8, 73 + secret: ?[]const u8 = null, 74 + label: ?[]const u8 = null, 75 + }; 76 + 77 + pub fn handleCreate(request: *http.Server.Request, io: Io) !void { 78 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 79 + defer arena.deinit(); 80 + const alloc = arena.allocator(); 81 + 82 + const did = oauth.getSessionDid(request) orelse { 83 + try sendJsonStatus(request, .unauthorized, "{\"error\":\"not signed in\"}"); 84 + return; 85 + }; 86 + const session = (try store.getSession(alloc, did)) orelse { 87 + try sendJsonStatus(request, .unauthorized, "{\"error\":\"session expired\"}"); 88 + return; 89 + }; 90 + 91 + // read body (cap at 8KB — subscriptions are tiny) 92 + const body_reader = request.readerExpectContinue(&.{}) catch { 93 + try sendJsonStatus(request, .bad_request, "{\"error\":\"failed to read body\"}"); 94 + return; 95 + }; 96 + const body_bytes = body_reader.allocRemaining(alloc, Io.Limit.limited(8 * 1024)) catch { 97 + try sendJsonStatus(request, .bad_request, "{\"error\":\"failed to read body\"}"); 98 + return; 99 + }; 100 + 101 + const parsed = json.parseFromSliceLeaky(CreateBody, alloc, body_bytes, .{ 102 + .ignore_unknown_fields = true, 103 + }) catch { 104 + try sendJsonStatus(request, .bad_request, "{\"error\":\"invalid json\"}"); 105 + return; 106 + }; 107 + 108 + // validate 109 + if (!contains(&ALLOWED_TRIGGER_KINDS, parsed.triggerKind)) { 110 + try sendJsonStatus(request, .bad_request, "{\"error\":\"invalid triggerKind\"}"); 111 + return; 112 + } 113 + if (!contains(&ALLOWED_DEST_KINDS, parsed.destinationKind)) { 114 + try sendJsonStatus(request, .bad_request, "{\"error\":\"invalid destinationKind\"}"); 115 + return; 116 + } 117 + if (parsed.triggerValue.len == 0 or parsed.triggerValue.len > 512) { 118 + try sendJsonStatus(request, .bad_request, "{\"error\":\"triggerValue length\"}"); 119 + return; 120 + } 121 + if (parsed.destinationValue.len == 0 or parsed.destinationValue.len > 512) { 122 + try sendJsonStatus(request, .bad_request, "{\"error\":\"destinationValue length\"}"); 123 + return; 124 + } 125 + // for bsky: resolve handle → DID at create time if needed, so we store 126 + // a stable identifier the chat.convo.getConvoForMembers call can use. 127 + var resolved_dest = parsed.destinationValue; 128 + if (mem.eql(u8, parsed.destinationKind, "bsky") and !mem.startsWith(u8, resolved_dest, "did:")) { 129 + const zat = @import("zat"); 130 + var resolver = zat.HandleResolver.init(io, alloc); 131 + defer resolver.deinit(); 132 + const parsed_handle = zat.Handle.parse(resolved_dest) orelse { 133 + try sendJsonStatus(request, .bad_request, "{\"error\":\"invalid bsky handle\"}"); 134 + return; 135 + }; 136 + resolved_dest = resolver.resolve(parsed_handle) catch { 137 + try sendJsonStatus(request, .bad_request, "{\"error\":\"could not resolve bsky handle\"}"); 138 + return; 139 + }; 140 + } 141 + 142 + const secret: []const u8 = ""; // no longer used; retained in schema for future signed destinations 143 + _ = parsed.secret; 144 + const label: []const u8 = parsed.label orelse ""; 145 + const created_at = try isoNow(alloc, io); 146 + 147 + // build record JSON for the PDS 148 + var rec_buf: std.Io.Writer.Allocating = .init(alloc); 149 + var jw: json.Stringify = .{ .writer = &rec_buf.writer }; 150 + try jw.beginObject(); 151 + try jw.objectField("$type"); 152 + try jw.write(SUBSCRIPTION_COLLECTION); 153 + try jw.objectField("triggerKind"); 154 + try jw.write(parsed.triggerKind); 155 + try jw.objectField("triggerValue"); 156 + try jw.write(parsed.triggerValue); 157 + try jw.objectField("destinationKind"); 158 + try jw.write(parsed.destinationKind); 159 + try jw.objectField("destinationValue"); 160 + try jw.write(resolved_dest); 161 + if (label.len > 0) { 162 + try jw.objectField("label"); 163 + try jw.write(label); 164 + } 165 + try jw.objectField("createdAt"); 166 + try jw.write(created_at); 167 + try jw.endObject(); 168 + 169 + const record_json = try rec_buf.toOwnedSlice(); 170 + 171 + // write to user's PDS 172 + const at_uri = oauth.createRecord(alloc, session, SUBSCRIPTION_COLLECTION, record_json) catch |err| { 173 + std.log.warn("handleCreate: createRecord failed: {}", .{err}); 174 + try sendJsonStatus(request, .bad_gateway, "{\"error\":\"failed to write record to PDS\"}"); 175 + return; 176 + }; 177 + const rkey = extractRkey(at_uri) orelse { 178 + try sendJsonStatus(request, .bad_gateway, "{\"error\":\"PDS returned malformed at-uri\"}"); 179 + return; 180 + }; 181 + 182 + // mirror locally for fast match at ingest time 183 + notifications.insert(.{ 184 + .owner_did = session.did, 185 + .rkey = rkey, 186 + .trigger_kind = parsed.triggerKind, 187 + .trigger_value = parsed.triggerValue, 188 + .destination_kind = parsed.destinationKind, 189 + .destination_value = resolved_dest, 190 + .secret = secret, 191 + .label = label, 192 + .created_at = created_at, 193 + }) catch |err| { 194 + std.log.warn("handleCreate: local mirror insert failed: {}", .{err}); 195 + // don't rollback the PDS write — the firehose will eventually reconcile 196 + }; 197 + 198 + const out = try std.fmt.allocPrint(alloc, "{{\"ok\":true,\"rkey\":\"{s}\",\"uri\":\"{s}\"}}", .{ rkey, at_uri }); 199 + try sendJson(request, out); 200 + } 201 + 202 + /// GET /api/my-publications — list the authenticated user's 203 + /// site.standard.publication records by hitting their PDS directly 204 + /// (records are public; no DPoP needed). This is the source of truth 205 + /// for the toggle list in the subscriptions UI. 206 + pub fn handleMyPublications(request: *http.Server.Request, io: Io) !void { 207 + _ = io; 208 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 209 + defer arena.deinit(); 210 + const alloc = arena.allocator(); 211 + 212 + const did = oauth.getSessionDid(request) orelse { 213 + try sendJsonStatus(request, .unauthorized, "{\"error\":\"not signed in\"}"); 214 + return; 215 + }; 216 + const session = (try store.getSession(alloc, did)) orelse { 217 + try sendJsonStatus(request, .unauthorized, "{\"error\":\"session expired\"}"); 218 + return; 219 + }; 220 + 221 + const url = try std.fmt.allocPrint(alloc, "{s}/xrpc/com.atproto.repo.listRecords?repo={s}&collection=site.standard.publication&limit=100", .{ 222 + session.pds_url, session.did, 223 + }); 224 + 225 + const body = oauth.httpGet(alloc, url) catch |err| { 226 + std.log.warn("handleMyPublications: listRecords failed: {}", .{err}); 227 + try sendJsonStatus(request, .bad_gateway, "{\"error\":\"failed to fetch publications from PDS\"}"); 228 + return; 229 + }; 230 + 231 + const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 232 + try sendJsonStatus(request, .bad_gateway, "{\"error\":\"PDS returned invalid json\"}"); 233 + return; 234 + }; 235 + defer parsed.deinit(); 236 + 237 + const records = parsed.value.object.get("records") orelse { 238 + try sendJson(request, "[]"); 239 + return; 240 + }; 241 + if (records != .array) { 242 + try sendJson(request, "[]"); 243 + return; 244 + } 245 + 246 + var out: std.Io.Writer.Allocating = .init(alloc); 247 + var jw: json.Stringify = .{ .writer = &out.writer }; 248 + try jw.beginArray(); 249 + for (records.array.items) |rec| { 250 + if (rec != .object) continue; 251 + const uri_v = rec.object.get("uri") orelse continue; 252 + if (uri_v != .string) continue; 253 + const val = rec.object.get("value") orelse continue; 254 + if (val != .object) continue; 255 + 256 + try jw.beginObject(); 257 + try jw.objectField("uri"); 258 + try jw.write(uri_v.string); 259 + if (val.object.get("name")) |n| if (n == .string) { 260 + try jw.objectField("name"); 261 + try jw.write(n.string); 262 + }; 263 + if (val.object.get("url")) |u| if (u == .string) { 264 + try jw.objectField("url"); 265 + try jw.write(u.string); 266 + }; 267 + if (val.object.get("description")) |d| if (d == .string) { 268 + try jw.objectField("description"); 269 + try jw.write(d.string); 270 + }; 271 + try jw.endObject(); 272 + } 273 + try jw.endArray(); 274 + 275 + const result = try out.toOwnedSlice(); 276 + try sendJson(request, result); 277 + } 278 + 279 + pub fn handleTestFire(request: *http.Server.Request, rkey: []const u8, io: Io) !void { 280 + _ = io; 281 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 282 + defer arena.deinit(); 283 + const alloc = arena.allocator(); 284 + 285 + const did = oauth.getSessionDid(request) orelse { 286 + try sendJsonStatus(request, .unauthorized, "{\"error\":\"not signed in\"}"); 287 + return; 288 + }; 289 + 290 + notifications.testFire(alloc, did, rkey) catch |err| { 291 + const msg = switch (err) { 292 + error.NotFound => "{\"error\":\"subscription not found\"}", 293 + error.UnsupportedDestination => "{\"error\":\"cannot test-fire this destination kind\"}", 294 + else => "{\"error\":\"test fire failed\"}", 295 + }; 296 + try sendJsonStatus(request, .bad_request, msg); 297 + return; 298 + }; 299 + try sendJson(request, "{\"ok\":true,\"note\":\"delivery enqueued — check your webhook receiver\"}"); 300 + } 301 + 302 + pub fn handleDelete(request: *http.Server.Request, rkey: []const u8, io: Io) !void { 303 + _ = io; 304 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 305 + defer arena.deinit(); 306 + const alloc = arena.allocator(); 307 + 308 + const did = oauth.getSessionDid(request) orelse { 309 + try sendJsonStatus(request, .unauthorized, "{\"error\":\"not signed in\"}"); 310 + return; 311 + }; 312 + const session = (try store.getSession(alloc, did)) orelse { 313 + try sendJsonStatus(request, .unauthorized, "{\"error\":\"session expired\"}"); 314 + return; 315 + }; 316 + 317 + // delete from PDS first so partial failure leaves us with a dangling 318 + // local row the user can re-sweep; better than a dangling PDS record. 319 + oauth.deleteRecord(alloc, session, SUBSCRIPTION_COLLECTION, rkey) catch |err| { 320 + std.log.warn("handleDelete: PDS deleteRecord failed: {}", .{err}); 321 + try sendJsonStatus(request, .bad_gateway, "{\"error\":\"failed to delete record on PDS\"}"); 322 + return; 323 + }; 324 + notifications.deleteByRkey(session.did, rkey) catch |err| { 325 + std.log.warn("handleDelete: local deleteByRkey failed: {}", .{err}); 326 + }; 327 + 328 + try sendJson(request, "{\"ok\":true}"); 329 + } 330 + 331 + // --------------------------------------------------------------------------- 332 + // helpers 333 + // --------------------------------------------------------------------------- 334 + 335 + fn extractRkey(at_uri: []const u8) ?[]const u8 { 336 + const idx = mem.lastIndexOfScalar(u8, at_uri, '/') orelse return null; 337 + if (idx + 1 >= at_uri.len) return null; 338 + return at_uri[idx + 1 ..]; 339 + } 340 + 341 + fn isoNow(alloc: Allocator, io: Io) ![]const u8 { 342 + // ISO-8601 UTC timestamp for the `createdAt` field (matches lexicon format: datetime) 343 + const now_secs: i64 = @intCast(@divTrunc(Io.Timestamp.now(io, .real).nanoseconds, std.time.ns_per_s)); 344 + const epoch_secs: std.time.epoch.EpochSeconds = .{ .secs = @intCast(now_secs) }; 345 + const day = epoch_secs.getDaySeconds(); 346 + const year_day = epoch_secs.getEpochDay().calculateYearDay(); 347 + const md = year_day.calculateMonthDay(); 348 + return try std.fmt.allocPrint(alloc, "{d:0>4}-{d:0>2}-{d:0>2}T{d:0>2}:{d:0>2}:{d:0>2}Z", .{ 349 + year_day.year, 350 + @intFromEnum(md.month), 351 + md.day_index + 1, 352 + day.getHoursIntoDay(), 353 + day.getMinutesIntoHour(), 354 + day.getSecondsIntoMinute(), 355 + }); 356 + } 357 + 358 + fn sendJson(request: *http.Server.Request, body: []const u8) !void { 359 + try sendJsonStatus(request, .ok, body); 360 + } 361 + 362 + fn sendJsonStatus(request: *http.Server.Request, status: http.Status, body: []const u8) !void { 363 + try request.respond(body, .{ 364 + .status = status, 365 + .extra_headers = &.{ 366 + .{ .name = "content-type", .value = "application/json" }, 367 + .{ .name = "access-control-allow-origin", .value = oauth.config().frontend_origin }, 368 + .{ .name = "access-control-allow-credentials", .value = "true" }, 369 + .{ .name = "vary", .value = "origin" }, 370 + }, 371 + }); 372 + }
+293
backend/src/state.zig
··· 1 + //! in-memory oauth session store. 2 + //! 3 + //! lifted from ken/backend/src/state.zig. OAuth sessions live only in memory; 4 + //! on backend restart users re-auth. acceptable UX because subscriptions 5 + //! themselves live on the user's PDS (source of truth) + a local mirror 6 + //! keyed by DID, which survives restarts. the session is just the bearer 7 + //! for creating/deleting records. 8 + 9 + const std = @import("std"); 10 + const Io = std.Io; 11 + const Allocator = std.mem.Allocator; 12 + 13 + var gpa: Allocator = undefined; 14 + var io: Io = undefined; 15 + pub var mutex: Io.Mutex = .init; 16 + 17 + const StoredAuthRequest = struct { 18 + state: []u8, 19 + authserver_iss: []u8, 20 + did: []u8, 21 + handle: []u8, 22 + pds_url: []u8, 23 + pkce_verifier: []u8, 24 + scope: []u8, 25 + dpop_authserver_nonce: []u8, 26 + dpop_private_key: []u8, 27 + created_at: i64, 28 + 29 + fn deinit(self: *StoredAuthRequest, a: Allocator) void { 30 + a.free(self.state); 31 + a.free(self.authserver_iss); 32 + a.free(self.did); 33 + a.free(self.handle); 34 + a.free(self.pds_url); 35 + a.free(self.pkce_verifier); 36 + a.free(self.scope); 37 + a.free(self.dpop_authserver_nonce); 38 + a.free(self.dpop_private_key); 39 + } 40 + }; 41 + 42 + const StoredSession = struct { 43 + did: []u8, 44 + handle: []u8, 45 + pds_url: []u8, 46 + authserver_iss: []u8, 47 + access_token: []u8, 48 + refresh_token: []u8, 49 + dpop_authserver_nonce: []u8, 50 + dpop_pds_nonce: []u8, 51 + dpop_private_key: []u8, 52 + created_at: i64, 53 + 54 + fn deinit(self: *StoredSession, a: Allocator) void { 55 + a.free(self.did); 56 + a.free(self.handle); 57 + a.free(self.pds_url); 58 + a.free(self.authserver_iss); 59 + a.free(self.access_token); 60 + a.free(self.refresh_token); 61 + a.free(self.dpop_authserver_nonce); 62 + a.free(self.dpop_pds_nonce); 63 + a.free(self.dpop_private_key); 64 + } 65 + }; 66 + 67 + var auth_requests: std.StringHashMap(StoredAuthRequest) = undefined; 68 + var sessions: std.StringHashMap(StoredSession) = undefined; 69 + /// opaque session cookie token (hex-encoded 32 random bytes) → DID. the 70 + /// cookie never contains the DID itself — DIDs are public identifiers. 71 + var session_tokens: std.StringHashMap([]u8) = undefined; 72 + 73 + fn timestamp() i64 { 74 + return @intCast(@divFloor(Io.Timestamp.now(io, .real).nanoseconds, std.time.ns_per_s)); 75 + } 76 + 77 + pub fn init(app_io: Io, app_allocator: Allocator) void { 78 + io = app_io; 79 + gpa = app_allocator; 80 + auth_requests = std.StringHashMap(StoredAuthRequest).init(gpa); 81 + sessions = std.StringHashMap(StoredSession).init(gpa); 82 + session_tokens = std.StringHashMap([]u8).init(gpa); 83 + std.log.info("state: in-memory (oauth sessions reset on restart)", .{}); 84 + } 85 + 86 + pub fn close() void {} 87 + 88 + pub const AuthRequest = struct { 89 + state: []const u8, 90 + authserver_iss: []const u8, 91 + did: []const u8, 92 + handle: []const u8, 93 + pds_url: []const u8, 94 + pkce_verifier: []const u8, 95 + scope: []const u8, 96 + dpop_authserver_nonce: []const u8, 97 + dpop_private_key: []const u8, 98 + }; 99 + 100 + pub const Session = struct { 101 + did: []const u8, 102 + handle: []const u8, 103 + pds_url: []const u8, 104 + authserver_iss: []const u8, 105 + access_token: []const u8, 106 + refresh_token: []const u8, 107 + dpop_authserver_nonce: []const u8, 108 + dpop_pds_nonce: []const u8, 109 + dpop_private_key: []const u8, 110 + }; 111 + 112 + pub fn insertAuthRequest( 113 + state: []const u8, 114 + authserver_iss: []const u8, 115 + did: []const u8, 116 + handle: []const u8, 117 + pds_url: []const u8, 118 + pkce_verifier: []const u8, 119 + scope: []const u8, 120 + dpop_nonce: []const u8, 121 + dpop_private_key_hex: []const u8, 122 + ) !void { 123 + mutex.lockUncancelable(io); 124 + defer mutex.unlock(io); 125 + 126 + const stored: StoredAuthRequest = .{ 127 + .state = try gpa.dupe(u8, state), 128 + .authserver_iss = try gpa.dupe(u8, authserver_iss), 129 + .did = try gpa.dupe(u8, did), 130 + .handle = try gpa.dupe(u8, handle), 131 + .pds_url = try gpa.dupe(u8, pds_url), 132 + .pkce_verifier = try gpa.dupe(u8, pkce_verifier), 133 + .scope = try gpa.dupe(u8, scope), 134 + .dpop_authserver_nonce = try gpa.dupe(u8, dpop_nonce), 135 + .dpop_private_key = try gpa.dupe(u8, dpop_private_key_hex), 136 + .created_at = timestamp(), 137 + }; 138 + if (auth_requests.fetchRemove(stored.state)) |kv| { 139 + var prev = kv.value; 140 + prev.deinit(gpa); 141 + } 142 + try auth_requests.put(stored.state, stored); 143 + } 144 + 145 + pub fn getAuthRequest(arena: Allocator, state: []const u8) !?AuthRequest { 146 + mutex.lockUncancelable(io); 147 + defer mutex.unlock(io); 148 + 149 + const stored = auth_requests.getPtr(state) orelse return null; 150 + return AuthRequest{ 151 + .state = try arena.dupe(u8, stored.state), 152 + .authserver_iss = try arena.dupe(u8, stored.authserver_iss), 153 + .did = try arena.dupe(u8, stored.did), 154 + .handle = try arena.dupe(u8, stored.handle), 155 + .pds_url = try arena.dupe(u8, stored.pds_url), 156 + .pkce_verifier = try arena.dupe(u8, stored.pkce_verifier), 157 + .scope = try arena.dupe(u8, stored.scope), 158 + .dpop_authserver_nonce = try arena.dupe(u8, stored.dpop_authserver_nonce), 159 + .dpop_private_key = try arena.dupe(u8, stored.dpop_private_key), 160 + }; 161 + } 162 + 163 + pub fn deleteAuthRequest(state: []const u8) void { 164 + mutex.lockUncancelable(io); 165 + defer mutex.unlock(io); 166 + if (auth_requests.fetchRemove(state)) |kv| { 167 + var stored = kv.value; 168 + stored.deinit(gpa); 169 + } 170 + } 171 + 172 + pub fn upsertSession( 173 + did: []const u8, 174 + handle: []const u8, 175 + pds_url: []const u8, 176 + authserver_iss: []const u8, 177 + access_token: []const u8, 178 + refresh_token: []const u8, 179 + dpop_authserver_nonce: []const u8, 180 + dpop_pds_nonce: []const u8, 181 + dpop_private_key_hex: []const u8, 182 + ) !void { 183 + mutex.lockUncancelable(io); 184 + defer mutex.unlock(io); 185 + 186 + const new_session: StoredSession = .{ 187 + .did = try gpa.dupe(u8, did), 188 + .handle = try gpa.dupe(u8, handle), 189 + .pds_url = try gpa.dupe(u8, pds_url), 190 + .authserver_iss = try gpa.dupe(u8, authserver_iss), 191 + .access_token = try gpa.dupe(u8, access_token), 192 + .refresh_token = try gpa.dupe(u8, refresh_token), 193 + .dpop_authserver_nonce = try gpa.dupe(u8, dpop_authserver_nonce), 194 + .dpop_pds_nonce = try gpa.dupe(u8, dpop_pds_nonce), 195 + .dpop_private_key = try gpa.dupe(u8, dpop_private_key_hex), 196 + .created_at = timestamp(), 197 + }; 198 + 199 + if (sessions.fetchRemove(new_session.did)) |kv| { 200 + var prev = kv.value; 201 + prev.deinit(gpa); 202 + } 203 + try sessions.put(new_session.did, new_session); 204 + } 205 + 206 + pub fn getSession(arena: Allocator, did: []const u8) !?Session { 207 + mutex.lockUncancelable(io); 208 + defer mutex.unlock(io); 209 + 210 + const stored = sessions.getPtr(did) orelse return null; 211 + return Session{ 212 + .did = try arena.dupe(u8, stored.did), 213 + .handle = try arena.dupe(u8, stored.handle), 214 + .pds_url = try arena.dupe(u8, stored.pds_url), 215 + .authserver_iss = try arena.dupe(u8, stored.authserver_iss), 216 + .access_token = try arena.dupe(u8, stored.access_token), 217 + .refresh_token = try arena.dupe(u8, stored.refresh_token), 218 + .dpop_authserver_nonce = try arena.dupe(u8, stored.dpop_authserver_nonce), 219 + .dpop_pds_nonce = try arena.dupe(u8, stored.dpop_pds_nonce), 220 + .dpop_private_key = try arena.dupe(u8, stored.dpop_private_key), 221 + }; 222 + } 223 + 224 + pub fn deleteSession(did: []const u8) void { 225 + mutex.lockUncancelable(io); 226 + defer mutex.unlock(io); 227 + if (sessions.fetchRemove(did)) |kv| { 228 + var stored = kv.value; 229 + stored.deinit(gpa); 230 + } 231 + } 232 + 233 + pub fn updateSessionNonce(did: []const u8, field: enum { authserver, pds }, nonce: []const u8) void { 234 + mutex.lockUncancelable(io); 235 + defer mutex.unlock(io); 236 + const stored = sessions.getPtr(did) orelse return; 237 + const new_val = gpa.dupe(u8, nonce) catch return; 238 + switch (field) { 239 + .authserver => { 240 + gpa.free(stored.dpop_authserver_nonce); 241 + stored.dpop_authserver_nonce = new_val; 242 + }, 243 + .pds => { 244 + gpa.free(stored.dpop_pds_nonce); 245 + stored.dpop_pds_nonce = new_val; 246 + }, 247 + } 248 + } 249 + 250 + pub fn updateSessionTokens(did: []const u8, access_token: []const u8, refresh_token: []const u8) void { 251 + mutex.lockUncancelable(io); 252 + defer mutex.unlock(io); 253 + const stored = sessions.getPtr(did) orelse return; 254 + const new_at = gpa.dupe(u8, access_token) catch return; 255 + const new_rt = gpa.dupe(u8, refresh_token) catch { 256 + gpa.free(new_at); 257 + return; 258 + }; 259 + gpa.free(stored.access_token); 260 + gpa.free(stored.refresh_token); 261 + stored.access_token = new_at; 262 + stored.refresh_token = new_rt; 263 + } 264 + 265 + pub fn createSessionToken(did: []const u8) ![]const u8 { 266 + mutex.lockUncancelable(io); 267 + defer mutex.unlock(io); 268 + 269 + var rand_bytes: [32]u8 = undefined; 270 + io.random(&rand_bytes); 271 + 272 + const token = std.fmt.bytesToHex(rand_bytes, .lower); 273 + 274 + const key = try gpa.dupe(u8, &token); 275 + const val = try gpa.dupe(u8, did); 276 + try session_tokens.put(key, val); 277 + return key; 278 + } 279 + 280 + pub fn resolveSessionToken(token: []const u8) ?[]const u8 { 281 + mutex.lockUncancelable(io); 282 + defer mutex.unlock(io); 283 + return session_tokens.get(token); 284 + } 285 + 286 + pub fn deleteSessionToken(token: []const u8) void { 287 + mutex.lockUncancelable(io); 288 + defer mutex.unlock(io); 289 + if (session_tokens.fetchRemove(token)) |kv| { 290 + gpa.free(kv.key); 291 + gpa.free(kv.value); 292 + } 293 + }
+46
lexicons/tech/waow/pub-search/subscription.json
··· 1 + { 2 + "lexicon": 1, 3 + "id": "tech.waow.pub-search.subscription", 4 + "defs": { 5 + "main": { 6 + "type": "record", 7 + "key": "tid", 8 + "description": "Subscribe to new documents indexed by pub-search that match a trigger. pub-search fires a delivery (e.g. webhook) each time a matching document is ingested. The record lives on the user's PDS so subscriptions are portable and inspectable.", 9 + "record": { 10 + "type": "object", 11 + "required": ["triggerKind", "triggerValue", "destinationKind", "destinationValue", "createdAt"], 12 + "properties": { 13 + "triggerKind": { 14 + "type": "string", 15 + "knownValues": ["author", "publication", "platform", "tag"], 16 + "description": "What to match against. 'author' matches document.did; 'publication' matches document.publicationUri; 'platform' matches leaflet/pckt/offprint/greengale/whitewind/other; 'tag' matches any tag on the document." 17 + }, 18 + "triggerValue": { 19 + "type": "string", 20 + "maxLength": 512, 21 + "description": "Value to match. For author: a DID. For publication: an at-uri. For platform: the platform name. For tag: the tag string." 22 + }, 23 + "destinationKind": { 24 + "type": "string", 25 + "knownValues": ["bsky"], 26 + "description": "Where to send notifications. 'bsky' sends a direct message via chat.bsky on behalf of the authenticated subscriber." 27 + }, 28 + "destinationValue": { 29 + "type": "string", 30 + "maxLength": 512, 31 + "description": "For bsky: the recipient's DID or handle. The subscriber must have chat DM'ing permission with the recipient." 32 + }, 33 + "label": { 34 + "type": "string", 35 + "maxLength": 128, 36 + "description": "Human-readable name for this subscription." 37 + }, 38 + "createdAt": { 39 + "type": "string", 40 + "format": "datetime" 41 + } 42 + } 43 + } 44 + } 45 + } 46 + }
+420
site/subscriptions.html
··· 1 + <!DOCTYPE html> 2 + <html lang="en"> 3 + <head> 4 + <meta charset="UTF-8"> 5 + <meta name="viewport" content="width=device-width, initial-scale=1.0"> 6 + <link rel="icon" type="image/svg+xml" href="/favicon.svg"> 7 + <title>subscriptions - pub search</title> 8 + <meta name="description" content="get a bsky DM when pub-search indexes a new document under one of your standard.site publications"> 9 + <meta name="robots" content="noindex"> 10 + <script> 11 + (function() { 12 + var t = localStorage.getItem('theme') || 'dark'; 13 + if (t === 'system') t = matchMedia('(prefers-color-scheme: light)').matches ? 'light' : 'dark'; 14 + document.documentElement.setAttribute('data-theme', t); 15 + })(); 16 + </script> 17 + <style> 18 + :root, [data-theme="dark"] { 19 + --bg: #0a0a0a; 20 + --bg-subtle: #111; 21 + --bg-hover: #151515; 22 + --border: #222; 23 + --border-subtle: #252525; 24 + --border-focus: #333; 25 + --text: #ccc; 26 + --text-bright: #fff; 27 + --text-secondary: #888; 28 + --text-dim: #555; 29 + --text-muted: #444; 30 + --error: #c44; 31 + --ok: #1B7340; 32 + } 33 + [data-theme="light"] { 34 + --bg: #f5f5f0; 35 + --bg-subtle: #eee; 36 + --bg-hover: #e4e4e0; 37 + --border: #ddd; 38 + --border-subtle: #ccc; 39 + --border-focus: #bbb; 40 + --text: #333; 41 + --text-bright: #111; 42 + --text-secondary: #555; 43 + --text-dim: #888; 44 + --text-muted: #999; 45 + --error: #c44; 46 + --ok: #1B7340; 47 + } 48 + 49 + * { box-sizing: border-box; margin: 0; padding: 0; } 50 + 51 + body { 52 + font-family: monospace; 53 + background: var(--bg); 54 + color: var(--text); 55 + min-height: 100vh; 56 + padding: 1rem; 57 + font-size: 14px; 58 + line-height: 1.6; 59 + } 60 + 61 + .container { 62 + max-width: 600px; 63 + margin: 0 auto; 64 + } 65 + 66 + a { color: #1B7340; text-decoration: none; } 67 + a:hover { color: #2a9d5c; } 68 + 69 + h1 { 70 + font-size: 12px; 71 + font-weight: normal; 72 + margin-bottom: 1.5rem; 73 + } 74 + h1 a.title { color: var(--text-secondary); } 75 + h1 a.title:hover { color: var(--text-bright); } 76 + h1 .sep { color: var(--text-muted); margin: 0 0.25rem; } 77 + h1 .section { color: var(--text-dim); } 78 + 79 + p { margin-bottom: 1rem; color: var(--text-secondary); font-size: 13px; } 80 + p.hint { font-size: 12px; color: var(--text-dim); } 81 + 82 + code { 83 + font-family: monospace; 84 + color: #1B7340; 85 + background: var(--bg-subtle); 86 + padding: 1px 5px; 87 + border: 1px solid var(--border); 88 + font-size: 12px; 89 + } 90 + 91 + input[type="text"] { 92 + width: 100%; 93 + padding: 0.5rem; 94 + font-family: monospace; 95 + font-size: 16px; 96 + background: var(--bg-subtle); 97 + border: 1px solid var(--border-focus); 98 + color: var(--text); 99 + } 100 + input[type="text"]:focus { outline: 1px solid #1B7340; } 101 + 102 + button { 103 + padding: 0.5rem 1rem; 104 + font-family: monospace; 105 + font-size: 14px; 106 + background: var(--bg-subtle); 107 + border: 1px solid var(--border-focus); 108 + color: var(--text); 109 + cursor: pointer; 110 + } 111 + button:hover { background: var(--border); color: var(--text-bright); } 112 + button:disabled { color: var(--text-dim); cursor: not-allowed; } 113 + 114 + .me { 115 + float: right; 116 + font-size: 11px; 117 + color: var(--text-dim); 118 + } 119 + .me a { color: var(--text-dim); } 120 + .me a:hover { color: var(--text); } 121 + 122 + .status { 123 + font-size: 12px; 124 + color: var(--ok); 125 + margin-top: 0.5rem; 126 + min-height: 1.6em; 127 + } 128 + .status.err { color: var(--error); } 129 + 130 + .card { 131 + border: 1px solid var(--border); 132 + padding: 0.75rem 1rem; 133 + margin-bottom: 1rem; 134 + background: var(--bg-subtle); 135 + } 136 + 137 + .pub { 138 + display: flex; 139 + align-items: center; 140 + gap: 1rem; 141 + padding: 0.75rem 0; 142 + border-bottom: 1px solid var(--border); 143 + } 144 + .pub:last-child { border-bottom: none; } 145 + .pub-meta { flex: 1; min-width: 0; } 146 + .pub-name { 147 + color: var(--text-bright); 148 + font-size: 13px; 149 + margin-bottom: 2px; 150 + word-break: break-word; 151 + } 152 + .pub-url { 153 + color: var(--text-secondary); 154 + font-size: 11px; 155 + word-break: break-all; 156 + } 157 + .pub-url a { color: var(--text-secondary); } 158 + .pub-url a:hover { color: #1B7340; } 159 + 160 + /* toggle */ 161 + .toggle { 162 + position: relative; 163 + width: 36px; 164 + height: 20px; 165 + flex-shrink: 0; 166 + } 167 + .toggle input { opacity: 0; width: 0; height: 0; } 168 + .toggle .track { 169 + position: absolute; 170 + inset: 0; 171 + background: var(--border); 172 + border: 1px solid var(--border-focus); 173 + border-radius: 10px; 174 + cursor: pointer; 175 + transition: background 0.15s; 176 + } 177 + .toggle .thumb { 178 + position: absolute; 179 + top: 2px; 180 + left: 2px; 181 + width: 14px; 182 + height: 14px; 183 + background: var(--text-dim); 184 + border-radius: 50%; 185 + transition: left 0.15s, background 0.15s; 186 + } 187 + .toggle input:checked ~ .track { background: #1B7340; border-color: #1B7340; } 188 + .toggle input:checked ~ .thumb { left: 18px; background: #fff; } 189 + .toggle input:disabled ~ .track { opacity: 0.5; cursor: wait; } 190 + 191 + .test-btn { 192 + padding: 0.2rem 0.5rem; 193 + font-size: 11px; 194 + background: none; 195 + border: 1px solid var(--border-focus); 196 + color: var(--text-dim); 197 + } 198 + .test-btn:hover { color: var(--text); background: var(--border); } 199 + 200 + .empty { color: var(--text-dim); font-size: 12px; padding: 1rem 0; } 201 + 202 + .hidden { display: none !important; } 203 + 204 + .theme-toggle { 205 + position: fixed; 206 + top: 1rem; 207 + right: 1rem; 208 + background: none; 209 + border: none; 210 + color: var(--text-muted); 211 + cursor: pointer; 212 + font-size: 16px; 213 + padding: 0; 214 + } 215 + .theme-toggle:hover { color: var(--text); background: none; } 216 + </style> 217 + </head> 218 + <body> 219 + <button class="theme-toggle" id="theme-toggle" title="toggle theme">◐</button> 220 + 221 + <div class="container"> 222 + <h1> 223 + <a class="title" href="/">pub search</a> 224 + <span class="sep">/</span> 225 + <span class="section">subscriptions</span> 226 + <span class="me" id="me"></span> 227 + </h1> 228 + 229 + <p>get a bsky DM whenever pub-search indexes a new document under one of your <code>site.standard.publication</code> records.</p> 230 + 231 + <section id="login" class="card hidden"> 232 + <p class="hint">sign in with your atproto handle:</p> 233 + <form id="login-form" style="display:flex;gap:0.5rem"> 234 + <input id="handle" type="text" placeholder="you.bsky.social" autocomplete="off" autocapitalize="off" required style="flex:1"> 235 + <button type="submit">sign in</button> 236 + </form> 237 + <div id="login-status" class="status"></div> 238 + </section> 239 + 240 + <section id="pubs-section" class="card hidden"> 241 + <div id="pubs"></div> 242 + <div id="pubs-status" class="status"></div> 243 + </section> 244 + </div> 245 + 246 + <script> 247 + // API_URL selection: 248 + // - ?api=https://... (persisted to localStorage, for dev vs ngrok) 249 + // - localStorage.pubsearch_api 250 + // - same-origin if served from ngrok 251 + // - default: prod backend 252 + (function captureApiParam() { 253 + const p = new URLSearchParams(location.search).get('api'); 254 + if (p) { 255 + localStorage.setItem('pubsearch_api', p); 256 + const u = new URL(location.href); 257 + u.searchParams.delete('api'); 258 + history.replaceState(null, '', u.toString()); 259 + } 260 + })(); 261 + const API_URL = 262 + localStorage.getItem('pubsearch_api') || 263 + ((location.hostname.endsWith('ngrok.app') || location.hostname.endsWith('ngrok-free.app')) 264 + ? location.origin 265 + : 'https://leaflet-search-backend.fly.dev'); 266 + 267 + const themeToggle = document.getElementById('theme-toggle'); 268 + themeToggle.onclick = () => { 269 + const cur = document.documentElement.getAttribute('data-theme') || 'dark'; 270 + const next = cur === 'dark' ? 'light' : 'dark'; 271 + document.documentElement.setAttribute('data-theme', next); 272 + localStorage.setItem('theme', next); 273 + }; 274 + 275 + const $ = (id) => document.getElementById(id); 276 + const escapeHtml = (s) => (s || '').replace(/[&<>"']/g, c => ({'&':'&amp;','<':'&lt;','>':'&gt;','"':'&quot;',"'":'&#39;'}[c])); 277 + const show = (el) => el.classList.remove('hidden'); 278 + const hide = (el) => el.classList.add('hidden'); 279 + 280 + const api = (path, opts = {}) => 281 + fetch(API_URL + path, { credentials: 'include', ...opts }); 282 + 283 + async function fetchMe() { 284 + try { const r = await api('/api/me'); if (r.status === 200) return r.json(); } catch {} 285 + return null; 286 + } 287 + async function fetchPubs() { 288 + try { const r = await api('/api/my-publications'); if (r.ok) return r.json(); } catch {} 289 + return []; 290 + } 291 + async function fetchSubs() { 292 + try { const r = await api('/api/subscriptions'); if (r.ok) return r.json(); } catch {} 293 + return []; 294 + } 295 + 296 + // build a lookup map: publication at-uri → subscription rkey 297 + function indexSubs(subs) { 298 + const out = {}; 299 + for (const s of subs) { 300 + if (s.triggerKind === 'publication' && s.destinationKind === 'bsky') { 301 + out[s.triggerValue] = s.rkey; 302 + } 303 + } 304 + return out; 305 + } 306 + 307 + function renderPubs(pubs, subIndex, selfDid) { 308 + const el = $('pubs'); 309 + if (!pubs.length) { 310 + el.innerHTML = '<div class="empty">you don\'t have any <code>site.standard.publication</code> records on your PDS yet.</div>'; 311 + return; 312 + } 313 + el.innerHTML = ''; 314 + for (const p of pubs) { 315 + const subbed = !!subIndex[p.uri]; 316 + const row = document.createElement('div'); 317 + row.className = 'pub'; 318 + row.innerHTML = ` 319 + <div class="pub-meta"> 320 + <div class="pub-name">${escapeHtml(p.name || '(untitled)')}</div> 321 + <div class="pub-url">${p.url ? `<a href="${escapeHtml(p.url)}" target="_blank" rel="noopener">${escapeHtml(p.url)}</a>` : escapeHtml(p.uri)}</div> 322 + </div> 323 + <button class="test-btn" title="send a test DM">test</button> 324 + <label class="toggle" title="${subbed ? 'DM me off' : 'DM me on'}"> 325 + <input type="checkbox" ${subbed ? 'checked' : ''}> 326 + <span class="track"></span> 327 + <span class="thumb"></span> 328 + </label> 329 + `; 330 + const checkbox = row.querySelector('input[type=checkbox]'); 331 + const testBtn = row.querySelector('.test-btn'); 332 + testBtn.style.display = subbed ? '' : 'none'; 333 + 334 + checkbox.onchange = async () => { 335 + checkbox.disabled = true; 336 + const status = $('pubs-status'); 337 + status.textContent = ''; status.classList.remove('err'); 338 + try { 339 + if (checkbox.checked) { 340 + const r = await api('/api/subscriptions', { 341 + method: 'POST', 342 + headers: { 'content-type': 'application/json' }, 343 + body: JSON.stringify({ 344 + triggerKind: 'publication', 345 + triggerValue: p.uri, 346 + destinationKind: 'bsky', 347 + destinationValue: selfDid, 348 + label: p.name || p.url || p.uri, 349 + }), 350 + }); 351 + const j = await r.json().catch(() => ({})); 352 + if (!r.ok) throw new Error(j.error || ('error ' + r.status)); 353 + subIndex[p.uri] = j.rkey; 354 + testBtn.style.display = ''; 355 + } else { 356 + const rkey = subIndex[p.uri]; 357 + if (!rkey) return; 358 + const r = await api('/api/subscriptions/' + encodeURIComponent(rkey), { method: 'DELETE' }); 359 + if (!r.ok) throw new Error('delete failed'); 360 + delete subIndex[p.uri]; 361 + testBtn.style.display = 'none'; 362 + } 363 + } catch (err) { 364 + status.textContent = err.message || 'failed'; 365 + status.classList.add('err'); 366 + checkbox.checked = !checkbox.checked; // revert 367 + } finally { 368 + checkbox.disabled = false; 369 + } 370 + }; 371 + 372 + testBtn.onclick = async () => { 373 + const rkey = subIndex[p.uri]; 374 + if (!rkey) return; 375 + const status = $('pubs-status'); 376 + testBtn.disabled = true; 377 + status.textContent = ''; status.classList.remove('err'); 378 + const r = await api('/api/subscriptions/' + encodeURIComponent(rkey) + '/test', { method: 'POST' }); 379 + const j = await r.json().catch(() => ({})); 380 + if (r.ok) status.textContent = 'test DM sent'; 381 + else { status.textContent = j.error || 'test failed'; status.classList.add('err'); } 382 + testBtn.disabled = false; 383 + setTimeout(() => { status.textContent = ''; }, 3000); 384 + }; 385 + 386 + el.appendChild(row); 387 + } 388 + } 389 + 390 + async function render() { 391 + const me = await fetchMe(); 392 + const meEl = $('me'); 393 + if (!me) { 394 + meEl.innerHTML = ''; 395 + show($('login')); hide($('pubs-section')); 396 + return; 397 + } 398 + meEl.innerHTML = `@${escapeHtml(me.handle)} <a href="#" id="logout">(sign out)</a>`; 399 + $('logout').onclick = async (e) => { 400 + e.preventDefault(); 401 + await api('/oauth/logout', { method: 'POST' }); 402 + render(); 403 + }; 404 + hide($('login')); show($('pubs-section')); 405 + 406 + const [pubs, subs] = await Promise.all([fetchPubs(), fetchSubs()]); 407 + renderPubs(pubs, indexSubs(subs), me.did); 408 + } 409 + 410 + $('login-form').addEventListener('submit', (e) => { 411 + e.preventDefault(); 412 + const h = $('handle').value.trim(); 413 + if (!h) return; 414 + window.location.href = API_URL + '/oauth/login?handle=' + encodeURIComponent(h); 415 + }); 416 + 417 + render(); 418 + </script> 419 + </body> 420 + </html>