this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

initial commit: live AT Protocol labeler built on zat

jetstream consumer, keyword labeling, secp256k1 signing, SQLite storage,
subscribeLabels (WS) + queryLabels (HTTP). deployed to fly.io.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 2cec1702

+2046
+4
.gitignore
··· 1 + .env/ 2 + .zig-cache/ 3 + zig-out/ 4 + *.db
+26
Dockerfile
··· 1 + FROM --platform=linux/amd64 debian:bookworm-slim AS builder 2 + 3 + RUN apt-get update && apt-get install -y --no-install-recommends \ 4 + ca-certificates curl xz-utils libsqlite3-dev \ 5 + && rm -rf /var/lib/apt/lists/* 6 + 7 + RUN curl -fSL https://ziglang.org/download/0.15.2/zig-x86_64-linux-0.15.2.tar.xz \ 8 + | tar xJ -C /opt 9 + ENV PATH=/opt/zig-x86_64-linux-0.15.2:$PATH 10 + 11 + WORKDIR /build 12 + COPY build.zig build.zig.zon ./ 13 + RUN zig build --fetch-only 2>/dev/null || true 14 + 15 + COPY src/ src/ 16 + RUN zig build -Doptimize=ReleaseSafe 17 + 18 + FROM --platform=linux/amd64 debian:bookworm-slim 19 + RUN apt-get update && apt-get install -y --no-install-recommends \ 20 + ca-certificates libsqlite3-0 \ 21 + && rm -rf /var/lib/apt/lists/* 22 + 23 + COPY --from=builder /build/zig-out/bin/labelz /usr/local/bin/labelz 24 + 25 + EXPOSE 4100 26 + ENTRYPOINT ["/usr/local/bin/labelz"]
+94
README.md
··· 1 + # labelz 2 + 3 + a live AT Protocol labeler, built on [zat](https://tangled.org/zat.dev/zat). 4 + 5 + this exists to pressure-test zat as an SDK — jetstream consumption, CBOR encoding, 6 + secp256k1 signing, and XRPC serving all running against real network traffic. 7 + 8 + ## what it does 9 + 10 + connects to the bluesky jetstream firehose, watches for posts matching keyword rules, 11 + signs labels with secp256k1, stores them in SQLite, and serves them over the standard 12 + AT Protocol labeler endpoints: 13 + 14 + - `com.atproto.label.subscribeLabels` — WebSocket event stream 15 + - `com.atproto.label.queryLabels` — HTTP query 16 + 17 + the labeler is registered as a proper AT Protocol identity with a DID, PDS account, 18 + and labeler declaration record. 19 + 20 + **live instance:** [labelz.fly.dev](https://labelz.fly.dev/health) · [DID document](https://plc.directory/did:plc:ugigqebt4sm3n4xpmqvslcyo) · [PDS](https://labelz-pds.nate-8fe.workers.dev/status) 21 + 22 + ## running 23 + 24 + ``` 25 + LABELZ_DID=did:plc:... LABELZ_SECRET_KEY=<64-hex-chars> zig build run 26 + ``` 27 + 28 + <details> 29 + <summary>environment variables</summary> 30 + 31 + | variable | required | description | 32 + |----------|----------|-------------| 33 + | `LABELZ_DID` | yes | labeler DID (src field on emitted labels) | 34 + | `LABELZ_SECRET_KEY` | yes | secp256k1 secret key, 64 hex chars (32 bytes) | 35 + | `LABELZ_PORT` | no | server port (default: 4100) | 36 + | `LABELZ_DB` | no | SQLite database path (default: `/data/labelz.db`) | 37 + 38 + </details> 39 + 40 + <details> 41 + <summary>architecture</summary> 42 + 43 + ``` 44 + jetstream ──→ keyword match ──→ sign (secp256k1) ──→ store (SQLite) 45 + 46 + ┌──────────┴──────────┐ 47 + │ │ 48 + subscribeLabels (WS) queryLabels (HTTP) 49 + ``` 50 + 51 + source files: 52 + 53 + - `main.zig` — config, jetstream consumer, keyword matching 54 + - `label.zig` — label type, CBOR encoding, signing 55 + - `store.zig` — SQLite storage 56 + - `server.zig` — XRPC server (WebSocket + HTTP) 57 + - `identity.zig` — PLC identity operations (unused at runtime, used for setup) 58 + 59 + </details> 60 + 61 + <details> 62 + <summary>identity setup</summary> 63 + 64 + the labeler needs a full AT Protocol identity: a DID registered at plc.directory, 65 + a PDS hosting the account, and a labeler declaration record. this was done once 66 + using [pds.js](https://tangled.org/chadtmiller.com/pds.js) on cloudflare workers 67 + and [goat](https://github.com/bluesky-social/indigo/tree/main/cmd/goat) for PLC operations. 68 + 69 + the DID document includes both a repo signing key (P-256, used by the PDS) and a 70 + label signing key (secp256k1, used by labelz), plus service endpoints for both 71 + the PDS and the labeler. 72 + 73 + see the `justfile` for the operational recipes (`keygen`, `update-did`, `declare`). 74 + 75 + </details> 76 + 77 + <details> 78 + <summary>deployment</summary> 79 + 80 + runs on fly.io with a persistent SQLite volume. the Dockerfile does a multi-stage 81 + build: debian bookworm with zig 0.15.2 for compilation, slim runtime image with 82 + just libsqlite3. 83 + 84 + ``` 85 + fly deploy 86 + ``` 87 + 88 + </details> 89 + 90 + ## dependencies 91 + 92 + - [zat](https://tangled.org/zat.dev/zat) — AT Protocol primitives (jetstream, CBOR, crypto, XRPC) 93 + - [websocket.zig](https://github.com/zzstoatzz/websocket.zig) — WebSocket client/server 94 + - system libsqlite3
+63
build.zig
··· 1 + const std = @import("std"); 2 + 3 + pub fn build(b: *std.Build) void { 4 + const target = b.standardTargetOptions(.{}); 5 + const optimize = b.standardOptimizeOption(.{}); 6 + 7 + const zat = b.dependency("zat", .{ 8 + .target = target, 9 + .optimize = optimize, 10 + }); 11 + const websocket = b.dependency("websocket", .{ 12 + .target = target, 13 + .optimize = optimize, 14 + }); 15 + 16 + const imports: []const std.Build.Module.Import = &.{ 17 + .{ .name = "zat", .module = zat.module("zat") }, 18 + .{ .name = "websocket", .module = websocket.module("websocket") }, 19 + }; 20 + 21 + // labelz executable 22 + const exe_mod = b.createModule(.{ 23 + .root_source_file = b.path("src/main.zig"), 24 + .target = target, 25 + .optimize = optimize, 26 + .imports = imports, 27 + }); 28 + const exe = b.addExecutable(.{ 29 + .name = "labelz", 30 + .root_module = exe_mod, 31 + }); 32 + exe.linkLibC(); 33 + exe.linkSystemLibrary("sqlite3"); 34 + b.installArtifact(exe); 35 + 36 + const run_exe = b.addRunArtifact(exe); 37 + if (b.args) |args| run_exe.addArgs(args); 38 + const run_step = b.step("run", "run the labeler"); 39 + run_step.dependOn(&run_exe.step); 40 + 41 + // tests 42 + const test_step = b.step("test", "run unit tests"); 43 + const test_files = .{ 44 + "src/label.zig", 45 + "src/store.zig", 46 + "src/server.zig", 47 + "src/identity.zig", 48 + }; 49 + inline for (test_files) |file| { 50 + const test_mod = b.createModule(.{ 51 + .root_source_file = b.path(file), 52 + .target = target, 53 + .optimize = optimize, 54 + .imports = imports, 55 + }); 56 + const t = b.addTest(.{ 57 + .root_module = test_mod, 58 + }); 59 + t.linkLibC(); 60 + t.linkSystemLibrary("sqlite3"); 61 + test_step.dependOn(&b.addRunArtifact(t).step); 62 + } 63 + }
+21
build.zig.zon
··· 1 + .{ 2 + .name = .labelz, 3 + .version = "0.0.1", 4 + .fingerprint = 0xcc0dbab526d8712d, 5 + .minimum_zig_version = "0.15.0", 6 + .dependencies = .{ 7 + .zat = .{ 8 + .url = "https://tangled.org/zat.dev/zat/archive/v0.2.16.tar.gz", 9 + .hash = "zat-0.2.16-5PuC7tjwBADbnwV5y8ztKUHhGHMJHh2HouvoYImnZ7y5", 10 + }, 11 + .websocket = .{ 12 + .url = "https://github.com/zzstoatzz/websocket.zig/archive/395d0f4.tar.gz", 13 + .hash = "websocket-0.1.0-ZPISdVJ8AwD7U03ARGgHclzlYSd9GeU91_WDXjRyjYdh", 14 + }, 15 + }, 16 + .paths = .{ 17 + "build.zig", 18 + "build.zig.zon", 19 + "src", 20 + }, 21 + }
+21
fly.toml
··· 1 + app = 'labelz' 2 + primary_region = 'ord' 3 + 4 + [build] 5 + 6 + [http_service] 7 + internal_port = 4100 8 + force_https = true 9 + auto_stop_machines = 'stop' 10 + auto_start_machines = true 11 + min_machines_running = 1 12 + processes = ['app'] 13 + 14 + [mounts] 15 + source = "labelz_data" 16 + destination = "/data" 17 + 18 + [[vm]] 19 + memory = '256mb' 20 + cpu_kind = 'shared' 21 + cpus = 1
+90
justfile
··· 1 + # labelz — AT Protocol labeler 2 + 3 + # --- development --- 4 + 5 + build: 6 + zig build 7 + 8 + test: 9 + zig build test 10 + 11 + fmt: 12 + zig fmt . 13 + 14 + check: 15 + zig fmt --check . 16 + zig build test 17 + 18 + # run locally (requires LABELZ_DID and LABELZ_SECRET_KEY env vars) 19 + run *args: 20 + LABELZ_DB=labelz.db zig build run -- {{args}} 21 + 22 + # --- identity setup (one-time, requires goat CLI) --- 23 + 24 + # step 1: deploy pds.js to cloudflare workers 25 + # cd /tmp && git clone https://tangled.org/chadtmiller.com/pds.js && cd pds.js 26 + # pnpm install 27 + # wrangler secret put PDS_PASSWORD 28 + # wrangler secret put JWT_SECRET 29 + # wrangler deploy 30 + # node scripts/setup.js --pds https://<your-pds>.workers.dev --handle labelz 31 + 32 + # step 2: generate label signing key 33 + keygen: 34 + goat key generate -t secp256k1 35 + 36 + # step 3: add labeler entries to DID doc 37 + # (after pds.js setup creates the base DID, update it with labeler fields) 38 + # usage: just update-did <did> <label-pubkey> <labeler-url> 39 + update-did did label_pubkey labeler_url: 40 + #!/usr/bin/env bash 41 + set -euo pipefail 42 + echo "fetching current PLC data for {{did}}..." 43 + goat plc data {{did}} > /tmp/labelz-plc-current.json 44 + echo "current DID doc:" 45 + cat /tmp/labelz-plc-current.json | python3 -m json.tool 46 + echo "" 47 + echo "to update, edit the operation JSON to add:" 48 + echo ' verificationMethods.atproto_label = "{{label_pubkey}}"' 49 + echo ' services.atproto_labeler = {"type":"AtprotoLabeler","endpoint":"{{labeler_url}}"}' 50 + echo "" 51 + echo "then: goat plc sign --plc-signing-key <rotation-key> updated.json | goat plc submit --genesis -" 52 + 53 + # step 4: write labeler declaration record 54 + # usage: just declare <did> <pds-url> 55 + declare did pds_url: 56 + #!/usr/bin/env bash 57 + set -euo pipefail 58 + echo "writing app.bsky.labeler.service/self to {{did}} on {{pds_url}}" 59 + goat xrpc procedure {{pds_url}} com.atproto.repo.putRecord \ 60 + repo="{{did}}" \ 61 + collection="app.bsky.labeler.service" \ 62 + rkey="self" \ 63 + 'record:={"$type":"app.bsky.labeler.service","policies":{"labelValues":["spam","!warn"],"labelValueDefinitions":[]},"createdAt":"'"$(date -u +%Y-%m-%dT%H:%M:%S.000Z)"'"}' 64 + 65 + # --- deployment (fly.io) --- 66 + 67 + deploy: 68 + fly deploy 69 + 70 + logs: 71 + fly logs 72 + 73 + status: 74 + fly status 75 + 76 + secrets-set did secret_key: 77 + fly secrets set LABELZ_DID={{did}} LABELZ_SECRET_KEY={{secret_key}} 78 + 79 + # --- inspection --- 80 + 81 + # check a DID's current PLC document 82 + resolve did: 83 + goat resolve {{did}} 84 + 85 + # query labels from the running labeler 86 + query-labels url uri: 87 + curl -s "{{url}}/xrpc/com.atproto.label.queryLabels?uriPatterns={{uri}}" | python3 -m json.tool 88 + 89 + health url: 90 + curl -s "{{url}}/health" | python3 -m json.tool
+437
src/identity.zig
··· 1 + //! PLC identity operations for labeler setup 2 + //! 3 + //! creates did:plc genesis operations, signs them, derives the DID, 4 + //! and submits to plc.directory. extends zat's primitives without 5 + //! modifying the zat library itself. 6 + 7 + const std = @import("std"); 8 + const zat = @import("zat"); 9 + const cbor = zat.cbor; 10 + const Keypair = zat.Keypair; 11 + 12 + const Sha256 = std.crypto.hash.sha2.Sha256; 13 + const Allocator = std.mem.Allocator; 14 + const log = std.log.scoped(.identity); 15 + 16 + pub const GenesisParams = struct { 17 + /// handle for alsoKnownAs (without at:// prefix) 18 + handle: []const u8, 19 + /// PDS endpoint URL 20 + pds_endpoint: []const u8, 21 + /// labeler endpoint URL 22 + labeler_endpoint: []const u8, 23 + /// rotation key (controls the DID — keep offline after genesis) 24 + rotation_keypair: *const Keypair, 25 + /// repo signing key (used by PDS) 26 + signing_keypair: *const Keypair, 27 + /// label signing key (used to sign labels) 28 + label_keypair: *const Keypair, 29 + }; 30 + 31 + /// result of creating a PLC genesis operation 32 + pub const GenesisResult = struct { 33 + /// the derived did:plc:... string 34 + did: []const u8, 35 + /// the signed operation as JSON, ready to POST to plc.directory 36 + json: []const u8, 37 + allocator: Allocator, 38 + 39 + pub fn deinit(self: *const GenesisResult) void { 40 + self.allocator.free(self.did); 41 + self.allocator.free(self.json); 42 + } 43 + }; 44 + 45 + /// create a signed PLC genesis operation and derive the DID. 46 + /// 47 + /// the genesis operation establishes a new did:plc identity with: 48 + /// - rotation key for DID control 49 + /// - #atproto verification method (repo signing) 50 + /// - #atproto_label verification method (label signing) 51 + /// - #atproto_pds service endpoint 52 + /// - #atproto_labeler service endpoint 53 + pub fn createGenesis(allocator: Allocator, params: *const GenesisParams) !GenesisResult { 54 + // get did:key strings for all three keys 55 + const rotation_did_key = try params.rotation_keypair.did(allocator); 56 + defer allocator.free(rotation_did_key); 57 + const signing_did_key = try params.signing_keypair.did(allocator); 58 + defer allocator.free(signing_did_key); 59 + const label_did_key = try params.label_keypair.did(allocator); 60 + defer allocator.free(label_did_key); 61 + 62 + // build handle URI: "at://handle" 63 + const handle_uri = try std.fmt.allocPrint(allocator, "at://{s}", .{params.handle}); 64 + defer allocator.free(handle_uri); 65 + 66 + // build unsigned operation as DAG-CBOR 67 + // top-level keys sorted by length then lex: 68 + // prev(4) < type(4) < services(8) < alsoKnownAs(11) < rotationKeys(12) < verificationMethods(19) 69 + 70 + // nested: services map 71 + // atproto_pds(11) < atproto_labeler(15) 72 + // each: type(4) < endpoint(8) 73 + 74 + // nested: verificationMethods map 75 + // atproto(7) < atproto_label(13) 76 + 77 + const unsigned_cbor = try encodeUnsignedOp(allocator, .{ 78 + .rotation_did_key = rotation_did_key, 79 + .signing_did_key = signing_did_key, 80 + .label_did_key = label_did_key, 81 + .handle_uri = handle_uri, 82 + .pds_endpoint = params.pds_endpoint, 83 + .labeler_endpoint = params.labeler_endpoint, 84 + }); 85 + defer allocator.free(unsigned_cbor); 86 + 87 + // sign: the ECDSA scheme handles SHA-256 internally 88 + const sig = try params.rotation_keypair.sign(unsigned_cbor); 89 + 90 + // encode signature as base64url (no padding) 91 + const sig_b64 = encodeBase64Url(&sig.bytes); 92 + 93 + // build signed operation CBOR (with sig field) for DID derivation 94 + const signed_cbor = try encodeSignedOp(allocator, .{ 95 + .rotation_did_key = rotation_did_key, 96 + .signing_did_key = signing_did_key, 97 + .label_did_key = label_did_key, 98 + .handle_uri = handle_uri, 99 + .pds_endpoint = params.pds_endpoint, 100 + .labeler_endpoint = params.labeler_endpoint, 101 + .sig_b64 = sig_b64.constSlice(), 102 + }); 103 + defer allocator.free(signed_cbor); 104 + 105 + // derive DID: SHA-256(signed_cbor) → base32lower → truncate to 24 chars 106 + var hash: [Sha256.digest_length]u8 = undefined; 107 + Sha256.hash(signed_cbor, &hash, .{}); 108 + 109 + const b32 = try zat.multibase.base32lower.encode(allocator, &hash); 110 + defer allocator.free(b32); 111 + 112 + // b32 has 'b' multibase prefix — skip it, take first 24 chars 113 + if (b32.len < 25) return error.Base32TooShort; 114 + const did = try std.fmt.allocPrint(allocator, "did:plc:{s}", .{b32[1..25]}); 115 + errdefer allocator.free(did); 116 + 117 + // build JSON for submission to plc.directory 118 + const json = try buildGenesisJson(allocator, .{ 119 + .rotation_did_key = rotation_did_key, 120 + .signing_did_key = signing_did_key, 121 + .label_did_key = label_did_key, 122 + .handle_uri = handle_uri, 123 + .pds_endpoint = params.pds_endpoint, 124 + .labeler_endpoint = params.labeler_endpoint, 125 + .sig_b64 = sig_b64.constSlice(), 126 + }); 127 + errdefer allocator.free(json); 128 + 129 + return .{ 130 + .did = did, 131 + .json = json, 132 + .allocator = allocator, 133 + }; 134 + } 135 + 136 + // internal params passed between encoding functions 137 + const OpFields = struct { 138 + rotation_did_key: []const u8, 139 + signing_did_key: []const u8, 140 + label_did_key: []const u8, 141 + handle_uri: []const u8, 142 + pds_endpoint: []const u8, 143 + labeler_endpoint: []const u8, 144 + sig_b64: []const u8 = "", 145 + }; 146 + 147 + fn encodeUnsignedOp(allocator: Allocator, f: OpFields) ![]u8 { 148 + var arena = std.heap.ArenaAllocator.init(allocator); 149 + defer arena.deinit(); 150 + const alloc = arena.allocator(); 151 + 152 + const value = try buildOpValue(alloc, f, false); 153 + return cbor.encodeAlloc(allocator, value); 154 + } 155 + 156 + fn encodeSignedOp(allocator: Allocator, f: OpFields) ![]u8 { 157 + var arena = std.heap.ArenaAllocator.init(allocator); 158 + defer arena.deinit(); 159 + const alloc = arena.allocator(); 160 + 161 + const value = try buildOpValue(alloc, f, true); 162 + return cbor.encodeAlloc(allocator, value); 163 + } 164 + 165 + fn buildOpValue(alloc: Allocator, f: OpFields, include_sig: bool) !cbor.Value { 166 + // service entries: type(4) < endpoint(8) 167 + const pds_service = try allocMapEntries(alloc, &.{ 168 + .{ .key = "type", .value = .{ .text = "AtprotoPersonalDataServer" } }, 169 + .{ .key = "endpoint", .value = .{ .text = f.pds_endpoint } }, 170 + }); 171 + 172 + const labeler_service = try allocMapEntries(alloc, &.{ 173 + .{ .key = "type", .value = .{ .text = "AtprotoLabeler" } }, 174 + .{ .key = "endpoint", .value = .{ .text = f.labeler_endpoint } }, 175 + }); 176 + 177 + // services map: atproto_pds(11) < atproto_labeler(15) 178 + const services = try allocMapEntries(alloc, &.{ 179 + .{ .key = "atproto_pds", .value = .{ .map = pds_service } }, 180 + .{ .key = "atproto_labeler", .value = .{ .map = labeler_service } }, 181 + }); 182 + 183 + // verificationMethods: atproto(7) < atproto_label(13) 184 + const ver_methods = try allocMapEntries(alloc, &.{ 185 + .{ .key = "atproto", .value = .{ .text = f.signing_did_key } }, 186 + .{ .key = "atproto_label", .value = .{ .text = f.label_did_key } }, 187 + }); 188 + 189 + // alsoKnownAs array 190 + const aka_items = try alloc.alloc(cbor.Value, 1); 191 + aka_items[0] = .{ .text = f.handle_uri }; 192 + 193 + // rotationKeys array 194 + const rot_items = try alloc.alloc(cbor.Value, 1); 195 + rot_items[0] = .{ .text = f.rotation_did_key }; 196 + 197 + // top-level: sorted by key length then lex 198 + // sig(3) < prev(4) < type(4) < services(8) < alsoKnownAs(11) < rotationKeys(12) < verificationMethods(19) 199 + var entries_buf: [7]cbor.Value.MapEntry = undefined; 200 + var i: usize = 0; 201 + 202 + if (include_sig) { 203 + entries_buf[i] = .{ .key = "sig", .value = .{ .text = f.sig_b64 } }; 204 + i += 1; 205 + } 206 + entries_buf[i] = .{ .key = "prev", .value = .null }; 207 + i += 1; 208 + entries_buf[i] = .{ .key = "type", .value = .{ .text = "plc_operation" } }; 209 + i += 1; 210 + entries_buf[i] = .{ .key = "services", .value = .{ .map = services } }; 211 + i += 1; 212 + entries_buf[i] = .{ .key = "alsoKnownAs", .value = .{ .array = aka_items } }; 213 + i += 1; 214 + entries_buf[i] = .{ .key = "rotationKeys", .value = .{ .array = rot_items } }; 215 + i += 1; 216 + entries_buf[i] = .{ .key = "verificationMethods", .value = .{ .map = ver_methods } }; 217 + i += 1; 218 + 219 + const entries = try alloc.alloc(cbor.Value.MapEntry, i); 220 + @memcpy(entries, entries_buf[0..i]); 221 + 222 + return .{ .map = entries }; 223 + } 224 + 225 + fn allocMapEntries(alloc: Allocator, comptime_entries: []const cbor.Value.MapEntry) ![]const cbor.Value.MapEntry { 226 + const entries = try alloc.alloc(cbor.Value.MapEntry, comptime_entries.len); 227 + @memcpy(entries, comptime_entries); 228 + return entries; 229 + } 230 + 231 + /// encode 64 bytes as base64url (no padding). max output = 86 chars. 232 + const Base64UrlResult = struct { 233 + buf: [88]u8 = undefined, 234 + len: usize = 0, 235 + 236 + fn constSlice(self: *const Base64UrlResult) []const u8 { 237 + return self.buf[0..self.len]; 238 + } 239 + }; 240 + 241 + fn encodeBase64Url(input: []const u8) Base64UrlResult { 242 + const encoder = std.base64.url_safe_no_pad.Encoder; 243 + var result: Base64UrlResult = .{}; 244 + result.len = encoder.calcSize(input.len); 245 + _ = encoder.encode(result.buf[0..result.len], input); 246 + return result; 247 + } 248 + 249 + fn buildGenesisJson(allocator: Allocator, f: OpFields) ![]u8 { 250 + var buf: std.ArrayList(u8) = .{}; 251 + errdefer buf.deinit(allocator); 252 + const w = buf.writer(allocator); 253 + 254 + try w.writeAll("{"); 255 + try w.writeAll("\"type\":\"plc_operation\""); 256 + try w.writeAll(",\"rotationKeys\":[\""); 257 + try w.writeAll(f.rotation_did_key); 258 + try w.writeAll("\"]"); 259 + try w.writeAll(",\"verificationMethods\":{"); 260 + try w.writeAll("\"atproto\":\""); 261 + try w.writeAll(f.signing_did_key); 262 + try w.writeAll("\",\"atproto_label\":\""); 263 + try w.writeAll(f.label_did_key); 264 + try w.writeAll("\"}"); 265 + try w.writeAll(",\"alsoKnownAs\":[\""); 266 + try w.writeAll(f.handle_uri); 267 + try w.writeAll("\"]"); 268 + try w.writeAll(",\"services\":{"); 269 + try w.writeAll("\"atproto_pds\":{\"type\":\"AtprotoPersonalDataServer\",\"endpoint\":\""); 270 + try w.writeAll(f.pds_endpoint); 271 + try w.writeAll("\"}"); 272 + try w.writeAll(",\"atproto_labeler\":{\"type\":\"AtprotoLabeler\",\"endpoint\":\""); 273 + try w.writeAll(f.labeler_endpoint); 274 + try w.writeAll("\"}"); 275 + try w.writeAll("}"); 276 + try w.writeAll(",\"prev\":null"); 277 + try w.writeAll(",\"sig\":\""); 278 + try w.writeAll(f.sig_b64); 279 + try w.writeAll("\"}"); 280 + 281 + return try buf.toOwnedSlice(allocator); 282 + } 283 + 284 + // === tests === 285 + 286 + test "genesis operation round-trip" { 287 + const allocator = std.testing.allocator; 288 + 289 + // use same key for all three roles (fine for testing) 290 + var rotation_kp = try Keypair.fromSecretKey(.secp256k1, .{ 291 + 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 292 + 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10, 293 + 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 294 + 0x19, 0x1a, 0x1b, 0x1c, 0x1d, 0x1e, 0x1f, 0x20, 295 + }); 296 + var signing_kp = rotation_kp; 297 + var label_kp = rotation_kp; 298 + 299 + const params = GenesisParams{ 300 + .handle = "test-labeler.example.com", 301 + .pds_endpoint = "https://pds.example.com", 302 + .labeler_endpoint = "https://labeler.example.com", 303 + .rotation_keypair = &rotation_kp, 304 + .signing_keypair = &signing_kp, 305 + .label_keypair = &label_kp, 306 + }; 307 + 308 + const result = try createGenesis(allocator, &params); 309 + defer result.deinit(); 310 + 311 + // DID should start with "did:plc:" and be 32 chars total 312 + try std.testing.expect(std.mem.startsWith(u8, result.did, "did:plc:")); 313 + try std.testing.expectEqual(@as(usize, 32), result.did.len); 314 + 315 + // DID suffix should only contain base32lower chars (a-z, 2-7) 316 + for (result.did[8..]) |c| { 317 + try std.testing.expect((c >= 'a' and c <= 'z') or (c >= '2' and c <= '7')); 318 + } 319 + 320 + // JSON should contain expected fields 321 + try std.testing.expect(std.mem.indexOf(u8, result.json, "\"type\":\"plc_operation\"") != null); 322 + try std.testing.expect(std.mem.indexOf(u8, result.json, "\"prev\":null") != null); 323 + try std.testing.expect(std.mem.indexOf(u8, result.json, "\"sig\":\"") != null); 324 + try std.testing.expect(std.mem.indexOf(u8, result.json, "AtprotoLabeler") != null); 325 + try std.testing.expect(std.mem.indexOf(u8, result.json, "AtprotoPersonalDataServer") != null); 326 + try std.testing.expect(std.mem.indexOf(u8, result.json, "test-labeler.example.com") != null); 327 + } 328 + 329 + test "genesis is deterministic" { 330 + const allocator = std.testing.allocator; 331 + 332 + var rotation_kp = try Keypair.fromSecretKey(.secp256k1, .{ 333 + 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 334 + 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10, 335 + 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 336 + 0x19, 0x1a, 0x1b, 0x1c, 0x1d, 0x1e, 0x1f, 0x20, 337 + }); 338 + var signing_kp = rotation_kp; 339 + var label_kp = rotation_kp; 340 + 341 + const params = GenesisParams{ 342 + .handle = "test.example.com", 343 + .pds_endpoint = "https://pds.example.com", 344 + .labeler_endpoint = "https://labeler.example.com", 345 + .rotation_keypair = &rotation_kp, 346 + .signing_keypair = &signing_kp, 347 + .label_keypair = &label_kp, 348 + }; 349 + 350 + const r1 = try createGenesis(allocator, &params); 351 + defer r1.deinit(); 352 + const r2 = try createGenesis(allocator, &params); 353 + defer r2.deinit(); 354 + 355 + // same keys + same params = same DID 356 + try std.testing.expectEqualStrings(r1.did, r2.did); 357 + try std.testing.expectEqualStrings(r1.json, r2.json); 358 + } 359 + 360 + test "genesis unsigned CBOR key ordering" { 361 + const allocator = std.testing.allocator; 362 + 363 + const cbor_bytes = try encodeUnsignedOp(allocator, .{ 364 + .rotation_did_key = "did:key:zTest", 365 + .signing_did_key = "did:key:zTest", 366 + .label_did_key = "did:key:zTest", 367 + .handle_uri = "at://test.example.com", 368 + .pds_endpoint = "https://pds.example.com", 369 + .labeler_endpoint = "https://labeler.example.com", 370 + }); 371 + defer allocator.free(cbor_bytes); 372 + 373 + // decode and check key ordering 374 + var arena = std.heap.ArenaAllocator.init(allocator); 375 + defer arena.deinit(); 376 + const decoded = try cbor.decodeAll(arena.allocator(), cbor_bytes); 377 + 378 + const entries = decoded.map; 379 + try std.testing.expectEqual(@as(usize, 6), entries.len); 380 + 381 + // DAG-CBOR: sorted by length then lex 382 + try std.testing.expectEqualStrings("prev", entries[0].key); // 4 383 + try std.testing.expectEqualStrings("type", entries[1].key); // 4 384 + try std.testing.expectEqualStrings("services", entries[2].key); // 8 385 + try std.testing.expectEqualStrings("alsoKnownAs", entries[3].key); // 11 386 + try std.testing.expectEqualStrings("rotationKeys", entries[4].key); // 12 387 + try std.testing.expectEqualStrings("verificationMethods", entries[5].key); // 19 388 + 389 + // prev should be null 390 + try std.testing.expectEqual(cbor.Value.null, entries[0].value); 391 + 392 + // type should be "plc_operation" 393 + try std.testing.expectEqualStrings("plc_operation", entries[1].value.text); 394 + } 395 + 396 + test "base64url encoding" { 397 + // test with known values 398 + const sig = [_]u8{0xaa} ** 64; 399 + const result = encodeBase64Url(&sig); 400 + try std.testing.expectEqual(@as(usize, 86), result.len); 401 + // base64url has no + or / characters 402 + for (result.constSlice()) |c| { 403 + try std.testing.expect(c != '+' and c != '/'); 404 + } 405 + } 406 + 407 + test "genesis JSON structure" { 408 + const allocator = std.testing.allocator; 409 + 410 + const json = try buildGenesisJson(allocator, .{ 411 + .rotation_did_key = "did:key:zRotation", 412 + .signing_did_key = "did:key:zSigning", 413 + .label_did_key = "did:key:zLabel", 414 + .handle_uri = "at://test.example.com", 415 + .pds_endpoint = "https://pds.example.com", 416 + .labeler_endpoint = "https://labeler.example.com", 417 + .sig_b64 = "dGVzdHNpZw", 418 + }); 419 + defer allocator.free(json); 420 + 421 + // parse to verify it's valid JSON 422 + const parsed = try std.json.parseFromSlice(std.json.Value, allocator, json, .{}); 423 + defer parsed.deinit(); 424 + 425 + const root = parsed.value.object; 426 + try std.testing.expectEqualStrings("plc_operation", root.get("type").?.string); 427 + try std.testing.expect(root.get("prev").?.null == {}); 428 + try std.testing.expectEqualStrings("dGVzdHNpZw", root.get("sig").?.string); 429 + 430 + const rotation_keys = root.get("rotationKeys").?.array; 431 + try std.testing.expectEqual(@as(usize, 1), rotation_keys.items.len); 432 + try std.testing.expectEqualStrings("did:key:zRotation", rotation_keys.items[0].string); 433 + 434 + const services = root.get("services").?.object; 435 + try std.testing.expectEqualStrings("AtprotoPersonalDataServer", services.get("atproto_pds").?.object.get("type").?.string); 436 + try std.testing.expectEqualStrings("AtprotoLabeler", services.get("atproto_labeler").?.object.get("type").?.string); 437 + }
+325
src/label.zig
··· 1 + //! AT Protocol label type with CBOR encoding and signing 2 + //! 3 + //! implements com.atproto.label.defs#label: 4 + //! encode label (sans sig) as deterministic DAG-CBOR, 5 + //! ECDSA sign (scheme handles SHA-256 internally), attach sig bytes. 6 + 7 + const std = @import("std"); 8 + const zat = @import("zat"); 9 + const cbor = zat.cbor; 10 + const Keypair = zat.Keypair; 11 + 12 + const Allocator = std.mem.Allocator; 13 + 14 + pub const Label = struct { 15 + ver: u8 = 1, 16 + src: []const u8, // labeler DID 17 + uri: []const u8, // subject (at:// URI or DID) 18 + cid: ?[]const u8 = null, // optional, pins to record version 19 + val: []const u8, // label value (max 128 bytes) 20 + neg: bool = false, // negates previous label 21 + cts: []const u8, // created-at ISO 8601 22 + exp: ?[]const u8 = null, // optional expiration 23 + sig: ?[]const u8 = null, // filled after signing 24 + 25 + /// encode the label (without sig) as deterministic DAG-CBOR. 26 + /// caller owns the returned slice. 27 + pub fn encodeUnsigned(self: *const Label, allocator: Allocator) ![]u8 { 28 + var entries: [9]cbor.Value.MapEntry = undefined; 29 + const n = self.fillEntries(&entries, false); 30 + return cbor.encodeAlloc(allocator, .{ .map = entries[0..n] }); 31 + } 32 + 33 + /// sign this label: CBOR-encode (sans sig) → ECDSA sign (scheme hashes internally). 34 + /// returns the raw signature bytes (64 bytes, r||s). 35 + /// does NOT mutate self — caller should set self.sig. 36 + pub fn computeSignature(self: *const Label, allocator: Allocator, keypair: *const Keypair) !zat.jwt.Signature { 37 + const unsigned_bytes = try self.encodeUnsigned(allocator); 38 + defer allocator.free(unsigned_bytes); 39 + 40 + return keypair.sign(unsigned_bytes); 41 + } 42 + 43 + /// encode the full label (with sig) as deterministic DAG-CBOR. 44 + /// self.sig must be set before calling this. 45 + pub fn encodeSigned(self: *const Label, allocator: Allocator) ![]u8 { 46 + std.debug.assert(self.sig != null); 47 + var entries: [9]cbor.Value.MapEntry = undefined; 48 + const n = self.fillEntries(&entries, true); 49 + return cbor.encodeAlloc(allocator, .{ .map = entries[0..n] }); 50 + } 51 + 52 + /// sign and return a fully-encoded signed label. 53 + /// sets self.sig as a side effect. 54 + pub fn signAndEncode(self: *Label, allocator: Allocator, keypair: *const Keypair, sig_buf: *[64]u8) ![]u8 { 55 + const sig = try self.computeSignature(allocator, keypair); 56 + sig_buf.* = sig.bytes; 57 + self.sig = sig_buf; 58 + return self.encodeSigned(allocator); 59 + } 60 + 61 + /// fill entries buffer with CBOR map entries. returns number of entries written. 62 + /// entries buffer must be at least 9 elements. the returned slice is valid 63 + /// as long as the buffer and self are alive. 64 + fn fillEntries(self: *const Label, entries: *[9]cbor.Value.MapEntry, include_sig: bool) usize { 65 + var i: usize = 0; 66 + 67 + if (self.cid) |ci| { 68 + entries[i] = .{ .key = "cid", .value = .{ .text = ci } }; 69 + i += 1; 70 + } 71 + entries[i] = .{ .key = "cts", .value = .{ .text = self.cts } }; 72 + i += 1; 73 + if (self.exp) |e| { 74 + entries[i] = .{ .key = "exp", .value = .{ .text = e } }; 75 + i += 1; 76 + } 77 + if (self.neg) { 78 + entries[i] = .{ .key = "neg", .value = .{ .boolean = true } }; 79 + i += 1; 80 + } 81 + if (include_sig) { 82 + if (self.sig) |s| { 83 + entries[i] = .{ .key = "sig", .value = .{ .bytes = s } }; 84 + i += 1; 85 + } 86 + } 87 + entries[i] = .{ .key = "src", .value = .{ .text = self.src } }; 88 + i += 1; 89 + entries[i] = .{ .key = "uri", .value = .{ .text = self.uri } }; 90 + i += 1; 91 + entries[i] = .{ .key = "val", .value = .{ .text = self.val } }; 92 + i += 1; 93 + entries[i] = .{ .key = "ver", .value = .{ .unsigned = self.ver } }; 94 + i += 1; 95 + 96 + return i; 97 + } 98 + }; 99 + 100 + /// encode an XRPC event stream frame: header CBOR || payload CBOR. 101 + /// used for both subscribeLabels and firehose-style event streams. 102 + pub fn encodeEventFrame(allocator: Allocator, op: i64, frame_type: ?[]const u8, payload: cbor.Value) ![]u8 { 103 + // header: {op: <int>, t?: <string>} 104 + var header_entries: [2]cbor.Value.MapEntry = undefined; 105 + var h_count: usize = 0; 106 + 107 + if (op >= 0) { 108 + header_entries[h_count] = .{ .key = "op", .value = .{ .unsigned = @intCast(op) } }; 109 + } else { 110 + header_entries[h_count] = .{ .key = "op", .value = .{ .negative = op } }; 111 + } 112 + h_count += 1; 113 + 114 + if (frame_type) |t| { 115 + header_entries[h_count] = .{ .key = "t", .value = .{ .text = t } }; 116 + h_count += 1; 117 + } 118 + 119 + const header: cbor.Value = .{ .map = header_entries[0..h_count] }; 120 + 121 + const header_bytes = try cbor.encodeAlloc(allocator, header); 122 + defer allocator.free(header_bytes); 123 + const payload_bytes = try cbor.encodeAlloc(allocator, payload); 124 + defer allocator.free(payload_bytes); 125 + 126 + const frame = try allocator.alloc(u8, header_bytes.len + payload_bytes.len); 127 + @memcpy(frame[0..header_bytes.len], header_bytes); 128 + @memcpy(frame[header_bytes.len..], payload_bytes); 129 + return frame; 130 + } 131 + 132 + // === tests === 133 + 134 + test "label unsigned CBOR round-trip" { 135 + const allocator = std.testing.allocator; 136 + 137 + const label = Label{ 138 + .src = "did:plc:test123", 139 + .uri = "at://did:plc:user/app.bsky.feed.post/abc", 140 + .val = "spam", 141 + .cts = "2024-01-01T00:00:00.000Z", 142 + }; 143 + 144 + const encoded = try label.encodeUnsigned(allocator); 145 + defer allocator.free(encoded); 146 + 147 + // decode and verify fields 148 + var arena = std.heap.ArenaAllocator.init(allocator); 149 + defer arena.deinit(); 150 + const decoded = try cbor.decodeAll(arena.allocator(), encoded); 151 + 152 + try std.testing.expectEqualStrings("did:plc:test123", decoded.getString("src").?); 153 + try std.testing.expectEqualStrings("at://did:plc:user/app.bsky.feed.post/abc", decoded.getString("uri").?); 154 + try std.testing.expectEqualStrings("spam", decoded.getString("val").?); 155 + try std.testing.expectEqualStrings("2024-01-01T00:00:00.000Z", decoded.getString("cts").?); 156 + try std.testing.expectEqual(@as(u64, 1), decoded.getUint("ver").?); 157 + // neg=false should be omitted 158 + try std.testing.expect(decoded.get("neg") == null); 159 + // sig should be omitted in unsigned encoding 160 + try std.testing.expect(decoded.get("sig") == null); 161 + } 162 + 163 + test "label with optional fields" { 164 + const allocator = std.testing.allocator; 165 + 166 + const label = Label{ 167 + .src = "did:plc:labeler", 168 + .uri = "did:plc:subject", 169 + .val = "!warn", 170 + .neg = true, 171 + .cts = "2024-06-15T12:00:00.000Z", 172 + .exp = "2025-06-15T12:00:00.000Z", 173 + .cid = "bafyreitest", 174 + }; 175 + 176 + const encoded = try label.encodeUnsigned(allocator); 177 + defer allocator.free(encoded); 178 + 179 + var arena = std.heap.ArenaAllocator.init(allocator); 180 + defer arena.deinit(); 181 + const decoded = try cbor.decodeAll(arena.allocator(), encoded); 182 + 183 + try std.testing.expectEqual(true, decoded.getBool("neg").?); 184 + try std.testing.expectEqualStrings("2025-06-15T12:00:00.000Z", decoded.getString("exp").?); 185 + try std.testing.expectEqualStrings("bafyreitest", decoded.getString("cid").?); 186 + } 187 + 188 + test "label sign and verify" { 189 + const allocator = std.testing.allocator; 190 + 191 + const keypair = try Keypair.fromSecretKey(.secp256k1, .{ 192 + 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 193 + 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10, 194 + 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 195 + 0x19, 0x1a, 0x1b, 0x1c, 0x1d, 0x1e, 0x1f, 0x20, 196 + }); 197 + 198 + var label = Label{ 199 + .src = "did:plc:test", 200 + .uri = "at://did:plc:user/app.bsky.feed.post/abc", 201 + .val = "spam", 202 + .cts = "2024-01-01T00:00:00.000Z", 203 + }; 204 + 205 + var sig_buf: [64]u8 = undefined; 206 + const encoded = try label.signAndEncode(allocator, &keypair, &sig_buf); 207 + defer allocator.free(encoded); 208 + 209 + // decode and check sig is present 210 + var arena = std.heap.ArenaAllocator.init(allocator); 211 + defer arena.deinit(); 212 + const decoded = try cbor.decodeAll(arena.allocator(), encoded); 213 + const sig_bytes = decoded.getBytes("sig").?; 214 + try std.testing.expectEqual(@as(usize, 64), sig_bytes.len); 215 + 216 + // verify the signature: re-encode unsigned, verify (scheme hashes internally) 217 + const unsigned = try label.encodeUnsigned(allocator); 218 + defer allocator.free(unsigned); 219 + 220 + const pk = try keypair.publicKey(); 221 + try zat.jwt.verifySecp256k1(unsigned, sig_bytes, &pk); 222 + } 223 + 224 + test "label deterministic encoding" { 225 + const allocator = std.testing.allocator; 226 + 227 + const label = Label{ 228 + .src = "did:plc:test", 229 + .uri = "at://did:plc:user/app.bsky.feed.post/abc", 230 + .val = "spam", 231 + .cts = "2024-01-01T00:00:00.000Z", 232 + }; 233 + 234 + const enc1 = try label.encodeUnsigned(allocator); 235 + defer allocator.free(enc1); 236 + const enc2 = try label.encodeUnsigned(allocator); 237 + defer allocator.free(enc2); 238 + 239 + try std.testing.expectEqualSlices(u8, enc1, enc2); 240 + } 241 + 242 + test "label DAG-CBOR key ordering" { 243 + const allocator = std.testing.allocator; 244 + 245 + // with all fields: keys should be sorted by length then lex 246 + // 3-char: cid, cts, exp, neg, sig, src, uri, val, ver 247 + // all same length (3) → sorted lex: cid, cts, exp, neg, sig, src, uri, val, ver 248 + const label = Label{ 249 + .src = "did:plc:labeler", 250 + .uri = "did:plc:subject", 251 + .val = "test", 252 + .neg = true, 253 + .cts = "2024-01-01T00:00:00.000Z", 254 + .exp = "2025-01-01T00:00:00.000Z", 255 + .cid = "bafytest", 256 + .sig = &(.{0xaa} ** 64), 257 + }; 258 + 259 + const encoded = try label.encodeSigned(allocator); 260 + defer allocator.free(encoded); 261 + 262 + var arena = std.heap.ArenaAllocator.init(allocator); 263 + defer arena.deinit(); 264 + const decoded = try cbor.decodeAll(arena.allocator(), encoded); 265 + 266 + // verify map key order 267 + const entries = decoded.map; 268 + try std.testing.expectEqualStrings("cid", entries[0].key); 269 + try std.testing.expectEqualStrings("cts", entries[1].key); 270 + try std.testing.expectEqualStrings("exp", entries[2].key); 271 + try std.testing.expectEqualStrings("neg", entries[3].key); 272 + try std.testing.expectEqualStrings("sig", entries[4].key); 273 + try std.testing.expectEqualStrings("src", entries[5].key); 274 + try std.testing.expectEqualStrings("uri", entries[6].key); 275 + try std.testing.expectEqualStrings("val", entries[7].key); 276 + try std.testing.expectEqualStrings("ver", entries[8].key); 277 + } 278 + 279 + test "encode event frame" { 280 + const allocator = std.testing.allocator; 281 + 282 + // simple labels frame: {op: 1, t: "#labels"} + {seq: 1, labels: [...]} 283 + const payload: cbor.Value = .{ .map = &.{ 284 + .{ .key = "seq", .value = .{ .unsigned = 42 } }, 285 + } }; 286 + 287 + const frame = try encodeEventFrame(allocator, 1, "#labels", payload); 288 + defer allocator.free(frame); 289 + 290 + // decode header 291 + var arena = std.heap.ArenaAllocator.init(allocator); 292 + defer arena.deinit(); 293 + const alloc = arena.allocator(); 294 + 295 + const header_result = try cbor.decode(alloc, frame); 296 + try std.testing.expectEqual(@as(u64, 1), header_result.value.getUint("op").?); 297 + try std.testing.expectEqualStrings("#labels", header_result.value.getString("t").?); 298 + 299 + // decode payload 300 + const payload_decoded = try cbor.decodeAll(alloc, frame[header_result.consumed..]); 301 + try std.testing.expectEqual(@as(u64, 42), payload_decoded.getUint("seq").?); 302 + } 303 + 304 + test "encode error frame" { 305 + const allocator = std.testing.allocator; 306 + 307 + const payload: cbor.Value = .{ .map = &.{ 308 + .{ .key = "error", .value = .{ .text = "FutureCursor" } }, 309 + .{ .key = "message", .value = .{ .text = "cursor is in the future" } }, 310 + } }; 311 + 312 + const frame = try encodeEventFrame(allocator, -1, null, payload); 313 + defer allocator.free(frame); 314 + 315 + var arena = std.heap.ArenaAllocator.init(allocator); 316 + defer arena.deinit(); 317 + const alloc = arena.allocator(); 318 + 319 + const header_result = try cbor.decode(alloc, frame); 320 + try std.testing.expectEqual(@as(i64, -1), header_result.value.getInt("op").?); 321 + try std.testing.expect(header_result.value.getString("t") == null); 322 + 323 + const payload_decoded = try cbor.decodeAll(alloc, frame[header_result.consumed..]); 324 + try std.testing.expectEqualStrings("FutureCursor", payload_decoded.getString("error").?); 325 + }
+256
src/main.zig
··· 1 + //! labelz — AT Protocol labeler built on zat 2 + //! 3 + //! subscribes to jetstream, applies keyword-based labels, 4 + //! signs with secp256k1, stores in SQLite, and serves 5 + //! subscribeLabels (WS) + queryLabels (HTTP). 6 + 7 + const std = @import("std"); 8 + const websocket = @import("websocket"); 9 + const zat = @import("zat"); 10 + const label_mod = @import("label.zig"); 11 + const store_mod = @import("store.zig"); 12 + const server_mod = @import("server.zig"); 13 + 14 + const Label = label_mod.Label; 15 + const Keypair = zat.Keypair; 16 + const JetstreamClient = zat.JetstreamClient; 17 + const Allocator = std.mem.Allocator; 18 + const log = std.log.scoped(.labelz); 19 + 20 + const Config = struct { 21 + /// labeler DID (src field on labels) 22 + did: []const u8, 23 + /// secret key bytes (32 bytes, hex-encoded in env) 24 + secret_key: [32]u8, 25 + /// port for XRPC server 26 + port: u16 = 4100, 27 + /// SQLite database path 28 + db_path: [*:0]const u8 = "labelz.db", 29 + /// collections to watch 30 + collections: []const []const u8 = &.{"app.bsky.feed.post"}, 31 + /// keyword → label value rules 32 + rules: []const Rule = &default_rules, 33 + }; 34 + 35 + pub const Rule = struct { 36 + keyword: []const u8, 37 + label_val: []const u8, 38 + }; 39 + 40 + const default_rules = [_]Rule{ 41 + .{ .keyword = "#labelz-test", .label_val = "spam" }, 42 + .{ .keyword = "zat-labeler-test", .label_val = "!warn" }, 43 + }; 44 + 45 + const ServerType = server_mod.Server; 46 + 47 + const EventHandler = struct { 48 + allocator: Allocator, 49 + config: *const Config, 50 + keypair: *const Keypair, 51 + store: *store_mod.Store, 52 + server: *ServerType, 53 + 54 + pub fn onEvent(self: *EventHandler, event: zat.JetstreamEvent) void { 55 + switch (event) { 56 + .commit => |commit| self.handleCommit(commit), 57 + else => {}, 58 + } 59 + } 60 + 61 + pub fn onError(_: *EventHandler, err: anyerror) void { 62 + log.err("jetstream: {s}", .{@errorName(err)}); 63 + } 64 + 65 + pub fn onConnect(_: *EventHandler, host: []const u8) void { 66 + log.info("connected to {s}", .{host}); 67 + } 68 + 69 + fn handleCommit(self: *EventHandler, commit: zat.jetstream.CommitEvent) void { 70 + if (commit.operation != .create) return; 71 + 72 + const record = commit.record orelse return; 73 + const text = zat.json.getString(record, "text") orelse return; 74 + 75 + for (self.config.rules) |rule| { 76 + if (containsIgnoreCase(text, rule.keyword)) { 77 + self.emitLabel(commit, rule.label_val); 78 + return; // one label per record 79 + } 80 + } 81 + } 82 + 83 + fn emitLabel(self: *EventHandler, commit: zat.jetstream.CommitEvent, val: []const u8) void { 84 + // build subject URI: at://{did}/{collection}/{rkey} 85 + var uri_buf: [512]u8 = undefined; 86 + const uri = std.fmt.bufPrint(&uri_buf, "at://{s}/{s}/{s}", .{ 87 + commit.did, commit.collection, commit.rkey, 88 + }) catch return; 89 + 90 + var ts_buf: [32]u8 = undefined; 91 + const cts = nowIso8601(&ts_buf); 92 + 93 + var label = Label{ 94 + .src = self.config.did, 95 + .uri = uri, 96 + .val = val, 97 + .cts = cts, 98 + }; 99 + 100 + var sig_buf: [64]u8 = undefined; 101 + const encoded = label.signAndEncode(self.allocator, self.keypair, &sig_buf) catch |err| { 102 + log.err("sign failed: {s}", .{@errorName(err)}); 103 + return; 104 + }; 105 + defer self.allocator.free(encoded); 106 + 107 + const seq = self.store.insert(&label, encoded) catch |err| { 108 + log.err("store failed: {s}", .{@errorName(err)}); 109 + return; 110 + }; 111 + 112 + log.info("label seq={d} val={s} uri={s}", .{ seq, val, uri }); 113 + self.server.broadcast(seq, encoded); 114 + } 115 + }; 116 + 117 + pub fn main() !void { 118 + var gpa: std.heap.GeneralPurposeAllocator(.{}) = .init; 119 + defer _ = gpa.deinit(); 120 + const allocator = gpa.allocator(); 121 + 122 + const config = try loadConfig(); 123 + 124 + var keypair = try Keypair.fromSecretKey(.secp256k1, config.secret_key); 125 + var store = try store_mod.Store.init(config.db_path); 126 + defer store.deinit(); 127 + 128 + var server = ServerType.init(allocator, &store); 129 + defer server.deinit(); 130 + 131 + // start XRPC server in background thread 132 + const WsHandler = server_mod.Handler(ServerType); 133 + var ws_server = try websocket.Server(WsHandler).init(allocator, .{ 134 + .port = config.port, 135 + .address = "0.0.0.0", 136 + .max_conn = 256, 137 + .max_message_size = 64 * 1024, 138 + }); 139 + 140 + const server_thread = try ws_server.listenInNewThread(&server); 141 + defer server_thread.detach(); 142 + 143 + log.info("labeler serving on :{d}", .{config.port}); 144 + log.info("DID: {s}", .{config.did}); 145 + log.info("rules: {d} keyword rules active", .{config.rules.len}); 146 + 147 + // start jetstream consumer (blocks forever) 148 + var client = JetstreamClient.init(allocator, .{ 149 + .wanted_collections = config.collections, 150 + }); 151 + 152 + var handler = EventHandler{ 153 + .allocator = allocator, 154 + .config = &config, 155 + .keypair = &keypair, 156 + .store = &store, 157 + .server = &server, 158 + }; 159 + 160 + client.subscribe(&handler); 161 + } 162 + 163 + fn loadConfig() !Config { 164 + const did = std.posix.getenv("LABELZ_DID") orelse { 165 + log.err("LABELZ_DID environment variable required", .{}); 166 + return error.MissingConfig; 167 + }; 168 + 169 + const key_hex = std.posix.getenv("LABELZ_SECRET_KEY") orelse { 170 + log.err("LABELZ_SECRET_KEY environment variable required (64 hex chars)", .{}); 171 + return error.MissingConfig; 172 + }; 173 + 174 + if (key_hex.len != 64) { 175 + log.err("LABELZ_SECRET_KEY must be 64 hex characters (32 bytes)", .{}); 176 + return error.InvalidConfig; 177 + } 178 + 179 + var secret_key: [32]u8 = undefined; 180 + _ = std.fmt.hexToBytes(&secret_key, key_hex) catch { 181 + log.err("LABELZ_SECRET_KEY invalid hex", .{}); 182 + return error.InvalidConfig; 183 + }; 184 + 185 + const port: u16 = if (std.posix.getenv("LABELZ_PORT")) |p| 186 + std.fmt.parseInt(u16, p, 10) catch 4100 187 + else 188 + 4100; 189 + 190 + const db_path: [*:0]const u8 = if (std.posix.getenv("LABELZ_DB")) |p| 191 + @ptrCast(p.ptr) 192 + else 193 + "/data/labelz.db"; 194 + 195 + return .{ 196 + .did = did, 197 + .secret_key = secret_key, 198 + .port = port, 199 + .db_path = db_path, 200 + }; 201 + } 202 + 203 + fn containsIgnoreCase(haystack: []const u8, needle: []const u8) bool { 204 + if (needle.len > haystack.len) return false; 205 + if (needle.len == 0) return true; 206 + 207 + const end = haystack.len - needle.len + 1; 208 + for (0..end) |i| { 209 + var match = true; 210 + for (needle, 0..) |nc, j| { 211 + if (std.ascii.toLower(haystack[i + j]) != std.ascii.toLower(nc)) { 212 + match = false; 213 + break; 214 + } 215 + } 216 + if (match) return true; 217 + } 218 + return false; 219 + } 220 + 221 + fn nowIso8601(buf: *[32]u8) []const u8 { 222 + const ts = std.time.timestamp(); 223 + const epoch: std.time.epoch.EpochSeconds = .{ .secs = @intCast(ts) }; 224 + const day = epoch.getDaySeconds(); 225 + const yd = epoch.getEpochDay().calculateYearDay(); 226 + const md = yd.calculateMonthDay(); 227 + 228 + return std.fmt.bufPrint(buf, "{d:0>4}-{d:0>2}-{d:0>2}T{d:0>2}:{d:0>2}:{d:0>2}.000Z", .{ 229 + yd.year, 230 + md.month.numeric(), 231 + md.day_index + 1, 232 + day.getHoursIntoDay(), 233 + day.getMinutesIntoHour(), 234 + day.getSecondsIntoMinute(), 235 + }) catch "1970-01-01T00:00:00.000Z"; 236 + } 237 + 238 + // === tests === 239 + 240 + test "containsIgnoreCase" { 241 + try std.testing.expect(containsIgnoreCase("Hello World", "hello")); 242 + try std.testing.expect(containsIgnoreCase("this is SPAM", "spam")); 243 + try std.testing.expect(containsIgnoreCase("🚨 alert!", "🚨")); 244 + try std.testing.expect(!containsIgnoreCase("nothing here", "spam")); 245 + try std.testing.expect(containsIgnoreCase("", "")); 246 + try std.testing.expect(!containsIgnoreCase("", "x")); 247 + } 248 + 249 + test "nowIso8601 produces valid format" { 250 + var buf: [32]u8 = undefined; 251 + const ts = nowIso8601(&buf); 252 + // should look like 2024-01-01T00:00:00.000Z 253 + try std.testing.expectEqual(@as(usize, 24), ts.len); 254 + try std.testing.expectEqual(@as(u8, 'T'), ts[10]); 255 + try std.testing.expectEqual(@as(u8, 'Z'), ts[23]); 256 + }
+376
src/server.zig
··· 1 + //! XRPC server for AT Protocol labeler 2 + //! 3 + //! serves two endpoints: 4 + //! - com.atproto.label.subscribeLabels (WebSocket, CBOR-framed event stream) 5 + //! - com.atproto.label.queryLabels (HTTP GET, JSON response) 6 + 7 + const std = @import("std"); 8 + const websocket = @import("websocket"); 9 + const zat = @import("zat"); 10 + const cbor = zat.cbor; 11 + const label_mod = @import("label.zig"); 12 + const store_mod = @import("store.zig"); 13 + 14 + const Allocator = std.mem.Allocator; 15 + const log = std.log.scoped(.server); 16 + 17 + pub const Server = struct { 18 + allocator: Allocator, 19 + store: *store_mod.Store, 20 + subscribers: std.ArrayList(*Subscriber), 21 + mutex: std.Thread.Mutex = .{}, 22 + 23 + pub fn init(allocator: Allocator, store: *store_mod.Store) Server { 24 + return .{ 25 + .allocator = allocator, 26 + .store = store, 27 + .subscribers = .{}, 28 + }; 29 + } 30 + 31 + pub fn deinit(self: *Server) void { 32 + self.subscribers.deinit(self.allocator); 33 + } 34 + 35 + /// broadcast a new label to all connected subscribers. 36 + /// called after a label is stored. 37 + pub fn broadcast(self: *Server, seq: i64, encoded_label: []const u8) void { 38 + const frame = self.buildLabelsFrame(seq, encoded_label) catch |err| { 39 + log.err("failed to build frame: {s}", .{@errorName(err)}); 40 + return; 41 + }; 42 + defer self.allocator.free(frame); 43 + 44 + self.mutex.lock(); 45 + defer self.mutex.unlock(); 46 + 47 + var i: usize = 0; 48 + while (i < self.subscribers.items.len) { 49 + const sub = self.subscribers.items[i]; 50 + sub.conn.writeBin(frame) catch { 51 + // subscriber disconnected, remove 52 + log.info("subscriber disconnected, removing", .{}); 53 + _ = self.subscribers.swapRemove(i); 54 + continue; 55 + }; 56 + i += 1; 57 + } 58 + } 59 + 60 + fn buildLabelsFrame(self: *Server, seq: i64, encoded_label: []const u8) ![]u8 { 61 + // payload: {seq: <int>, labels: [<encoded label as bytes>]} 62 + // we re-decode the stored CBOR to embed it as a CBOR value in the frame. 63 + var arena = std.heap.ArenaAllocator.init(self.allocator); 64 + defer arena.deinit(); 65 + const alloc = arena.allocator(); 66 + 67 + const label_value = try cbor.decodeAll(alloc, encoded_label); 68 + 69 + const labels_array = try alloc.alloc(cbor.Value, 1); 70 + labels_array[0] = label_value; 71 + 72 + const entries = try alloc.alloc(cbor.Value.MapEntry, 2); 73 + entries[0] = .{ .key = "labels", .value = .{ .array = labels_array } }; 74 + entries[1] = .{ .key = "seq", .value = .{ .unsigned = @intCast(seq) } }; 75 + 76 + const payload: cbor.Value = .{ .map = entries }; 77 + 78 + return label_mod.encodeEventFrame(self.allocator, 1, "#labels", payload); 79 + } 80 + 81 + /// add a subscriber connection. 82 + pub fn addSubscriber(self: *Server, conn: *websocket.Conn, cursor: ?i64) void { 83 + const sub = self.allocator.create(Subscriber) catch return; 84 + sub.* = .{ .conn = conn }; 85 + 86 + // backfill from cursor 87 + if (cursor) |cur| { 88 + self.backfill(conn, cur) catch |err| { 89 + log.err("backfill failed: {s}", .{@errorName(err)}); 90 + }; 91 + } 92 + 93 + self.mutex.lock(); 94 + defer self.mutex.unlock(); 95 + self.subscribers.append(self.allocator, sub) catch { 96 + self.allocator.destroy(sub); 97 + }; 98 + } 99 + 100 + /// remove a subscriber connection. 101 + pub fn removeSubscriber(self: *Server, conn: *websocket.Conn) void { 102 + self.mutex.lock(); 103 + defer self.mutex.unlock(); 104 + 105 + for (self.subscribers.items, 0..) |sub, i| { 106 + if (sub.conn == conn) { 107 + self.allocator.destroy(sub); 108 + _ = self.subscribers.swapRemove(i); 109 + return; 110 + } 111 + } 112 + } 113 + 114 + fn backfill(self: *Server, conn: *websocket.Conn, cursor: i64) !void { 115 + const latest = self.store.latestSeq(); 116 + 117 + // check if cursor is in the future 118 + if (cursor > latest) { 119 + const frame = try label_mod.encodeEventFrame(self.allocator, -1, null, .{ .map = &.{ 120 + .{ .key = "error", .value = .{ .text = "FutureCursor" } }, 121 + .{ .key = "message", .value = .{ .text = "cursor is in the future" } }, 122 + } }); 123 + defer self.allocator.free(frame); 124 + conn.writeBin(frame) catch return; 125 + return; 126 + } 127 + 128 + // send outdated cursor info if needed (cursor=0 means "from beginning", not outdated) 129 + if (cursor > 0) { 130 + const info_frame = try label_mod.encodeEventFrame(self.allocator, 1, "#info", .{ .map = &.{ 131 + .{ .key = "message", .value = .{ .text = "OutdatedCursor" } }, 132 + .{ .key = "name", .value = .{ .text = "OutdatedCursor" } }, 133 + } }); 134 + defer self.allocator.free(info_frame); 135 + conn.writeBin(info_frame) catch return; 136 + } 137 + 138 + // send stored labels in batches 139 + var cur = cursor; 140 + while (true) { 141 + const labels = self.store.queryByCursor(self.allocator, cur, 100) catch return; 142 + defer { 143 + for (labels) |item| { 144 + self.allocator.free(item.label.src); 145 + self.allocator.free(item.label.uri); 146 + self.allocator.free(item.label.val); 147 + self.allocator.free(item.label.cts); 148 + self.allocator.free(item.encoded); 149 + if (item.label.sig) |s| self.allocator.free(s); 150 + if (item.label.cid) |ci| self.allocator.free(ci); 151 + if (item.label.exp) |e| self.allocator.free(e); 152 + } 153 + self.allocator.free(labels); 154 + } 155 + 156 + if (labels.len == 0) break; 157 + 158 + for (labels) |stored| { 159 + const frame = self.buildLabelsFrame(stored.seq, stored.encoded) catch continue; 160 + defer self.allocator.free(frame); 161 + conn.writeBin(frame) catch return; 162 + } 163 + 164 + cur = labels[labels.len - 1].seq; 165 + } 166 + } 167 + 168 + /// handle an HTTP request (non-WebSocket). 169 + pub fn handleHttp( 170 + self: *Server, 171 + conn: *websocket.Conn, 172 + method: []const u8, 173 + url: []const u8, 174 + ) void { 175 + // split path and query 176 + const qmark = std.mem.indexOfScalar(u8, url, '?'); 177 + const path = url[0..(qmark orelse url.len)]; 178 + const query = if (qmark) |q| url[q + 1 ..] else ""; 179 + 180 + if (std.mem.eql(u8, method, "GET") and 181 + std.mem.eql(u8, path, "/xrpc/com.atproto.label.queryLabels")) 182 + { 183 + self.handleQueryLabels(conn, query); 184 + } else if (std.mem.eql(u8, method, "GET") and std.mem.eql(u8, path, "/health")) { 185 + httpRespond(conn, "200 OK", "application/json", "{\"status\":\"ok\"}"); 186 + } else { 187 + httpRespond(conn, "404 Not Found", "application/json", 188 + \\{"error":"NotFound","message":"endpoint not found"} 189 + ); 190 + } 191 + } 192 + 193 + fn handleQueryLabels(self: *Server, conn: *websocket.Conn, query: []const u8) void { 194 + // parse uriPatterns param 195 + const uri = queryParam(query, "uriPatterns") orelse { 196 + httpRespond(conn, "400 Bad Request", "application/json", 197 + \\{"error":"InvalidRequest","message":"uriPatterns parameter required"} 198 + ); 199 + return; 200 + }; 201 + 202 + const labels = self.store.queryBySubject(self.allocator, uri) catch { 203 + httpRespond(conn, "500 Internal Server Error", "application/json", 204 + \\{"error":"InternalError","message":"database query failed"} 205 + ); 206 + return; 207 + }; 208 + defer { 209 + for (labels) |item| { 210 + self.allocator.free(item.label.src); 211 + self.allocator.free(item.label.uri); 212 + self.allocator.free(item.label.val); 213 + self.allocator.free(item.label.cts); 214 + self.allocator.free(item.encoded); 215 + if (item.label.sig) |s| self.allocator.free(s); 216 + if (item.label.cid) |ci| self.allocator.free(ci); 217 + if (item.label.exp) |e| self.allocator.free(e); 218 + } 219 + self.allocator.free(labels); 220 + } 221 + 222 + // build JSON response 223 + var buf: std.ArrayList(u8) = .{}; 224 + defer buf.deinit(self.allocator); 225 + const w = buf.writer(self.allocator); 226 + 227 + w.writeAll("{\"labels\":[") catch return; 228 + 229 + for (labels, 0..) |stored, i| { 230 + if (i > 0) w.writeByte(',') catch return; 231 + writeJsonLabel(w, &stored) catch return; 232 + } 233 + 234 + w.writeAll("]}") catch return; 235 + 236 + httpRespond(conn, "200 OK", "application/json", buf.items); 237 + } 238 + }; 239 + 240 + const Subscriber = struct { 241 + conn: *websocket.Conn, 242 + }; 243 + 244 + /// websocket handler for subscribeLabels 245 + pub fn Handler(comptime ServerT: type) type { 246 + return struct { 247 + server: *ServerT, 248 + conn: *websocket.Conn, 249 + cursor: ?i64 = null, 250 + 251 + const Self = @This(); 252 + 253 + pub fn init(handshake: *const websocket.Handshake, conn: *websocket.Conn, ctx: *ServerT) !Self { 254 + _ = handshake; 255 + return .{ 256 + .server = ctx, 257 + .conn = conn, 258 + }; 259 + } 260 + 261 + pub fn afterInit(self: *Self, _: *ServerT) !void { 262 + self.server.addSubscriber(self.conn, self.cursor); 263 + } 264 + 265 + pub fn clientMessage(_: *Self, _: []const u8) !void { 266 + // labeler is write-only to subscribers 267 + } 268 + 269 + pub fn close(self: *Self) void { 270 + self.server.removeSubscriber(self.conn); 271 + } 272 + 273 + pub fn httpFallback(conn: *websocket.Conn, method: []const u8, url: []const u8, _: []const u8, _: *const websocket.Handshake.KeyValue, ctx_ptr: ?*anyopaque) void { 274 + const ctx: *ServerT = @ptrCast(@alignCast(ctx_ptr.?)); 275 + ctx.handleHttp(conn, method, url); 276 + } 277 + }; 278 + } 279 + 280 + // === HTTP helpers === 281 + 282 + fn httpRespond(conn: *websocket.Conn, status: []const u8, content_type: []const u8, body: []const u8) void { 283 + var buf: [512]u8 = undefined; 284 + const header = std.fmt.bufPrint(&buf, "HTTP/1.1 {s}\r\nContent-Type: {s}\r\nContent-Length: {d}\r\nAccess-Control-Allow-Origin: *\r\n\r\n", .{ 285 + status, content_type, body.len, 286 + }) catch return; 287 + conn.writeFramed(header) catch return; 288 + if (body.len > 0) conn.writeFramed(body) catch return; 289 + } 290 + 291 + fn queryParam(query: []const u8, name: []const u8) ?[]const u8 { 292 + var it = std.mem.splitScalar(u8, query, '&'); 293 + while (it.next()) |pair| { 294 + const eq = std.mem.indexOfScalar(u8, pair, '=') orelse continue; 295 + if (std.mem.eql(u8, pair[0..eq], name)) { 296 + return pair[eq + 1 ..]; 297 + } 298 + } 299 + return null; 300 + } 301 + 302 + fn writeJsonLabel(w: anytype, stored: *const store_mod.StoredLabel) !void { 303 + const lbl = &stored.label; 304 + try w.writeAll("{"); 305 + try w.print("\"ver\":{d}", .{lbl.ver}); 306 + try w.writeAll(",\"src\":\""); 307 + try w.writeAll(lbl.src); 308 + try w.writeAll("\",\"uri\":\""); 309 + try w.writeAll(lbl.uri); 310 + try w.writeByte('"'); 311 + 312 + if (lbl.cid) |cid| { 313 + try w.writeAll(",\"cid\":\""); 314 + try w.writeAll(cid); 315 + try w.writeByte('"'); 316 + } 317 + 318 + try w.writeAll(",\"val\":\""); 319 + try w.writeAll(lbl.val); 320 + try w.writeAll("\",\"cts\":\""); 321 + try w.writeAll(lbl.cts); 322 + try w.writeByte('"'); 323 + 324 + if (lbl.neg) try w.writeAll(",\"neg\":true"); 325 + if (lbl.exp) |exp| { 326 + try w.writeAll(",\"exp\":\""); 327 + try w.writeAll(exp); 328 + try w.writeByte('"'); 329 + } 330 + 331 + // sig as base64 332 + if (lbl.sig) |sig| { 333 + try w.writeAll(",\"sig\":\""); 334 + const encoder = std.base64.standard.Encoder; 335 + var b64_buf: [88]u8 = undefined; // 64 bytes → 88 chars 336 + const b64_len = encoder.calcSize(sig.len); 337 + const b64 = encoder.encode(b64_buf[0..b64_len], sig); 338 + try w.writeAll(b64); 339 + try w.writeByte('"'); 340 + } 341 + 342 + try w.writeByte('}'); 343 + } 344 + 345 + // === tests === 346 + 347 + test "query param parsing" { 348 + try std.testing.expectEqualStrings("hello", queryParam("foo=hello&bar=world", "foo").?); 349 + try std.testing.expectEqualStrings("world", queryParam("foo=hello&bar=world", "bar").?); 350 + try std.testing.expect(queryParam("foo=hello", "bar") == null); 351 + try std.testing.expect(queryParam("", "foo") == null); 352 + } 353 + 354 + test "event frame structure" { 355 + const allocator = std.testing.allocator; 356 + 357 + const payload: cbor.Value = .{ .map = &.{ 358 + .{ .key = "seq", .value = .{ .unsigned = 1 } }, 359 + .{ .key = "labels", .value = .{ .array = &.{} } }, 360 + } }; 361 + 362 + const frame = try label_mod.encodeEventFrame(allocator, 1, "#labels", payload); 363 + defer allocator.free(frame); 364 + 365 + // verify we can decode header + payload from the frame 366 + var arena = std.heap.ArenaAllocator.init(allocator); 367 + defer arena.deinit(); 368 + const alloc = arena.allocator(); 369 + 370 + const header = try cbor.decode(alloc, frame); 371 + try std.testing.expectEqual(@as(u64, 1), header.value.getUint("op").?); 372 + try std.testing.expectEqualStrings("#labels", header.value.getString("t").?); 373 + 374 + const body = try cbor.decodeAll(alloc, frame[header.consumed..]); 375 + try std.testing.expectEqual(@as(u64, 1), body.getUint("seq").?); 376 + }
+333
src/store.zig
··· 1 + //! SQLite-backed label store 2 + //! 3 + //! stores signed labels with auto-incrementing sequence numbers. 4 + //! supports cursor-based retrieval for subscribeLabels backfill 5 + //! and subject-filtered queries for queryLabels. 6 + 7 + const std = @import("std"); 8 + const c = @cImport(@cInclude("sqlite3.h")); 9 + const label_mod = @import("label.zig"); 10 + const Label = label_mod.Label; 11 + 12 + const Allocator = std.mem.Allocator; 13 + const log = std.log.scoped(.store); 14 + 15 + pub const StoredLabel = struct { 16 + seq: i64, 17 + label: Label, 18 + /// pre-encoded signed CBOR (stored as blob, avoids re-encoding) 19 + encoded: []const u8, 20 + }; 21 + 22 + pub const Store = struct { 23 + db: *c.sqlite3, 24 + insert_stmt: *c.sqlite3_stmt, 25 + query_cursor_stmt: *c.sqlite3_stmt, 26 + query_subject_stmt: *c.sqlite3_stmt, 27 + latest_seq_stmt: *c.sqlite3_stmt, 28 + 29 + pub fn init(path: [*:0]const u8) !Store { 30 + var db: ?*c.sqlite3 = null; 31 + if (c.sqlite3_open(path, &db) != c.SQLITE_OK) { 32 + if (db) |d| log.err("sqlite open: {s}", .{c.sqlite3_errmsg(d)}); 33 + return error.SqliteOpen; 34 + } 35 + 36 + const d = db.?; 37 + 38 + // WAL mode for concurrent reads during writes 39 + if (execSimple(d, "PRAGMA journal_mode=WAL") != c.SQLITE_OK) return error.SqlitePragma; 40 + 41 + // create table 42 + if (execSimple(d, 43 + \\CREATE TABLE IF NOT EXISTS labels ( 44 + \\ seq INTEGER PRIMARY KEY AUTOINCREMENT, 45 + \\ src TEXT NOT NULL, 46 + \\ uri TEXT NOT NULL, 47 + \\ cid TEXT, 48 + \\ val TEXT NOT NULL, 49 + \\ neg INTEGER NOT NULL DEFAULT 0, 50 + \\ cts TEXT NOT NULL, 51 + \\ exp TEXT, 52 + \\ sig BLOB NOT NULL, 53 + \\ encoded BLOB NOT NULL 54 + \\) 55 + ) != c.SQLITE_OK) return error.SqliteSchema; 56 + 57 + // index on uri for queryLabels filtering 58 + if (execSimple(d, 59 + \\CREATE INDEX IF NOT EXISTS idx_labels_uri ON labels(uri) 60 + ) != c.SQLITE_OK) return error.SqliteSchema; 61 + 62 + return .{ 63 + .db = d, 64 + .insert_stmt = try prepare(d, 65 + \\INSERT INTO labels (src, uri, cid, val, neg, cts, exp, sig, encoded) 66 + \\VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9) 67 + ), 68 + .query_cursor_stmt = try prepare(d, 69 + \\SELECT seq, src, uri, cid, val, neg, cts, exp, sig, encoded 70 + \\FROM labels WHERE seq > ?1 ORDER BY seq ASC LIMIT ?2 71 + ), 72 + .query_subject_stmt = try prepare(d, 73 + \\SELECT seq, src, uri, cid, val, neg, cts, exp, sig, encoded 74 + \\FROM labels WHERE uri = ?1 ORDER BY seq ASC 75 + ), 76 + .latest_seq_stmt = try prepare(d, 77 + \\SELECT COALESCE(MAX(seq), 0) FROM labels 78 + ), 79 + }; 80 + } 81 + 82 + pub fn deinit(self: *Store) void { 83 + _ = c.sqlite3_finalize(self.insert_stmt); 84 + _ = c.sqlite3_finalize(self.query_cursor_stmt); 85 + _ = c.sqlite3_finalize(self.query_subject_stmt); 86 + _ = c.sqlite3_finalize(self.latest_seq_stmt); 87 + _ = c.sqlite3_close(self.db); 88 + } 89 + 90 + /// insert a signed label, returns the assigned sequence number. 91 + pub fn insert(self: *Store, lbl: *const Label, encoded: []const u8) !i64 { 92 + const stmt = self.insert_stmt; 93 + defer _ = c.sqlite3_reset(stmt); 94 + 95 + bindText(stmt, 1, lbl.src); 96 + bindText(stmt, 2, lbl.uri); 97 + if (lbl.cid) |cid| bindText(stmt, 3, cid) else _ = c.sqlite3_bind_null(stmt, 3); 98 + bindText(stmt, 4, lbl.val); 99 + _ = c.sqlite3_bind_int(stmt, 5, @intFromBool(lbl.neg)); 100 + bindText(stmt, 6, lbl.cts); 101 + if (lbl.exp) |exp| bindText(stmt, 7, exp) else _ = c.sqlite3_bind_null(stmt, 7); 102 + if (lbl.sig) |sig| { 103 + _ = c.sqlite3_bind_blob(stmt, 8, sig.ptr, @intCast(sig.len), c.SQLITE_STATIC); 104 + } else { 105 + return error.UnsignedLabel; 106 + } 107 + _ = c.sqlite3_bind_blob(stmt, 9, encoded.ptr, @intCast(encoded.len), c.SQLITE_STATIC); 108 + 109 + if (c.sqlite3_step(stmt) != c.SQLITE_DONE) { 110 + log.err("sqlite insert: {s}", .{c.sqlite3_errmsg(self.db)}); 111 + return error.SqliteInsert; 112 + } 113 + 114 + return c.sqlite3_last_insert_rowid(self.db); 115 + } 116 + 117 + /// get labels after a cursor (sequence number), up to limit. 118 + pub fn queryByCursor(self: *Store, allocator: Allocator, cursor: i64, limit: i64) ![]StoredLabel { 119 + const stmt = self.query_cursor_stmt; 120 + defer _ = c.sqlite3_reset(stmt); 121 + 122 + _ = c.sqlite3_bind_int64(stmt, 1, cursor); 123 + _ = c.sqlite3_bind_int64(stmt, 2, limit); 124 + 125 + return self.collectRows(allocator, stmt); 126 + } 127 + 128 + /// get labels for a specific subject URI. 129 + pub fn queryBySubject(self: *Store, allocator: Allocator, uri: []const u8) ![]StoredLabel { 130 + const stmt = self.query_subject_stmt; 131 + defer _ = c.sqlite3_reset(stmt); 132 + 133 + bindText(stmt, 1, uri); 134 + 135 + return self.collectRows(allocator, stmt); 136 + } 137 + 138 + /// get the latest sequence number (0 if empty). 139 + pub fn latestSeq(self: *Store) i64 { 140 + defer _ = c.sqlite3_reset(self.latest_seq_stmt); 141 + if (c.sqlite3_step(self.latest_seq_stmt) == c.SQLITE_ROW) { 142 + return c.sqlite3_column_int64(self.latest_seq_stmt, 0); 143 + } 144 + return 0; 145 + } 146 + 147 + fn collectRows(self: *Store, allocator: Allocator, stmt: *c.sqlite3_stmt) ![]StoredLabel { 148 + _ = self; 149 + var results: std.ArrayList(StoredLabel) = .{}; 150 + errdefer { 151 + for (results.items) |item| { 152 + allocator.free(item.encoded); 153 + } 154 + results.deinit(allocator); 155 + } 156 + 157 + while (c.sqlite3_step(stmt) == c.SQLITE_ROW) { 158 + const encoded_ptr = c.sqlite3_column_blob(stmt, 9); 159 + const encoded_len: usize = @intCast(c.sqlite3_column_bytes(stmt, 9)); 160 + 161 + const encoded = try allocator.alloc(u8, encoded_len); 162 + if (encoded_ptr) |ptr| { 163 + @memcpy(encoded, @as([*]const u8, @ptrCast(ptr))[0..encoded_len]); 164 + } 165 + 166 + const sig_ptr = c.sqlite3_column_blob(stmt, 8); 167 + const sig_len: usize = @intCast(c.sqlite3_column_bytes(stmt, 8)); 168 + var sig_slice: ?[]const u8 = null; 169 + if (sig_ptr != null and sig_len > 0) { 170 + const sig_copy = try allocator.alloc(u8, sig_len); 171 + @memcpy(sig_copy, @as([*]const u8, @ptrCast(sig_ptr.?))[0..sig_len]); 172 + sig_slice = sig_copy; 173 + } 174 + 175 + try results.append(allocator, .{ 176 + .seq = c.sqlite3_column_int64(stmt, 0), 177 + .label = .{ 178 + .src = try dupeColumnText(allocator, stmt, 1), 179 + .uri = try dupeColumnText(allocator, stmt, 2), 180 + .cid = try dupeColumnTextOpt(allocator, stmt, 3), 181 + .val = try dupeColumnText(allocator, stmt, 4), 182 + .neg = c.sqlite3_column_int(stmt, 5) != 0, 183 + .cts = try dupeColumnText(allocator, stmt, 6), 184 + .exp = try dupeColumnTextOpt(allocator, stmt, 7), 185 + .sig = sig_slice, 186 + }, 187 + .encoded = encoded, 188 + }); 189 + } 190 + 191 + return try results.toOwnedSlice(allocator); 192 + } 193 + }; 194 + 195 + fn execSimple(db: *c.sqlite3, sql: [*:0]const u8) c_int { 196 + return c.sqlite3_exec(db, sql, null, null, null); 197 + } 198 + 199 + fn prepare(db: *c.sqlite3, sql: [*:0]const u8) !*c.sqlite3_stmt { 200 + var stmt: ?*c.sqlite3_stmt = null; 201 + if (c.sqlite3_prepare_v2(db, sql, -1, &stmt, null) != c.SQLITE_OK) { 202 + log.err("sqlite prepare: {s}", .{c.sqlite3_errmsg(db)}); 203 + return error.SqlitePrepare; 204 + } 205 + return stmt.?; 206 + } 207 + 208 + fn bindText(stmt: *c.sqlite3_stmt, col: c_int, text: []const u8) void { 209 + _ = c.sqlite3_bind_text(stmt, col, text.ptr, @intCast(text.len), c.SQLITE_STATIC); 210 + } 211 + 212 + fn dupeColumnText(allocator: Allocator, stmt: *c.sqlite3_stmt, col: c_int) ![]const u8 { 213 + const ptr = c.sqlite3_column_text(stmt, col); 214 + const len: usize = @intCast(c.sqlite3_column_bytes(stmt, col)); 215 + if (ptr == null or len == 0) return try allocator.dupe(u8, ""); 216 + const slice = @as([*]const u8, @ptrCast(ptr.?))[0..len]; 217 + return try allocator.dupe(u8, slice); 218 + } 219 + 220 + fn dupeColumnTextOpt(allocator: Allocator, stmt: *c.sqlite3_stmt, col: c_int) !?[]const u8 { 221 + if (c.sqlite3_column_type(stmt, col) == c.SQLITE_NULL) return null; 222 + return try dupeColumnText(allocator, stmt, col); 223 + } 224 + 225 + // === tests === 226 + 227 + test "store insert and query by cursor" { 228 + var store = try Store.init(":memory:"); 229 + defer store.deinit(); 230 + 231 + const allocator = std.testing.allocator; 232 + 233 + var label1 = Label{ 234 + .src = "did:plc:labeler", 235 + .uri = "at://did:plc:user1/app.bsky.feed.post/aaa", 236 + .val = "spam", 237 + .cts = "2024-01-01T00:00:00.000Z", 238 + .sig = &(.{0xaa} ** 64), 239 + }; 240 + const seq1 = try store.insert(&label1, "encoded1"); 241 + 242 + var label2 = Label{ 243 + .src = "did:plc:labeler", 244 + .uri = "at://did:plc:user2/app.bsky.feed.post/bbb", 245 + .val = "impersonation", 246 + .cts = "2024-01-01T00:01:00.000Z", 247 + .sig = &(.{0xbb} ** 64), 248 + }; 249 + const seq2 = try store.insert(&label2, "encoded2"); 250 + 251 + try std.testing.expect(seq2 > seq1); 252 + try std.testing.expectEqual(seq2, store.latestSeq()); 253 + 254 + // query from cursor=0 should return both 255 + const all = try store.queryByCursor(allocator, 0, 100); 256 + defer { 257 + for (all) |item| { 258 + allocator.free(item.label.src); 259 + allocator.free(item.label.uri); 260 + allocator.free(item.label.val); 261 + allocator.free(item.label.cts); 262 + allocator.free(item.encoded); 263 + if (item.label.sig) |s| allocator.free(s); 264 + } 265 + allocator.free(all); 266 + } 267 + try std.testing.expectEqual(@as(usize, 2), all.len); 268 + 269 + // query from cursor=seq1 should return only the second 270 + const after = try store.queryByCursor(allocator, seq1, 100); 271 + defer { 272 + for (after) |item| { 273 + allocator.free(item.label.src); 274 + allocator.free(item.label.uri); 275 + allocator.free(item.label.val); 276 + allocator.free(item.label.cts); 277 + allocator.free(item.encoded); 278 + if (item.label.sig) |s| allocator.free(s); 279 + } 280 + allocator.free(after); 281 + } 282 + try std.testing.expectEqual(@as(usize, 1), after.len); 283 + try std.testing.expectEqualStrings("impersonation", after[0].label.val); 284 + } 285 + 286 + test "store query by subject" { 287 + var store = try Store.init(":memory:"); 288 + defer store.deinit(); 289 + 290 + const allocator = std.testing.allocator; 291 + 292 + const target_uri = "at://did:plc:user1/app.bsky.feed.post/aaa"; 293 + var label1 = Label{ 294 + .src = "did:plc:labeler", 295 + .uri = target_uri, 296 + .val = "spam", 297 + .cts = "2024-01-01T00:00:00.000Z", 298 + .sig = &(.{0xaa} ** 64), 299 + }; 300 + _ = try store.insert(&label1, "e1"); 301 + 302 + var label2 = Label{ 303 + .src = "did:plc:labeler", 304 + .uri = "at://did:plc:other/app.bsky.feed.post/bbb", 305 + .val = "nsfw", 306 + .cts = "2024-01-01T00:00:01.000Z", 307 + .sig = &(.{0xbb} ** 64), 308 + }; 309 + _ = try store.insert(&label2, "e2"); 310 + 311 + const results = try store.queryBySubject(allocator, target_uri); 312 + defer { 313 + for (results) |item| { 314 + allocator.free(item.label.src); 315 + allocator.free(item.label.uri); 316 + allocator.free(item.label.val); 317 + allocator.free(item.label.cts); 318 + allocator.free(item.encoded); 319 + if (item.label.sig) |s| allocator.free(s); 320 + } 321 + allocator.free(results); 322 + } 323 + 324 + try std.testing.expectEqual(@as(usize, 1), results.len); 325 + try std.testing.expectEqualStrings("spam", results[0].label.val); 326 + } 327 + 328 + test "store empty returns zero seq" { 329 + var store = try Store.init(":memory:"); 330 + defer store.deinit(); 331 + 332 + try std.testing.expectEqual(@as(i64, 0), store.latestSeq()); 333 + }