this repo has no description
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Remove Rust appview crate, replaced by Elixir appview [CL-290]

Delete crates/opake-appview/ (~3100 LOC Rust), Containerfile.appview,
and workspace member. Update CONTRIBUTING.md to reflect the new Elixir
appview at appview/.

+10 -3244
+8 -8
CONTRIBUTING.md
··· 15 15 - `cargo fmt` before every commit (enforced by CI and pre-commit hook) 16 16 - `cargo clippy -- -D warnings` must pass 17 17 - No `unwrap()` in library code — use `?` and proper error types 18 - - `opake-core` uses `thiserror` for typed errors; `opake-cli` and `opake-appview` use `anyhow` for application errors 18 + - `opake-core` uses `thiserror` for typed errors; `opake-cli` uses `anyhow` for application errors 19 19 - Prefer `&str` parameters, `String` for owned data 20 20 - Avoid `.clone()` unless necessary 21 21 ··· 35 35 - clap command definitions 36 36 - FileStorage (impl Storage over filesystem, TOML + JSON) 37 37 - user interaction (prompts, formatting) 38 - 39 - opake-appview indexer + REST API for grant/keyring discovery 40 - - Jetstream firehose consumer 41 - - SQLite storage (WAL mode) 42 - - Axum API with DID-scoped Ed25519 auth 43 - - rate limiting via tower_governor 44 38 45 39 opake-derive proc-macro crate 46 40 - #[derive(RedactedDebug)] with #[redact] field attribute 47 41 - generates Debug impls showing byte length instead of content 48 42 - used by opake-core (ContentKey, Session) and opake-cli (Identity) 49 43 44 + appview/ Elixir/Phoenix indexer + REST API for grant/keyring discovery 45 + - Jetstream firehose consumer (WebSockex) 46 + - PostgreSQL storage (Ecto) 47 + - Phoenix API with DID-scoped Ed25519 auth (Erlang :crypto) 48 + - rate limiting via Hammer 49 + 50 50 web/ React SPA (Vite + TanStack Router + Tailwind/daisyUI) 51 51 - opake-core via WASM (wasm-pack build) 52 52 - IndexedDbStorage (impl Storage over Dexie.js/IndexedDB) ··· 71 71 cargo test # all Rust tests 72 72 cargo test -p opake-core # core only 73 73 cargo test -p opake-cli # CLI only 74 - cargo test -p opake-appview # appview only 75 74 cargo test -- --test-output # show println output 76 75 76 + cd appview && mix test # appview tests (Elixir/ExUnit) 77 77 cd web && bun run test # web frontend tests (Vitest + fake-indexeddb) 78 78 ``` 79 79
+1 -561
Cargo.lock
··· 56 56 ] 57 57 58 58 [[package]] 59 - name = "allocator-api2" 60 - version = "0.2.21" 61 - source = "registry+https://github.com/rust-lang/crates.io-index" 62 - checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" 63 - 64 - [[package]] 65 59 name = "android_system_properties" 66 60 version = "0.1.5" 67 61 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 172 166 ] 173 167 174 168 [[package]] 175 - name = "axum" 176 - version = "0.8.8" 177 - source = "registry+https://github.com/rust-lang/crates.io-index" 178 - checksum = "8b52af3cb4058c895d37317bb27508dccc8e5f2d39454016b297bf4a400597b8" 179 - dependencies = [ 180 - "axum-core", 181 - "bytes", 182 - "form_urlencoded", 183 - "futures-util", 184 - "http", 185 - "http-body", 186 - "http-body-util", 187 - "hyper", 188 - "hyper-util", 189 - "itoa", 190 - "matchit", 191 - "memchr", 192 - "mime", 193 - "percent-encoding", 194 - "pin-project-lite", 195 - "serde_core", 196 - "serde_json", 197 - "serde_path_to_error", 198 - "serde_urlencoded", 199 - "sync_wrapper", 200 - "tokio", 201 - "tower", 202 - "tower-layer", 203 - "tower-service", 204 - "tracing", 205 - ] 206 - 207 - [[package]] 208 - name = "axum-core" 209 - version = "0.5.6" 210 - source = "registry+https://github.com/rust-lang/crates.io-index" 211 - checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1" 212 - dependencies = [ 213 - "bytes", 214 - "futures-core", 215 - "http", 216 - "http-body", 217 - "http-body-util", 218 - "mime", 219 - "pin-project-lite", 220 - "sync_wrapper", 221 - "tower-layer", 222 - "tower-service", 223 - "tracing", 224 - ] 225 - 226 - [[package]] 227 169 name = "base16ct" 228 170 version = "0.2.0" 229 171 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 438 380 ] 439 381 440 382 [[package]] 441 - name = "crossbeam-utils" 442 - version = "0.8.21" 443 - source = "registry+https://github.com/rust-lang/crates.io-index" 444 - checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" 445 - 446 - [[package]] 447 383 name = "crypto-bigint" 448 384 version = "0.5.5" 449 385 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 503 439 ] 504 440 505 441 [[package]] 506 - name = "dashmap" 507 - version = "6.1.0" 508 - source = "registry+https://github.com/rust-lang/crates.io-index" 509 - checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" 510 - dependencies = [ 511 - "cfg-if", 512 - "crossbeam-utils", 513 - "hashbrown 0.14.5", 514 - "lock_api", 515 - "once_cell", 516 - "parking_lot_core", 517 - ] 518 - 519 - [[package]] 520 442 name = "data-encoding" 521 443 version = "2.10.0" 522 444 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 676 598 ] 677 599 678 600 [[package]] 679 - name = "fallible-iterator" 680 - version = "0.3.0" 681 - source = "registry+https://github.com/rust-lang/crates.io-index" 682 - checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" 683 - 684 - [[package]] 685 - name = "fallible-streaming-iterator" 686 - version = "0.1.9" 687 - source = "registry+https://github.com/rust-lang/crates.io-index" 688 - checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" 689 - 690 - [[package]] 691 601 name = "fastrand" 692 602 version = "2.3.0" 693 603 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 716 626 checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" 717 627 718 628 [[package]] 719 - name = "fnv" 720 - version = "1.0.7" 721 - source = "registry+https://github.com/rust-lang/crates.io-index" 722 - checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" 723 - 724 - [[package]] 725 - name = "foldhash" 726 - version = "0.2.0" 727 - source = "registry+https://github.com/rust-lang/crates.io-index" 728 - checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" 729 - 730 - [[package]] 731 629 name = "form_urlencoded" 732 630 version = "1.2.2" 733 631 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 737 635 ] 738 636 739 637 [[package]] 740 - name = "forwarded-header-value" 741 - version = "0.1.1" 742 - source = "registry+https://github.com/rust-lang/crates.io-index" 743 - checksum = "8835f84f38484cc86f110a805655697908257fb9a7af005234060891557198e9" 744 - dependencies = [ 745 - "nonempty", 746 - "thiserror 1.0.69", 747 - ] 748 - 749 - [[package]] 750 638 name = "fs_extra" 751 639 version = "1.3.0" 752 640 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 774 662 checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" 775 663 776 664 [[package]] 777 - name = "futures-macro" 778 - version = "0.3.32" 779 - source = "registry+https://github.com/rust-lang/crates.io-index" 780 - checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" 781 - dependencies = [ 782 - "proc-macro2", 783 - "quote", 784 - "syn", 785 - ] 786 - 787 - [[package]] 788 - name = "futures-sink" 789 - version = "0.3.32" 790 - source = "registry+https://github.com/rust-lang/crates.io-index" 791 - checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893" 792 - 793 - [[package]] 794 665 name = "futures-task" 795 666 version = "0.3.32" 796 667 source = "registry+https://github.com/rust-lang/crates.io-index" 797 668 checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393" 798 669 799 670 [[package]] 800 - name = "futures-timer" 801 - version = "3.0.3" 802 - source = "registry+https://github.com/rust-lang/crates.io-index" 803 - checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" 804 - 805 - [[package]] 806 671 name = "futures-util" 807 672 version = "0.3.32" 808 673 source = "registry+https://github.com/rust-lang/crates.io-index" 809 674 checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" 810 675 dependencies = [ 811 676 "futures-core", 812 - "futures-macro", 813 - "futures-sink", 814 677 "futures-task", 815 678 "pin-project-lite", 816 679 "slab", ··· 865 728 ] 866 729 867 730 [[package]] 868 - name = "governor" 869 - version = "0.10.4" 870 - source = "registry+https://github.com/rust-lang/crates.io-index" 871 - checksum = "9efcab3c1958580ff1f25a2a41be1668f7603d849bb63af523b208a3cc1223b8" 872 - dependencies = [ 873 - "cfg-if", 874 - "dashmap", 875 - "futures-sink", 876 - "futures-timer", 877 - "futures-util", 878 - "getrandom 0.3.4", 879 - "hashbrown 0.16.1", 880 - "nonzero_ext", 881 - "parking_lot", 882 - "portable-atomic", 883 - "quanta", 884 - "rand 0.9.2", 885 - "smallvec", 886 - "spinning_top", 887 - "web-time", 888 - ] 889 - 890 - [[package]] 891 731 name = "group" 892 732 version = "0.13.0" 893 733 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 899 739 ] 900 740 901 741 [[package]] 902 - name = "h2" 903 - version = "0.4.13" 904 - source = "registry+https://github.com/rust-lang/crates.io-index" 905 - checksum = "2f44da3a8150a6703ed5d34e164b875fd14c2cdab9af1252a9a1020bde2bdc54" 906 - dependencies = [ 907 - "atomic-waker", 908 - "bytes", 909 - "fnv", 910 - "futures-core", 911 - "futures-sink", 912 - "http", 913 - "indexmap", 914 - "slab", 915 - "tokio", 916 - "tokio-util", 917 - "tracing", 918 - ] 919 - 920 - [[package]] 921 - name = "hashbrown" 922 - version = "0.14.5" 923 - source = "registry+https://github.com/rust-lang/crates.io-index" 924 - checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" 925 - 926 - [[package]] 927 742 name = "hashbrown" 928 743 version = "0.16.1" 929 744 source = "registry+https://github.com/rust-lang/crates.io-index" 930 745 checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" 931 - dependencies = [ 932 - "allocator-api2", 933 - "equivalent", 934 - "foldhash", 935 - ] 936 - 937 - [[package]] 938 - name = "hashlink" 939 - version = "0.11.0" 940 - source = "registry+https://github.com/rust-lang/crates.io-index" 941 - checksum = "ea0b22561a9c04a7cb1a302c013e0259cd3b4bb619f145b32f72b8b4bcbed230" 942 - dependencies = [ 943 - "hashbrown 0.16.1", 944 - ] 945 746 946 747 [[package]] 947 748 name = "heck" ··· 1052 853 checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" 1053 854 1054 855 [[package]] 1055 - name = "httpdate" 1056 - version = "1.0.3" 1057 - source = "registry+https://github.com/rust-lang/crates.io-index" 1058 - checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" 1059 - 1060 - [[package]] 1061 856 name = "hyper" 1062 857 version = "1.8.1" 1063 858 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1067 862 "bytes", 1068 863 "futures-channel", 1069 864 "futures-core", 1070 - "h2", 1071 865 "http", 1072 866 "http-body", 1073 867 "httparse", 1074 - "httpdate", 1075 868 "itoa", 1076 869 "pin-project-lite", 1077 870 "pin-utils", ··· 1097 890 ] 1098 891 1099 892 [[package]] 1100 - name = "hyper-timeout" 1101 - version = "0.5.2" 1102 - source = "registry+https://github.com/rust-lang/crates.io-index" 1103 - checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" 1104 - dependencies = [ 1105 - "hyper", 1106 - "hyper-util", 1107 - "pin-project-lite", 1108 - "tokio", 1109 - "tower-service", 1110 - ] 1111 - 1112 - [[package]] 1113 893 name = "hyper-util" 1114 894 version = "0.1.20" 1115 895 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1265 1045 checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" 1266 1046 dependencies = [ 1267 1047 "equivalent", 1268 - "hashbrown 0.16.1", 1048 + "hashbrown", 1269 1049 ] 1270 1050 1271 1051 [[package]] ··· 1390 1170 checksum = "6800badb6cb2082ffd7b6a67e6125bb39f18782f793520caee8cb8846be06112" 1391 1171 1392 1172 [[package]] 1393 - name = "libsqlite3-sys" 1394 - version = "0.36.0" 1395 - source = "registry+https://github.com/rust-lang/crates.io-index" 1396 - checksum = "95b4103cffefa72eb8428cb6b47d6627161e51c2739fc5e3b734584157bc642a" 1397 - dependencies = [ 1398 - "cc", 1399 - "pkg-config", 1400 - "vcpkg", 1401 - ] 1402 - 1403 - [[package]] 1404 1173 name = "linked-hash-map" 1405 1174 version = "0.5.6" 1406 1175 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1449 1218 checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" 1450 1219 1451 1220 [[package]] 1452 - name = "matchit" 1453 - version = "0.8.4" 1454 - source = "registry+https://github.com/rust-lang/crates.io-index" 1455 - checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" 1456 - 1457 - [[package]] 1458 1221 name = "memchr" 1459 1222 version = "2.8.0" 1460 1223 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1488 1251 ] 1489 1252 1490 1253 [[package]] 1491 - name = "nonempty" 1492 - version = "0.7.0" 1493 - source = "registry+https://github.com/rust-lang/crates.io-index" 1494 - checksum = "e9e591e719385e6ebaeb5ce5d3887f7d5676fceca6411d1925ccc95745f3d6f7" 1495 - 1496 - [[package]] 1497 - name = "nonzero_ext" 1498 - version = "0.3.0" 1499 - source = "registry+https://github.com/rust-lang/crates.io-index" 1500 - checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" 1501 - 1502 - [[package]] 1503 1254 name = "num-traits" 1504 1255 version = "0.2.19" 1505 1256 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1519 1270 version = "1.70.2" 1520 1271 source = "registry+https://github.com/rust-lang/crates.io-index" 1521 1272 checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" 1522 - 1523 - [[package]] 1524 - name = "opake-appview" 1525 - version = "0.1.0" 1526 - dependencies = [ 1527 - "anyhow", 1528 - "axum", 1529 - "base64", 1530 - "chrono", 1531 - "clap", 1532 - "ed25519-dalek", 1533 - "env_logger", 1534 - "futures-util", 1535 - "http-body-util", 1536 - "log", 1537 - "opake-core", 1538 - "reqwest", 1539 - "rusqlite", 1540 - "rustls", 1541 - "serde", 1542 - "serde_json", 1543 - "tempfile", 1544 - "thiserror 2.0.18", 1545 - "tokio", 1546 - "tokio-tungstenite", 1547 - "toml", 1548 - "tower", 1549 - "tower_governor", 1550 - ] 1551 1273 1552 1274 [[package]] 1553 1275 name = "opake-cli" ··· 1681 1403 checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" 1682 1404 1683 1405 [[package]] 1684 - name = "pin-project" 1685 - version = "1.1.11" 1686 - source = "registry+https://github.com/rust-lang/crates.io-index" 1687 - checksum = "f1749c7ed4bcaf4c3d0a3efc28538844fb29bcdd7d2b67b2be7e20ba861ff517" 1688 - dependencies = [ 1689 - "pin-project-internal", 1690 - ] 1691 - 1692 - [[package]] 1693 - name = "pin-project-internal" 1694 - version = "1.1.11" 1695 - source = "registry+https://github.com/rust-lang/crates.io-index" 1696 - checksum = "d9b20ed30f105399776b9c883e68e536ef602a16ae6f596d2c473591d6ad64c6" 1697 - dependencies = [ 1698 - "proc-macro2", 1699 - "quote", 1700 - "syn", 1701 - ] 1702 - 1703 - [[package]] 1704 1406 name = "pin-project-lite" 1705 1407 version = "0.2.17" 1706 1408 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1721 1423 "der", 1722 1424 "spki", 1723 1425 ] 1724 - 1725 - [[package]] 1726 - name = "pkg-config" 1727 - version = "0.3.32" 1728 - source = "registry+https://github.com/rust-lang/crates.io-index" 1729 - checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" 1730 1426 1731 1427 [[package]] 1732 1428 name = "polyval" ··· 1792 1488 ] 1793 1489 1794 1490 [[package]] 1795 - name = "quanta" 1796 - version = "0.12.6" 1797 - source = "registry+https://github.com/rust-lang/crates.io-index" 1798 - checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7" 1799 - dependencies = [ 1800 - "crossbeam-utils", 1801 - "libc", 1802 - "once_cell", 1803 - "raw-cpuid", 1804 - "wasi", 1805 - "web-sys", 1806 - "winapi", 1807 - ] 1808 - 1809 - [[package]] 1810 1491 name = "quinn" 1811 1492 version = "0.11.9" 1812 1493 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1937 1618 ] 1938 1619 1939 1620 [[package]] 1940 - name = "raw-cpuid" 1941 - version = "11.6.0" 1942 - source = "registry+https://github.com/rust-lang/crates.io-index" 1943 - checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186" 1944 - dependencies = [ 1945 - "bitflags", 1946 - ] 1947 - 1948 - [[package]] 1949 1621 name = "redox_syscall" 1950 1622 version = "0.5.18" 1951 1623 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2051 1723 ] 2052 1724 2053 1725 [[package]] 2054 - name = "rsqlite-vfs" 2055 - version = "0.1.0" 2056 - source = "registry+https://github.com/rust-lang/crates.io-index" 2057 - checksum = "a8a1f2315036ef6b1fbacd1972e8ee7688030b0a2121edfc2a6550febd41574d" 2058 - dependencies = [ 2059 - "hashbrown 0.16.1", 2060 - "thiserror 2.0.18", 2061 - ] 2062 - 2063 - [[package]] 2064 - name = "rusqlite" 2065 - version = "0.38.0" 2066 - source = "registry+https://github.com/rust-lang/crates.io-index" 2067 - checksum = "f1c93dd1c9683b438c392c492109cb702b8090b2bfc8fed6f6e4eb4523f17af3" 2068 - dependencies = [ 2069 - "bitflags", 2070 - "fallible-iterator", 2071 - "fallible-streaming-iterator", 2072 - "hashlink", 2073 - "libsqlite3-sys", 2074 - "smallvec", 2075 - "sqlite-wasm-rs", 2076 - ] 2077 - 2078 - [[package]] 2079 1726 name = "rustc-hash" 2080 1727 version = "2.1.1" 2081 1728 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2110 1757 checksum = "758025cb5fccfd3bc2fd74708fd4682be41d99e5dff73c377c0646c6012c73a4" 2111 1758 dependencies = [ 2112 1759 "aws-lc-rs", 2113 - "log", 2114 1760 "once_cell", 2115 - "ring", 2116 1761 "rustls-pki-types", 2117 1762 "rustls-webpki", 2118 1763 "subtle", ··· 2187 1832 checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" 2188 1833 2189 1834 [[package]] 2190 - name = "ryu" 2191 - version = "1.0.23" 2192 - source = "registry+https://github.com/rust-lang/crates.io-index" 2193 - checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f" 2194 - 2195 - [[package]] 2196 1835 name = "same-file" 2197 1836 version = "1.0.6" 2198 1837 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2315 1954 ] 2316 1955 2317 1956 [[package]] 2318 - name = "serde_path_to_error" 2319 - version = "0.1.20" 2320 - source = "registry+https://github.com/rust-lang/crates.io-index" 2321 - checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457" 2322 - dependencies = [ 2323 - "itoa", 2324 - "serde", 2325 - "serde_core", 2326 - ] 2327 - 2328 - [[package]] 2329 1957 name = "serde_spanned" 2330 1958 version = "1.0.4" 2331 1959 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2335 1963 ] 2336 1964 2337 1965 [[package]] 2338 - name = "serde_urlencoded" 2339 - version = "0.7.1" 2340 - source = "registry+https://github.com/rust-lang/crates.io-index" 2341 - checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" 2342 - dependencies = [ 2343 - "form_urlencoded", 2344 - "itoa", 2345 - "ryu", 2346 - "serde", 2347 - ] 2348 - 2349 - [[package]] 2350 1966 name = "serdect" 2351 1967 version = "0.2.0" 2352 1968 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2357 1973 ] 2358 1974 2359 1975 [[package]] 2360 - name = "sha1" 2361 - version = "0.10.6" 2362 - source = "registry+https://github.com/rust-lang/crates.io-index" 2363 - checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" 2364 - dependencies = [ 2365 - "cfg-if", 2366 - "cpufeatures", 2367 - "digest", 2368 - ] 2369 - 2370 - [[package]] 2371 1976 name = "sha2" 2372 1977 version = "0.10.9" 2373 1978 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2437 2042 ] 2438 2043 2439 2044 [[package]] 2440 - name = "spinning_top" 2441 - version = "0.3.0" 2442 - source = "registry+https://github.com/rust-lang/crates.io-index" 2443 - checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300" 2444 - dependencies = [ 2445 - "lock_api", 2446 - ] 2447 - 2448 - [[package]] 2449 2045 name = "spki" 2450 2046 version = "0.7.3" 2451 2047 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2453 2049 dependencies = [ 2454 2050 "base64ct", 2455 2051 "der", 2456 - ] 2457 - 2458 - [[package]] 2459 - name = "sqlite-wasm-rs" 2460 - version = "0.5.2" 2461 - source = "registry+https://github.com/rust-lang/crates.io-index" 2462 - checksum = "2f4206ed3a67690b9c29b77d728f6acc3ce78f16bf846d83c94f76400320181b" 2463 - dependencies = [ 2464 - "cc", 2465 - "js-sys", 2466 - "rsqlite-vfs", 2467 - "wasm-bindgen", 2468 2052 ] 2469 2053 2470 2054 [[package]] ··· 2633 2217 ] 2634 2218 2635 2219 [[package]] 2636 - name = "tokio-stream" 2637 - version = "0.1.18" 2638 - source = "registry+https://github.com/rust-lang/crates.io-index" 2639 - checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70" 2640 - dependencies = [ 2641 - "futures-core", 2642 - "pin-project-lite", 2643 - "tokio", 2644 - ] 2645 - 2646 - [[package]] 2647 - name = "tokio-tungstenite" 2648 - version = "0.28.0" 2649 - source = "registry+https://github.com/rust-lang/crates.io-index" 2650 - checksum = "d25a406cddcc431a75d3d9afc6a7c0f7428d4891dd973e4d54c56b46127bf857" 2651 - dependencies = [ 2652 - "futures-util", 2653 - "log", 2654 - "rustls", 2655 - "rustls-native-certs", 2656 - "rustls-pki-types", 2657 - "tokio", 2658 - "tokio-rustls", 2659 - "tungstenite", 2660 - ] 2661 - 2662 - [[package]] 2663 - name = "tokio-util" 2664 - version = "0.7.18" 2665 - source = "registry+https://github.com/rust-lang/crates.io-index" 2666 - checksum = "9ae9cec805b01e8fc3fd2fe289f89149a9b66dd16786abd8b19cfa7b48cb0098" 2667 - dependencies = [ 2668 - "bytes", 2669 - "futures-core", 2670 - "futures-sink", 2671 - "pin-project-lite", 2672 - "tokio", 2673 - ] 2674 - 2675 - [[package]] 2676 2220 name = "toml" 2677 2221 version = "1.0.3+spec-1.1.0" 2678 2222 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2712 2256 checksum = "ab16f14aed21ee8bfd8ec22513f7287cd4a91aa92e44edfe2c17ddd004e92607" 2713 2257 2714 2258 [[package]] 2715 - name = "tonic" 2716 - version = "0.14.5" 2717 - source = "registry+https://github.com/rust-lang/crates.io-index" 2718 - checksum = "fec7c61a0695dc1887c1b53952990f3ad2e3a31453e1f49f10e75424943a93ec" 2719 - dependencies = [ 2720 - "async-trait", 2721 - "axum", 2722 - "base64", 2723 - "bytes", 2724 - "h2", 2725 - "http", 2726 - "http-body", 2727 - "http-body-util", 2728 - "hyper", 2729 - "hyper-timeout", 2730 - "hyper-util", 2731 - "percent-encoding", 2732 - "pin-project", 2733 - "socket2 0.6.2", 2734 - "sync_wrapper", 2735 - "tokio", 2736 - "tokio-stream", 2737 - "tower", 2738 - "tower-layer", 2739 - "tower-service", 2740 - "tracing", 2741 - ] 2742 - 2743 - [[package]] 2744 2259 name = "tower" 2745 2260 version = "0.5.3" 2746 2261 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2748 2263 dependencies = [ 2749 2264 "futures-core", 2750 2265 "futures-util", 2751 - "indexmap", 2752 2266 "pin-project-lite", 2753 - "slab", 2754 2267 "sync_wrapper", 2755 2268 "tokio", 2756 - "tokio-util", 2757 2269 "tower-layer", 2758 2270 "tower-service", 2759 - "tracing", 2760 2271 ] 2761 2272 2762 2273 [[package]] ··· 2790 2301 checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" 2791 2302 2792 2303 [[package]] 2793 - name = "tower_governor" 2794 - version = "0.8.0" 2795 - source = "registry+https://github.com/rust-lang/crates.io-index" 2796 - checksum = "44de9b94d849d3c46e06a883d72d408c2de6403367b39df2b1c9d9e7b6736fe6" 2797 - dependencies = [ 2798 - "axum", 2799 - "forwarded-header-value", 2800 - "governor", 2801 - "http", 2802 - "pin-project", 2803 - "thiserror 2.0.18", 2804 - "tonic", 2805 - "tower", 2806 - "tracing", 2807 - ] 2808 - 2809 - [[package]] 2810 2304 name = "tracing" 2811 2305 version = "0.1.44" 2812 2306 source = "registry+https://github.com/rust-lang/crates.io-index" 2813 2307 checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" 2814 2308 dependencies = [ 2815 - "log", 2816 2309 "pin-project-lite", 2817 2310 "tracing-attributes", 2818 2311 "tracing-core", ··· 2845 2338 checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" 2846 2339 2847 2340 [[package]] 2848 - name = "tungstenite" 2849 - version = "0.28.0" 2850 - source = "registry+https://github.com/rust-lang/crates.io-index" 2851 - checksum = "8628dcc84e5a09eb3d8423d6cb682965dea9133204e8fb3efee74c2a0c259442" 2852 - dependencies = [ 2853 - "bytes", 2854 - "data-encoding", 2855 - "http", 2856 - "httparse", 2857 - "log", 2858 - "rand 0.9.2", 2859 - "rustls", 2860 - "rustls-pki-types", 2861 - "sha1", 2862 - "thiserror 2.0.18", 2863 - "utf-8", 2864 - ] 2865 - 2866 - [[package]] 2867 2341 name = "typenum" 2868 2342 version = "1.19.0" 2869 2343 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2916 2390 checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" 2917 2391 2918 2392 [[package]] 2919 - name = "utf-8" 2920 - version = "0.7.6" 2921 - source = "registry+https://github.com/rust-lang/crates.io-index" 2922 - checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" 2923 - 2924 - [[package]] 2925 2393 name = "utf8_iter" 2926 2394 version = "1.0.4" 2927 2395 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2932 2400 version = "0.2.2" 2933 2401 source = "registry+https://github.com/rust-lang/crates.io-index" 2934 2402 checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" 2935 - 2936 - [[package]] 2937 - name = "vcpkg" 2938 - version = "0.2.15" 2939 - source = "registry+https://github.com/rust-lang/crates.io-index" 2940 - checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" 2941 2403 2942 2404 [[package]] 2943 2405 name = "version_check" ··· 3074 2536 checksum = "72069c3113ab32ab29e5584db3c6ec55d416895e60715417b5b883a357c3e471" 3075 2537 3076 2538 [[package]] 3077 - name = "winapi" 3078 - version = "0.3.9" 3079 - source = "registry+https://github.com/rust-lang/crates.io-index" 3080 - checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" 3081 - dependencies = [ 3082 - "winapi-i686-pc-windows-gnu", 3083 - "winapi-x86_64-pc-windows-gnu", 3084 - ] 3085 - 3086 - [[package]] 3087 - name = "winapi-i686-pc-windows-gnu" 3088 - version = "0.4.0" 3089 - source = "registry+https://github.com/rust-lang/crates.io-index" 3090 - checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" 3091 - 3092 - [[package]] 3093 2539 name = "winapi-util" 3094 2540 version = "0.1.11" 3095 2541 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3097 2543 dependencies = [ 3098 2544 "windows-sys 0.61.2", 3099 2545 ] 3100 - 3101 - [[package]] 3102 - name = "winapi-x86_64-pc-windows-gnu" 3103 - version = "0.4.0" 3104 - source = "registry+https://github.com/rust-lang/crates.io-index" 3105 - checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" 3106 2546 3107 2547 [[package]] 3108 2548 name = "windows-core"
+1 -1
Cargo.toml
··· 1 1 [workspace] 2 - members = ["crates/opake-core", "crates/opake-cli", "crates/opake-appview", "crates/opake-derive", "crates/opake-wasm"] 2 + members = ["crates/opake-core", "crates/opake-cli", "crates/opake-derive", "crates/opake-wasm"] 3 3 resolver = "2" 4 4 5 5 [workspace.package]
-22
Containerfile.appview
··· 1 - FROM rust:1.86-slim-bookworm AS builder 2 - 3 - RUN apt-get update && apt-get install -y pkg-config libssl-dev && rm -rf /var/lib/apt/lists/* 4 - 5 - WORKDIR /build 6 - COPY Cargo.toml Cargo.lock ./ 7 - COPY crates/ crates/ 8 - 9 - RUN cargo build --release --package opake-appview 10 - 11 - FROM debian:bookworm-slim 12 - 13 - RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/* 14 - RUN groupadd -g 1000 opake && useradd -u 1000 -g opake -m opake 15 - 16 - COPY --from=builder /build/target/release/opake-appview /usr/local/bin/opake-appview 17 - 18 - USER 1000:1000 19 - EXPOSE 6100 20 - 21 - ENTRYPOINT ["opake-appview"] 22 - CMD ["run"]
-38
crates/opake-appview/Cargo.toml
··· 1 - [package] 2 - name = "opake-appview" 3 - description = "AppView indexer for Opake — indexes grants and keyrings from the AT Protocol firehose" 4 - edition.workspace = true 5 - version.workspace = true 6 - license.workspace = true 7 - 8 - [[bin]] 9 - name = "opake-appview" 10 - path = "src/main.rs" 11 - 12 - [dependencies] 13 - opake-core = { path = "../opake-core", features = ["native"] } 14 - axum = "0.8" 15 - base64.workspace = true 16 - chrono.workspace = true 17 - clap.workspace = true 18 - ed25519-dalek.workspace = true 19 - env_logger.workspace = true 20 - futures-util = "0.3" 21 - log.workspace = true 22 - reqwest.workspace = true 23 - rusqlite = { version = "0.38", features = ["bundled"] } 24 - rustls = { version = "0.23", default-features = false, features = ["ring", "logging", "std", "tls12"] } 25 - serde.workspace = true 26 - serde_json.workspace = true 27 - thiserror.workspace = true 28 - tokio.workspace = true 29 - tokio-tungstenite = { version = "0.28", features = ["rustls-tls-native-roots"] } 30 - toml.workspace = true 31 - tower_governor = "0.8" 32 - anyhow.workspace = true 33 - 34 - [dev-dependencies] 35 - http-body-util = "0.1" 36 - tempfile.workspace = true 37 - tokio = { workspace = true, features = ["test-util"] } 38 - tower = { version = "0.5", features = ["util"] }
-233
crates/opake-appview/src/api/api_tests.rs
··· 1 - use std::sync::Arc; 2 - 3 - use axum::body::Body; 4 - use axum::http::{Request, StatusCode}; 5 - use axum::routing::get; 6 - use axum::Router; 7 - use http_body_util::BodyExt; 8 - use tower::ServiceExt; 9 - 10 - use crate::api; 11 - use crate::api::{health, inbox, keyrings}; 12 - use crate::db::grants::IndexedGrant; 13 - use crate::db::Database; 14 - use crate::db::{grants, keyrings as db_keyrings}; 15 - use crate::state::AppState; 16 - 17 - fn test_state() -> Arc<AppState> { 18 - let db = Database::open_in_memory().unwrap(); 19 - Arc::new(AppState::new(db)) 20 - } 21 - 22 - /// Router WITHOUT auth middleware — for testing handler logic in isolation. 23 - fn handler_router(state: Arc<AppState>) -> Router { 24 - Router::new() 25 - .route("/api/health", get(health::handle_health)) 26 - .route("/api/inbox", get(inbox::handle_inbox)) 27 - .route("/api/keyrings", get(keyrings::handle_keyrings)) 28 - .with_state(state) 29 - } 30 - 31 - /// Router WITH auth middleware but WITHOUT rate limiting — for testing auth rejection. 32 - fn auth_router(state: Arc<AppState>) -> Router { 33 - api::base_router(state) 34 - } 35 - 36 - fn get_request(uri: &str) -> Request<Body> { 37 - Request::builder().uri(uri).body(Body::empty()).unwrap() 38 - } 39 - 40 - async fn response_json(app: Router, req: Request<Body>) -> (StatusCode, serde_json::Value) { 41 - let resp = app.oneshot(req).await.unwrap(); 42 - let status = resp.status(); 43 - let body = resp.into_body().collect().await.unwrap().to_bytes(); 44 - let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); 45 - (status, json) 46 - } 47 - 48 - // -- auth middleware tests (use auth_router) -- 49 - 50 - #[tokio::test] 51 - async fn protected_routes_require_auth() { 52 - let state = test_state(); 53 - let app = auth_router(state); 54 - let (status, json) = response_json(app, get_request("/api/inbox?did=test")).await; 55 - assert_eq!(status, StatusCode::UNAUTHORIZED); 56 - assert!(json["error"].as_str().unwrap().contains("authorization")); 57 - } 58 - 59 - #[tokio::test] 60 - async fn rejects_bearer_token() { 61 - let state = test_state(); 62 - let app = auth_router(state); 63 - let req = Request::builder() 64 - .uri("/api/inbox?did=test") 65 - .header("authorization", "Bearer some-token") 66 - .body(Body::empty()) 67 - .unwrap(); 68 - let (status, json) = response_json(app, req).await; 69 - assert_eq!(status, StatusCode::UNAUTHORIZED); 70 - assert!(json["error"].as_str().unwrap().contains("unsupported")); 71 - } 72 - 73 - #[tokio::test] 74 - async fn rejects_basic_auth() { 75 - let state = test_state(); 76 - let app = auth_router(state); 77 - let req = Request::builder() 78 - .uri("/api/inbox?did=test") 79 - .header("authorization", "Basic dXNlcjpwYXNz") 80 - .body(Body::empty()) 81 - .unwrap(); 82 - let (status, json) = response_json(app, req).await; 83 - assert_eq!(status, StatusCode::UNAUTHORIZED); 84 - assert!(json["error"].as_str().unwrap().contains("unsupported")); 85 - } 86 - 87 - #[tokio::test] 88 - async fn health_works_without_auth() { 89 - let state = test_state(); 90 - let app = auth_router(state); 91 - let (status, json) = response_json(app, get_request("/api/health")).await; 92 - assert_eq!(status, StatusCode::OK); 93 - assert_eq!(json["indexerConnected"], false); 94 - assert!(json["cursorTime"].is_null()); 95 - assert!(json.get("grantCount").is_none()); 96 - assert!(json.get("keyringCount").is_none()); 97 - } 98 - 99 - // -- handler logic tests (use handler_router, no auth) -- 100 - 101 - #[tokio::test] 102 - async fn inbox_requires_did() { 103 - let state = test_state(); 104 - let app = handler_router(state); 105 - let (status, json) = response_json(app, get_request("/api/inbox")).await; 106 - assert_eq!(status, StatusCode::BAD_REQUEST); 107 - assert!(json["error"].as_str().unwrap().contains("did")); 108 - } 109 - 110 - #[tokio::test] 111 - async fn inbox_returns_empty_for_unknown_did() { 112 - let state = test_state(); 113 - let app = handler_router(state); 114 - let (status, json) = response_json(app, get_request("/api/inbox?did=did:plc:nobody")).await; 115 - assert_eq!(status, StatusCode::OK); 116 - assert_eq!(json["grants"].as_array().unwrap().len(), 0); 117 - assert!(json.get("cursor").is_none() || json["cursor"].is_null()); 118 - } 119 - 120 - #[tokio::test] 121 - async fn inbox_returns_grants() { 122 - let state = test_state(); 123 - 124 - let grant = IndexedGrant { 125 - uri: "at://did:plc:owner/app.opake.grant/3abc".into(), 126 - owner_did: "did:plc:owner".into(), 127 - recipient_did: "did:plc:me".into(), 128 - document_uri: "at://did:plc:owner/app.opake.document/3xyz".into(), 129 - created_at: "2026-03-01T12:00:00Z".into(), 130 - indexed_at: "2026-03-01T12:00:01Z".into(), 131 - }; 132 - state 133 - .db 134 - .with_conn(|c| grants::upsert_grant(c, &grant)) 135 - .unwrap(); 136 - 137 - let app = handler_router(state); 138 - let (status, json) = response_json(app, get_request("/api/inbox?did=did:plc:me")).await; 139 - assert_eq!(status, StatusCode::OK); 140 - 141 - let items = json["grants"].as_array().unwrap(); 142 - assert_eq!(items.len(), 1); 143 - assert_eq!(items[0]["ownerDid"], "did:plc:owner"); 144 - assert_eq!( 145 - items[0]["documentUri"], 146 - "at://did:plc:owner/app.opake.document/3xyz" 147 - ); 148 - } 149 - 150 - #[tokio::test] 151 - async fn inbox_pagination() { 152 - let state = test_state(); 153 - 154 - for i in 0..5 { 155 - let grant = IndexedGrant { 156 - uri: format!("at://did:plc:owner/app.opake.grant/{i}"), 157 - owner_did: "did:plc:owner".into(), 158 - recipient_did: "did:plc:me".into(), 159 - document_uri: format!("at://did:plc:owner/app.opake.document/{i}"), 160 - created_at: "2026-03-01T12:00:00Z".into(), 161 - indexed_at: format!("2026-03-01T12:00:0{i}Z"), 162 - }; 163 - state 164 - .db 165 - .with_conn(|c| grants::upsert_grant(c, &grant)) 166 - .unwrap(); 167 - } 168 - 169 - let app = handler_router(state); 170 - let (status, json) = response_json(app, get_request("/api/inbox?did=did:plc:me&limit=3")).await; 171 - assert_eq!(status, StatusCode::OK); 172 - assert_eq!(json["grants"].as_array().unwrap().len(), 3); 173 - assert!(json["cursor"].is_string()); 174 - } 175 - 176 - #[tokio::test] 177 - async fn keyrings_requires_did() { 178 - let state = test_state(); 179 - let app = handler_router(state); 180 - let (status, json) = response_json(app, get_request("/api/keyrings")).await; 181 - assert_eq!(status, StatusCode::BAD_REQUEST); 182 - assert!(json["error"].as_str().unwrap().contains("did")); 183 - } 184 - 185 - #[tokio::test] 186 - async fn keyrings_returns_memberships() { 187 - let state = test_state(); 188 - 189 - state 190 - .db 191 - .with_conn(|c| { 192 - db_keyrings::upsert_keyring_members( 193 - c, 194 - "at://did:plc:owner/app.opake.keyring/3def", 195 - "did:plc:owner", 196 - &["did:plc:me".into(), "did:plc:other".into()], 197 - "2026-03-01T12:00:00Z", 198 - ) 199 - }) 200 - .unwrap(); 201 - 202 - let app = handler_router(state); 203 - let (status, json) = response_json(app, get_request("/api/keyrings?did=did:plc:me")).await; 204 - assert_eq!(status, StatusCode::OK); 205 - 206 - let items = json["keyrings"].as_array().unwrap(); 207 - assert_eq!(items.len(), 1); 208 - assert_eq!(items[0]["ownerDid"], "did:plc:owner"); 209 - } 210 - 211 - #[tokio::test] 212 - async fn health_omits_counts() { 213 - let state = test_state(); 214 - 215 - let grant = IndexedGrant { 216 - uri: "at://did:plc:owner/app.opake.grant/3abc".into(), 217 - owner_did: "did:plc:owner".into(), 218 - recipient_did: "did:plc:me".into(), 219 - document_uri: "at://did:plc:owner/app.opake.document/3xyz".into(), 220 - created_at: "2026-03-01T12:00:00Z".into(), 221 - indexed_at: "2026-03-01T12:00:01Z".into(), 222 - }; 223 - state 224 - .db 225 - .with_conn(|c| grants::upsert_grant(c, &grant)) 226 - .unwrap(); 227 - 228 - let app = handler_router(state); 229 - let (status, json) = response_json(app, get_request("/api/health")).await; 230 - assert_eq!(status, StatusCode::OK); 231 - assert!(json.get("grantCount").is_none()); 232 - assert!(json.get("keyringCount").is_none()); 233 - }
-188
crates/opake-appview/src/api/auth.rs
··· 1 - use std::sync::Arc; 2 - 3 - use axum::extract::{Request, State}; 4 - use axum::http::StatusCode; 5 - use axum::middleware::Next; 6 - use axum::response::{IntoResponse, Json, Response}; 7 - use base64::engine::general_purpose::STANDARD as BASE64; 8 - use base64::Engine; 9 - use ed25519_dalek::{Signature, Verifier}; 10 - 11 - use crate::api::types::ErrorResponse; 12 - use crate::state::AppState; 13 - 14 - /// Maximum age of a signed request timestamp (replay protection). 15 - const MAX_TIMESTAMP_DRIFT_SECS: i64 = 60; 16 - 17 - /// Auth middleware: validates `Opake-Ed25519` DID-scoped signatures. 18 - /// 19 - /// Header format: 20 - /// Authorization: Opake-Ed25519 <did>:<unix-timestamp>:<base64(signature)> 21 - /// Signature covers: <method>:<path>:<timestamp>:<did> 22 - pub async fn require_auth( 23 - State(state): State<Arc<AppState>>, 24 - req: Request, 25 - next: Next, 26 - ) -> Response { 27 - let auth_header = req 28 - .headers() 29 - .get("authorization") 30 - .and_then(|v| v.to_str().ok()) 31 - .map(|s| s.to_string()); 32 - 33 - let method = req.method().as_str().to_string(); 34 - let path = req.uri().path().to_string(); 35 - let query_did = extract_did_param(req.uri().query()); 36 - 37 - match auth_header.as_deref() { 38 - Some(h) if h.starts_with("Opake-Ed25519 ") => { 39 - match verify_did_auth(&state, h, &method, &path, query_did.as_deref()).await { 40 - Ok(()) => next.run(req).await, 41 - Err(msg) => unauthorized(&msg), 42 - } 43 - } 44 - Some(_) => unauthorized("unsupported authorization scheme — use Opake-Ed25519"), 45 - None => unauthorized("missing authorization header"), 46 - } 47 - } 48 - 49 - fn unauthorized(message: &str) -> Response { 50 - ( 51 - StatusCode::UNAUTHORIZED, 52 - Json(ErrorResponse { 53 - error: message.to_string(), 54 - }), 55 - ) 56 - .into_response() 57 - } 58 - 59 - /// Parse and verify an Opake-Ed25519 authorization header. 60 - async fn verify_did_auth( 61 - state: &AppState, 62 - header: &str, 63 - method: &str, 64 - path: &str, 65 - query_did: Option<&str>, 66 - ) -> Result<(), String> { 67 - let payload = header 68 - .strip_prefix("Opake-Ed25519 ") 69 - .ok_or("malformed auth header")?; 70 - 71 - let (did, timestamp_str, sig_b64) = parse_auth_payload(payload)?; 72 - 73 - // Enforce: authenticated DID must match the ?did= query parameter 74 - if let Some(qd) = query_did { 75 - if qd != did { 76 - return Err(format!( 77 - "authenticated as {did} but requesting data for {qd}" 78 - )); 79 - } 80 - } 81 - 82 - // Validate timestamp (replay protection) 83 - let timestamp: i64 = timestamp_str 84 - .parse() 85 - .map_err(|_| "invalid timestamp in auth header")?; 86 - let now = chrono::Utc::now().timestamp(); 87 - let drift = (now - timestamp).abs(); 88 - if drift > MAX_TIMESTAMP_DRIFT_SECS { 89 - return Err(format!( 90 - "timestamp too far from current time ({drift}s drift, max {MAX_TIMESTAMP_DRIFT_SECS}s)" 91 - )); 92 - } 93 - 94 - // Decode signature 95 - let sig_bytes = BASE64 96 - .decode(sig_b64) 97 - .or_else(|_| { 98 - use base64::engine::general_purpose::STANDARD_NO_PAD; 99 - STANDARD_NO_PAD.decode(sig_b64) 100 - }) 101 - .map_err(|_| "invalid base64 in signature")?; 102 - let signature = 103 - Signature::from_slice(&sig_bytes).map_err(|_| "invalid Ed25519 signature format")?; 104 - 105 - // Fetch/cache the signing key 106 - let verifying_key = state 107 - .key_cache 108 - .lock() 109 - .await 110 - .get_or_fetch(did) 111 - .await 112 - .map_err(|e| format!("failed to fetch signing key: {e}"))?; 113 - 114 - // Verify signature over: <method>:<path>:<timestamp>:<did> 115 - let message = format!("{method}:{path}:{timestamp_str}:{did}"); 116 - verifying_key 117 - .verify(message.as_bytes(), &signature) 118 - .map_err(|_| "signature verification failed".to_string()) 119 - } 120 - 121 - /// Parse the auth payload. DIDs contain colons, so we split from the right: 122 - /// last segment = signature, second-to-last = timestamp, rest = DID. 123 - fn parse_auth_payload(payload: &str) -> Result<(&str, &str, &str), String> { 124 - let last_colon = payload.rfind(':').ok_or("malformed auth payload")?; 125 - let sig = &payload[last_colon + 1..]; 126 - let rest = &payload[..last_colon]; 127 - 128 - let second_last = rest.rfind(':').ok_or("malformed auth payload")?; 129 - let timestamp = &rest[second_last + 1..]; 130 - let did = &rest[..second_last]; 131 - 132 - if did.is_empty() || timestamp.is_empty() || sig.is_empty() { 133 - return Err("malformed auth payload: empty component".into()); 134 - } 135 - 136 - Ok((did, timestamp, sig)) 137 - } 138 - 139 - /// Extract the `did` query parameter from a raw query string. 140 - fn extract_did_param(query: Option<&str>) -> Option<String> { 141 - query.and_then(|q| { 142 - q.split('&') 143 - .find_map(|pair| pair.strip_prefix("did=").map(|v| v.to_string())) 144 - }) 145 - } 146 - 147 - #[cfg(test)] 148 - mod tests { 149 - use super::*; 150 - 151 - #[test] 152 - fn parse_auth_payload_valid() { 153 - let (did, ts, sig) = parse_auth_payload("did:plc:abc123:1709330400:c2lnbmF0dXJl").unwrap(); 154 - assert_eq!(did, "did:plc:abc123"); 155 - assert_eq!(ts, "1709330400"); 156 - assert_eq!(sig, "c2lnbmF0dXJl"); 157 - } 158 - 159 - #[test] 160 - fn parse_auth_payload_did_web() { 161 - let (did, ts, sig) = parse_auth_payload("did:web:example.com:1709330400:c2ln").unwrap(); 162 - assert_eq!(did, "did:web:example.com"); 163 - assert_eq!(ts, "1709330400"); 164 - assert_eq!(sig, "c2ln"); 165 - } 166 - 167 - #[test] 168 - fn parse_auth_payload_rejects_garbage() { 169 - assert!(parse_auth_payload("nocolons").is_err()); 170 - assert!(parse_auth_payload("one:colon").is_err()); 171 - } 172 - 173 - #[test] 174 - fn parse_auth_payload_rejects_empty_components() { 175 - assert!(parse_auth_payload("::sig").is_err()); 176 - assert!(parse_auth_payload("did::sig").is_err()); 177 - } 178 - 179 - #[test] 180 - fn extract_did_from_query() { 181 - assert_eq!( 182 - extract_did_param(Some("did=did:plc:abc&limit=10")), 183 - Some("did:plc:abc".into()) 184 - ); 185 - assert_eq!(extract_did_param(Some("limit=10")), None); 186 - assert_eq!(extract_did_param(None), None); 187 - } 188 - }
-39
crates/opake-appview/src/api/health.rs
··· 1 - use std::sync::atomic::Ordering; 2 - use std::sync::Arc; 3 - 4 - use axum::extract::State; 5 - use axum::response::{IntoResponse, Json}; 6 - use serde::Serialize; 7 - 8 - use crate::db::cursor::{self, MICROS_PER_SECOND}; 9 - use crate::state::AppState; 10 - 11 - #[derive(Serialize)] 12 - #[serde(rename_all = "camelCase")] 13 - struct HealthResponse { 14 - indexer_connected: bool, 15 - cursor_time: Option<String>, 16 - cursor_age_secs: Option<i64>, 17 - } 18 - 19 - pub async fn handle_health(State(state): State<Arc<AppState>>) -> impl IntoResponse { 20 - let indexer_connected = state.indexer_connected.load(Ordering::Relaxed); 21 - 22 - let cursor_us = state.db.with_conn(cursor::load_cursor).unwrap_or(None); 23 - 24 - let (cursor_time, cursor_age_secs) = match cursor_us { 25 - Some(us) => { 26 - let secs = us / MICROS_PER_SECOND; 27 - let now = chrono::Utc::now().timestamp(); 28 - let time = chrono::DateTime::from_timestamp(secs, 0).map(|dt| dt.to_rfc3339()); 29 - (time, Some(now - secs)) 30 - } 31 - None => (None, None), 32 - }; 33 - 34 - Json(HealthResponse { 35 - indexer_connected, 36 - cursor_time, 37 - cursor_age_secs, 38 - }) 39 - }
-78
crates/opake-appview/src/api/inbox.rs
··· 1 - use std::sync::Arc; 2 - 3 - use axum::extract::{Query, State}; 4 - use axum::http::StatusCode; 5 - use axum::response::{IntoResponse, Json}; 6 - use serde::Deserialize; 7 - 8 - use crate::api::types::{ErrorResponse, GrantItem, InboxResponse}; 9 - use crate::db::grants; 10 - use crate::state::AppState; 11 - 12 - const DEFAULT_LIMIT: u32 = 50; 13 - const MAX_LIMIT: u32 = 100; 14 - 15 - #[derive(Debug, Deserialize)] 16 - pub struct InboxParams { 17 - pub did: Option<String>, 18 - pub limit: Option<u32>, 19 - pub cursor: Option<String>, 20 - } 21 - 22 - pub async fn handle_inbox( 23 - State(state): State<Arc<AppState>>, 24 - Query(params): Query<InboxParams>, 25 - ) -> impl IntoResponse { 26 - let did = match &params.did { 27 - Some(d) if !d.is_empty() => d.as_str(), 28 - _ => { 29 - return ( 30 - StatusCode::BAD_REQUEST, 31 - Json( 32 - serde_json::to_value(ErrorResponse { 33 - error: "missing required parameter: did".into(), 34 - }) 35 - .expect("ErrorResponse serializes"), 36 - ), 37 - ) 38 - .into_response(); 39 - } 40 - }; 41 - 42 - let limit = params.limit.unwrap_or(DEFAULT_LIMIT).clamp(1, MAX_LIMIT); 43 - let cursor_ref = params.cursor.as_deref(); 44 - 45 - let result = state 46 - .db 47 - .with_conn(|conn| grants::list_inbox(conn, did, limit, cursor_ref)); 48 - 49 - match result { 50 - Ok(indexed_grants) => { 51 - let next_cursor = indexed_grants.last().map(grants::encode_cursor); 52 - let items: Vec<GrantItem> = indexed_grants.iter().map(GrantItem::from).collect(); 53 - 54 - let response = InboxResponse { 55 - grants: items, 56 - cursor: next_cursor, 57 - }; 58 - ( 59 - StatusCode::OK, 60 - Json(serde_json::to_value(response).expect("InboxResponse serializes")), 61 - ) 62 - .into_response() 63 - } 64 - Err(e) => { 65 - log::error!("inbox query failed: {e}"); 66 - ( 67 - StatusCode::INTERNAL_SERVER_ERROR, 68 - Json( 69 - serde_json::to_value(ErrorResponse { 70 - error: "internal server error".into(), 71 - }) 72 - .expect("ErrorResponse serializes"), 73 - ), 74 - ) 75 - .into_response() 76 - } 77 - } 78 - }
-142
crates/opake-appview/src/api/key_cache.rs
··· 1 - use std::collections::HashMap; 2 - use std::time::{Duration, Instant}; 3 - 4 - use ed25519_dalek::VerifyingKey; 5 - 6 - use crate::error::{Error, Result}; 7 - 8 - const TTL: Duration = Duration::from_secs(300); // 5 minutes 9 - 10 - /// Caches Ed25519 verifying keys fetched from users' PDS public key records. 11 - pub struct KeyCache { 12 - entries: HashMap<String, CacheEntry>, 13 - } 14 - 15 - struct CacheEntry { 16 - key: VerifyingKey, 17 - fetched_at: Instant, 18 - } 19 - 20 - impl KeyCache { 21 - pub fn new() -> Self { 22 - Self { 23 - entries: HashMap::new(), 24 - } 25 - } 26 - 27 - /// Return cached key if fresh, otherwise fetch from the user's PDS. 28 - pub async fn get_or_fetch(&mut self, did: &str) -> Result<VerifyingKey> { 29 - if let Some(entry) = self.entries.get(did) { 30 - if entry.fetched_at.elapsed() < TTL { 31 - return Ok(entry.key); 32 - } 33 - } 34 - 35 - let key = fetch_signing_key(did).await?; 36 - self.entries.insert( 37 - did.to_string(), 38 - CacheEntry { 39 - key, 40 - fetched_at: Instant::now(), 41 - }, 42 - ); 43 - Ok(key) 44 - } 45 - } 46 - 47 - /// Fetch the Ed25519 signing key from a user's `app.opake.publicKey/self` record. 48 - /// 49 - /// Resolution: DID → DID document → PDS URL → getRecord → signingKey field. 50 - async fn fetch_signing_key(did: &str) -> Result<VerifyingKey> { 51 - let client = reqwest::Client::new(); 52 - 53 - // Step 1: Resolve DID document to find PDS URL 54 - let did_doc_url = if did.starts_with("did:plc:") { 55 - format!("https://plc.directory/{did}") 56 - } else if did.starts_with("did:web:") { 57 - let host = did.strip_prefix("did:web:").unwrap_or(""); 58 - format!("https://{host}/.well-known/did.json") 59 - } else { 60 - return Err(Error::Auth(format!("unsupported DID method: {did}"))); 61 - }; 62 - 63 - let did_doc: serde_json::Value = client 64 - .get(&did_doc_url) 65 - .send() 66 - .await 67 - .map_err(|e| Error::Auth(format!("failed to fetch DID document for {did}: {e}")))? 68 - .json() 69 - .await 70 - .map_err(|e| Error::Auth(format!("invalid DID document for {did}: {e}")))?; 71 - 72 - let pds_url = did_doc["service"] 73 - .as_array() 74 - .and_then(|services| { 75 - services.iter().find_map(|s| { 76 - if s["id"].as_str() == Some("#atproto_pds") { 77 - s["serviceEndpoint"].as_str().map(|u| u.to_string()) 78 - } else { 79 - None 80 - } 81 - }) 82 - }) 83 - .ok_or_else(|| Error::Auth(format!("no PDS service in DID document for {did}")))?; 84 - 85 - // Step 2: Fetch public key record from the user's PDS 86 - let record_url = format!( 87 - "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection=app.opake.publicKey&rkey=self", 88 - pds_url.trim_end_matches('/'), 89 - did 90 - ); 91 - 92 - let record_resp: serde_json::Value = client 93 - .get(&record_url) 94 - .send() 95 - .await 96 - .map_err(|e| Error::Auth(format!("failed to fetch public key record for {did}: {e}")))? 97 - .json() 98 - .await 99 - .map_err(|e| Error::Auth(format!("invalid public key record for {did}: {e}")))?; 100 - 101 - // Step 3: Extract signing key from the record 102 - let signing_key_b64 = record_resp["value"]["signingKey"]["$bytes"] 103 - .as_str() 104 - .ok_or_else(|| { 105 - Error::Auth(format!( 106 - "no signingKey in public key record for {did} — user needs to re-login to publish signing key" 107 - )) 108 - })?; 109 - 110 - use base64::engine::general_purpose::STANDARD as BASE64; 111 - use base64::Engine; 112 - 113 - let key_bytes = BASE64 114 - .decode(signing_key_b64) 115 - .or_else(|_| { 116 - // PDS may strip padding 117 - use base64::engine::general_purpose::STANDARD_NO_PAD; 118 - STANDARD_NO_PAD.decode(signing_key_b64) 119 - }) 120 - .map_err(|e| Error::Auth(format!("invalid base64 in signing key for {did}: {e}")))?; 121 - 122 - let key_array: [u8; 32] = key_bytes.try_into().map_err(|v: Vec<u8>| { 123 - Error::Auth(format!( 124 - "signing key for {did} is {} bytes, expected 32", 125 - v.len() 126 - )) 127 - })?; 128 - 129 - VerifyingKey::from_bytes(&key_array) 130 - .map_err(|e| Error::Auth(format!("invalid Ed25519 key for {did}: {e}"))) 131 - } 132 - 133 - #[cfg(test)] 134 - mod tests { 135 - use super::*; 136 - 137 - #[test] 138 - fn cache_returns_none_for_missing_key() { 139 - let cache = KeyCache::new(); 140 - assert!(!cache.entries.contains_key("did:plc:unknown")); 141 - } 142 - }
-78
crates/opake-appview/src/api/keyrings.rs
··· 1 - use std::sync::Arc; 2 - 3 - use axum::extract::{Query, State}; 4 - use axum::http::StatusCode; 5 - use axum::response::{IntoResponse, Json}; 6 - use serde::Deserialize; 7 - 8 - use crate::api::types::{ErrorResponse, KeyringItem, KeyringsResponse}; 9 - use crate::db::keyrings; 10 - use crate::state::AppState; 11 - 12 - const DEFAULT_LIMIT: u32 = 50; 13 - const MAX_LIMIT: u32 = 100; 14 - 15 - #[derive(Debug, Deserialize)] 16 - pub struct KeyringsParams { 17 - pub did: Option<String>, 18 - pub limit: Option<u32>, 19 - pub cursor: Option<String>, 20 - } 21 - 22 - pub async fn handle_keyrings( 23 - State(state): State<Arc<AppState>>, 24 - Query(params): Query<KeyringsParams>, 25 - ) -> impl IntoResponse { 26 - let did = match &params.did { 27 - Some(d) if !d.is_empty() => d.as_str(), 28 - _ => { 29 - return ( 30 - StatusCode::BAD_REQUEST, 31 - Json( 32 - serde_json::to_value(ErrorResponse { 33 - error: "missing required parameter: did".into(), 34 - }) 35 - .expect("ErrorResponse serializes"), 36 - ), 37 - ) 38 - .into_response(); 39 - } 40 - }; 41 - 42 - let limit = params.limit.unwrap_or(DEFAULT_LIMIT).clamp(1, MAX_LIMIT); 43 - let cursor_ref = params.cursor.as_deref(); 44 - 45 - let result = state 46 - .db 47 - .with_conn(|conn| keyrings::list_keyrings_for_member(conn, did, limit, cursor_ref)); 48 - 49 - match result { 50 - Ok(members) => { 51 - let next_cursor = members.last().map(keyrings::encode_cursor); 52 - let items: Vec<KeyringItem> = members.iter().map(KeyringItem::from).collect(); 53 - 54 - let response = KeyringsResponse { 55 - keyrings: items, 56 - cursor: next_cursor, 57 - }; 58 - ( 59 - StatusCode::OK, 60 - Json(serde_json::to_value(response).expect("KeyringsResponse serializes")), 61 - ) 62 - .into_response() 63 - } 64 - Err(e) => { 65 - log::error!("keyrings query failed: {e}"); 66 - ( 67 - StatusCode::INTERNAL_SERVER_ERROR, 68 - Json( 69 - serde_json::to_value(ErrorResponse { 70 - error: "internal server error".into(), 71 - }) 72 - .expect("ErrorResponse serializes"), 73 - ), 74 - ) 75 - .into_response() 76 - } 77 - } 78 - }
-62
crates/opake-appview/src/api/mod.rs
··· 1 - pub mod auth; 2 - pub mod health; 3 - pub mod inbox; 4 - pub mod key_cache; 5 - pub mod keyrings; 6 - pub mod types; 7 - 8 - use std::sync::Arc; 9 - 10 - use axum::middleware; 11 - use axum::routing::get; 12 - use axum::Router; 13 - use tower_governor::governor::GovernorConfigBuilder; 14 - use tower_governor::key_extractor::SmartIpKeyExtractor; 15 - use tower_governor::GovernorLayer; 16 - 17 - use crate::state::AppState; 18 - 19 - /// Routes + auth middleware, without rate limiting. Used by tests. 20 - #[cfg(test)] 21 - pub(crate) fn base_router(state: Arc<AppState>) -> Router { 22 - let protected = Router::new() 23 - .route("/api/inbox", get(inbox::handle_inbox)) 24 - .route("/api/keyrings", get(keyrings::handle_keyrings)) 25 - .layer(middleware::from_fn_with_state( 26 - state.clone(), 27 - auth::require_auth, 28 - )); 29 - 30 - Router::new() 31 - .route("/api/health", get(health::handle_health)) 32 - .merge(protected) 33 - .with_state(state) 34 - } 35 - 36 - /// Build the Axum router with all API routes, auth middleware, and rate limiting. 37 - pub fn router(state: Arc<AppState>) -> Router { 38 - let protected = Router::new() 39 - .route("/api/inbox", get(inbox::handle_inbox)) 40 - .route("/api/keyrings", get(keyrings::handle_keyrings)) 41 - .layer(middleware::from_fn_with_state( 42 - state.clone(), 43 - auth::require_auth, 44 - )); 45 - 46 - let governor_config = GovernorConfigBuilder::default() 47 - .per_second(10) 48 - .burst_size(30) 49 - .key_extractor(SmartIpKeyExtractor) 50 - .finish() 51 - .expect("governor config"); 52 - 53 - Router::new() 54 - .route("/api/health", get(health::handle_health)) 55 - .merge(protected) 56 - .layer(GovernorLayer::new(governor_config)) 57 - .with_state(state) 58 - } 59 - 60 - #[cfg(test)] 61 - #[path = "api_tests.rs"] 62 - mod tests;
-78
crates/opake-appview/src/api/resolve.rs
··· 1 - // Handle resolution endpoint — server-side DNS TXT + core resolution. 2 - // 3 - // The browser can't do DNS lookups, so this endpoint does it on behalf of 4 - // authenticated web clients. Resolution order: 5 - // 1. DNS TXT `_atproto.{handle}` (via opake-core dns feature) 6 - // 2. .well-known/atproto-did → DID doc (via opake-core) 7 - // 3. Bluesky public API resolveHandle → DID doc (via opake-core) 8 - 9 - use std::sync::Arc; 10 - 11 - use axum::extract::{Query, State}; 12 - use axum::http::StatusCode; 13 - use axum::response::{IntoResponse, Json}; 14 - use serde::{Deserialize, Serialize}; 15 - 16 - use crate::state::AppState; 17 - 18 - #[derive(Debug, Deserialize)] 19 - pub struct ResolveParams { 20 - pub handle: String, 21 - } 22 - 23 - #[derive(Debug, Serialize)] 24 - #[serde(rename_all = "camelCase")] 25 - pub struct ResolveResponse { 26 - pub did: String, 27 - pub pds_url: String, 28 - #[serde(skip_serializing_if = "Option::is_none")] 29 - pub handle: Option<String>, 30 - } 31 - 32 - pub async fn handle_resolve( 33 - State(_state): State<Arc<AppState>>, 34 - Query(params): Query<ResolveParams>, 35 - ) -> impl IntoResponse { 36 - use crate::api::types::ErrorResponse; 37 - 38 - if params.handle.is_empty() { 39 - return ( 40 - StatusCode::BAD_REQUEST, 41 - Json( 42 - serde_json::to_value(ErrorResponse { 43 - error: "missing required parameter: handle".into(), 44 - }) 45 - .expect("ErrorResponse serializes"), 46 - ), 47 - ) 48 - .into_response(); 49 - } 50 - 51 - // Try DNS first (fastest), then fall through to core's resolution chain 52 - // which handles .well-known and bsky public API fallback. 53 - let transport = opake_core::client::ReqwestTransport::new(); 54 - match opake_core::resolve::resolve_pds_for_login_with_dns(&transport, &params.handle).await { 55 - Ok((did, pds_url, handle)) => ( 56 - StatusCode::OK, 57 - Json( 58 - serde_json::to_value(ResolveResponse { 59 - did, 60 - pds_url, 61 - handle, 62 - }) 63 - .expect("ResolveResponse serializes"), 64 - ), 65 - ) 66 - .into_response(), 67 - Err(e) => ( 68 - StatusCode::NOT_FOUND, 69 - Json( 70 - serde_json::to_value(ErrorResponse { 71 - error: format!("could not resolve handle: {e}"), 72 - }) 73 - .expect("ErrorResponse serializes"), 74 - ), 75 - ) 76 - .into_response(), 77 - } 78 - }
-64
crates/opake-appview/src/api/types.rs
··· 1 - use serde::Serialize; 2 - 3 - use crate::db::grants::IndexedGrant; 4 - use crate::db::keyrings::IndexedKeyringMember; 5 - 6 - #[derive(Debug, Serialize)] 7 - #[serde(rename_all = "camelCase")] 8 - pub struct InboxResponse { 9 - pub grants: Vec<GrantItem>, 10 - #[serde(skip_serializing_if = "Option::is_none")] 11 - pub cursor: Option<String>, 12 - } 13 - 14 - #[derive(Debug, Serialize)] 15 - #[serde(rename_all = "camelCase")] 16 - pub struct GrantItem { 17 - pub uri: String, 18 - pub owner_did: String, 19 - pub document_uri: String, 20 - pub created_at: String, 21 - } 22 - 23 - impl From<&IndexedGrant> for GrantItem { 24 - fn from(g: &IndexedGrant) -> Self { 25 - Self { 26 - uri: g.uri.clone(), 27 - owner_did: g.owner_did.clone(), 28 - document_uri: g.document_uri.clone(), 29 - created_at: g.created_at.clone(), 30 - } 31 - } 32 - } 33 - 34 - #[derive(Debug, Serialize)] 35 - #[serde(rename_all = "camelCase")] 36 - pub struct KeyringsResponse { 37 - pub keyrings: Vec<KeyringItem>, 38 - #[serde(skip_serializing_if = "Option::is_none")] 39 - pub cursor: Option<String>, 40 - } 41 - 42 - #[derive(Debug, Serialize)] 43 - #[serde(rename_all = "camelCase")] 44 - pub struct KeyringItem { 45 - pub uri: String, 46 - pub owner_did: String, 47 - pub indexed_at: String, 48 - } 49 - 50 - impl From<&IndexedKeyringMember> for KeyringItem { 51 - fn from(m: &IndexedKeyringMember) -> Self { 52 - Self { 53 - uri: m.keyring_uri.clone(), 54 - owner_did: m.owner_did.clone(), 55 - indexed_at: m.indexed_at.clone(), 56 - } 57 - } 58 - } 59 - 60 - #[derive(Debug, Serialize)] 61 - #[serde(rename_all = "camelCase")] 62 - pub struct ErrorResponse { 63 - pub error: String, 64 - }
-23
crates/opake-appview/src/commands/index.rs
··· 1 - use clap::Args; 2 - 3 - use crate::config::Config; 4 - use crate::indexer; 5 - 6 - use super::build_state; 7 - 8 - #[derive(Args)] 9 - pub struct IndexCommand {} 10 - 11 - impl IndexCommand { 12 - pub async fn execute(self, config: &Config) -> anyhow::Result<()> { 13 - let state = build_state(config)?; 14 - 15 - log::info!( 16 - "opake-appview indexer (db: {})", 17 - config.resolved_db_path().display() 18 - ); 19 - 20 - indexer::run(state, config.jetstream_url.clone()).await; 21 - Ok(()) 22 - } 23 - }
-56
crates/opake-appview/src/commands/mod.rs
··· 1 - pub mod index; 2 - pub mod run; 3 - pub mod serve; 4 - pub mod status; 5 - 6 - use std::sync::Arc; 7 - 8 - use anyhow::Context; 9 - use clap::Subcommand; 10 - 11 - use crate::config::Config; 12 - use crate::db::Database; 13 - use crate::state::AppState; 14 - 15 - #[derive(Subcommand)] 16 - pub enum Command { 17 - /// Run both indexer and API server (default) 18 - Run(run::RunCommand), 19 - /// Run indexer only (write-only, no HTTP server) 20 - Index(index::IndexCommand), 21 - /// Run API server only (read-only, no Jetstream connection) 22 - Serve(serve::ServeCommand), 23 - /// Print cursor position, lag, and stats, then exit 24 - Status(status::StatusCommand), 25 - } 26 - 27 - pub fn build_state(config: &Config) -> anyhow::Result<Arc<AppState>> { 28 - let db = Database::open(&config.resolved_db_path()).context("failed to open database")?; 29 - Ok(Arc::new(AppState::new(db))) 30 - } 31 - 32 - pub async fn serve_http(listen: &str, app: axum::Router, config: &Config) -> anyhow::Result<()> { 33 - let listener = tokio::net::TcpListener::bind(listen) 34 - .await 35 - .with_context(|| format!("failed to bind to {listen}"))?; 36 - 37 - log::info!( 38 - "opake-appview listening on {} (db: {})", 39 - listen, 40 - config.resolved_db_path().display() 41 - ); 42 - 43 - axum::serve(listener, app) 44 - .with_graceful_shutdown(shutdown_signal()) 45 - .await 46 - .context("server error")?; 47 - 48 - log::info!("shutting down"); 49 - Ok(()) 50 - } 51 - 52 - async fn shutdown_signal() { 53 - tokio::signal::ctrl_c() 54 - .await 55 - .expect("failed to listen for ctrl-c"); 56 - }
-25
crates/opake-appview/src/commands/run.rs
··· 1 - use clap::Args; 2 - 3 - use crate::api; 4 - use crate::config::Config; 5 - use crate::indexer; 6 - 7 - use super::{build_state, serve_http}; 8 - 9 - #[derive(Args)] 10 - pub struct RunCommand {} 11 - 12 - impl RunCommand { 13 - pub async fn execute(self, config: &Config) -> anyhow::Result<()> { 14 - let state = build_state(config)?; 15 - 16 - tokio::spawn({ 17 - let state = state.clone(); 18 - let url = config.jetstream_url.clone(); 19 - async move { indexer::run(state, url).await } 20 - }); 21 - 22 - let app = api::router(state); 23 - serve_http(&config.listen, app, config).await 24 - } 25 - }
-18
crates/opake-appview/src/commands/serve.rs
··· 1 - use clap::Args; 2 - 3 - use crate::api; 4 - use crate::config::Config; 5 - 6 - use super::{build_state, serve_http}; 7 - 8 - #[derive(Args)] 9 - pub struct ServeCommand {} 10 - 11 - impl ServeCommand { 12 - pub async fn execute(self, config: &Config) -> anyhow::Result<()> { 13 - let state = build_state(config)?; 14 - 15 - let app = api::router(state); 16 - serve_http(&config.listen, app, config).await 17 - } 18 - }
-47
crates/opake-appview/src/commands/status.rs
··· 1 - use clap::Args; 2 - 3 - use crate::config::Config; 4 - use crate::db; 5 - use crate::db::cursor::MICROS_PER_SECOND; 6 - 7 - use super::build_state; 8 - 9 - #[derive(Args)] 10 - pub struct StatusCommand {} 11 - 12 - impl StatusCommand { 13 - pub fn execute(self, config: &Config) -> anyhow::Result<()> { 14 - let state = build_state(config)?; 15 - 16 - let cursor_us = state.db.with_conn(db::cursor::load_cursor).unwrap_or(None); 17 - 18 - let grant_count = state.db.with_conn(db::grants::count_grants).unwrap_or(0); 19 - 20 - let keyring_count = state 21 - .db 22 - .with_conn(db::keyrings::count_unique_keyrings) 23 - .unwrap_or(0); 24 - 25 - match cursor_us { 26 - Some(us) => { 27 - let cursor_secs = us / MICROS_PER_SECOND; 28 - let now_secs = chrono::Utc::now().timestamp(); 29 - let lag_secs = now_secs - cursor_secs; 30 - 31 - let cursor_time = chrono::DateTime::from_timestamp(cursor_secs, 0) 32 - .map(|dt| dt.to_rfc3339()) 33 - .unwrap_or_else(|| format!("{us}µs")); 34 - 35 - println!("Cursor: {cursor_time}"); 36 - println!("Lag: {lag_secs}s"); 37 - } 38 - None => { 39 - println!("Cursor: (none — indexer has not run)"); 40 - } 41 - } 42 - 43 - println!("Grants: {grant_count}"); 44 - println!("Keyrings: {keyring_count}"); 45 - Ok(()) 46 - } 47 - }
-53
crates/opake-appview/src/config.rs
··· 1 - use std::fs; 2 - use std::path::{Path, PathBuf}; 3 - 4 - use serde::Deserialize; 5 - 6 - use crate::error::{Error, Result}; 7 - 8 - #[derive(Debug, Deserialize)] 9 - pub struct Config { 10 - pub jetstream_url: String, 11 - pub listen: String, 12 - pub db_path: String, 13 - } 14 - 15 - impl Config { 16 - /// Load config from the `appview.toml` inside the resolved data directory. 17 - /// Priority: override_dir > OPAKE_DATA_DIR env > XDG_CONFIG_HOME/opake > ~/.config/opake 18 - pub fn load(override_dir: Option<PathBuf>) -> Result<Self> { 19 - let dir = opake_core::paths::resolve_data_dir(override_dir); 20 - let path = dir.join("appview.toml"); 21 - Self::load_from(&path) 22 - } 23 - 24 - /// Load config from a specific path. 25 - pub fn load_from(path: &Path) -> Result<Self> { 26 - let content = fs::read_to_string(path).map_err(|e| { 27 - Error::Config(format!("failed to read config at {}: {e}", path.display())) 28 - })?; 29 - let config: Config = toml::from_str(&content) 30 - .map_err(|e| Error::Config(format!("failed to parse config: {e}")))?; 31 - config.validate()?; 32 - Ok(config) 33 - } 34 - 35 - fn validate(&self) -> Result<()> { 36 - if !self.jetstream_url.starts_with("ws://") && !self.jetstream_url.starts_with("wss://") { 37 - return Err(Error::Config(format!( 38 - "jetstream_url must start with ws:// or wss://, got: {}", 39 - self.jetstream_url 40 - ))); 41 - } 42 - Ok(()) 43 - } 44 - 45 - /// Resolve db_path with `~` expansion. 46 - pub fn resolved_db_path(&self) -> PathBuf { 47 - opake_core::paths::expand_tilde(&self.db_path) 48 - } 49 - } 50 - 51 - #[cfg(test)] 52 - #[path = "config_tests.rs"] 53 - mod tests;
-78
crates/opake-appview/src/config_tests.rs
··· 1 - use std::io::Write; 2 - 3 - use tempfile::NamedTempFile; 4 - 5 - use super::*; 6 - 7 - fn minimal_config_toml() -> &'static str { 8 - r#" 9 - jetstream_url = "wss://jetstream2.us-east.bsky.network/subscribe" 10 - listen = "127.0.0.1:6100" 11 - db_path = "/tmp/test.db" 12 - "# 13 - } 14 - 15 - #[test] 16 - fn loads_valid_config() { 17 - let mut f = NamedTempFile::new().unwrap(); 18 - write!(f, "{}", minimal_config_toml()).unwrap(); 19 - 20 - let config = Config::load_from(f.path()).unwrap(); 21 - assert_eq!(config.listen, "127.0.0.1:6100"); 22 - assert_eq!(config.db_path, "/tmp/test.db"); 23 - } 24 - 25 - #[test] 26 - fn ignores_unknown_fields() { 27 - let mut f = NamedTempFile::new().unwrap(); 28 - write!( 29 - f, 30 - r#" 31 - jetstream_url = "wss://example.com/subscribe" 32 - listen = "127.0.0.1:6100" 33 - db_path = "/tmp/test.db" 34 - auth_token = "leftover-from-old-config" 35 - "# 36 - ) 37 - .unwrap(); 38 - 39 - // Old configs with auth_token should still parse fine (toml ignores unknown keys) 40 - let config = Config::load_from(f.path()).unwrap(); 41 - assert_eq!(config.listen, "127.0.0.1:6100"); 42 - } 43 - 44 - #[test] 45 - fn rejects_invalid_jetstream_url() { 46 - let mut f = NamedTempFile::new().unwrap(); 47 - write!( 48 - f, 49 - r#" 50 - jetstream_url = "https://example.com/subscribe" 51 - listen = "127.0.0.1:6100" 52 - db_path = "/tmp/test.db" 53 - "# 54 - ) 55 - .unwrap(); 56 - 57 - let err = Config::load_from(f.path()).unwrap_err(); 58 - assert!(err.to_string().contains("ws:// or wss://")); 59 - } 60 - 61 - #[test] 62 - fn expands_tilde_in_db_path() { 63 - let mut f = NamedTempFile::new().unwrap(); 64 - write!( 65 - f, 66 - r#" 67 - jetstream_url = "wss://example.com/subscribe" 68 - listen = "127.0.0.1:6100" 69 - db_path = "~/opake/appview.db" 70 - "# 71 - ) 72 - .unwrap(); 73 - 74 - let config = Config::load_from(f.path()).unwrap(); 75 - let resolved = config.resolved_db_path(); 76 - assert!(!resolved.to_string_lossy().contains('~')); 77 - assert!(resolved.to_string_lossy().ends_with("opake/appview.db")); 78 - }
-32
crates/opake-appview/src/db/cursor.rs
··· 1 - use rusqlite::{params, Connection}; 2 - 3 - use crate::error::Result; 4 - 5 - /// Jetstream cursors are unix microsecond timestamps. 6 - pub const MICROS_PER_SECOND: i64 = 1_000_000; 7 - 8 - /// Save the Jetstream cursor (unix microseconds timestamp). 9 - /// Uses upsert into the singleton row (id = 1). 10 - pub fn save_cursor(conn: &Connection, time_us: i64) -> Result<()> { 11 - let now = chrono::Utc::now().to_rfc3339(); 12 - conn.execute( 13 - "INSERT INTO cursor (id, time_us, updated_at) 14 - VALUES (1, ?1, ?2) 15 - ON CONFLICT(id) DO UPDATE SET 16 - time_us = excluded.time_us, 17 - updated_at = excluded.updated_at", 18 - params![time_us, now], 19 - )?; 20 - Ok(()) 21 - } 22 - 23 - /// Load the last saved Jetstream cursor, if any. 24 - pub fn load_cursor(conn: &Connection) -> Result<Option<i64>> { 25 - let mut stmt = conn.prepare("SELECT time_us FROM cursor WHERE id = 1")?; 26 - let result = stmt.query_row([], |row| row.get::<_, i64>(0)); 27 - match result { 28 - Ok(time_us) => Ok(Some(time_us)), 29 - Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), 30 - Err(e) => Err(e.into()), 31 - } 32 - }
-233
crates/opake-appview/src/db/db_tests.rs
··· 1 - use super::*; 2 - use grants::IndexedGrant; 3 - 4 - fn test_db() -> Database { 5 - Database::open_in_memory().unwrap() 6 - } 7 - 8 - fn make_grant(uri: &str, recipient: &str, owner: &str, doc_uri: &str) -> IndexedGrant { 9 - IndexedGrant { 10 - uri: uri.into(), 11 - owner_did: owner.into(), 12 - recipient_did: recipient.into(), 13 - document_uri: doc_uri.into(), 14 - created_at: "2026-03-01T12:00:00Z".into(), 15 - indexed_at: "2026-03-01T12:00:01Z".into(), 16 - } 17 - } 18 - 19 - #[test] 20 - fn grant_upsert_and_query() { 21 - let db = test_db(); 22 - let grant = make_grant( 23 - "at://did:plc:owner/app.opake.grant/3abc", 24 - "did:plc:recipient", 25 - "did:plc:owner", 26 - "at://did:plc:owner/app.opake.document/3xyz", 27 - ); 28 - 29 - db.with_conn(|c| grants::upsert_grant(c, &grant)).unwrap(); 30 - 31 - let inbox = db 32 - .with_conn(|c| grants::list_inbox(c, "did:plc:recipient", 50, None)) 33 - .unwrap(); 34 - assert_eq!(inbox.len(), 1); 35 - assert_eq!(inbox[0].uri, grant.uri); 36 - assert_eq!(inbox[0].document_uri, grant.document_uri); 37 - } 38 - 39 - #[test] 40 - fn grant_upsert_overwrites() { 41 - let db = test_db(); 42 - let mut grant = make_grant( 43 - "at://did:plc:owner/app.opake.grant/3abc", 44 - "did:plc:recipient", 45 - "did:plc:owner", 46 - "at://did:plc:owner/app.opake.document/3xyz", 47 - ); 48 - db.with_conn(|c| grants::upsert_grant(c, &grant)).unwrap(); 49 - 50 - grant.document_uri = "at://did:plc:owner/app.opake.document/updated".into(); 51 - db.with_conn(|c| grants::upsert_grant(c, &grant)).unwrap(); 52 - 53 - let inbox = db 54 - .with_conn(|c| grants::list_inbox(c, "did:plc:recipient", 50, None)) 55 - .unwrap(); 56 - assert_eq!(inbox.len(), 1); 57 - assert_eq!( 58 - inbox[0].document_uri, 59 - "at://did:plc:owner/app.opake.document/updated" 60 - ); 61 - } 62 - 63 - #[test] 64 - fn grant_delete() { 65 - let db = test_db(); 66 - let grant = make_grant( 67 - "at://did:plc:owner/app.opake.grant/3abc", 68 - "did:plc:recipient", 69 - "did:plc:owner", 70 - "at://did:plc:owner/app.opake.document/3xyz", 71 - ); 72 - db.with_conn(|c| grants::upsert_grant(c, &grant)).unwrap(); 73 - db.with_conn(|c| grants::delete_grant(c, &grant.uri)) 74 - .unwrap(); 75 - 76 - let inbox = db 77 - .with_conn(|c| grants::list_inbox(c, "did:plc:recipient", 50, None)) 78 - .unwrap(); 79 - assert!(inbox.is_empty()); 80 - } 81 - 82 - #[test] 83 - fn grant_pagination() { 84 - let db = test_db(); 85 - 86 - for i in 0..5 { 87 - let grant = IndexedGrant { 88 - uri: format!("at://did:plc:owner/app.opake.grant/{i}"), 89 - owner_did: "did:plc:owner".into(), 90 - recipient_did: "did:plc:me".into(), 91 - document_uri: format!("at://did:plc:owner/app.opake.document/{i}"), 92 - created_at: "2026-03-01T12:00:00Z".into(), 93 - indexed_at: format!("2026-03-01T12:00:0{i}Z"), 94 - }; 95 - db.with_conn(|c| grants::upsert_grant(c, &grant)).unwrap(); 96 - } 97 - 98 - // First page: 2 items 99 - let page1 = db 100 - .with_conn(|c| grants::list_inbox(c, "did:plc:me", 2, None)) 101 - .unwrap(); 102 - assert_eq!(page1.len(), 2); 103 - // Newest first 104 - assert!(page1[0].indexed_at > page1[1].indexed_at); 105 - 106 - // Second page using cursor from last item of page 1 107 - let cursor = grants::encode_cursor(&page1[1]); 108 - let page2 = db 109 - .with_conn(|c| grants::list_inbox(c, "did:plc:me", 2, Some(&cursor))) 110 - .unwrap(); 111 - assert_eq!(page2.len(), 2); 112 - assert!(page2[0].indexed_at < page1[1].indexed_at); 113 - } 114 - 115 - #[test] 116 - fn keyring_upsert_and_query() { 117 - let db = test_db(); 118 - let members = vec!["did:plc:alice".to_string(), "did:plc:bob".to_string()]; 119 - 120 - db.with_conn(|c| { 121 - keyrings::upsert_keyring_members( 122 - c, 123 - "at://did:plc:owner/app.opake.keyring/3def", 124 - "did:plc:owner", 125 - &members, 126 - "2026-03-01T12:00:00Z", 127 - ) 128 - }) 129 - .unwrap(); 130 - 131 - let alice_keyrings = db 132 - .with_conn(|c| keyrings::list_keyrings_for_member(c, "did:plc:alice", 50, None)) 133 - .unwrap(); 134 - assert_eq!(alice_keyrings.len(), 1); 135 - 136 - let bob_keyrings = db 137 - .with_conn(|c| keyrings::list_keyrings_for_member(c, "did:plc:bob", 50, None)) 138 - .unwrap(); 139 - assert_eq!(bob_keyrings.len(), 1); 140 - 141 - // Charlie is not a member 142 - let charlie_keyrings = db 143 - .with_conn(|c| keyrings::list_keyrings_for_member(c, "did:plc:charlie", 50, None)) 144 - .unwrap(); 145 - assert!(charlie_keyrings.is_empty()); 146 - } 147 - 148 - #[test] 149 - fn keyring_update_replaces_members() { 150 - let db = test_db(); 151 - let uri = "at://did:plc:owner/app.opake.keyring/3def"; 152 - 153 - // Initially: alice + bob 154 - db.with_conn(|c| { 155 - keyrings::upsert_keyring_members( 156 - c, 157 - uri, 158 - "did:plc:owner", 159 - &["did:plc:alice".into(), "did:plc:bob".into()], 160 - "2026-03-01T12:00:00Z", 161 - ) 162 - }) 163 - .unwrap(); 164 - 165 - // Update: bob removed, charlie added 166 - db.with_conn(|c| { 167 - keyrings::upsert_keyring_members( 168 - c, 169 - uri, 170 - "did:plc:owner", 171 - &["did:plc:alice".into(), "did:plc:charlie".into()], 172 - "2026-03-01T13:00:00Z", 173 - ) 174 - }) 175 - .unwrap(); 176 - 177 - // Bob should no longer see it 178 - let bob = db 179 - .with_conn(|c| keyrings::list_keyrings_for_member(c, "did:plc:bob", 50, None)) 180 - .unwrap(); 181 - assert!(bob.is_empty()); 182 - 183 - // Charlie should see it 184 - let charlie = db 185 - .with_conn(|c| keyrings::list_keyrings_for_member(c, "did:plc:charlie", 50, None)) 186 - .unwrap(); 187 - assert_eq!(charlie.len(), 1); 188 - } 189 - 190 - #[test] 191 - fn keyring_delete() { 192 - let db = test_db(); 193 - let uri = "at://did:plc:owner/app.opake.keyring/3def"; 194 - 195 - db.with_conn(|c| { 196 - keyrings::upsert_keyring_members( 197 - c, 198 - uri, 199 - "did:plc:owner", 200 - &["did:plc:alice".into()], 201 - "2026-03-01T12:00:00Z", 202 - ) 203 - }) 204 - .unwrap(); 205 - 206 - db.with_conn(|c| keyrings::delete_keyring(c, uri)).unwrap(); 207 - 208 - let alice = db 209 - .with_conn(|c| keyrings::list_keyrings_for_member(c, "did:plc:alice", 50, None)) 210 - .unwrap(); 211 - assert!(alice.is_empty()); 212 - } 213 - 214 - #[test] 215 - fn cursor_roundtrip() { 216 - let db = test_db(); 217 - 218 - // No cursor initially 219 - let initial = db.with_conn(cursor::load_cursor).unwrap(); 220 - assert!(initial.is_none()); 221 - 222 - // Save and load 223 - db.with_conn(|c| cursor::save_cursor(c, 1709330400000000)) 224 - .unwrap(); 225 - let loaded = db.with_conn(cursor::load_cursor).unwrap(); 226 - assert_eq!(loaded, Some(1709330400000000)); 227 - 228 - // Update 229 - db.with_conn(|c| cursor::save_cursor(c, 1709330500000000)) 230 - .unwrap(); 231 - let updated = db.with_conn(cursor::load_cursor).unwrap(); 232 - assert_eq!(updated, Some(1709330500000000)); 233 - }
-158
crates/opake-appview/src/db/grants.rs
··· 1 - use rusqlite::{params, Connection}; 2 - 3 - use crate::error::Result; 4 - 5 - /// A grant row as stored in the index. 6 - #[derive(Debug, Clone)] 7 - pub struct IndexedGrant { 8 - pub uri: String, 9 - pub owner_did: String, 10 - pub recipient_did: String, 11 - pub document_uri: String, 12 - pub created_at: String, 13 - pub indexed_at: String, 14 - } 15 - 16 - pub fn upsert_grant(conn: &Connection, grant: &IndexedGrant) -> Result<()> { 17 - conn.execute( 18 - "INSERT INTO grants (uri, owner_did, recipient_did, document_uri, created_at, indexed_at) 19 - VALUES (?1, ?2, ?3, ?4, ?5, ?6) 20 - ON CONFLICT(uri) DO UPDATE SET 21 - owner_did = excluded.owner_did, 22 - recipient_did = excluded.recipient_did, 23 - document_uri = excluded.document_uri, 24 - created_at = excluded.created_at, 25 - indexed_at = excluded.indexed_at", 26 - params![ 27 - grant.uri, 28 - grant.owner_did, 29 - grant.recipient_did, 30 - grant.document_uri, 31 - grant.created_at, 32 - grant.indexed_at, 33 - ], 34 - )?; 35 - Ok(()) 36 - } 37 - 38 - pub fn delete_grant(conn: &Connection, uri: &str) -> Result<()> { 39 - conn.execute("DELETE FROM grants WHERE uri = ?1", params![uri])?; 40 - Ok(()) 41 - } 42 - 43 - /// Paginated inbox query: grants for a recipient DID, newest first. 44 - /// Cursor is a composite `indexed_at::uri` string. 45 - pub fn list_inbox( 46 - conn: &Connection, 47 - recipient_did: &str, 48 - limit: u32, 49 - cursor: Option<&str>, 50 - ) -> Result<Vec<IndexedGrant>> { 51 - let mut grants = Vec::new(); 52 - 53 - if let Some(cursor) = cursor { 54 - let (cursor_time, cursor_uri) = parse_cursor(cursor); 55 - let mut stmt = conn.prepare( 56 - "SELECT uri, owner_did, recipient_did, document_uri, created_at, indexed_at 57 - FROM grants 58 - WHERE recipient_did = ?1 59 - AND (indexed_at < ?2 OR (indexed_at = ?2 AND uri < ?3)) 60 - ORDER BY indexed_at DESC, uri DESC 61 - LIMIT ?4", 62 - )?; 63 - let rows = stmt.query_map( 64 - params![recipient_did, cursor_time, cursor_uri, limit], 65 - row_to_grant, 66 - )?; 67 - for row in rows { 68 - grants.push(row?); 69 - } 70 - } else { 71 - let mut stmt = conn.prepare( 72 - "SELECT uri, owner_did, recipient_did, document_uri, created_at, indexed_at 73 - FROM grants 74 - WHERE recipient_did = ?1 75 - ORDER BY indexed_at DESC, uri DESC 76 - LIMIT ?2", 77 - )?; 78 - let rows = stmt.query_map(params![recipient_did, limit], row_to_grant)?; 79 - for row in rows { 80 - grants.push(row?); 81 - } 82 - } 83 - 84 - Ok(grants) 85 - } 86 - 87 - /// List grants created by an owner DID, newest first. 88 - #[allow(dead_code)] 89 - pub fn list_grants_by_owner( 90 - conn: &Connection, 91 - owner_did: &str, 92 - limit: u32, 93 - cursor: Option<&str>, 94 - ) -> Result<Vec<IndexedGrant>> { 95 - let mut grants = Vec::new(); 96 - 97 - if let Some(cursor) = cursor { 98 - let (cursor_time, cursor_uri) = parse_cursor(cursor); 99 - let mut stmt = conn.prepare( 100 - "SELECT uri, owner_did, recipient_did, document_uri, created_at, indexed_at 101 - FROM grants 102 - WHERE owner_did = ?1 103 - AND (indexed_at < ?2 OR (indexed_at = ?2 AND uri < ?3)) 104 - ORDER BY indexed_at DESC, uri DESC 105 - LIMIT ?4", 106 - )?; 107 - let rows = stmt.query_map( 108 - params![owner_did, cursor_time, cursor_uri, limit], 109 - row_to_grant, 110 - )?; 111 - for row in rows { 112 - grants.push(row?); 113 - } 114 - } else { 115 - let mut stmt = conn.prepare( 116 - "SELECT uri, owner_did, recipient_did, document_uri, created_at, indexed_at 117 - FROM grants 118 - WHERE owner_did = ?1 119 - ORDER BY indexed_at DESC, uri DESC 120 - LIMIT ?2", 121 - )?; 122 - let rows = stmt.query_map(params![owner_did, limit], row_to_grant)?; 123 - for row in rows { 124 - grants.push(row?); 125 - } 126 - } 127 - 128 - Ok(grants) 129 - } 130 - 131 - fn row_to_grant(row: &rusqlite::Row) -> rusqlite::Result<IndexedGrant> { 132 - Ok(IndexedGrant { 133 - uri: row.get(0)?, 134 - owner_did: row.get(1)?, 135 - recipient_did: row.get(2)?, 136 - document_uri: row.get(3)?, 137 - created_at: row.get(4)?, 138 - indexed_at: row.get(5)?, 139 - }) 140 - } 141 - 142 - /// Build a cursor string from an indexed grant. 143 - pub fn encode_cursor(grant: &IndexedGrant) -> String { 144 - format!("{}::{}", grant.indexed_at, grant.uri) 145 - } 146 - 147 - /// Count total grants in the index. 148 - pub fn count_grants(conn: &Connection) -> Result<i64> { 149 - let count = conn.query_row("SELECT COUNT(*) FROM grants", [], |row| row.get(0))?; 150 - Ok(count) 151 - } 152 - 153 - fn parse_cursor(cursor: &str) -> (&str, &str) { 154 - match cursor.split_once("::") { 155 - Some((time, uri)) => (time, uri), 156 - None => (cursor, ""), 157 - } 158 - }
-124
crates/opake-appview/src/db/keyrings.rs
··· 1 - use rusqlite::{params, Connection}; 2 - 3 - use crate::error::Result; 4 - 5 - /// A keyring membership row as stored in the index. 6 - #[derive(Debug, Clone)] 7 - #[allow(dead_code)] 8 - pub struct IndexedKeyringMember { 9 - pub keyring_uri: String, 10 - pub member_did: String, 11 - pub owner_did: String, 12 - pub indexed_at: String, 13 - } 14 - 15 - /// Replace all members for a keyring (delete-and-reinsert). 16 - /// This handles both create and update events correctly — on update, 17 - /// the member list may have changed, so we wipe and rewrite. 18 - pub fn upsert_keyring_members( 19 - conn: &Connection, 20 - keyring_uri: &str, 21 - owner_did: &str, 22 - member_dids: &[String], 23 - indexed_at: &str, 24 - ) -> Result<()> { 25 - conn.execute( 26 - "DELETE FROM keyring_members WHERE keyring_uri = ?1", 27 - params![keyring_uri], 28 - )?; 29 - 30 - let mut stmt = conn.prepare( 31 - "INSERT INTO keyring_members (keyring_uri, member_did, owner_did, indexed_at) 32 - VALUES (?1, ?2, ?3, ?4)", 33 - )?; 34 - 35 - for did in member_dids { 36 - stmt.execute(params![keyring_uri, did, owner_did, indexed_at])?; 37 - } 38 - 39 - Ok(()) 40 - } 41 - 42 - /// Delete all member rows for a keyring (when the record is deleted). 43 - pub fn delete_keyring(conn: &Connection, keyring_uri: &str) -> Result<()> { 44 - conn.execute( 45 - "DELETE FROM keyring_members WHERE keyring_uri = ?1", 46 - params![keyring_uri], 47 - )?; 48 - Ok(()) 49 - } 50 - 51 - /// Paginated query: keyrings where a DID is a member, newest first. 52 - /// Returns one row per unique keyring (not per member). 53 - pub fn list_keyrings_for_member( 54 - conn: &Connection, 55 - member_did: &str, 56 - limit: u32, 57 - cursor: Option<&str>, 58 - ) -> Result<Vec<IndexedKeyringMember>> { 59 - let mut keyrings = Vec::new(); 60 - 61 - if let Some(cursor) = cursor { 62 - let (cursor_time, cursor_uri) = parse_cursor(cursor); 63 - let mut stmt = conn.prepare( 64 - "SELECT keyring_uri, member_did, owner_did, indexed_at 65 - FROM keyring_members 66 - WHERE member_did = ?1 67 - AND (indexed_at < ?2 OR (indexed_at = ?2 AND keyring_uri < ?3)) 68 - ORDER BY indexed_at DESC, keyring_uri DESC 69 - LIMIT ?4", 70 - )?; 71 - let rows = stmt.query_map( 72 - params![member_did, cursor_time, cursor_uri, limit], 73 - row_to_member, 74 - )?; 75 - for row in rows { 76 - keyrings.push(row?); 77 - } 78 - } else { 79 - let mut stmt = conn.prepare( 80 - "SELECT keyring_uri, member_did, owner_did, indexed_at 81 - FROM keyring_members 82 - WHERE member_did = ?1 83 - ORDER BY indexed_at DESC, keyring_uri DESC 84 - LIMIT ?2", 85 - )?; 86 - let rows = stmt.query_map(params![member_did, limit], row_to_member)?; 87 - for row in rows { 88 - keyrings.push(row?); 89 - } 90 - } 91 - 92 - Ok(keyrings) 93 - } 94 - 95 - fn row_to_member(row: &rusqlite::Row) -> rusqlite::Result<IndexedKeyringMember> { 96 - Ok(IndexedKeyringMember { 97 - keyring_uri: row.get(0)?, 98 - member_did: row.get(1)?, 99 - owner_did: row.get(2)?, 100 - indexed_at: row.get(3)?, 101 - }) 102 - } 103 - 104 - /// Build a cursor string from an indexed keyring member. 105 - pub fn encode_cursor(member: &IndexedKeyringMember) -> String { 106 - format!("{}::{}", member.indexed_at, member.keyring_uri) 107 - } 108 - 109 - /// Count unique keyrings in the index. 110 - pub fn count_unique_keyrings(conn: &Connection) -> Result<i64> { 111 - let count = conn.query_row( 112 - "SELECT COUNT(DISTINCT keyring_uri) FROM keyring_members", 113 - [], 114 - |row| row.get(0), 115 - )?; 116 - Ok(count) 117 - } 118 - 119 - fn parse_cursor(cursor: &str) -> (&str, &str) { 120 - match cursor.split_once("::") { 121 - Some((time, uri)) => (time, uri), 122 - None => (cursor, ""), 123 - } 124 - }
-57
crates/opake-appview/src/db/mod.rs
··· 1 - pub mod cursor; 2 - pub mod grants; 3 - pub mod keyrings; 4 - mod schema; 5 - 6 - use std::path::Path; 7 - use std::sync::Mutex; 8 - 9 - use rusqlite::Connection; 10 - 11 - use crate::error::Result; 12 - 13 - /// Thread-safe database handle. Axum handlers and the indexer share this. 14 - pub struct Database { 15 - conn: Mutex<Connection>, 16 - } 17 - 18 - impl Database { 19 - /// Open (or create) the SQLite database at the given path and run migrations. 20 - pub fn open(path: &Path) -> Result<Self> { 21 - if let Some(parent) = path.parent() { 22 - if !parent.exists() { 23 - std::fs::create_dir_all(parent)?; 24 - } 25 - } 26 - let conn = Connection::open(path)?; 27 - conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?; 28 - conn.execute_batch(schema::SCHEMA)?; 29 - Ok(Self { 30 - conn: Mutex::new(conn), 31 - }) 32 - } 33 - 34 - /// Create an in-memory database for testing. 35 - #[cfg(test)] 36 - pub fn open_in_memory() -> Result<Self> { 37 - let conn = Connection::open_in_memory()?; 38 - conn.execute_batch(schema::SCHEMA)?; 39 - Ok(Self { 40 - conn: Mutex::new(conn), 41 - }) 42 - } 43 - 44 - /// Run a closure with a reference to the connection. 45 - /// Panics if the mutex is poisoned (unrecoverable). 46 - pub fn with_conn<F, T>(&self, f: F) -> T 47 - where 48 - F: FnOnce(&Connection) -> T, 49 - { 50 - let conn = self.conn.lock().expect("database mutex poisoned"); 51 - f(&conn) 52 - } 53 - } 54 - 55 - #[cfg(test)] 56 - #[path = "db_tests.rs"] 57 - mod tests;
-28
crates/opake-appview/src/db/schema.rs
··· 1 - /// SQL statements to initialize the database schema. 2 - pub const SCHEMA: &str = " 3 - CREATE TABLE IF NOT EXISTS cursor ( 4 - id INTEGER PRIMARY KEY CHECK (id = 1), 5 - time_us INTEGER NOT NULL, 6 - updated_at TEXT NOT NULL 7 - ); 8 - 9 - CREATE TABLE IF NOT EXISTS grants ( 10 - uri TEXT PRIMARY KEY, 11 - owner_did TEXT NOT NULL, 12 - recipient_did TEXT NOT NULL, 13 - document_uri TEXT NOT NULL, 14 - created_at TEXT NOT NULL, 15 - indexed_at TEXT NOT NULL 16 - ); 17 - CREATE INDEX IF NOT EXISTS idx_grants_recipient ON grants (recipient_did); 18 - CREATE INDEX IF NOT EXISTS idx_grants_owner ON grants (owner_did); 19 - 20 - CREATE TABLE IF NOT EXISTS keyring_members ( 21 - keyring_uri TEXT NOT NULL, 22 - member_did TEXT NOT NULL, 23 - owner_did TEXT NOT NULL, 24 - indexed_at TEXT NOT NULL, 25 - PRIMARY KEY (keyring_uri, member_did) 26 - ); 27 - CREATE INDEX IF NOT EXISTS idx_keyring_members_did ON keyring_members (member_did); 28 - ";
-24
crates/opake-appview/src/error.rs
··· 1 - use thiserror::Error; 2 - 3 - #[derive(Debug, Error)] 4 - pub enum Error { 5 - #[error("database error: {0}")] 6 - Database(#[from] rusqlite::Error), 7 - 8 - #[error("config error: {0}")] 9 - Config(String), 10 - 11 - #[error("auth error: {0}")] 12 - Auth(String), 13 - 14 - #[error("firehose error: {0}")] 15 - Firehose(String), 16 - 17 - #[error("IO error: {0}")] 18 - Io(#[from] std::io::Error), 19 - 20 - #[error("JSON error: {0}")] 21 - Json(#[from] serde_json::Error), 22 - } 23 - 24 - pub type Result<T> = std::result::Result<T, Error>;
-100
crates/opake-appview/src/firehose/events.rs
··· 1 - use serde::Deserialize; 2 - 3 - /// Top-level Jetstream WebSocket message. 4 - #[derive(Debug, Deserialize)] 5 - pub struct JetstreamEvent { 6 - pub did: String, 7 - pub time_us: i64, 8 - pub kind: String, 9 - pub commit: Option<CommitEvent>, 10 - } 11 - 12 - /// A commit event within a Jetstream message. 13 - #[derive(Debug, Deserialize)] 14 - pub struct CommitEvent { 15 - pub operation: String, 16 - pub collection: String, 17 - pub rkey: String, 18 - /// Present for create/update, absent for delete. 19 - pub record: Option<serde_json::Value>, 20 - } 21 - 22 - /// Collections we index. 23 - pub const COLLECTION_GRANT: &str = "app.opake.grant"; 24 - pub const COLLECTION_KEYRING: &str = "app.opake.keyring"; 25 - 26 - /// Parsed event ready for indexing. 27 - #[derive(Debug)] 28 - pub enum IndexableEvent { 29 - UpsertGrant { 30 - uri: String, 31 - owner_did: String, 32 - recipient_did: String, 33 - document_uri: String, 34 - created_at: String, 35 - }, 36 - DeleteGrant { 37 - uri: String, 38 - }, 39 - UpsertKeyring { 40 - uri: String, 41 - owner_did: String, 42 - member_dids: Vec<String>, 43 - }, 44 - DeleteKeyring { 45 - uri: String, 46 - }, 47 - } 48 - 49 - /// Try to parse a Jetstream JSON message into an indexable event. 50 - /// Returns None for events we don't care about (wrong collection, identity events, etc). 51 - pub fn parse_event(raw: &str) -> Option<(IndexableEvent, i64)> { 52 - let event: JetstreamEvent = serde_json::from_str(raw).ok()?; 53 - 54 - if event.kind != "commit" { 55 - return None; 56 - } 57 - 58 - let commit = event.commit.as_ref()?; 59 - let uri = format!("at://{}/{}/{}", event.did, commit.collection, commit.rkey); 60 - 61 - match (commit.collection.as_str(), commit.operation.as_str()) { 62 - (COLLECTION_GRANT, "create" | "update") => { 63 - let record = commit.record.as_ref()?; 64 - let grant: opake_core::records::Grant = serde_json::from_value(record.clone()).ok()?; 65 - Some(( 66 - IndexableEvent::UpsertGrant { 67 - uri, 68 - owner_did: event.did, 69 - recipient_did: grant.recipient, 70 - document_uri: grant.document, 71 - created_at: grant.created_at, 72 - }, 73 - event.time_us, 74 - )) 75 - } 76 - (COLLECTION_GRANT, "delete") => Some((IndexableEvent::DeleteGrant { uri }, event.time_us)), 77 - (COLLECTION_KEYRING, "create" | "update") => { 78 - let record = commit.record.as_ref()?; 79 - let keyring: opake_core::records::Keyring = 80 - serde_json::from_value(record.clone()).ok()?; 81 - let member_dids: Vec<String> = keyring.members.iter().map(|m| m.did.clone()).collect(); 82 - Some(( 83 - IndexableEvent::UpsertKeyring { 84 - uri, 85 - owner_did: event.did, 86 - member_dids, 87 - }, 88 - event.time_us, 89 - )) 90 - } 91 - (COLLECTION_KEYRING, "delete") => { 92 - Some((IndexableEvent::DeleteKeyring { uri }, event.time_us)) 93 - } 94 - _ => None, 95 - } 96 - } 97 - 98 - #[cfg(test)] 99 - #[path = "events_tests.rs"] 100 - mod tests;
-208
crates/opake-appview/src/firehose/events_tests.rs
··· 1 - use super::*; 2 - 3 - fn grant_event_json(operation: &str) -> String { 4 - format!( 5 - r#"{{ 6 - "did": "did:plc:owner123", 7 - "time_us": 1709330400000000, 8 - "kind": "commit", 9 - "commit": {{ 10 - "rev": "3l3qo2vutsw2b", 11 - "operation": "{operation}", 12 - "collection": "app.opake.grant", 13 - "rkey": "3abc", 14 - "record": {{ 15 - "opakeVersion": 1, 16 - "document": "at://did:plc:owner123/app.opake.document/3xyz", 17 - "recipient": "did:plc:recipient456", 18 - "wrappedKey": {{ 19 - "did": "did:plc:recipient456", 20 - "ciphertext": {{ "$bytes": "AAAA" }}, 21 - "algo": "x25519-hkdf-a256kw" 22 - }}, 23 - "encryptedMetadata": {{ 24 - "ciphertext": {{ "$bytes": "AAAA" }}, 25 - "nonce": {{ "$bytes": "AAAAAAAAAAAAAAAA" }} 26 - }}, 27 - "createdAt": "2026-03-01T12:00:00Z" 28 - }}, 29 - "cid": "bafyabc" 30 - }} 31 - }}"# 32 - ) 33 - } 34 - 35 - fn keyring_event_json(operation: &str) -> String { 36 - format!( 37 - r#"{{ 38 - "did": "did:plc:owner123", 39 - "time_us": 1709330500000000, 40 - "kind": "commit", 41 - "commit": {{ 42 - "rev": "3l3qo2vutsw2b", 43 - "operation": "{operation}", 44 - "collection": "app.opake.keyring", 45 - "rkey": "3def", 46 - "record": {{ 47 - "opakeVersion": 1, 48 - "algo": "aes-256-gcm", 49 - "members": [ 50 - {{ 51 - "did": "did:plc:alice", 52 - "ciphertext": {{ "$bytes": "AAAA" }}, 53 - "algo": "x25519-hkdf-a256kw" 54 - }}, 55 - {{ 56 - "did": "did:plc:bob", 57 - "ciphertext": {{ "$bytes": "BBBB" }}, 58 - "algo": "x25519-hkdf-a256kw" 59 - }} 60 - ], 61 - "rotation": 0, 62 - "encryptedMetadata": {{ 63 - "ciphertext": {{ "$bytes": "AAAA" }}, 64 - "nonce": {{ "$bytes": "AAAAAAAAAAAAAAAA" }} 65 - }}, 66 - "createdAt": "2026-03-01T12:00:00Z" 67 - }}, 68 - "cid": "bafydef" 69 - }} 70 - }}"# 71 - ) 72 - } 73 - 74 - fn delete_event_json(collection: &str, rkey: &str) -> String { 75 - format!( 76 - r#"{{ 77 - "did": "did:plc:owner123", 78 - "time_us": 1709330600000000, 79 - "kind": "commit", 80 - "commit": {{ 81 - "rev": "3l3qo2vutsw2b", 82 - "operation": "delete", 83 - "collection": "{collection}", 84 - "rkey": "{rkey}" 85 - }} 86 - }}"# 87 - ) 88 - } 89 - 90 - #[test] 91 - fn parses_grant_create() { 92 - let json = grant_event_json("create"); 93 - let (event, time_us) = parse_event(&json).unwrap(); 94 - assert_eq!(time_us, 1709330400000000); 95 - 96 - match event { 97 - IndexableEvent::UpsertGrant { 98 - uri, 99 - owner_did, 100 - recipient_did, 101 - document_uri, 102 - .. 103 - } => { 104 - assert_eq!(uri, "at://did:plc:owner123/app.opake.grant/3abc"); 105 - assert_eq!(owner_did, "did:plc:owner123"); 106 - assert_eq!(recipient_did, "did:plc:recipient456"); 107 - assert_eq!( 108 - document_uri, 109 - "at://did:plc:owner123/app.opake.document/3xyz" 110 - ); 111 - } 112 - other => panic!("expected UpsertGrant, got {other:?}"), 113 - } 114 - } 115 - 116 - #[test] 117 - fn parses_grant_update() { 118 - let json = grant_event_json("update"); 119 - let (event, _) = parse_event(&json).unwrap(); 120 - assert!(matches!(event, IndexableEvent::UpsertGrant { .. })); 121 - } 122 - 123 - #[test] 124 - fn parses_grant_delete() { 125 - let json = delete_event_json("app.opake.grant", "3abc"); 126 - let (event, _) = parse_event(&json).unwrap(); 127 - match event { 128 - IndexableEvent::DeleteGrant { uri } => { 129 - assert_eq!(uri, "at://did:plc:owner123/app.opake.grant/3abc"); 130 - } 131 - other => panic!("expected DeleteGrant, got {other:?}"), 132 - } 133 - } 134 - 135 - #[test] 136 - fn parses_keyring_create() { 137 - let json = keyring_event_json("create"); 138 - let (event, time_us) = parse_event(&json).unwrap(); 139 - assert_eq!(time_us, 1709330500000000); 140 - 141 - match event { 142 - IndexableEvent::UpsertKeyring { 143 - uri, 144 - owner_did, 145 - member_dids, 146 - } => { 147 - assert_eq!(uri, "at://did:plc:owner123/app.opake.keyring/3def"); 148 - assert_eq!(owner_did, "did:plc:owner123"); 149 - assert_eq!(member_dids, vec!["did:plc:alice", "did:plc:bob"]); 150 - } 151 - other => panic!("expected UpsertKeyring, got {other:?}"), 152 - } 153 - } 154 - 155 - #[test] 156 - fn parses_keyring_delete() { 157 - let json = delete_event_json("app.opake.keyring", "3def"); 158 - let (event, _) = parse_event(&json).unwrap(); 159 - assert!(matches!(event, IndexableEvent::DeleteKeyring { .. })); 160 - } 161 - 162 - #[test] 163 - fn ignores_identity_events() { 164 - let json = r#"{"did":"did:plc:abc","time_us":123,"kind":"identity"}"#; 165 - assert!(parse_event(json).is_none()); 166 - } 167 - 168 - #[test] 169 - fn ignores_unknown_collections() { 170 - let json = r#"{ 171 - "did": "did:plc:abc", 172 - "time_us": 123, 173 - "kind": "commit", 174 - "commit": { 175 - "rev": "abc", 176 - "operation": "create", 177 - "collection": "app.bsky.feed.post", 178 - "rkey": "3abc", 179 - "record": {"text": "hello"}, 180 - "cid": "bafyabc" 181 - } 182 - }"#; 183 - assert!(parse_event(json).is_none()); 184 - } 185 - 186 - #[test] 187 - fn ignores_malformed_json() { 188 - assert!(parse_event("not json at all").is_none()); 189 - } 190 - 191 - #[test] 192 - fn ignores_grant_with_invalid_record() { 193 - // record is present but doesn't match Grant schema 194 - let json = r#"{ 195 - "did": "did:plc:owner", 196 - "time_us": 123, 197 - "kind": "commit", 198 - "commit": { 199 - "rev": "abc", 200 - "operation": "create", 201 - "collection": "app.opake.grant", 202 - "rkey": "3abc", 203 - "record": {"garbage": true}, 204 - "cid": "bafyabc" 205 - } 206 - }"#; 207 - assert!(parse_event(json).is_none()); 208 - }
-2
crates/opake-appview/src/firehose/mod.rs
··· 1 - pub mod events; 2 - pub mod subscribe;
-46
crates/opake-appview/src/firehose/subscribe.rs
··· 1 - use futures_util::stream::SplitStream; 2 - use futures_util::StreamExt; 3 - use tokio::net::TcpStream; 4 - use tokio_tungstenite::tungstenite::Message; 5 - use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; 6 - 7 - use crate::error::{Error, Result}; 8 - 9 - type WsStream = SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>; 10 - 11 - /// Build the Jetstream subscription URL with collection filters and optional cursor. 12 - pub fn subscription_url(base_url: &str, cursor: Option<i64>) -> String { 13 - let mut url = 14 - format!("{base_url}?wantedCollections=app.opake.grant&wantedCollections=app.opake.keyring"); 15 - if let Some(cursor_us) = cursor { 16 - url.push_str(&format!("&cursor={cursor_us}")); 17 - } 18 - url 19 - } 20 - 21 - /// Connect to the Jetstream WebSocket. Returns the read half of the stream. 22 - pub async fn connect(url: &str) -> Result<WsStream> { 23 - log::info!("connecting to jetstream: {url}"); 24 - let (ws, _response) = connect_async(url) 25 - .await 26 - .map_err(|e| Error::Firehose(format!("WebSocket connection failed: {e}")))?; 27 - let (_, read) = ws.split(); 28 - Ok(read) 29 - } 30 - 31 - /// Read the next text message from the WebSocket stream. 32 - /// Returns None if the stream is closed. 33 - pub async fn next_message(stream: &mut WsStream) -> Result<Option<String>> { 34 - loop { 35 - match stream.next().await { 36 - Some(Ok(Message::Text(text))) => return Ok(Some(text.to_string())), 37 - Some(Ok(Message::Ping(_) | Message::Pong(_))) => continue, 38 - Some(Ok(Message::Close(_))) => return Ok(None), 39 - Some(Ok(_)) => continue, 40 - Some(Err(e)) => { 41 - return Err(Error::Firehose(format!("WebSocket error: {e}"))); 42 - } 43 - None => return Ok(None), 44 - } 45 - } 46 - }
-126
crates/opake-appview/src/indexer.rs
··· 1 - use std::sync::atomic::Ordering; 2 - use std::sync::Arc; 3 - 4 - use crate::db::cursor; 5 - use crate::db::grants::{self, IndexedGrant}; 6 - use crate::db::keyrings; 7 - use crate::firehose::events::{self, IndexableEvent}; 8 - use crate::firehose::subscribe; 9 - use crate::state::AppState; 10 - 11 - const CURSOR_SAVE_INTERVAL: u64 = 100; 12 - const MAX_BACKOFF_SECS: u64 = 60; 13 - 14 - /// Run the indexer loop. Connects to Jetstream, processes events, writes to DB. 15 - /// Reconnects with exponential backoff on failure. Runs until the task is cancelled. 16 - pub async fn run(state: Arc<AppState>, jetstream_url: String) { 17 - let mut backoff_secs: u64 = 1; 18 - 19 - loop { 20 - let cursor_us = state.db.with_conn(cursor::load_cursor).unwrap_or(None); 21 - 22 - let url = subscribe::subscription_url(&jetstream_url, cursor_us); 23 - 24 - match subscribe::connect(&url).await { 25 - Ok(mut stream) => { 26 - backoff_secs = 1; 27 - state.indexer_connected.store(true, Ordering::Relaxed); 28 - log::info!("connected to jetstream, indexing events"); 29 - 30 - let mut events_since_cursor_save: u64 = 0; 31 - 32 - loop { 33 - match subscribe::next_message(&mut stream).await { 34 - Ok(Some(text)) => { 35 - if let Some((event, time_us)) = events::parse_event(&text) { 36 - if let Err(e) = process_event(&state, &event, time_us) { 37 - log::error!("failed to process event: {e}"); 38 - continue; 39 - } 40 - events_since_cursor_save += 1; 41 - if events_since_cursor_save >= CURSOR_SAVE_INTERVAL { 42 - if let Err(e) = 43 - state.db.with_conn(|c| cursor::save_cursor(c, time_us)) 44 - { 45 - log::error!("failed to save cursor: {e}"); 46 - } 47 - events_since_cursor_save = 0; 48 - } 49 - } 50 - } 51 - Ok(None) => { 52 - log::warn!("jetstream stream closed, reconnecting"); 53 - break; 54 - } 55 - Err(e) => { 56 - log::error!("jetstream read error: {e}"); 57 - break; 58 - } 59 - } 60 - } 61 - 62 - state.indexer_connected.store(false, Ordering::Relaxed); 63 - } 64 - Err(e) => { 65 - log::error!("jetstream connection failed: {e}"); 66 - } 67 - } 68 - 69 - log::info!("reconnecting in {backoff_secs}s"); 70 - tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await; 71 - backoff_secs = (backoff_secs * 2).min(MAX_BACKOFF_SECS); 72 - } 73 - } 74 - 75 - fn process_event( 76 - state: &AppState, 77 - event: &IndexableEvent, 78 - _time_us: i64, 79 - ) -> crate::error::Result<()> { 80 - let now = chrono::Utc::now().to_rfc3339(); 81 - 82 - state.db.with_conn(|conn| match event { 83 - IndexableEvent::UpsertGrant { 84 - uri, 85 - owner_did, 86 - recipient_did, 87 - document_uri, 88 - created_at, 89 - } => { 90 - let grant = IndexedGrant { 91 - uri: uri.clone(), 92 - owner_did: owner_did.clone(), 93 - recipient_did: recipient_did.clone(), 94 - document_uri: document_uri.clone(), 95 - created_at: created_at.clone(), 96 - indexed_at: now.clone(), 97 - }; 98 - grants::upsert_grant(conn, &grant)?; 99 - log::debug!("indexed grant: {uri}"); 100 - Ok(()) 101 - } 102 - IndexableEvent::DeleteGrant { uri } => { 103 - grants::delete_grant(conn, uri)?; 104 - log::debug!("deleted grant: {uri}"); 105 - Ok(()) 106 - } 107 - IndexableEvent::UpsertKeyring { 108 - uri, 109 - owner_did, 110 - member_dids, 111 - } => { 112 - keyrings::upsert_keyring_members(conn, uri, owner_did, member_dids, &now)?; 113 - log::debug!("indexed keyring: {uri} ({} members)", member_dids.len()); 114 - Ok(()) 115 - } 116 - IndexableEvent::DeleteKeyring { uri } => { 117 - keyrings::delete_keyring(conn, uri)?; 118 - log::debug!("deleted keyring: {uri}"); 119 - Ok(()) 120 - } 121 - }) 122 - } 123 - 124 - #[cfg(test)] 125 - #[path = "indexer_tests.rs"] 126 - mod tests;
-133
crates/opake-appview/src/indexer_tests.rs
··· 1 - use std::sync::Arc; 2 - 3 - use super::*; 4 - use crate::db::Database; 5 - use crate::firehose::events::IndexableEvent; 6 - use crate::state::AppState; 7 - 8 - fn test_state() -> Arc<AppState> { 9 - let db = Database::open_in_memory().unwrap(); 10 - Arc::new(AppState::new(db)) 11 - } 12 - 13 - #[test] 14 - fn indexes_grant_create() { 15 - let state = test_state(); 16 - let event = IndexableEvent::UpsertGrant { 17 - uri: "at://did:plc:owner/app.opake.grant/3abc".into(), 18 - owner_did: "did:plc:owner".into(), 19 - recipient_did: "did:plc:recipient".into(), 20 - document_uri: "at://did:plc:owner/app.opake.document/3xyz".into(), 21 - created_at: "2026-03-01T12:00:00Z".into(), 22 - }; 23 - process_event(&state, &event, 1709330400000000).unwrap(); 24 - 25 - let inbox = state 26 - .db 27 - .with_conn(|c| grants::list_inbox(c, "did:plc:recipient", 50, None)) 28 - .unwrap(); 29 - assert_eq!(inbox.len(), 1); 30 - assert_eq!(inbox[0].owner_did, "did:plc:owner"); 31 - } 32 - 33 - #[test] 34 - fn indexes_grant_delete() { 35 - let state = test_state(); 36 - let uri = "at://did:plc:owner/app.opake.grant/3abc"; 37 - 38 - let create = IndexableEvent::UpsertGrant { 39 - uri: uri.into(), 40 - owner_did: "did:plc:owner".into(), 41 - recipient_did: "did:plc:recipient".into(), 42 - document_uri: "at://did:plc:owner/app.opake.document/3xyz".into(), 43 - created_at: "2026-03-01T12:00:00Z".into(), 44 - }; 45 - process_event(&state, &create, 1709330400000000).unwrap(); 46 - 47 - let delete = IndexableEvent::DeleteGrant { uri: uri.into() }; 48 - process_event(&state, &delete, 1709330500000000).unwrap(); 49 - 50 - let inbox = state 51 - .db 52 - .with_conn(|c| grants::list_inbox(c, "did:plc:recipient", 50, None)) 53 - .unwrap(); 54 - assert!(inbox.is_empty()); 55 - } 56 - 57 - #[test] 58 - fn indexes_keyring_create() { 59 - let state = test_state(); 60 - let event = IndexableEvent::UpsertKeyring { 61 - uri: "at://did:plc:owner/app.opake.keyring/3def".into(), 62 - owner_did: "did:plc:owner".into(), 63 - member_dids: vec!["did:plc:alice".into(), "did:plc:bob".into()], 64 - }; 65 - process_event(&state, &event, 1709330400000000).unwrap(); 66 - 67 - let alice = state 68 - .db 69 - .with_conn(|c| keyrings::list_keyrings_for_member(c, "did:plc:alice", 50, None)) 70 - .unwrap(); 71 - assert_eq!(alice.len(), 1); 72 - 73 - let bob = state 74 - .db 75 - .with_conn(|c| keyrings::list_keyrings_for_member(c, "did:plc:bob", 50, None)) 76 - .unwrap(); 77 - assert_eq!(bob.len(), 1); 78 - } 79 - 80 - #[test] 81 - fn indexes_keyring_update_replaces_members() { 82 - let state = test_state(); 83 - let uri = "at://did:plc:owner/app.opake.keyring/3def"; 84 - 85 - let create = IndexableEvent::UpsertKeyring { 86 - uri: uri.into(), 87 - owner_did: "did:plc:owner".into(), 88 - member_dids: vec!["did:plc:alice".into(), "did:plc:bob".into()], 89 - }; 90 - process_event(&state, &create, 1709330400000000).unwrap(); 91 - 92 - // Bob removed, charlie added 93 - let update = IndexableEvent::UpsertKeyring { 94 - uri: uri.into(), 95 - owner_did: "did:plc:owner".into(), 96 - member_dids: vec!["did:plc:alice".into(), "did:plc:charlie".into()], 97 - }; 98 - process_event(&state, &update, 1709330500000000).unwrap(); 99 - 100 - let bob = state 101 - .db 102 - .with_conn(|c| keyrings::list_keyrings_for_member(c, "did:plc:bob", 50, None)) 103 - .unwrap(); 104 - assert!(bob.is_empty()); 105 - 106 - let charlie = state 107 - .db 108 - .with_conn(|c| keyrings::list_keyrings_for_member(c, "did:plc:charlie", 50, None)) 109 - .unwrap(); 110 - assert_eq!(charlie.len(), 1); 111 - } 112 - 113 - #[test] 114 - fn indexes_keyring_delete() { 115 - let state = test_state(); 116 - let uri = "at://did:plc:owner/app.opake.keyring/3def"; 117 - 118 - let create = IndexableEvent::UpsertKeyring { 119 - uri: uri.into(), 120 - owner_did: "did:plc:owner".into(), 121 - member_dids: vec!["did:plc:alice".into()], 122 - }; 123 - process_event(&state, &create, 1709330400000000).unwrap(); 124 - 125 - let delete = IndexableEvent::DeleteKeyring { uri: uri.into() }; 126 - process_event(&state, &delete, 1709330500000000).unwrap(); 127 - 128 - let alice = state 129 - .db 130 - .with_conn(|c| keyrings::list_keyrings_for_member(c, "did:plc:alice", 50, None)) 131 - .unwrap(); 132 - assert!(alice.is_empty()); 133 - }
-58
crates/opake-appview/src/main.rs
··· 1 - mod api; 2 - mod commands; 3 - mod config; 4 - mod db; 5 - mod error; 6 - mod firehose; 7 - mod indexer; 8 - mod state; 9 - 10 - use std::path::PathBuf; 11 - 12 - use clap::Parser; 13 - 14 - use commands::Command; 15 - 16 - #[derive(Parser)] 17 - #[command(name = "opake-appview", about = "AppView indexer and API for Opake")] 18 - struct Cli { 19 - /// Increase output verbosity (-v info, -vv debug, -vvv trace) 20 - #[arg(short, long, action = clap::ArgAction::Count, global = true)] 21 - verbose: u8, 22 - 23 - /// Override config directory (where appview.toml lives) 24 - #[arg(long, global = true)] 25 - config_dir: Option<String>, 26 - 27 - #[command(subcommand)] 28 - command: Option<Command>, 29 - } 30 - 31 - #[tokio::main] 32 - async fn main() -> anyhow::Result<()> { 33 - let cli = Cli::parse(); 34 - 35 - let log_level = match cli.verbose { 36 - 0 => log::LevelFilter::Warn, 37 - 1 => log::LevelFilter::Info, 38 - 2 => log::LevelFilter::Debug, 39 - _ => log::LevelFilter::Trace, 40 - }; 41 - 42 - env_logger::Builder::new() 43 - .filter_level(log_level) 44 - .parse_default_env() 45 - .init(); 46 - 47 - let config = config::Config::load(cli.config_dir.map(PathBuf::from))?; 48 - 49 - match cli 50 - .command 51 - .unwrap_or(Command::Run(commands::run::RunCommand {})) 52 - { 53 - Command::Run(cmd) => cmd.execute(&config).await, 54 - Command::Index(cmd) => cmd.execute(&config).await, 55 - Command::Serve(cmd) => cmd.execute(&config).await, 56 - Command::Status(cmd) => cmd.execute(&config), 57 - } 58 - }
-23
crates/opake-appview/src/state.rs
··· 1 - use std::sync::atomic::AtomicBool; 2 - 3 - use tokio::sync::Mutex; 4 - 5 - use crate::api::key_cache::KeyCache; 6 - use crate::db::Database; 7 - 8 - /// Shared application state for Axum handlers and the indexer. 9 - pub struct AppState { 10 - pub db: Database, 11 - pub indexer_connected: AtomicBool, 12 - pub key_cache: Mutex<KeyCache>, 13 - } 14 - 15 - impl AppState { 16 - pub fn new(db: Database) -> Self { 17 - Self { 18 - db, 19 - indexer_connected: AtomicBool::new(false), 20 - key_cache: Mutex::new(KeyCache::new()), 21 - } 22 - } 23 - }