search for standard sites pub-search.waow.tech
search zig blog atproto
11
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: bot-sends-DM delivery (@pub-search.waow.tech)

bsky chat convos require two distinct members — you can't DM yourself.
switching to a dedicated bot account that DMs subscribers.

changes:
- new src/bsky_bot.zig: app-password login + cached session + chat.bsky
sendMessage via atproto-proxy. self-healing on 401.
- notifications delivery now always DMs the subscription owner (the
subscriber), sent FROM the bot account
- oauth SCOPE drops transition:chat.bsky (not needed anymore — the bot
has its own bsky session via app password)
- subscriptions CRUD drops destinationKind/destinationValue from the
client-facing contract; lexicon makes them optional
- frontend no longer asks for recipient

fly secrets to set: BSKY_BOT_HANDLE (default pub-search.waow.tech),
BSKY_BOT_APP_PASSWORD (staged).

Co-Authored-By: Claude Opus 4 (1M context) <noreply@anthropic.com>

+331 -179
+1
.claude/scheduled_tasks.lock
··· 1 + {"sessionId":"9d0da28e-d8a6-468f-8a0a-b610f4d4683b","pid":55385,"acquiredAt":1776804529237}
+286
backend/src/bsky_bot.zig
··· 1 + //! bsky DM sender for the pub-search bot account. 2 + //! 3 + //! The bot (@pub-search.waow.tech) authenticates with an app password, 4 + //! caches its session (accessJwt + refreshJwt + PDS URL), and sends DMs 5 + //! via `chat.bsky.convo.*` proxied through its PDS. 6 + //! 7 + //! Why a bot instead of each user DMing themselves: bsky chat convos 8 + //! require at least two distinct members, so users can't "DM themselves" 9 + //! through the chat service. The bot is the second member. 10 + //! 11 + //! Session management is lazy + self-healing: login on first use, retry 12 + //! once on 401 after clearing the session. 13 + 14 + const std = @import("std"); 15 + const Io = std.Io; 16 + const http = std.http; 17 + const json = std.json; 18 + const Allocator = std.mem.Allocator; 19 + const zat = @import("zat"); 20 + const logfire = @import("logfire"); 21 + 22 + const BSKY_CHAT_PROXY = "did:web:api.bsky.chat#bsky_chat"; 23 + 24 + var cfg_io: Io = undefined; 25 + var cfg_alloc: Allocator = undefined; 26 + var cfg_handle: []const u8 = ""; 27 + var cfg_password: []const u8 = ""; 28 + var cfg_set = false; 29 + 30 + var mutex: Io.Mutex = .init; 31 + // cached state (heap-owned, guarded by mutex) 32 + var cached_pds: ?[]u8 = null; 33 + var cached_access: ?[]u8 = null; 34 + var cached_refresh: ?[]u8 = null; 35 + 36 + pub fn init(alloc: Allocator, io: Io, handle: []const u8, password: []const u8) void { 37 + cfg_alloc = alloc; 38 + cfg_io = io; 39 + cfg_handle = handle; 40 + cfg_password = password; 41 + cfg_set = true; 42 + } 43 + 44 + pub fn isConfigured() bool { 45 + return cfg_set and cfg_handle.len > 0 and cfg_password.len > 0; 46 + } 47 + 48 + /// Send a DM from the bot to `to_did` with the given text. Handles login 49 + /// and 401 recovery transparently. 50 + pub fn sendDm(arena: Allocator, to_did: []const u8, text: []const u8) !void { 51 + if (!isConfigured()) return error.BotNotConfigured; 52 + 53 + const convo_id = try callWithRetry(arena, to_did, null); 54 + _ = try callWithRetry(arena, convo_id, text); 55 + } 56 + 57 + /// Shared retry wrapper. If `text` is null, resolves a convo; otherwise 58 + /// sends `text`. Returns the convo_id for the resolve case, empty for send. 59 + fn callWithRetry(arena: Allocator, arg: []const u8, text: ?[]const u8) ![]const u8 { 60 + var attempted_relogin = false; 61 + while (true) { 62 + try ensureSession(arena); 63 + const sn = try snapshot(arena); 64 + 65 + const result = if (text) |t| 66 + sendMessage(arena, sn, arg, t) 67 + else 68 + getConvoForMembers(arena, sn, arg); 69 + 70 + if (result) |ok| { 71 + return ok; 72 + } else |err| { 73 + if (err == error.Unauthorized and !attempted_relogin) { 74 + logfire.info("bsky_bot: 401 — clearing cached session and retrying", .{}); 75 + clearSession(); 76 + attempted_relogin = true; 77 + continue; 78 + } 79 + return err; 80 + } 81 + } 82 + } 83 + 84 + // --------------------------------------------------------------------------- 85 + // session management 86 + // --------------------------------------------------------------------------- 87 + 88 + const Snapshot = struct { pds: []const u8, access: []const u8 }; 89 + 90 + fn snapshot(arena: Allocator) !Snapshot { 91 + mutex.lockUncancelable(cfg_io); 92 + defer mutex.unlock(cfg_io); 93 + const pds = cached_pds orelse return error.NoSession; 94 + const access = cached_access orelse return error.NoSession; 95 + return .{ 96 + .pds = try arena.dupe(u8, pds), 97 + .access = try arena.dupe(u8, access), 98 + }; 99 + } 100 + 101 + fn ensureSession(arena: Allocator) !void { 102 + { 103 + mutex.lockUncancelable(cfg_io); 104 + defer mutex.unlock(cfg_io); 105 + if (cached_access != null and cached_pds != null) return; 106 + } 107 + try login(arena); 108 + } 109 + 110 + fn clearSession() void { 111 + mutex.lockUncancelable(cfg_io); 112 + defer mutex.unlock(cfg_io); 113 + if (cached_access) |a| { 114 + cfg_alloc.free(a); 115 + cached_access = null; 116 + } 117 + if (cached_refresh) |r| { 118 + cfg_alloc.free(r); 119 + cached_refresh = null; 120 + } 121 + // keep cached_pds so we don't re-resolve on every re-login 122 + } 123 + 124 + fn login(arena: Allocator) !void { 125 + const pds_url = try resolvePds(arena); 126 + 127 + const body = try std.fmt.allocPrint(arena, 128 + \\{{"identifier":"{s}","password":"{s}"}} 129 + , .{ cfg_handle, cfg_password }); 130 + 131 + const result = try httpPost(arena, pds_url, "/xrpc/com.atproto.server.createSession", body, null, false); 132 + if (result.status != .ok) { 133 + logfire.err("bsky_bot: createSession failed status={t} body={s}", .{ result.status, result.body[0..@min(result.body.len, 400)] }); 134 + return error.LoginFailed; 135 + } 136 + const parsed = try json.parseFromSlice(json.Value, arena, result.body, .{}); 137 + defer parsed.deinit(); 138 + const access_v = parsed.value.object.get("accessJwt") orelse return error.MissingAccessJwt; 139 + const refresh_v = parsed.value.object.get("refreshJwt") orelse return error.MissingRefreshJwt; 140 + if (access_v != .string or refresh_v != .string) return error.BadLoginResponse; 141 + 142 + mutex.lockUncancelable(cfg_io); 143 + defer mutex.unlock(cfg_io); 144 + if (cached_pds) |p| cfg_alloc.free(p); 145 + if (cached_access) |a| cfg_alloc.free(a); 146 + if (cached_refresh) |r| cfg_alloc.free(r); 147 + cached_pds = try cfg_alloc.dupe(u8, pds_url); 148 + cached_access = try cfg_alloc.dupe(u8, access_v.string); 149 + cached_refresh = try cfg_alloc.dupe(u8, refresh_v.string); 150 + logfire.info("bsky_bot: logged in handle={s} pds={s}", .{ cfg_handle, pds_url }); 151 + } 152 + 153 + fn resolvePds(arena: Allocator) ![]const u8 { 154 + var hres = zat.HandleResolver.init(cfg_io, arena); 155 + defer hres.deinit(); 156 + const parsed_handle = zat.Handle.parse(cfg_handle) orelse return error.InvalidHandle; 157 + const did = try hres.resolve(parsed_handle); 158 + 159 + var dres = zat.DidResolver.init(cfg_io, arena); 160 + defer dres.deinit(); 161 + var did_doc = try dres.resolve(zat.Did.parse(did) orelse return error.InvalidDid); 162 + defer did_doc.deinit(); 163 + return try arena.dupe(u8, did_doc.pdsEndpoint() orelse return error.NoPds); 164 + } 165 + 166 + // --------------------------------------------------------------------------- 167 + // xrpc calls 168 + // --------------------------------------------------------------------------- 169 + 170 + fn getConvoForMembers(arena: Allocator, sn: Snapshot, to_did: []const u8) ![]const u8 { 171 + const path = try std.fmt.allocPrint(arena, "/xrpc/chat.bsky.convo.getConvoForMembers?members={s}", .{to_did}); 172 + const result = try httpGet(arena, sn.pds, path, sn.access, true); 173 + if (result.status == .unauthorized) return error.Unauthorized; 174 + if (result.status != .ok) { 175 + logfire.err("bsky_bot: getConvoForMembers status={t} body={s}", .{ result.status, result.body[0..@min(result.body.len, 400)] }); 176 + return error.FetchFailed; 177 + } 178 + const parsed = try json.parseFromSlice(json.Value, arena, result.body, .{}); 179 + defer parsed.deinit(); 180 + const convo = parsed.value.object.get("convo") orelse return error.MissingConvo; 181 + if (convo != .object) return error.MissingConvo; 182 + const id = convo.object.get("id") orelse return error.MissingConvoId; 183 + if (id != .string) return error.MissingConvoId; 184 + return try arena.dupe(u8, id.string); 185 + } 186 + 187 + fn sendMessage(arena: Allocator, sn: Snapshot, convo_id: []const u8, text: []const u8) ![]const u8 { 188 + var body_buf: std.Io.Writer.Allocating = .init(arena); 189 + var jw: json.Stringify = .{ .writer = &body_buf.writer }; 190 + try jw.beginObject(); 191 + try jw.objectField("convoId"); 192 + try jw.write(convo_id); 193 + try jw.objectField("message"); 194 + try jw.beginObject(); 195 + try jw.objectField("text"); 196 + try jw.write(text); 197 + try jw.endObject(); 198 + try jw.endObject(); 199 + const body = try body_buf.toOwnedSlice(); 200 + 201 + const result = try httpPost(arena, sn.pds, "/xrpc/chat.bsky.convo.sendMessage", body, sn.access, true); 202 + if (result.status == .unauthorized) return error.Unauthorized; 203 + if (result.status != .ok) { 204 + logfire.err("bsky_bot: sendMessage status={t} body={s}", .{ result.status, result.body[0..@min(result.body.len, 400)] }); 205 + return error.FetchFailed; 206 + } 207 + return ""; 208 + } 209 + 210 + // --------------------------------------------------------------------------- 211 + // HTTP helpers (bearer + optional chat-proxy) 212 + // --------------------------------------------------------------------------- 213 + 214 + const HttpResult = struct { status: http.Status, body: []u8 }; 215 + 216 + fn buildHeaders(access: ?[]const u8, chat_proxy: bool, auth_buf: []u8, out: *[2]http.Header) !u8 { 217 + var n: u8 = 0; 218 + if (access) |a| { 219 + const auth = try std.fmt.bufPrint(auth_buf, "Bearer {s}", .{a}); 220 + out[n] = .{ .name = "Authorization", .value = auth }; 221 + n += 1; 222 + } 223 + if (chat_proxy) { 224 + out[n] = .{ .name = "atproto-proxy", .value = BSKY_CHAT_PROXY }; 225 + n += 1; 226 + } 227 + return n; 228 + } 229 + 230 + fn httpGet(arena: Allocator, base: []const u8, path: []const u8, access: []const u8, chat_proxy: bool) !HttpResult { 231 + const url = try std.fmt.allocPrint(arena, "{s}{s}", .{ base, path }); 232 + 233 + var auth_buf: [4096]u8 = undefined; 234 + var hdrs: [2]http.Header = undefined; 235 + const n = try buildHeaders(access, chat_proxy, &auth_buf, &hdrs); 236 + 237 + var client: http.Client = .{ .allocator = arena, .io = cfg_io }; 238 + defer client.deinit(); 239 + 240 + var req = try client.request(.GET, try std.Uri.parse(url), .{ 241 + .extra_headers = hdrs[0..n], 242 + .headers = .{ .accept_encoding = .{ .override = "identity" } }, 243 + }); 244 + defer req.deinit(); 245 + 246 + try req.sendBodiless(); 247 + var redirect_buf: [1]u8 = undefined; 248 + var response = try req.receiveHead(&redirect_buf); 249 + var aw: std.Io.Writer.Allocating = .init(arena); 250 + const reader = response.reader(&.{}); 251 + _ = reader.streamRemaining(&aw.writer) catch {}; 252 + return .{ .status = response.head.status, .body = try aw.toOwnedSlice() }; 253 + } 254 + 255 + fn httpPost(arena: Allocator, base: []const u8, path: []const u8, body: []const u8, access: ?[]const u8, chat_proxy: bool) !HttpResult { 256 + const url = try std.fmt.allocPrint(arena, "{s}{s}", .{ base, path }); 257 + 258 + var auth_buf: [4096]u8 = undefined; 259 + var hdrs: [2]http.Header = undefined; 260 + const n = try buildHeaders(access, chat_proxy, &auth_buf, &hdrs); 261 + 262 + var client: http.Client = .{ .allocator = arena, .io = cfg_io }; 263 + defer client.deinit(); 264 + 265 + var req = try client.request(.POST, try std.Uri.parse(url), .{ 266 + .extra_headers = hdrs[0..n], 267 + .headers = .{ 268 + .content_type = .{ .override = "application/json" }, 269 + .accept_encoding = .{ .override = "identity" }, 270 + }, 271 + }); 272 + defer req.deinit(); 273 + 274 + req.transfer_encoding = .{ .content_length = body.len }; 275 + var body_writer = try req.sendBodyUnflushed(&.{}); 276 + try body_writer.writer.writeAll(body); 277 + try body_writer.end(); 278 + try req.connection.?.flush(); 279 + 280 + var redirect_buf: [1]u8 = undefined; 281 + var response = try req.receiveHead(&redirect_buf); 282 + var aw: std.Io.Writer.Allocating = .init(arena); 283 + const reader = response.reader(&.{}); 284 + _ = reader.streamRemaining(&aw.writer) catch {}; 285 + return .{ .status = response.head.status, .body = try aw.toOwnedSlice() }; 286 + }
+12
backend/src/main.zig
··· 10 10 const state = @import("state.zig"); 11 11 const oauth = @import("oauth.zig"); 12 12 const notifications = @import("notifications.zig"); 13 + const bsky_bot = @import("bsky_bot.zig"); 13 14 14 15 const SOCKET_TIMEOUT_SECS = 5; 15 16 ··· 71 72 72 73 // notifications module needs io + alloc for the delivery queue & worker 73 74 notifications.init(allocator, io); 75 + 76 + // bsky bot (sends subscription DMs on behalf of @pub-search.waow.tech) 77 + bsky_bot.init( 78 + allocator, 79 + io, 80 + getenv("BSKY_BOT_HANDLE") orelse "pub-search.waow.tech", 81 + getenv("BSKY_BOT_APP_PASSWORD") orelse "", 82 + ); 83 + if (!bsky_bot.isConfigured()) { 84 + logfire.warn("BSKY_BOT_APP_PASSWORD not set — bsky DM delivery will fail", .{}); 85 + } 74 86 75 87 // init local db and other services in background (slow) 76 88 const init_thread = try Thread.spawn(.{}, initServices, .{ allocator, io });
+18 -53
backend/src/notifications.zig
··· 22 22 const Allocator = std.mem.Allocator; 23 23 const logfire = @import("logfire"); 24 24 const db = @import("db.zig"); 25 - const oauth = @import("oauth.zig"); 26 - const store = @import("state.zig"); 25 + const bsky_bot = @import("bsky_bot.zig"); 27 26 28 27 pub const SUBSCRIPTION_COLLECTION = "tech.waow.pub-search.subscription"; 29 28 ··· 33 32 const QUEUE_CAPACITY = 1024; 34 33 35 34 const DeliveryJob = struct { 35 + /// the subscriber — who receives the DM from the bot 36 36 owner_did: []u8, 37 - recipient_did: []u8, 38 37 sub_rkey: []u8, 39 38 trigger_kind: []u8, 40 39 trigger_value: []u8, ··· 43 42 44 43 fn deinit(self: *DeliveryJob, a: Allocator) void { 45 44 a.free(self.owner_did); 46 - a.free(self.recipient_did); 47 45 a.free(self.sub_rkey); 48 46 a.free(self.trigger_kind); 49 47 a.free(self.trigger_value); ··· 58 56 var dropped_count = std.atomic.Value(u64).init(0); 59 57 var delivered_count = std.atomic.Value(u64).init(0); 60 58 var failed_count = std.atomic.Value(u64).init(0); 61 - var skipped_no_session_count = std.atomic.Value(u64).init(0); 62 59 63 60 // --------------------------------------------------------------------------- 64 61 // init + schema ··· 232 229 const local = db.getLocalDbRaw() orelse return; 233 230 234 231 var rows = local.query( 235 - \\SELECT rkey, owner_did, destination_kind, destination_value, trigger_kind, trigger_value 232 + \\SELECT rkey, owner_did, trigger_kind, trigger_value 236 233 \\FROM subscriptions 237 234 \\WHERE trigger_kind = ? AND trigger_value = ? 238 235 , .{ kind, value }) catch |err| { ··· 244 241 while (rows.next()) |row| { 245 242 const rkey = row.text(0); 246 243 const owner_did = row.text(1); 247 - const dest_kind = row.text(2); 248 - const dest_value = row.text(3); 249 - const trigger_kind = row.text(4); 250 - const trigger_value = row.text(5); 251 - 252 - if (!std.mem.eql(u8, dest_kind, "bsky")) continue; 244 + const trigger_kind = row.text(2); 245 + const trigger_value = row.text(3); 253 246 254 247 enqueue(.{ 255 248 .owner_did = owner_did, 256 - .recipient_did = dest_value, 257 249 .sub_rkey = rkey, 258 250 .trigger_kind = trigger_kind, 259 251 .trigger_value = trigger_value, ··· 274 266 const local = db.getLocalDbRaw() orelse return error.LocalDbUnavailable; 275 267 276 268 var rows = try local.query( 277 - \\SELECT destination_kind, destination_value, trigger_kind, trigger_value 269 + \\SELECT trigger_kind, trigger_value 278 270 \\FROM subscriptions WHERE owner_did = ? AND rkey = ? 279 271 , .{ owner_did, rkey }); 280 272 defer rows.deinit(); ··· 283 275 logfire.warn("testFire: sub not found owner={s} rkey={s}", .{ owner_did, rkey }); 284 276 return error.NotFound; 285 277 }; 286 - const dest_kind = row.text(0); 287 - const dest_value = row.text(1); 288 - const trigger_kind = row.text(2); 289 - const trigger_value = row.text(3); 278 + const trigger_kind = row.text(0); 279 + const trigger_value = row.text(1); 290 280 291 - if (!std.mem.eql(u8, dest_kind, "bsky")) return error.UnsupportedDestination; 292 - 293 - logfire.info("testFire: enqueuing sub rkey={s} owner={s} recipient={s}", .{ rkey, owner_did, dest_value }); 281 + logfire.info("testFire: enqueuing sub rkey={s} owner={s}", .{ rkey, owner_did }); 294 282 295 283 try enqueue(.{ 296 284 .owner_did = owner_did, 297 - .recipient_did = dest_value, 298 285 .sub_rkey = rkey, 299 286 .trigger_kind = trigger_kind, 300 287 .trigger_value = trigger_value, ··· 309 296 310 297 const EnqueueInput = struct { 311 298 owner_did: []const u8, 312 - recipient_did: []const u8, 313 299 sub_rkey: []const u8, 314 300 trigger_kind: []const u8, 315 301 trigger_value: []const u8, ··· 328 314 329 315 const job: DeliveryJob = .{ 330 316 .owner_did = try alloc.dupe(u8, in.owner_did), 331 - .recipient_did = try alloc.dupe(u8, in.recipient_did), 332 317 .sub_rkey = try alloc.dupe(u8, in.sub_rkey), 333 318 .trigger_kind = try alloc.dupe(u8, in.trigger_kind), 334 319 .trigger_value = try alloc.dupe(u8, in.trigger_value), ··· 365 350 var job = dequeueBlocking(io) orelse continue; 366 351 defer job.deinit(alloc); 367 352 368 - deliver(alloc, &job) catch |err| switch (err) { 369 - error.NoSession => { 370 - _ = skipped_no_session_count.fetchAdd(1, .monotonic); 371 - logfire.warn("notifications: skip delivery sub={s} — subscriber has no live session", .{job.sub_rkey}); 372 - }, 373 - else => { 374 - _ = failed_count.fetchAdd(1, .monotonic); 375 - logfire.warn("notifications: delivery failed sub={s}: {}", .{ job.sub_rkey, err }); 376 - }, 353 + deliver(alloc, &job) catch |err| { 354 + _ = failed_count.fetchAdd(1, .monotonic); 355 + logfire.warn("notifications: delivery failed sub={s}: {}", .{ job.sub_rkey, err }); 377 356 }; 378 357 } 379 358 } ··· 383 362 defer arena.deinit(); 384 363 const a = arena.allocator(); 385 364 386 - logfire.info("deliver: starting sub={s} owner={s} recipient={s}", .{ job.sub_rkey, job.owner_did, job.recipient_did }); 387 - 388 - const session = (try store.getSession(a, job.owner_did)) orelse { 389 - logfire.warn("deliver: NO session in memory for owner={s}", .{job.owner_did}); 390 - return error.NoSession; 391 - }; 392 - 393 - const convo_id = oauth.chatGetConvoForMembers(a, session, &.{job.recipient_did}) catch |err| { 394 - logfire.warn("deliver: getConvoForMembers failed sub={s} err={}", .{ job.sub_rkey, err }); 395 - return err; 396 - }; 397 - logfire.info("deliver: convo_id={s} sub={s}", .{ convo_id, job.sub_rkey }); 365 + logfire.info("deliver: starting sub={s} to_did={s}", .{ job.sub_rkey, job.owner_did }); 398 366 399 367 const text = try std.fmt.allocPrint(a, 400 368 \\new on pub-search — matched your {s}="{s}" subscription ··· 403 371 \\{s} 404 372 , .{ job.trigger_kind, job.trigger_value, job.doc_title, job.doc_url }); 405 373 406 - oauth.chatSendMessage(a, session, convo_id, text) catch |err| { 407 - logfire.warn("deliver: sendMessage failed sub={s} err={}", .{ job.sub_rkey, err }); 408 - return err; 409 - }; 410 - logfire.info("deliver: DM sent sub={s} recipient={s}", .{ job.sub_rkey, job.recipient_did }); 374 + try bsky_bot.sendDm(a, job.owner_did, text); 375 + 376 + logfire.info("deliver: DM sent sub={s}", .{job.sub_rkey}); 411 377 _ = delivered_count.fetchAdd(1, .monotonic); 412 378 } 413 379 414 - pub fn stats() struct { delivered: u64, failed: u64, dropped: u64, skipped_no_session: u64, queued: usize } { 415 - const io = global_io orelse return .{ .delivered = 0, .failed = 0, .dropped = 0, .skipped_no_session = 0, .queued = 0 }; 380 + pub fn stats() struct { delivered: u64, failed: u64, dropped: u64, queued: usize } { 381 + const io = global_io orelse return .{ .delivered = 0, .failed = 0, .dropped = 0, .queued = 0 }; 416 382 queue_mutex.lockUncancelable(io); 417 383 defer queue_mutex.unlock(io); 418 384 return .{ 419 385 .delivered = delivered_count.load(.monotonic), 420 386 .failed = failed_count.load(.monotonic), 421 387 .dropped = dropped_count.load(.monotonic), 422 - .skipped_no_session = skipped_no_session_count.load(.monotonic), 423 388 .queued = queue.items.len, 424 389 }; 425 390 }
-68
backend/src/oauth.zig
··· 1098 1098 defer alloc.free(resp); 1099 1099 } 1100 1100 1101 - // --------------------------------------------------------------------------- 1102 - // chat.bsky sender — proxied through the user's PDS with atproto-proxy header 1103 - // so DMs are sent as the authenticated subscriber. 1104 - // --------------------------------------------------------------------------- 1105 - 1106 - const BSKY_CHAT_PROXY = "did:web:api.bsky.chat#bsky_chat"; 1107 - 1108 - /// GET chat.bsky.convo.getConvoForMembers — returns a convoId suitable for 1109 - /// sendMessage. members is a slice of DIDs (1+, excluding the caller). 1110 - pub fn chatGetConvoForMembers(alloc: Allocator, session: store.Session, member_dids: []const []const u8) ![]const u8 { 1111 - var path_buf: std.Io.Writer.Allocating = .init(alloc); 1112 - try path_buf.writer.writeAll("/xrpc/chat.bsky.convo.getConvoForMembers"); 1113 - for (member_dids, 0..) |d, i| { 1114 - const sep: u8 = if (i == 0) '?' else '&'; 1115 - try path_buf.writer.print("{c}members={s}", .{ sep, d }); 1116 - } 1117 - const path = try path_buf.toOwnedSlice(); 1118 - defer alloc.free(path); 1119 - 1120 - const resp = try pdsAuthedRequest( 1121 - alloc, 1122 - session, 1123 - "GET", 1124 - path, 1125 - null, 1126 - "application/json", 1127 - &.{.{ .name = "atproto-proxy", .value = BSKY_CHAT_PROXY }}, 1128 - ); 1129 - defer alloc.free(resp); 1130 - 1131 - const parsed = try json.parseFromSlice(json.Value, alloc, resp, .{}); 1132 - defer parsed.deinit(); 1133 - 1134 - const convo = parsed.value.object.get("convo") orelse return error.MissingConvo; 1135 - if (convo != .object) return error.MissingConvo; 1136 - const id = convo.object.get("id") orelse return error.MissingConvoId; 1137 - if (id != .string) return error.MissingConvoId; 1138 - return alloc.dupe(u8, id.string); 1139 - } 1140 - 1141 - /// POST chat.bsky.convo.sendMessage. 1142 - pub fn chatSendMessage(alloc: Allocator, session: store.Session, convo_id: []const u8, text: []const u8) !void { 1143 - // JSON-escape the text into the body 1144 - var body_buf: std.Io.Writer.Allocating = .init(alloc); 1145 - var jw: json.Stringify = .{ .writer = &body_buf.writer }; 1146 - try jw.beginObject(); 1147 - try jw.objectField("convoId"); 1148 - try jw.write(convo_id); 1149 - try jw.objectField("message"); 1150 - try jw.beginObject(); 1151 - try jw.objectField("text"); 1152 - try jw.write(text); 1153 - try jw.endObject(); 1154 - try jw.endObject(); 1155 - const body = try body_buf.toOwnedSlice(); 1156 - defer alloc.free(body); 1157 - 1158 - const resp = try pdsAuthedRequest( 1159 - alloc, 1160 - session, 1161 - "POST", 1162 - "/xrpc/chat.bsky.convo.sendMessage", 1163 - body, 1164 - "application/json", 1165 - &.{.{ .name = "atproto-proxy", .value = BSKY_CHAT_PROXY }}, 1166 - ); 1167 - defer alloc.free(resp); 1168 - }
+9 -39
backend/src/server/subscriptions.zig
··· 20 20 const SUBSCRIPTION_COLLECTION = notifications.SUBSCRIPTION_COLLECTION; 21 21 22 22 const ALLOWED_TRIGGER_KINDS = [_][]const u8{ "author", "publication", "platform", "tag" }; 23 - const ALLOWED_DEST_KINDS = [_][]const u8{"bsky"}; 24 23 25 24 fn contains(haystack: []const []const u8, needle: []const u8) bool { 26 25 for (haystack) |h| if (mem.eql(u8, h, needle)) return true; ··· 68 67 const CreateBody = struct { 69 68 triggerKind: []const u8, 70 69 triggerValue: []const u8, 71 - destinationKind: []const u8, 72 - destinationValue: []const u8, 73 - secret: ?[]const u8 = null, 74 70 label: ?[]const u8 = null, 75 71 }; 76 72 ··· 110 106 try sendJsonStatus(request, .bad_request, "{\"error\":\"invalid triggerKind\"}"); 111 107 return; 112 108 } 113 - if (!contains(&ALLOWED_DEST_KINDS, parsed.destinationKind)) { 114 - try sendJsonStatus(request, .bad_request, "{\"error\":\"invalid destinationKind\"}"); 115 - return; 116 - } 117 109 if (parsed.triggerValue.len == 0 or parsed.triggerValue.len > 512) { 118 110 try sendJsonStatus(request, .bad_request, "{\"error\":\"triggerValue length\"}"); 119 111 return; 120 112 } 121 - if (parsed.destinationValue.len == 0 or parsed.destinationValue.len > 512) { 122 - try sendJsonStatus(request, .bad_request, "{\"error\":\"destinationValue length\"}"); 123 - return; 124 - } 125 - // for bsky: resolve handle → DID at create time if needed, so we store 126 - // a stable identifier the chat.convo.getConvoForMembers call can use. 127 - var resolved_dest = parsed.destinationValue; 128 - if (mem.eql(u8, parsed.destinationKind, "bsky") and !mem.startsWith(u8, resolved_dest, "did:")) { 129 - const zat = @import("zat"); 130 - var resolver = zat.HandleResolver.init(io, alloc); 131 - defer resolver.deinit(); 132 - const parsed_handle = zat.Handle.parse(resolved_dest) orelse { 133 - try sendJsonStatus(request, .bad_request, "{\"error\":\"invalid bsky handle\"}"); 134 - return; 135 - }; 136 - resolved_dest = resolver.resolve(parsed_handle) catch { 137 - try sendJsonStatus(request, .bad_request, "{\"error\":\"could not resolve bsky handle\"}"); 138 - return; 139 - }; 140 - } 141 113 142 - const secret: []const u8 = ""; // no longer used; retained in schema for future signed destinations 143 - _ = parsed.secret; 114 + // destination is always the bot DMing the subscriber — implicit, not 115 + // something the client specifies. stored as empty-string placeholders 116 + // in the local mirror for schema continuity. 117 + const dest_kind: []const u8 = "bsky"; 118 + const dest_value: []const u8 = ""; 144 119 const label: []const u8 = parsed.label orelse ""; 145 120 const created_at = try isoNow(alloc, io); 146 121 ··· 154 129 try jw.write(parsed.triggerKind); 155 130 try jw.objectField("triggerValue"); 156 131 try jw.write(parsed.triggerValue); 157 - try jw.objectField("destinationKind"); 158 - try jw.write(parsed.destinationKind); 159 - try jw.objectField("destinationValue"); 160 - try jw.write(resolved_dest); 161 132 if (label.len > 0) { 162 133 try jw.objectField("label"); 163 134 try jw.write(label); ··· 185 156 .rkey = rkey, 186 157 .trigger_kind = parsed.triggerKind, 187 158 .trigger_value = parsed.triggerValue, 188 - .destination_kind = parsed.destinationKind, 189 - .destination_value = resolved_dest, 190 - .secret = secret, 159 + .destination_kind = dest_kind, 160 + .destination_value = dest_value, 161 + .secret = "", 191 162 .label = label, 192 163 .created_at = created_at, 193 164 }) catch |err| { ··· 290 261 notifications.testFire(alloc, did, rkey) catch |err| { 291 262 const msg = switch (err) { 292 263 error.NotFound => "{\"error\":\"subscription not found\"}", 293 - error.UnsupportedDestination => "{\"error\":\"cannot test-fire this destination kind\"}", 294 264 else => "{\"error\":\"test fire failed\"}", 295 265 }; 296 266 try sendJsonStatus(request, .bad_request, msg); 297 267 return; 298 268 }; 299 - try sendJson(request, "{\"ok\":true,\"note\":\"delivery enqueued — check your webhook receiver\"}"); 269 + try sendJson(request, "{\"ok\":true,\"note\":\"delivery enqueued — check bsky DMs\"}"); 300 270 } 301 271 302 272 pub fn handleDelete(request: *http.Server.Request, rkey: []const u8, io: Io) !void {
+2 -12
lexicons/tech/waow/pub-search/subscription.json
··· 5 5 "main": { 6 6 "type": "record", 7 7 "key": "tid", 8 - "description": "Subscribe to new documents indexed by pub-search that match a trigger. pub-search fires a delivery (e.g. webhook) each time a matching document is ingested. The record lives on the user's PDS so subscriptions are portable and inspectable.", 8 + "description": "Subscribe to new documents indexed by pub-search that match a trigger. When a matching document is ingested, the @pub-search.waow.tech bot DMs the subscriber on bsky chat. The record lives on the user's PDS so subscriptions are portable and inspectable.", 9 9 "record": { 10 10 "type": "object", 11 - "required": ["triggerKind", "triggerValue", "destinationKind", "destinationValue", "createdAt"], 11 + "required": ["triggerKind", "triggerValue", "createdAt"], 12 12 "properties": { 13 13 "triggerKind": { 14 14 "type": "string", ··· 19 19 "type": "string", 20 20 "maxLength": 512, 21 21 "description": "Value to match. For author: a DID. For publication: an at-uri. For platform: the platform name. For tag: the tag string." 22 - }, 23 - "destinationKind": { 24 - "type": "string", 25 - "knownValues": ["bsky"], 26 - "description": "Where to send notifications. 'bsky' sends a direct message via chat.bsky on behalf of the authenticated subscriber." 27 - }, 28 - "destinationValue": { 29 - "type": "string", 30 - "maxLength": 512, 31 - "description": "For bsky: the recipient's DID or handle. The subscriber must have chat DM'ing permission with the recipient." 32 22 }, 33 23 "label": { 34 24 "type": "string",
+3 -7
site/subscriptions.html
··· 337 337 function indexSubs(subs) { 338 338 const out = {}; 339 339 for (const s of subs) { 340 - if (s.triggerKind === 'publication' && s.destinationKind === 'bsky') { 341 - out[s.triggerValue] = s.rkey; 342 - } 340 + if (s.triggerKind === 'publication') out[s.triggerValue] = s.rkey; 343 341 } 344 342 return out; 345 343 } 346 344 347 - function renderPubs(pubs, subIndex, selfDid) { 345 + function renderPubs(pubs, subIndex) { 348 346 const el = $('pubs'); 349 347 if (!pubs.length) { 350 348 el.innerHTML = '<div class="empty">you don\'t have any <code>site.standard.publication</code> records on your PDS yet.</div>'; ··· 383 381 body: JSON.stringify({ 384 382 triggerKind: 'publication', 385 383 triggerValue: p.uri, 386 - destinationKind: 'bsky', 387 - destinationValue: selfDid, 388 384 label: p.name || p.url || p.uri, 389 385 }), 390 386 }); ··· 444 440 hide($('login')); show($('pubs-section')); 445 441 446 442 const [pubs, subs] = await Promise.all([fetchPubs(), fetchSubs()]); 447 - renderPubs(pubs, indexSubs(subs), me.did); 443 + renderPubs(pubs, indexSubs(subs)); 448 444 } 449 445 450 446 $('login-form').addEventListener('submit', (e) => {