atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

getRepo: 302 redirect to PDS for indigo compatibility

getRepo is a PDS endpoint per the sync spec, but indigo's relay
implements it as a 302 redirect. adds the same behavior here —
looks up the account's PDS hostname and redirects the client.

also fixes README: distinguishes spec-mandated relay endpoints
from indigo-compat endpoints, corrects LRU eviction description.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz a931853b f851c30d

+47 -3
+4 -2
README.md
··· 8 8 9 9 - **direct PDS crawl** — the bootstrap relay is called once at startup for the host list via `listHosts`, then all data flows directly from each PDS. 10 10 11 - - **optimistic signature validation** — on signing key cache miss, the frame passes through immediately and the DID is queued for background resolution. all subsequent commits are verified against the cached key. the cache caps at a configurable size and evicts the oldest 10% by resolve time when full. 11 + - **optimistic signature validation** — on signing key cache miss, the frame passes through immediately and the DID is queued for background resolution. all subsequent commits are verified against the cached key. the cache caps at a configurable size and evicts the least recently used entry when full. 12 12 13 13 - **inline collection index** — indexes `(DID, collection)` pairs in the event processing pipeline using RocksDB. serves `listReposByCollection` from the relay process — no sidecar. the index design draws on [fig](https://tangled.org/microcosm.blue)'s work on [lightrail](https://tangled.org/microcosm.blue/lightrail). 14 14 ··· 16 16 17 17 ## spec compliance 18 18 19 - implements the [AT Protocol sync spec](https://atproto.com/specs/sync) — `subscribeRepos`, `listRepos`, `getRepoStatus`, `getLatestCommit`, `listReposByCollection`, `listHosts`, `getHostStatus`, and `requestCrawl`. 19 + implements the relay endpoints from the [AT Protocol sync spec](https://atproto.com/specs/sync): `subscribeRepos`, `listRepos`, `getRepoStatus`, `listHosts`, `getHostStatus`, and `requestCrawl`. 20 + 21 + also implements `getLatestCommit`, `listReposByCollection`, and `getRepo` (302 redirect to PDS) for compatibility with the [indigo relay](https://github.com/bluesky-social/indigo) reference implementation. 20 22 21 23 ## dependencies 22 24
+6
src/api/http.zig
··· 28 28 httpRespond(conn, status, "text/plain", body); 29 29 } 30 30 31 + pub fn respondRedirect(conn: *Conn, location: []const u8) void { 32 + var buf: [1024]u8 = undefined; 33 + const header = std.fmt.bufPrint(&buf, "HTTP/1.1 302 Found\r\nLocation: {s}\r\nContent-Length: 0\r\nConnection: close\r\nServer: zlay\r\n\r\n", .{location}) catch return; 34 + conn.writeFramed(header) catch return; 35 + } 36 + 31 37 pub fn httpStatusLine(status: http.Status) []const u8 { 32 38 return switch (status) { 33 39 .ok => "200 OK",
+2
src/api/router.zig
··· 73 73 xrpc.handleListRepos(conn, query, ctx.persist); 74 74 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getRepoStatus")) { 75 75 xrpc.handleGetRepoStatus(conn, query, ctx.persist); 76 + } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getRepo")) { 77 + xrpc.handleGetRepo(conn, query, ctx.persist); 76 78 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getLatestCommit")) { 77 79 xrpc.handleGetLatestCommit(conn, query, ctx.persist); 78 80 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listReposByCollection")) {
+35 -1
src/api/xrpc.zig
··· 1 1 //! XRPC endpoint handlers for the AT Protocol sync API. 2 2 //! 3 3 //! implements com.atproto.sync.* lexicon endpoints: 4 - //! listRepos, getRepoStatus, getLatestCommit, listReposByCollection, 4 + //! listRepos, getRepo, getRepoStatus, getLatestCommit, listReposByCollection, 5 5 //! listHosts, getHostStatus, requestCrawl 6 6 7 7 const std = @import("std"); ··· 167 167 168 168 w.writeByte('}') catch return; 169 169 h.respondJson(conn, .ok, fbs.getWritten()); 170 + } 171 + 172 + pub fn handleGetRepo(conn: *h.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 173 + var did_buf: [256]u8 = undefined; 174 + const did = h.queryParamDecoded(query, "did", &did_buf) orelse { 175 + h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"did parameter required\"}"); 176 + return; 177 + }; 178 + 179 + if (!std.mem.startsWith(u8, did, "did:")) { 180 + h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid DID\"}"); 181 + return; 182 + } 183 + 184 + // look up the PDS hostname for this account 185 + var row = (persist.db.rowUnsafe( 186 + "SELECT h.hostname FROM account a JOIN host h ON a.host_id = h.id WHERE a.did = $1 AND a.host_id > 0", 187 + .{did}, 188 + ) catch { 189 + h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 190 + return; 191 + }) orelse { 192 + h.respondJson(conn, .not_found, "{\"error\":\"RepoNotFound\",\"message\":\"account not found\"}"); 193 + return; 194 + }; 195 + defer row.deinit() catch {}; 196 + 197 + const hostname = row.get([]const u8, 0); 198 + 199 + // build redirect URL: https://{hostname}/xrpc/com.atproto.sync.getRepo?did={did} 200 + var url_buf: [512]u8 = undefined; 201 + const url = std.fmt.bufPrint(&url_buf, "https://{s}/xrpc/com.atproto.sync.getRepo?did={s}", .{ hostname, did }) catch return; 202 + 203 + h.respondRedirect(conn, url); 170 204 } 171 205 172 206 pub fn handleGetLatestCommit(conn: *h.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void {