semantic bufo search find-bufo.com
bufo
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(bot): auto-cleanup stale quote-posts (delete/block/detach)

track our quote-posts and clean them up when the original author deletes
their post, blocks us, or uses bluesky's "detach quote" feature. adds a
startup scan to bootstrap tracking for existing posts and clean any
already-stale embeds.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz e47d2d81 af9c8cf5

+535 -13
+169 -2
bot/src/bsky.zig
··· 225 225 return false; 226 226 } 227 227 228 - pub fn createQuotePost(self: *BskyClient, quote_uri: []const u8, quote_cid: []const u8, blob_json: []const u8, alt_text: []const u8) !void { 228 + pub fn createQuotePost(self: *BskyClient, quote_uri: []const u8, quote_cid: []const u8, blob_json: []const u8, alt_text: []const u8) ![]const u8 { 229 229 if (self.access_jwt == null or self.did == null) return error.NotLoggedIn; 230 230 231 231 var client = self.httpClient(); ··· 266 266 } 267 267 268 268 std.debug.print("posted successfully!\n", .{}); 269 + return self.parseRkeyFromResponse(aw.toArrayList().items); 269 270 } 270 271 271 272 pub fn getPostCid(self: *BskyClient, uri: []const u8) ![]const u8 { ··· 504 505 return error.VideoTimeout; 505 506 } 506 507 507 - pub fn createVideoQuotePost(self: *BskyClient, quote_uri: []const u8, quote_cid: []const u8, blob_json: []const u8, alt_text: []const u8) !void { 508 + pub fn createVideoQuotePost(self: *BskyClient, quote_uri: []const u8, quote_cid: []const u8, blob_json: []const u8, alt_text: []const u8) ![]const u8 { 508 509 if (self.access_jwt == null or self.did == null) return error.NotLoggedIn; 509 510 510 511 var client = self.httpClient(); ··· 545 546 } 546 547 547 548 std.debug.print("posted video successfully!\n", .{}); 549 + return self.parseRkeyFromResponse(aw.toArrayList().items); 550 + } 551 + 552 + pub fn deleteRecord(self: *BskyClient, rkey: []const u8) !void { 553 + if (self.access_jwt == null or self.did == null) return error.NotLoggedIn; 554 + 555 + var client = self.httpClient(); 556 + defer client.deinit(); 557 + 558 + var body_buf: std.ArrayList(u8) = .{}; 559 + defer body_buf.deinit(self.allocator); 560 + 561 + try body_buf.print(self.allocator, 562 + \\{{"repo":"{s}","collection":"app.bsky.feed.post","rkey":"{s}"}} 563 + , .{ self.did.?, rkey }); 564 + 565 + var auth_buf: [512]u8 = undefined; 566 + const auth_header = std.fmt.bufPrint(&auth_buf, "Bearer {s}", .{self.access_jwt.?}) catch return error.AuthTooLong; 567 + 568 + var aw: Io.Writer.Allocating = .init(self.allocator); 569 + defer aw.deinit(); 570 + 571 + const result = client.fetch(.{ 572 + .location = .{ .url = "https://bsky.social/xrpc/com.atproto.repo.deleteRecord" }, 573 + .method = .POST, 574 + .headers = .{ 575 + .content_type = .{ .override = "application/json" }, 576 + .authorization = .{ .override = auth_header }, 577 + }, 578 + .payload = body_buf.items, 579 + .response_writer = &aw.writer, 580 + }) catch |err| { 581 + std.debug.print("delete record failed: {}\n", .{err}); 582 + return err; 583 + }; 584 + 585 + if (result.status != .ok) { 586 + const response = aw.toArrayList(); 587 + std.debug.print("delete record failed with status: {} - {s}\n", .{ result.status, response.items }); 588 + return error.DeleteFailed; 589 + } 590 + 591 + std.debug.print("deleted record {s}\n", .{rkey}); 592 + } 593 + 594 + pub const FeedPost = struct { 595 + rkey: []const u8, 596 + original_uri: []const u8, 597 + original_did: []const u8, 598 + is_stale: bool, // viewNotFound or viewDetached 599 + }; 600 + 601 + /// fetch our own feed and return quote-posts with their embed status. 602 + /// caller must free each entry's duped strings and the returned slice. 603 + pub fn getAuthorFeed(self: *BskyClient, buf: []FeedPost) ![]FeedPost { 604 + if (self.did == null) return error.NotLoggedIn; 605 + 606 + var client = self.httpClient(); 607 + defer client.deinit(); 608 + 609 + var url_buf: [512]u8 = undefined; 610 + const url = std.fmt.bufPrint(&url_buf, "https://public.api.bsky.app/xrpc/app.bsky.feed.getAuthorFeed?actor={s}&limit=100&filter=posts_no_replies", .{self.did.?}) catch return error.UrlTooLong; 611 + 612 + var aw: Io.Writer.Allocating = .init(self.allocator); 613 + defer aw.deinit(); 614 + 615 + const result = client.fetch(.{ 616 + .location = .{ .url = url }, 617 + .method = .GET, 618 + .response_writer = &aw.writer, 619 + }) catch |err| { 620 + std.debug.print("getAuthorFeed failed: {}\n", .{err}); 621 + return err; 622 + }; 623 + 624 + if (result.status != .ok) { 625 + std.debug.print("getAuthorFeed failed with status: {}\n", .{result.status}); 626 + return error.FeedFetchFailed; 627 + } 628 + 629 + const response = aw.toArrayList(); 630 + const parsed = json.parseFromSlice(json.Value, self.allocator, response.items, .{}) catch return error.ParseError; 631 + defer parsed.deinit(); 632 + 633 + const feed = parsed.value.object.get("feed") orelse return buf[0..0]; 634 + if (feed != .array) return buf[0..0]; 635 + 636 + var count: usize = 0; 637 + for (feed.array.items) |item| { 638 + if (count >= buf.len) break; 639 + if (item != .object) continue; 640 + 641 + const post_obj = item.object.get("post") orelse continue; 642 + if (post_obj != .object) continue; 643 + 644 + // get our post's URI to extract rkey 645 + const post_uri_val = post_obj.object.get("uri") orelse continue; 646 + if (post_uri_val != .string) continue; 647 + 648 + // check if it has an embed with a record (quote-post) 649 + const embed = post_obj.object.get("embed") orelse continue; 650 + if (embed != .object) continue; 651 + 652 + // look for record in embed (recordWithMedia or record embed) 653 + const record_obj = if (embed.object.get("record")) |r| r else continue; 654 + if (record_obj != .object) continue; 655 + 656 + // the embedded record view — check its $type 657 + // for recordWithMedia, the record contains a "record" with the actual view 658 + const inner_record = if (record_obj.object.get("record")) |r| r else record_obj; 659 + if (inner_record != .object) continue; 660 + 661 + const type_val = inner_record.object.get("$type") orelse continue; 662 + if (type_val != .string) continue; 663 + 664 + const is_stale = mem.eql(u8, type_val.string, "app.bsky.embed.record#viewNotFound") or 665 + mem.eql(u8, type_val.string, "app.bsky.embed.record#viewDetached") or 666 + mem.eql(u8, type_val.string, "app.bsky.embed.record#viewBlocked"); 667 + 668 + // extract the original post's URI from the inner record 669 + const orig_uri_val = inner_record.object.get("uri") orelse continue; 670 + if (orig_uri_val != .string) continue; 671 + 672 + // parse our rkey from our post URI 673 + var our_parts = mem.splitScalar(u8, post_uri_val.string[5..], '/'); 674 + _ = our_parts.next(); // did 675 + _ = our_parts.next(); // collection 676 + const our_rkey = our_parts.next() orelse continue; 677 + 678 + // parse original DID from original URI 679 + var orig_parts = mem.splitScalar(u8, orig_uri_val.string[5..], '/'); 680 + const orig_did = orig_parts.next() orelse continue; 681 + 682 + buf[count] = .{ 683 + .rkey = self.allocator.dupe(u8, our_rkey) catch continue, 684 + .original_uri = self.allocator.dupe(u8, orig_uri_val.string) catch { 685 + self.allocator.free(buf[count].rkey); 686 + continue; 687 + }, 688 + .original_did = self.allocator.dupe(u8, orig_did) catch { 689 + self.allocator.free(buf[count].rkey); 690 + self.allocator.free(buf[count].original_uri); 691 + continue; 692 + }, 693 + .is_stale = is_stale, 694 + }; 695 + count += 1; 696 + } 697 + 698 + return buf[0..count]; 699 + } 700 + 701 + fn parseRkeyFromResponse(self: *BskyClient, response: []const u8) ![]const u8 { 702 + const parsed = json.parseFromSlice(json.Value, self.allocator, response, .{}) catch return error.ParseError; 703 + defer parsed.deinit(); 704 + 705 + const uri_val = parsed.value.object.get("uri") orelse return error.NoUri; 706 + if (uri_val != .string) return error.NoUri; 707 + 708 + // parse rkey from at://did/collection/rkey 709 + var parts = mem.splitScalar(u8, uri_val.string[5..], '/'); 710 + _ = parts.next(); // did 711 + _ = parts.next(); // collection 712 + const rkey = parts.next() orelse return error.InvalidUri; 713 + 714 + return try self.allocator.dupe(u8, rkey); 548 715 } 549 716 }; 550 717
+1 -1
bot/src/config.zig
··· 20 20 .min_phrase_words = parseU32(posix.getenv("MIN_PHRASE_WORDS"), 4), 21 21 .posting_enabled = parseBool(posix.getenv("POSTING_ENABLED")), 22 22 .cooldown_minutes = parseU32(posix.getenv("COOLDOWN_MINUTES"), 120), 23 - .exclude_patterns = posix.getenv("EXCLUDE_PATTERNS") orelse "what-have-you-done,what-have-i-done,sad,crying,cant-take", 23 + .exclude_patterns = posix.getenv("EXCLUDE_PATTERNS") orelse "what-have-you-done,what-have-i-done,sad,crying,cant-take,knife,what-are-you-doing-with-that", 24 24 .stats_port = parseU16(posix.getenv("STATS_PORT"), 8080), 25 25 .backend_url = posix.getenv("BACKEND_URL") orelse "https://find-bufo.com", 26 26 };
+26
bot/src/jetstream.zig
··· 37 37 pub const PostHandler = struct { 38 38 callback: *const fn (Post) void, 39 39 on_connect: ?*const fn ([]const u8) void = null, 40 + on_delete: ?*const fn ([]const u8, []const u8) void = null, 41 + on_block: ?*const fn ([]const u8, []const u8) void = null, 42 + on_detach: ?*const fn ([]const u8, json.Value) void = null, 40 43 41 44 pub fn onEvent(self: *PostHandler, event: zat.JetstreamEvent) void { 42 45 switch (event) { ··· 54 57 } 55 58 56 59 fn handleCommit(self: *PostHandler, c: zat.jetstream.CommitEvent) void { 60 + if (mem.eql(u8, c.collection, "app.bsky.graph.block")) { 61 + if (c.operation == .create) { 62 + const record = c.record orelse return; 63 + const subject = zat.json.getString(record, "subject") orelse return; 64 + if (self.on_block) |cb| cb(c.did, subject); 65 + } 66 + return; 67 + } 68 + 69 + if (mem.eql(u8, c.collection, "app.bsky.feed.postgate")) { 70 + if (c.operation == .create or c.operation == .update) { 71 + const record = c.record orelse return; 72 + if (self.on_detach) |cb| cb(c.did, record); 73 + } 74 + return; 75 + } 76 + 77 + // app.bsky.feed.post 78 + if (c.operation == .delete) { 79 + if (self.on_delete) |cb| cb(c.did, c.rkey); 80 + return; 81 + } 82 + 57 83 if (c.operation != .create) return; 58 84 59 85 const record = c.record orelse return;
+158 -8
bot/src/main.zig
··· 56 56 defer bot_stats.deinit(); 57 57 bot_stats.setBufosLoaded(@intCast(m.count())); 58 58 59 + // prune tracked posts older than 30 days 60 + bot_stats.pruneOldPosts(30 * 86400); 61 + 59 62 // init state 60 63 var state = BotState{ 61 64 .allocator = allocator, ··· 67 70 68 71 global_state = &state; 69 72 73 + // startup scan: bootstrap tracking and clean stale posts 74 + if (cfg.posting_enabled) { 75 + startupScan(&state); 76 + } 77 + 70 78 // start stats server on background thread 71 79 var stats_server = stats.StatsServer.init(allocator, &state.stats, cfg.stats_port); 72 80 const stats_thread = Thread.spawn(.{}, stats.StatsServer.run, .{&stats_server}) catch |err| { ··· 76 84 defer stats_thread.join(); 77 85 78 86 // start jetstream consumer (use zat defaults with optional preferred relay) 79 - var handler = jetstream.PostHandler{ .callback = onPost, .on_connect = onConnect }; 87 + var handler = jetstream.PostHandler{ 88 + .callback = onPost, 89 + .on_connect = onConnect, 90 + .on_delete = onDelete, 91 + .on_block = onBlock, 92 + .on_detach = onDetach, 93 + }; 80 94 81 95 // prepend preferred relay to default host list if set 82 96 var hosts_buf: [1 + zat.jetstream.default_hosts.len][]const u8 = undefined; ··· 92 106 93 107 var client = zat.JetstreamClient.init(allocator, .{ 94 108 .hosts = hosts_buf[0..hosts_len], 95 - .wanted_collections = &.{"app.bsky.feed.post"}, 109 + .wanted_collections = &.{ "app.bsky.feed.post", "app.bsky.graph.block", "app.bsky.feed.postgate" }, 96 110 }); 97 111 defer client.deinit(); 98 112 client.subscribe(&handler); ··· 202 216 const cid = try state.bsky_client.getPostCid(post.uri); 203 217 defer state.allocator.free(cid); 204 218 205 - if (is_gif) { 219 + const our_rkey = if (is_gif) blk: { 206 220 // upload as video for animated GIFs 207 221 std.debug.print("uploading {d} bytes as video\n", .{img_data.len}); 208 222 const job_id = try state.bsky_client.uploadVideo(img_data, match.name); ··· 212 226 const blob_json = try state.bsky_client.waitForVideo(job_id); 213 227 defer state.allocator.free(blob_json); 214 228 215 - try state.bsky_client.createVideoQuotePost(post.uri, cid, blob_json, alt_text); 216 - } else { 229 + break :blk try state.bsky_client.createVideoQuotePost(post.uri, cid, blob_json, alt_text); 230 + } else blk: { 217 231 // upload as image 218 232 const content_type = if (mem.endsWith(u8, match.url, ".png")) 219 233 "image/png" ··· 224 238 const blob_json = try state.bsky_client.uploadBlob(img_data, content_type); 225 239 defer state.allocator.free(blob_json); 226 240 227 - try state.bsky_client.createQuotePost(post.uri, cid, blob_json, alt_text); 228 - } 229 - std.debug.print("posted bufo quote: {s}\n", .{match.name}); 241 + break :blk try state.bsky_client.createQuotePost(post.uri, cid, blob_json, alt_text); 242 + }; 243 + defer state.allocator.free(our_rkey); 244 + 245 + std.debug.print("posted bufo quote: {s} (rkey: {s})\n", .{ match.name, our_rkey }); 230 246 state.stats.incPostsCreated(); 231 247 248 + // track our post for cleanup on delete/block 249 + state.stats.addTrackedPost(our_rkey, post.uri, post.did, now); 250 + 232 251 // update cooldown cache (persisted to disk) 233 252 state.stats.setLastPosted(match.name, now); 253 + } 254 + 255 + fn onDelete(did: []const u8, rkey: []const u8) void { 256 + const state = global_state orelse return; 257 + 258 + // construct the original URI and check if we quote-posted it 259 + var uri_buf: [256]u8 = undefined; 260 + const uri = std.fmt.bufPrint(&uri_buf, "at://{s}/app.bsky.feed.post/{s}", .{ did, rkey }) catch return; 261 + 262 + const our_rkey = state.stats.removeByOriginalUri(uri) orelse return; 263 + defer state.allocator.free(our_rkey); 264 + 265 + std.debug.print("original post deleted ({s}), deleting our quote-post {s}\n", .{ uri, our_rkey }); 266 + 267 + state.mutex.lock(); 268 + defer state.mutex.unlock(); 269 + 270 + state.bsky_client.deleteRecord(our_rkey) catch |err| { 271 + std.debug.print("failed to delete our post: {}\n", .{err}); 272 + state.stats.incErrors(); 273 + }; 274 + } 275 + 276 + fn onBlock(blocker_did: []const u8, subject_did: []const u8) void { 277 + const state = global_state orelse return; 278 + 279 + // only care if someone is blocking us 280 + const our_did = state.bsky_client.did orelse return; 281 + if (!mem.eql(u8, subject_did, our_did)) return; 282 + 283 + std.debug.print("blocked by {s}, cleaning up our quote-posts of their content\n", .{blocker_did}); 284 + 285 + state.mutex.lock(); 286 + defer state.mutex.unlock(); 287 + 288 + // collect and remove tracked posts from this DID 289 + var rkey_buf: [64][]const u8 = undefined; 290 + const rkeys = state.stats.removeByOriginalDid(blocker_did, &rkey_buf); 291 + 292 + for (rkeys) |rkey| { 293 + state.bsky_client.deleteRecord(rkey) catch |err| { 294 + std.debug.print("failed to delete post {s}: {}\n", .{ rkey, err }); 295 + state.stats.incErrors(); 296 + }; 297 + state.allocator.free(rkey); 298 + } 299 + 300 + if (rkeys.len > 0) { 301 + std.debug.print("deleted {} quote-posts after block from {s}\n", .{ rkeys.len, blocker_did }); 302 + } 303 + } 304 + 305 + fn onDetach(_: []const u8, record: json.Value) void { 306 + const state = global_state orelse return; 307 + 308 + // postgate record has detachedEmbeddingUris: array of AT-URIs that should no longer embed 309 + const uris = record.object.get("detachedEmbeddingUris") orelse return; 310 + if (uris != .array) return; 311 + 312 + const our_did = state.bsky_client.did orelse return; 313 + 314 + state.mutex.lock(); 315 + defer state.mutex.unlock(); 316 + 317 + for (uris.array.items) |uri_val| { 318 + if (uri_val != .string) continue; 319 + 320 + // check if this detached URI is one of our tracked posts 321 + // the URI in detachedEmbeddingUris points to the embedding post (ours) 322 + // parse rkey from at://did/app.bsky.feed.post/rkey 323 + if (!mem.startsWith(u8, uri_val.string, "at://")) continue; 324 + var parts = mem.splitScalar(u8, uri_val.string[5..], '/'); 325 + const uri_did = parts.next() orelse continue; 326 + 327 + // only care about URIs pointing at our posts 328 + if (!mem.eql(u8, uri_did, our_did)) continue; 329 + 330 + _ = parts.next(); // collection 331 + const rkey = parts.next() orelse continue; 332 + 333 + if (state.stats.removeByOurRkey(rkey)) { 334 + std.debug.print("post detached, deleting our quote-post {s}\n", .{rkey}); 335 + state.bsky_client.deleteRecord(rkey) catch |err| { 336 + std.debug.print("failed to delete detached post {s}: {}\n", .{ rkey, err }); 337 + state.stats.incErrors(); 338 + }; 339 + } 340 + } 341 + } 342 + 343 + fn startupScan(state: *BotState) void { 344 + std.debug.print("running startup scan...\n", .{}); 345 + 346 + var feed_buf: [100]bsky.BskyClient.FeedPost = undefined; 347 + const posts = state.bsky_client.getAuthorFeed(&feed_buf) catch |err| { 348 + std.debug.print("startup scan failed: {}\n", .{err}); 349 + return; 350 + }; 351 + 352 + var bootstrapped: usize = 0; 353 + var cleaned: usize = 0; 354 + const now = std.time.timestamp(); 355 + 356 + for (posts) |post| { 357 + defer { 358 + state.allocator.free(post.original_uri); 359 + state.allocator.free(post.original_did); 360 + } 361 + 362 + if (post.is_stale) { 363 + // delete stale post regardless of whether it was tracked 364 + std.debug.print("startup: cleaning stale post {s}\n", .{post.rkey}); 365 + state.bsky_client.deleteRecord(post.rkey) catch |err| { 366 + std.debug.print("startup: failed to delete {s}: {}\n", .{ post.rkey, err }); 367 + state.stats.incErrors(); 368 + }; 369 + // remove from tracking if present 370 + _ = state.stats.removeByOurRkey(post.rkey); 371 + state.allocator.free(post.rkey); 372 + cleaned += 1; 373 + } else { 374 + // bootstrap tracking for posts we don't know about yet 375 + if (!state.stats.isTracked(post.rkey)) { 376 + state.stats.addTrackedPost(post.rkey, post.original_uri, post.original_did, now); 377 + bootstrapped += 1; 378 + } 379 + state.allocator.free(post.rkey); 380 + } 381 + } 382 + 383 + std.debug.print("startup scan complete: {} bootstrapped, {} cleaned\n", .{ bootstrapped, cleaned }); 234 384 } 235 385 236 386 fn loadBufos(allocator: Allocator, m: *matcher.Matcher, exclude_patterns: []const u8) !void {
+181 -2
bot/src/stats.zig
··· 8 8 9 9 const STATS_PATH = "/data/stats.json"; 10 10 11 + pub const TrackedPost = struct { 12 + our_rkey: []const u8, 13 + original_uri: []const u8, 14 + original_did: []const u8, 15 + timestamp: i64, 16 + }; 17 + 11 18 pub const Stats = struct { 12 19 allocator: Allocator, 13 20 start_time: i64, ··· 27 34 bufo_mutex: Thread.Mutex = .{}, 28 35 // track last post time per bufo (persisted to survive restarts) 29 36 last_posted: std.StringHashMap(i64), 37 + // track our quote-posts for cleanup on delete/block 38 + tracked_posts: std.ArrayList(TrackedPost), 30 39 31 40 const BufoMatchData = struct { 32 41 count: u64, ··· 39 48 .start_time = std.time.timestamp(), 40 49 .bufo_matches = std.StringHashMap(BufoMatchData).init(allocator), 41 50 .last_posted = std.StringHashMap(i64).init(allocator), 51 + .tracked_posts = .{}, 42 52 }; 43 53 self.load(); 44 54 return self; ··· 57 67 self.allocator.free(entry.key_ptr.*); 58 68 } 59 69 self.last_posted.deinit(); 70 + for (self.tracked_posts.items) |tp| { 71 + self.allocator.free(tp.our_rkey); 72 + self.allocator.free(tp.original_uri); 73 + self.allocator.free(tp.original_did); 74 + } 75 + self.tracked_posts.deinit(self.allocator); 60 76 } 61 77 62 78 fn load(self: *Stats) void { ··· 155 171 } 156 172 } 157 173 158 - std.debug.print("loaded stats from {s}\n", .{STATS_PATH}); 174 + // load tracked posts 175 + if (root.get("tracked_posts")) |tp| { 176 + if (tp == .array) { 177 + for (tp.array.items) |item| { 178 + if (item != .object) continue; 179 + const rkey_val = item.object.get("our_rkey") orelse continue; 180 + const uri_val = item.object.get("original_uri") orelse continue; 181 + const did_val = item.object.get("original_did") orelse continue; 182 + const ts_val = item.object.get("timestamp") orelse continue; 183 + if (rkey_val != .string or uri_val != .string or did_val != .string or ts_val != .integer) continue; 184 + 185 + const rkey = self.allocator.dupe(u8, rkey_val.string) catch continue; 186 + const uri = self.allocator.dupe(u8, uri_val.string) catch { 187 + self.allocator.free(rkey); 188 + continue; 189 + }; 190 + const did = self.allocator.dupe(u8, did_val.string) catch { 191 + self.allocator.free(rkey); 192 + self.allocator.free(uri); 193 + continue; 194 + }; 195 + self.tracked_posts.append(self.allocator, .{ 196 + .our_rkey = rkey, 197 + .original_uri = uri, 198 + .original_did = did, 199 + .timestamp = ts_val.integer, 200 + }) catch { 201 + self.allocator.free(rkey); 202 + self.allocator.free(uri); 203 + self.allocator.free(did); 204 + }; 205 + } 206 + } 207 + } 208 + 209 + std.debug.print("loaded stats from {s} ({} tracked posts)\n", .{ STATS_PATH, self.tracked_posts.items.len }); 159 210 } 160 211 161 212 pub fn save(self: *Stats) void { ··· 244 295 lp_first = false; 245 296 std.fmt.format(writer, "\"{s}\":{}", .{ entry.key_ptr.*, entry.value_ptr.* }) catch return; 246 297 } 247 - writer.writeAll("}}") catch return; 298 + writer.writeAll("},") catch return; 299 + 300 + // write tracked posts 301 + writer.writeAll("\"tracked_posts\":[") catch return; 302 + for (self.tracked_posts.items, 0..) |tp, i| { 303 + if (i > 0) writer.writeAll(",") catch return; 304 + std.fmt.format(writer, "{{\"our_rkey\":\"{s}\",\"original_uri\":\"{s}\",\"original_did\":\"{s}\",\"timestamp\":{}}}", .{ tp.our_rkey, tp.original_uri, tp.original_did, tp.timestamp }) catch return; 305 + } 306 + writer.writeAll("]}") catch return; 248 307 249 308 file.writeAll(fbs.getWritten()) catch return; 250 309 } ··· 290 349 }; 291 350 } 292 351 self.saveUnlocked(); 352 + } 353 + 354 + pub fn addTrackedPost(self: *Stats, our_rkey: []const u8, original_uri: []const u8, original_did: []const u8, timestamp: i64) void { 355 + self.bufo_mutex.lock(); 356 + defer self.bufo_mutex.unlock(); 357 + 358 + const rkey = self.allocator.dupe(u8, our_rkey) catch return; 359 + const uri = self.allocator.dupe(u8, original_uri) catch { 360 + self.allocator.free(rkey); 361 + return; 362 + }; 363 + const did = self.allocator.dupe(u8, original_did) catch { 364 + self.allocator.free(rkey); 365 + self.allocator.free(uri); 366 + return; 367 + }; 368 + self.tracked_posts.append(self.allocator, .{ 369 + .our_rkey = rkey, 370 + .original_uri = uri, 371 + .original_did = did, 372 + .timestamp = timestamp, 373 + }) catch { 374 + self.allocator.free(rkey); 375 + self.allocator.free(uri); 376 + self.allocator.free(did); 377 + return; 378 + }; 379 + self.saveUnlocked(); 380 + } 381 + 382 + /// atomically find+remove a tracked post by original URI, returning the caller-owned rkey 383 + pub fn removeByOriginalUri(self: *Stats, uri: []const u8) ?[]const u8 { 384 + self.bufo_mutex.lock(); 385 + defer self.bufo_mutex.unlock(); 386 + 387 + for (self.tracked_posts.items, 0..) |tp, i| { 388 + if (mem.eql(u8, tp.original_uri, uri)) { 389 + const removed = self.tracked_posts.orderedRemove(i); 390 + self.allocator.free(removed.original_uri); 391 + self.allocator.free(removed.original_did); 392 + self.saveUnlocked(); 393 + return removed.our_rkey; // caller owns this 394 + } 395 + } 396 + return null; 397 + } 398 + 399 + /// find+remove a tracked post by our rkey, returning true if found 400 + pub fn removeByOurRkey(self: *Stats, rkey: []const u8) bool { 401 + self.bufo_mutex.lock(); 402 + defer self.bufo_mutex.unlock(); 403 + 404 + for (self.tracked_posts.items, 0..) |tp, i| { 405 + if (mem.eql(u8, tp.our_rkey, rkey)) { 406 + const removed = self.tracked_posts.orderedRemove(i); 407 + self.allocator.free(removed.our_rkey); 408 + self.allocator.free(removed.original_uri); 409 + self.allocator.free(removed.original_did); 410 + self.saveUnlocked(); 411 + return true; 412 + } 413 + } 414 + return false; 415 + } 416 + 417 + /// check if a given rkey is already tracked 418 + pub fn isTracked(self: *Stats, rkey: []const u8) bool { 419 + self.bufo_mutex.lock(); 420 + defer self.bufo_mutex.unlock(); 421 + 422 + for (self.tracked_posts.items) |tp| { 423 + if (mem.eql(u8, tp.our_rkey, rkey)) return true; 424 + } 425 + return false; 426 + } 427 + 428 + /// collect rkeys of tracked posts matching a given original DID, then remove them 429 + pub fn removeByOriginalDid(self: *Stats, did: []const u8, buf: [][]const u8) [][]const u8 { 430 + self.bufo_mutex.lock(); 431 + defer self.bufo_mutex.unlock(); 432 + 433 + var count: usize = 0; 434 + var i: usize = 0; 435 + while (i < self.tracked_posts.items.len and count < buf.len) { 436 + if (mem.eql(u8, self.tracked_posts.items[i].original_did, did)) { 437 + const tp = self.tracked_posts.orderedRemove(i); 438 + buf[count] = tp.our_rkey; 439 + self.allocator.free(tp.original_uri); 440 + self.allocator.free(tp.original_did); 441 + count += 1; 442 + } else { 443 + i += 1; 444 + } 445 + } 446 + if (count > 0) self.saveUnlocked(); 447 + return buf[0..count]; 448 + } 449 + 450 + pub fn pruneOldPosts(self: *Stats, max_age_secs: i64) void { 451 + self.bufo_mutex.lock(); 452 + defer self.bufo_mutex.unlock(); 453 + 454 + const now = std.time.timestamp(); 455 + var i: usize = 0; 456 + var pruned: usize = 0; 457 + while (i < self.tracked_posts.items.len) { 458 + if (now - self.tracked_posts.items[i].timestamp > max_age_secs) { 459 + const tp = self.tracked_posts.orderedRemove(i); 460 + self.allocator.free(tp.our_rkey); 461 + self.allocator.free(tp.original_uri); 462 + self.allocator.free(tp.original_did); 463 + pruned += 1; 464 + } else { 465 + i += 1; 466 + } 467 + } 468 + if (pruned > 0) { 469 + std.debug.print("pruned {} old tracked posts\n", .{pruned}); 470 + self.saveUnlocked(); 471 + } 293 472 } 294 473 295 474 pub fn incCooldownsHit(self: *Stats) void {