Ramjet is a relay consumer that supports configurable forward and track collections, as well as record reconciliation.
event-stream relay firehose riblt atprotocol
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

release: 0.2.0

+1465 -45
+31
.github/workflows/docker-publish.yml
··· 1 + name: Docker publish 2 + 3 + on: 4 + push: 5 + tags: 6 + - "[0-9]+.[0-9]+.[0-9]+" 7 + 8 + jobs: 9 + docker: 10 + runs-on: ubuntu-latest 11 + 12 + steps: 13 + - uses: actions/checkout@v4 14 + 15 + - name: Set up Docker Buildx 16 + uses: docker/setup-buildx-action@v3 17 + 18 + - name: Log in to Docker Hub 19 + uses: docker/login-action@v3 20 + with: 21 + username: ngerakines 22 + password: ${{ secrets.DOCKERHUB_SECRET }} 23 + 24 + - name: Build and push 25 + uses: docker/build-push-action@v6 26 + with: 27 + context: . 28 + push: true 29 + tags: | 30 + ngerakines/ramjet:${{ github.ref_name }} 31 + ngerakines/ramjet:latest
+7
CHANGELOG.md
··· 1 1 # Changelog 2 2 3 + ## 1.2.0 - 2026-03-17 4 + 5 + ### Added 6 + - QUIC RIBLT set reconciliation server (#10) 7 + - QUIC RIBLT reconciliation consumer binary (#11) 8 + - Docker Hub publish workflow for semver tags (#9) 9 + 3 10 ## 0.1.1 - 2026-03-15 4 11 5 12 ### Added
+260 -34
Cargo.lock
··· 19 19 20 20 [[package]] 21 21 name = "anstream" 22 - version = "0.6.21" 22 + version = "1.0.0" 23 23 source = "registry+https://github.com/rust-lang/crates.io-index" 24 - checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a" 24 + checksum = "824a212faf96e9acacdbd09febd34438f8f711fb84e09a8916013cd7815ca28d" 25 25 dependencies = [ 26 26 "anstyle", 27 27 "anstyle-parse", ··· 34 34 35 35 [[package]] 36 36 name = "anstyle" 37 - version = "1.0.13" 37 + version = "1.0.14" 38 38 source = "registry+https://github.com/rust-lang/crates.io-index" 39 - checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78" 39 + checksum = "940b3a0ca603d1eade50a4846a2afffd5ef57a9feac2c0e2ec2e14f9ead76000" 40 40 41 41 [[package]] 42 42 name = "anstyle-parse" 43 - version = "0.2.7" 43 + version = "1.0.0" 44 44 source = "registry+https://github.com/rust-lang/crates.io-index" 45 - checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2" 45 + checksum = "52ce7f38b242319f7cabaa6813055467063ecdc9d355bbb4ce0c68908cd8130e" 46 46 dependencies = [ 47 47 "utf8parse", 48 48 ] ··· 456 456 457 457 [[package]] 458 458 name = "cc" 459 - version = "1.2.56" 459 + version = "1.2.57" 460 460 source = "registry+https://github.com/rust-lang/crates.io-index" 461 - checksum = "aebf35691d1bfb0ac386a69bac2fde4dd276fb618cf8bf4f5318fe285e821bb2" 461 + checksum = "7a0dd1ca384932ff3641c8718a02769f1698e7563dc6974ffd03346116310423" 462 462 dependencies = [ 463 463 "find-msvc-tools", 464 464 "jobserver", 465 465 "libc", 466 466 "shlex", 467 467 ] 468 + 469 + [[package]] 470 + name = "cesu8" 471 + version = "1.1.0" 472 + source = "registry+https://github.com/rust-lang/crates.io-index" 473 + checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" 468 474 469 475 [[package]] 470 476 name = "cfg-if" ··· 513 519 514 520 [[package]] 515 521 name = "clap" 516 - version = "4.5.60" 522 + version = "4.6.0" 517 523 source = "registry+https://github.com/rust-lang/crates.io-index" 518 - checksum = "2797f34da339ce31042b27d23607e051786132987f595b02ba4f6a6dffb7030a" 524 + checksum = "b193af5b67834b676abd72466a96c1024e6a6ad978a1f484bd90b85c94041351" 519 525 dependencies = [ 520 526 "clap_builder", 521 527 "clap_derive", ··· 523 529 524 530 [[package]] 525 531 name = "clap_builder" 526 - version = "4.5.60" 532 + version = "4.6.0" 527 533 source = "registry+https://github.com/rust-lang/crates.io-index" 528 - checksum = "24a241312cea5059b13574bb9b3861cabf758b879c15190b37b6d6fd63ab6876" 534 + checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f" 529 535 dependencies = [ 530 536 "anstream", 531 537 "anstyle", ··· 535 541 536 542 [[package]] 537 543 name = "clap_derive" 538 - version = "4.5.55" 544 + version = "4.6.0" 539 545 source = "registry+https://github.com/rust-lang/crates.io-index" 540 - checksum = "a92793da1a46a5f2a02a6f4c46c6496b28c43638adea8306fcb0caa1634f24e5" 546 + checksum = "1110bd8a634a1ab8cb04345d8d878267d57c3cf1b38d91b71af6686408bbca6a" 541 547 dependencies = [ 542 548 "heck", 543 549 "proc-macro2", ··· 547 553 548 554 [[package]] 549 555 name = "clap_lex" 550 - version = "1.0.0" 556 + version = "1.1.0" 551 557 source = "registry+https://github.com/rust-lang/crates.io-index" 552 - checksum = "3a822ea5bc7590f9d40f1ba12c0dc3c2760f3482c6984db1573ad11031420831" 558 + checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" 553 559 554 560 [[package]] 555 561 name = "colorchoice" 556 - version = "1.0.4" 562 + version = "1.0.5" 563 + source = "registry+https://github.com/rust-lang/crates.io-index" 564 + checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570" 565 + 566 + [[package]] 567 + name = "combine" 568 + version = "4.6.7" 557 569 source = "registry+https://github.com/rust-lang/crates.io-index" 558 - checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" 570 + checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" 571 + dependencies = [ 572 + "bytes", 573 + "memchr", 574 + ] 559 575 560 576 [[package]] 561 577 name = "compact_str" ··· 1030 1046 ] 1031 1047 1032 1048 [[package]] 1049 + name = "fastbloom" 1050 + version = "0.14.1" 1051 + source = "registry+https://github.com/rust-lang/crates.io-index" 1052 + checksum = "4e7f34442dbe69c60fe8eaf58a8cafff81a1f278816d8ab4db255b3bef4ac3c4" 1053 + dependencies = [ 1054 + "getrandom 0.3.4", 1055 + "libm", 1056 + "rand 0.9.2", 1057 + "siphasher", 1058 + ] 1059 + 1060 + [[package]] 1033 1061 name = "fastrand" 1034 1062 version = "2.3.0" 1035 1063 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1076 1104 1077 1105 [[package]] 1078 1106 name = "fjall" 1079 - version = "3.1.0" 1107 + version = "3.1.1" 1080 1108 source = "registry+https://github.com/rust-lang/crates.io-index" 1081 - checksum = "40cb1eb0cef3792900897b32c8282f6417bc978f6af46400a2f14bf0e649ae30" 1109 + checksum = "48cf071a6f6090e99f3e095aaca9d38f78ad6fcf40bca736dc4cf7cbe15e4438" 1082 1110 dependencies = [ 1083 1111 "byteorder-lite", 1084 1112 "byteview", ··· 1747 1775 checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2" 1748 1776 1749 1777 [[package]] 1778 + name = "jni" 1779 + version = "0.21.1" 1780 + source = "registry+https://github.com/rust-lang/crates.io-index" 1781 + checksum = "1a87aa2bb7d2af34197c04845522473242e1aa17c12f4935d5856491a7fb8c97" 1782 + dependencies = [ 1783 + "cesu8", 1784 + "cfg-if", 1785 + "combine", 1786 + "jni-sys", 1787 + "log", 1788 + "thiserror 1.0.69", 1789 + "walkdir", 1790 + "windows-sys 0.45.0", 1791 + ] 1792 + 1793 + [[package]] 1794 + name = "jni-sys" 1795 + version = "0.3.0" 1796 + source = "registry+https://github.com/rust-lang/crates.io-index" 1797 + checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" 1798 + 1799 + [[package]] 1750 1800 name = "jobserver" 1751 1801 version = "0.1.34" 1752 1802 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1782 1832 1783 1833 [[package]] 1784 1834 name = "kasuari" 1785 - version = "0.4.11" 1835 + version = "0.4.12" 1786 1836 source = "registry+https://github.com/rust-lang/crates.io-index" 1787 - checksum = "8fe90c1150662e858c7d5f945089b7517b0a80d8bf7ba4b1b5ffc984e7230a5b" 1837 + checksum = "bde5057d6143cc94e861d90f591b9303d6716c6b9602309150bd068853c10899" 1788 1838 dependencies = [ 1789 1839 "hashbrown 0.16.1", 1790 1840 "portable-atomic", ··· 1814 1864 version = "0.2.183" 1815 1865 source = "registry+https://github.com/rust-lang/crates.io-index" 1816 1866 checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" 1867 + 1868 + [[package]] 1869 + name = "libm" 1870 + version = "0.2.16" 1871 + source = "registry+https://github.com/rust-lang/crates.io-index" 1872 + checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981" 1817 1873 1818 1874 [[package]] 1819 1875 name = "line-clipping" ··· 1883 1939 1884 1940 [[package]] 1885 1941 name = "lsm-tree" 1886 - version = "3.1.0" 1942 + version = "3.1.1" 1887 1943 source = "registry+https://github.com/rust-lang/crates.io-index" 1888 - checksum = "fc5fa40c207eed45c811085aaa1b0a25fead22e298e286081cd4b98785fe759b" 1944 + checksum = "e97484b43ad0b232eaaeb83ee6edbf5d2e3602a389eee4205997b4ad3f6d3ace" 1889 1945 dependencies = [ 1890 1946 "byteorder-lite", 1891 1947 "byteview", ··· 1905 1961 1906 1962 [[package]] 1907 1963 name = "lz4_flex" 1908 - version = "0.11.5" 1964 + version = "0.11.6" 1909 1965 source = "registry+https://github.com/rust-lang/crates.io-index" 1910 - checksum = "08ab2867e3eeeca90e844d1940eab391c9dc5228783db2ed999acbc0a9ed375a" 1966 + checksum = "373f5eceeeab7925e0c1098212f2fbc4d416adec9d35051a6ab251e824c1854a" 1911 1967 dependencies = [ 1912 1968 "twox-hash", 1913 1969 ] ··· 2123 2179 2124 2180 [[package]] 2125 2181 name = "once_cell" 2126 - version = "1.21.3" 2182 + version = "1.21.4" 2127 2183 source = "registry+https://github.com/rust-lang/crates.io-index" 2128 - checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" 2184 + checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" 2129 2185 dependencies = [ 2130 2186 "critical-section", 2131 2187 "portable-atomic", ··· 2199 2255 "redox_syscall", 2200 2256 "smallvec", 2201 2257 "windows-link", 2258 + ] 2259 + 2260 + [[package]] 2261 + name = "pem" 2262 + version = "3.0.6" 2263 + source = "registry+https://github.com/rust-lang/crates.io-index" 2264 + checksum = "1d30c53c26bc5b31a98cd02d20f25a7c8567146caf63ed593a9d87b2775291be" 2265 + dependencies = [ 2266 + "base64", 2267 + "serde_core", 2202 2268 ] 2203 2269 2204 2270 [[package]] ··· 2473 2539 checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098" 2474 2540 dependencies = [ 2475 2541 "bytes", 2542 + "fastbloom", 2476 2543 "getrandom 0.3.4", 2477 2544 "lru-slab", 2478 2545 "rand 0.9.2", ··· 2480 2547 "rustc-hash", 2481 2548 "rustls", 2482 2549 "rustls-pki-types", 2550 + "rustls-platform-verifier", 2483 2551 "slab", 2484 2552 "thiserror 2.0.18", 2485 2553 "tinyvec", ··· 2524 2592 2525 2593 [[package]] 2526 2594 name = "ramjet" 2527 - version = "0.1.1" 2595 + version = "1.2.0" 2528 2596 dependencies = [ 2529 2597 "anyhow", 2530 2598 "atproto-dasl", ··· 2544 2612 "http", 2545 2613 "multihash", 2546 2614 "prometheus-client", 2615 + "quinn", 2547 2616 "rand 0.10.0", 2548 2617 "ratatui", 2618 + "rcgen", 2549 2619 "reqwest", 2550 2620 "riblt", 2621 + "rustls", 2551 2622 "serde", 2552 2623 "serde_json", 2553 2624 "sha2", ··· 2732 2803 ] 2733 2804 2734 2805 [[package]] 2806 + name = "rcgen" 2807 + version = "0.13.2" 2808 + source = "registry+https://github.com/rust-lang/crates.io-index" 2809 + checksum = "75e669e5202259b5314d1ea5397316ad400819437857b90861765f24c4cf80a2" 2810 + dependencies = [ 2811 + "pem", 2812 + "ring", 2813 + "rustls-pki-types", 2814 + "time", 2815 + "yasna", 2816 + ] 2817 + 2818 + [[package]] 2735 2819 name = "redox_syscall" 2736 2820 version = "0.5.18" 2737 2821 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2859 2943 [[package]] 2860 2944 name = "riblt" 2861 2945 version = "0.1.0" 2862 - source = "git+https://github.com/ngerakines/riblt-rs#1bed49393771f3fc488a49a8b7b4a952273cdd93" 2946 + source = "git+https://github.com/ngerakines/riblt-rs#7b3efb7708dc20193062a4d5b26ca29c7e0bc8f5" 2947 + dependencies = [ 2948 + "siphasher", 2949 + ] 2863 2950 2864 2951 [[package]] 2865 2952 name = "ring" ··· 2940 3027 ] 2941 3028 2942 3029 [[package]] 3030 + name = "rustls-platform-verifier" 3031 + version = "0.6.2" 3032 + source = "registry+https://github.com/rust-lang/crates.io-index" 3033 + checksum = "1d99feebc72bae7ab76ba994bb5e121b8d83d910ca40b36e0921f53becc41784" 3034 + dependencies = [ 3035 + "core-foundation 0.10.1", 3036 + "core-foundation-sys", 3037 + "jni", 3038 + "log", 3039 + "once_cell", 3040 + "rustls", 3041 + "rustls-native-certs", 3042 + "rustls-platform-verifier-android", 3043 + "rustls-webpki", 3044 + "security-framework", 3045 + "security-framework-sys", 3046 + "webpki-root-certs", 3047 + "windows-sys 0.61.2", 3048 + ] 3049 + 3050 + [[package]] 3051 + name = "rustls-platform-verifier-android" 3052 + version = "0.1.1" 3053 + source = "registry+https://github.com/rust-lang/crates.io-index" 3054 + checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" 3055 + 3056 + [[package]] 2943 3057 name = "rustls-webpki" 2944 3058 version = "0.103.9" 2945 3059 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2961 3075 version = "1.0.23" 2962 3076 source = "registry+https://github.com/rust-lang/crates.io-index" 2963 3077 checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f" 3078 + 3079 + [[package]] 3080 + name = "same-file" 3081 + version = "1.0.6" 3082 + source = "registry+https://github.com/rust-lang/crates.io-index" 3083 + checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" 3084 + dependencies = [ 3085 + "winapi-util", 3086 + ] 2964 3087 2965 3088 [[package]] 2966 3089 name = "schannel" ··· 3546 3669 3547 3670 [[package]] 3548 3671 name = "tinyvec" 3549 - version = "1.10.0" 3672 + version = "1.11.0" 3550 3673 source = "registry+https://github.com/rust-lang/crates.io-index" 3551 - checksum = "bfa5fdc3bce6191a1dbc8c02d5c8bffcf557bafa17c124c5264a458f1b0613fa" 3674 + checksum = "3e61e67053d25a4e82c844e8424039d9745781b3fc4f32b8d55ed50f5f667ef3" 3552 3675 dependencies = [ 3553 3676 "tinyvec_macros", 3554 3677 ] ··· 3758 3881 3759 3882 [[package]] 3760 3883 name = "tracing-subscriber" 3761 - version = "0.3.22" 3884 + version = "0.3.23" 3762 3885 source = "registry+https://github.com/rust-lang/crates.io-index" 3763 - checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e" 3886 + checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" 3764 3887 dependencies = [ 3765 3888 "matchers", 3766 3889 "nu-ansi-term", ··· 3948 4071 ] 3949 4072 3950 4073 [[package]] 4074 + name = "walkdir" 4075 + version = "2.5.0" 4076 + source = "registry+https://github.com/rust-lang/crates.io-index" 4077 + checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" 4078 + dependencies = [ 4079 + "same-file", 4080 + "winapi-util", 4081 + ] 4082 + 4083 + [[package]] 3951 4084 name = "want" 3952 4085 version = "0.3.1" 3953 4086 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 4094 4227 ] 4095 4228 4096 4229 [[package]] 4230 + name = "webpki-root-certs" 4231 + version = "1.0.6" 4232 + source = "registry+https://github.com/rust-lang/crates.io-index" 4233 + checksum = "804f18a4ac2676ffb4e8b5b5fa9ae38af06df08162314f96a68d2a363e21a8ca" 4234 + dependencies = [ 4235 + "rustls-pki-types", 4236 + ] 4237 + 4238 + [[package]] 4097 4239 name = "webpki-roots" 4098 4240 version = "1.0.6" 4099 4241 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 4197 4339 checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" 4198 4340 4199 4341 [[package]] 4342 + name = "winapi-util" 4343 + version = "0.1.11" 4344 + source = "registry+https://github.com/rust-lang/crates.io-index" 4345 + checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" 4346 + dependencies = [ 4347 + "windows-sys 0.61.2", 4348 + ] 4349 + 4350 + [[package]] 4200 4351 name = "winapi-x86_64-pc-windows-gnu" 4201 4352 version = "0.4.0" 4202 4353 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 4239 4390 4240 4391 [[package]] 4241 4392 name = "windows-sys" 4393 + version = "0.45.0" 4394 + source = "registry+https://github.com/rust-lang/crates.io-index" 4395 + checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" 4396 + dependencies = [ 4397 + "windows-targets 0.42.2", 4398 + ] 4399 + 4400 + [[package]] 4401 + name = "windows-sys" 4242 4402 version = "0.48.0" 4243 4403 source = "registry+https://github.com/rust-lang/crates.io-index" 4244 4404 checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" ··· 4275 4435 4276 4436 [[package]] 4277 4437 name = "windows-targets" 4438 + version = "0.42.2" 4439 + source = "registry+https://github.com/rust-lang/crates.io-index" 4440 + checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" 4441 + dependencies = [ 4442 + "windows_aarch64_gnullvm 0.42.2", 4443 + "windows_aarch64_msvc 0.42.2", 4444 + "windows_i686_gnu 0.42.2", 4445 + "windows_i686_msvc 0.42.2", 4446 + "windows_x86_64_gnu 0.42.2", 4447 + "windows_x86_64_gnullvm 0.42.2", 4448 + "windows_x86_64_msvc 0.42.2", 4449 + ] 4450 + 4451 + [[package]] 4452 + name = "windows-targets" 4278 4453 version = "0.48.5" 4279 4454 source = "registry+https://github.com/rust-lang/crates.io-index" 4280 4455 checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" ··· 4320 4495 "windows_x86_64_gnullvm 0.53.1", 4321 4496 "windows_x86_64_msvc 0.53.1", 4322 4497 ] 4498 + 4499 + [[package]] 4500 + name = "windows_aarch64_gnullvm" 4501 + version = "0.42.2" 4502 + source = "registry+https://github.com/rust-lang/crates.io-index" 4503 + checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" 4323 4504 4324 4505 [[package]] 4325 4506 name = "windows_aarch64_gnullvm" ··· 4341 4522 4342 4523 [[package]] 4343 4524 name = "windows_aarch64_msvc" 4525 + version = "0.42.2" 4526 + source = "registry+https://github.com/rust-lang/crates.io-index" 4527 + checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" 4528 + 4529 + [[package]] 4530 + name = "windows_aarch64_msvc" 4344 4531 version = "0.48.5" 4345 4532 source = "registry+https://github.com/rust-lang/crates.io-index" 4346 4533 checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" ··· 4359 4546 4360 4547 [[package]] 4361 4548 name = "windows_i686_gnu" 4549 + version = "0.42.2" 4550 + source = "registry+https://github.com/rust-lang/crates.io-index" 4551 + checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" 4552 + 4553 + [[package]] 4554 + name = "windows_i686_gnu" 4362 4555 version = "0.48.5" 4363 4556 source = "registry+https://github.com/rust-lang/crates.io-index" 4364 4557 checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" ··· 4389 4582 4390 4583 [[package]] 4391 4584 name = "windows_i686_msvc" 4585 + version = "0.42.2" 4586 + source = "registry+https://github.com/rust-lang/crates.io-index" 4587 + checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" 4588 + 4589 + [[package]] 4590 + name = "windows_i686_msvc" 4392 4591 version = "0.48.5" 4393 4592 source = "registry+https://github.com/rust-lang/crates.io-index" 4394 4593 checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" ··· 4407 4606 4408 4607 [[package]] 4409 4608 name = "windows_x86_64_gnu" 4609 + version = "0.42.2" 4610 + source = "registry+https://github.com/rust-lang/crates.io-index" 4611 + checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" 4612 + 4613 + [[package]] 4614 + name = "windows_x86_64_gnu" 4410 4615 version = "0.48.5" 4411 4616 source = "registry+https://github.com/rust-lang/crates.io-index" 4412 4617 checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" ··· 4422 4627 version = "0.53.1" 4423 4628 source = "registry+https://github.com/rust-lang/crates.io-index" 4424 4629 checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" 4630 + 4631 + [[package]] 4632 + name = "windows_x86_64_gnullvm" 4633 + version = "0.42.2" 4634 + source = "registry+https://github.com/rust-lang/crates.io-index" 4635 + checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" 4425 4636 4426 4637 [[package]] 4427 4638 name = "windows_x86_64_gnullvm" ··· 4440 4651 version = "0.53.1" 4441 4652 source = "registry+https://github.com/rust-lang/crates.io-index" 4442 4653 checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" 4654 + 4655 + [[package]] 4656 + name = "windows_x86_64_msvc" 4657 + version = "0.42.2" 4658 + source = "registry+https://github.com/rust-lang/crates.io-index" 4659 + checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" 4443 4660 4444 4661 [[package]] 4445 4662 name = "windows_x86_64_msvc" ··· 4568 4785 version = "0.8.15" 4569 4786 source = "registry+https://github.com/rust-lang/crates.io-index" 4570 4787 checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3" 4788 + 4789 + [[package]] 4790 + name = "yasna" 4791 + version = "0.5.2" 4792 + source = "registry+https://github.com/rust-lang/crates.io-index" 4793 + checksum = "e17bb3549cc1321ae1296b9cdc2698e2b6cb1992adfa19a8c72e5b7a738f44cd" 4794 + dependencies = [ 4795 + "time", 4796 + ] 4571 4797 4572 4798 [[package]] 4573 4799 name = "yoke"
+6 -1
Cargo.toml
··· 1 1 [package] 2 2 name = "ramjet" 3 - version = "0.1.1" 3 + version = "1.2.0" 4 4 edition = "2024" 5 5 rust-version = "1.94" 6 6 description = "ATProtocol event-stream, record, and blob service built on fjall" ··· 64 64 65 65 # Set reconciliation 66 66 riblt = { git = "https://github.com/ngerakines/riblt-rs" } 67 + 68 + # QUIC transport 69 + quinn = "0.11" 70 + rcgen = "0.13" 71 + rustls = { version = "0.23", default-features = false, features = ["std", "ring"] } 67 72 68 73 # Utilities 69 74 cid = "0.11"
+1 -1
Dockerfile
··· 50 50 LABEL org.opencontainers.image.description="ATProtocol event-stream, record, and blob service built on fjall" 51 51 LABEL org.opencontainers.image.authors="Nick Gerakines <nick.gerakines@gmail.com>" 52 52 LABEL org.opencontainers.image.source="https://tangled.org/ngerakines.me/ramjet" 53 - LABEL org.opencontainers.image.version="0.1.1" 53 + LABEL org.opencontainers.image.version="1.2.0" 54 54 LABEL org.opencontainers.image.licenses="MIT" 55 55 56 56 # Document available binaries
+1 -1
README.md
··· 118 118 119 119 ### Health and metrics 120 120 121 - - `GET /_health` — returns `{"status":"ok","version":"0.1.1"}` 121 + - `GET /_health` — returns `{"status":"ok","version":"1.2.0"}` 122 122 - `GET /metrics` — Prometheus text format (includes HTTP metrics, pipeline counters, tokio task metrics, fan-out queue depths) 123 123 124 124 ### XRPC endpoints
+654
src/bin/ramjet_recon.rs
··· 1 + //! QUIC RIBLT reconciliation consumer. 2 + //! 3 + //! Connects to a ramjet instance to discover tracked collections for a DID, 4 + //! then periodically reconciles records via the QUIC RIBLT protocol. 5 + //! 6 + //! On the first cycle for each collection, all records are fetched via 7 + //! HTTP `listRecords` to build the local symbol set. Subsequent cycles 8 + //! use RIBLT set reconciliation to efficiently detect and fetch only 9 + //! the records that changed. 10 + 11 + use std::collections::HashMap; 12 + use std::net::SocketAddr; 13 + use std::sync::Arc; 14 + use std::time::Duration; 15 + 16 + use clap::Parser; 17 + use quinn::crypto::rustls::QuicClientConfig; 18 + use riblt::Decoder; 19 + use riblt::byte_symbol::ByteSymbol; 20 + 21 + use ramjet::server::quic::{ClientMessage, MissingRecord, ServerMessage}; 22 + use ramjet::server::reconciliation::make_record_symbol; 23 + 24 + const RECON_ALPN: &[u8] = b"ramjet-recon/1"; 25 + const MAX_MESSAGE_SIZE: usize = 16 * 1024 * 1024; 26 + const NEED_MORE_BATCH: u32 = 1000; 27 + const MAX_NEED_MORE_ROUNDS: u32 = 100; 28 + 29 + #[derive(Parser, Debug)] 30 + #[command( 31 + name = "ramjet-recon", 32 + about = "QUIC RIBLT reconciliation consumer for ramjet" 33 + )] 34 + struct Args { 35 + /// Ramjet HTTP host for fetching repo metadata. 36 + #[arg(long, default_value = "localhost:8080")] 37 + ramjet_host: String, 38 + 39 + /// Ramjet QUIC server address for RIBLT reconciliation. 40 + #[arg(long, default_value = "127.0.0.1:6379")] 41 + quic_addr: SocketAddr, 42 + 43 + /// DID identity to reconcile. 44 + #[arg(long, default_value = "did:plc:cbkjy5n7bk3ax2wplmtjofq2")] 45 + identity: String, 46 + 47 + /// Seconds to sleep between reconciliation cycles. 48 + #[arg(long, default_value = "30")] 49 + sleep_duration: u64, 50 + } 51 + 52 + /// Local record state: maps (collection, rkey) to raw CID bytes. 53 + type LocalRecords = HashMap<(String, String), Vec<u8>>; 54 + 55 + // --- Wire protocol framing (matches server) --- 56 + 57 + async fn write_message<T: serde::Serialize>( 58 + stream: &mut quinn::SendStream, 59 + msg: &T, 60 + ) -> anyhow::Result<()> { 61 + let data = 62 + atproto_dasl::drisl::to_vec(msg).map_err(|e| anyhow::anyhow!("encode error: {e}"))?; 63 + if data.len() > MAX_MESSAGE_SIZE { 64 + return Err(anyhow::anyhow!("message too large: {} bytes", data.len())); 65 + } 66 + let len = (data.len() as u32).to_be_bytes(); 67 + stream.write_all(&len).await?; 68 + stream.write_all(&data).await?; 69 + Ok(()) 70 + } 71 + 72 + async fn read_message<T: serde::de::DeserializeOwned>( 73 + stream: &mut quinn::RecvStream, 74 + ) -> anyhow::Result<T> { 75 + let mut len_buf = [0u8; 4]; 76 + stream 77 + .read_exact(&mut len_buf) 78 + .await 79 + .map_err(|e| anyhow::anyhow!("read length: {e}"))?; 80 + let len = u32::from_be_bytes(len_buf) as usize; 81 + if len > MAX_MESSAGE_SIZE { 82 + return Err(anyhow::anyhow!("message too large: {len} bytes")); 83 + } 84 + let mut buf = vec![0u8; len]; 85 + stream 86 + .read_exact(&mut buf) 87 + .await 88 + .map_err(|e| anyhow::anyhow!("read payload: {e}"))?; 89 + let msg = 90 + atproto_dasl::drisl::from_slice(&buf).map_err(|e| anyhow::anyhow!("decode error: {e}"))?; 91 + Ok(msg) 92 + } 93 + 94 + // --- QUIC client setup --- 95 + 96 + fn make_client_endpoint() -> anyhow::Result<quinn::Endpoint> { 97 + let mut client_crypto = rustls::ClientConfig::builder() 98 + .dangerous() 99 + .with_custom_certificate_verifier(Arc::new(SkipServerVerification)) 100 + .with_no_client_auth(); 101 + client_crypto.alpn_protocols = vec![RECON_ALPN.to_vec()]; 102 + 103 + let mut endpoint = quinn::Endpoint::client("0.0.0.0:0".parse()?)?; 104 + let mut transport = quinn::TransportConfig::default(); 105 + transport.max_idle_timeout(Some(Duration::from_secs(60).try_into()?)); 106 + let mut client_config = quinn::ClientConfig::new(Arc::new( 107 + QuicClientConfig::try_from(client_crypto) 108 + .map_err(|e| anyhow::anyhow!("bad quic client config: {e}"))?, 109 + )); 110 + client_config.transport_config(Arc::new(transport)); 111 + endpoint.set_default_client_config(client_config); 112 + Ok(endpoint) 113 + } 114 + 115 + /// Accepts any server certificate (ramjet uses self-signed certs). 116 + #[derive(Debug)] 117 + struct SkipServerVerification; 118 + 119 + impl rustls::client::danger::ServerCertVerifier for SkipServerVerification { 120 + fn verify_server_cert( 121 + &self, 122 + _end_entity: &rustls::pki_types::CertificateDer<'_>, 123 + _intermediates: &[rustls::pki_types::CertificateDer<'_>], 124 + _server_name: &rustls::pki_types::ServerName<'_>, 125 + _ocsp_response: &[u8], 126 + _now: rustls::pki_types::UnixTime, 127 + ) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> { 128 + Ok(rustls::client::danger::ServerCertVerified::assertion()) 129 + } 130 + 131 + fn verify_tls12_signature( 132 + &self, 133 + _message: &[u8], 134 + _cert: &rustls::pki_types::CertificateDer<'_>, 135 + _dss: &rustls::DigitallySignedStruct, 136 + ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> { 137 + Ok(rustls::client::danger::HandshakeSignatureValid::assertion()) 138 + } 139 + 140 + fn verify_tls13_signature( 141 + &self, 142 + _message: &[u8], 143 + _cert: &rustls::pki_types::CertificateDer<'_>, 144 + _dss: &rustls::DigitallySignedStruct, 145 + ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> { 146 + Ok(rustls::client::danger::HandshakeSignatureValid::assertion()) 147 + } 148 + 149 + fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> { 150 + vec![ 151 + rustls::SignatureScheme::ECDSA_NISTP256_SHA256, 152 + rustls::SignatureScheme::ECDSA_NISTP384_SHA384, 153 + rustls::SignatureScheme::ED25519, 154 + rustls::SignatureScheme::RSA_PSS_SHA256, 155 + rustls::SignatureScheme::RSA_PSS_SHA384, 156 + rustls::SignatureScheme::RSA_PSS_SHA512, 157 + rustls::SignatureScheme::RSA_PKCS1_SHA256, 158 + rustls::SignatureScheme::RSA_PKCS1_SHA384, 159 + rustls::SignatureScheme::RSA_PKCS1_SHA512, 160 + ] 161 + } 162 + } 163 + 164 + // --- Fetch collections via HTTP --- 165 + 166 + async fn fetch_collections(host: &str, did: &str) -> anyhow::Result<Vec<String>> { 167 + let url = format!("http://{host}/xrpc/com.atproto.repo.describeRepo?repo={did}"); 168 + tracing::info!(%url, "fetching tracked collections"); 169 + 170 + let resp: serde_json::Value = reqwest::get(&url).await?.json().await?; 171 + 172 + let collections = resp 173 + .get("collections") 174 + .and_then(|v| v.as_array()) 175 + .map(|arr| { 176 + arr.iter() 177 + .filter_map(|v| v.as_str().map(|s| s.to_string())) 178 + .collect::<Vec<_>>() 179 + }) 180 + .unwrap_or_default(); 181 + 182 + Ok(collections) 183 + } 184 + 185 + /// Fetch all records for a collection via paginated HTTP listRecords. 186 + /// Returns a map of (collection, rkey) → raw CID bytes. 187 + async fn fetch_all_records( 188 + host: &str, 189 + did: &str, 190 + collection: &str, 191 + ) -> anyhow::Result<LocalRecords> { 192 + let mut records = LocalRecords::new(); 193 + let mut cursor: Option<String> = None; 194 + 195 + loop { 196 + let url = if let Some(ref c) = cursor { 197 + format!( 198 + "http://{host}/xrpc/com.atproto.repo.listRecords?repo={did}&collection={collection}&limit=100&cursor={c}" 199 + ) 200 + } else { 201 + format!( 202 + "http://{host}/xrpc/com.atproto.repo.listRecords?repo={did}&collection={collection}&limit=100" 203 + ) 204 + }; 205 + 206 + let resp: serde_json::Value = reqwest::get(&url).await?.json().await?; 207 + 208 + let items = resp 209 + .get("records") 210 + .and_then(|v| v.as_array()) 211 + .cloned() 212 + .unwrap_or_default(); 213 + 214 + if items.is_empty() { 215 + break; 216 + } 217 + 218 + for item in &items { 219 + let Some(uri) = item.get("uri").and_then(|v| v.as_str()) else { 220 + continue; 221 + }; 222 + let Some(cid_str) = item.get("cid").and_then(|v| v.as_str()) else { 223 + continue; 224 + }; 225 + 226 + // Parse rkey from AT-URI: at://did/collection/rkey 227 + let rkey = uri.rsplit('/').next().unwrap_or_default().to_string(); 228 + 229 + // Parse CID string back to raw bytes 230 + let Ok(cid) = cid_str.parse::<cid::Cid>() else { 231 + continue; 232 + }; 233 + let cid_bytes = cid.to_bytes(); 234 + 235 + records.insert((collection.to_string(), rkey), cid_bytes); 236 + } 237 + 238 + // Check for pagination cursor 239 + cursor = resp 240 + .get("cursor") 241 + .and_then(|v| v.as_str()) 242 + .map(|s| s.to_string()); 243 + 244 + if cursor.is_none() { 245 + break; 246 + } 247 + } 248 + 249 + Ok(records) 250 + } 251 + 252 + // --- Parse a remote ByteSymbol into (collection, rkey) --- 253 + 254 + fn parse_remote_symbol(symbol: &ByteSymbol) -> Option<(String, String)> { 255 + let bytes = &symbol.0; 256 + // Format: collection\x00rkey\x00cid_bytes 257 + let first_sep = bytes.iter().position(|&b| b == 0x00)?; 258 + let rest = &bytes[first_sep + 1..]; 259 + let second_sep = rest.iter().position(|&b| b == 0x00)?; 260 + 261 + let collection = std::str::from_utf8(&bytes[..first_sep]).ok()?; 262 + let rkey = std::str::from_utf8(&rest[..second_sep]).ok()?; 263 + 264 + Some((collection.to_string(), rkey.to_string())) 265 + } 266 + 267 + // --- Decode record data as JSON --- 268 + 269 + fn decode_record_data(data: &[u8]) -> serde_json::Value { 270 + if let Ok(ipld) = atproto_dasl::drisl::from_slice::<atproto_dasl::Ipld>(data) { 271 + ipld_to_json(&ipld) 272 + } else if let Ok(val) = serde_json::from_slice::<serde_json::Value>(data) { 273 + val 274 + } else { 275 + use base64::Engine; 276 + serde_json::json!({ "$bytes": base64::engine::general_purpose::STANDARD.encode(data) }) 277 + } 278 + } 279 + 280 + fn ipld_to_json(ipld: &atproto_dasl::Ipld) -> serde_json::Value { 281 + use atproto_dasl::Ipld; 282 + use serde_json::json; 283 + match ipld { 284 + Ipld::Null => serde_json::Value::Null, 285 + Ipld::Bool(b) => serde_json::Value::Bool(*b), 286 + Ipld::Integer(i) => json!(*i), 287 + Ipld::Float(f) => json!(*f), 288 + Ipld::String(s) => serde_json::Value::String(s.clone()), 289 + Ipld::Bytes(b) => { 290 + use base64::Engine; 291 + json!({ "$bytes": base64::prelude::BASE64_STANDARD.encode(b) }) 292 + } 293 + Ipld::List(list) => serde_json::Value::Array(list.iter().map(ipld_to_json).collect()), 294 + Ipld::Map(map) => { 295 + let obj: serde_json::Map<String, serde_json::Value> = map 296 + .iter() 297 + .map(|(k, v)| (k.clone(), ipld_to_json(v))) 298 + .collect(); 299 + serde_json::Value::Object(obj) 300 + } 301 + Ipld::Link(cid) => json!({ "$link": cid.to_string() }), 302 + } 303 + } 304 + 305 + fn format_cid(cid_bytes: &[u8]) -> String { 306 + match cid::Cid::try_from(cid_bytes) { 307 + Ok(c) => c.to_string(), 308 + Err(_) => { 309 + use base64::Engine; 310 + base64::engine::general_purpose::STANDARD.encode(cid_bytes) 311 + } 312 + } 313 + } 314 + 315 + #[tokio::main] 316 + async fn main() -> anyhow::Result<()> { 317 + let args = Args::parse(); 318 + 319 + tracing_subscriber::fmt() 320 + .with_env_filter( 321 + tracing_subscriber::EnvFilter::try_from_default_env() 322 + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), 323 + ) 324 + .init(); 325 + 326 + tracing::info!( 327 + identity = %args.identity, 328 + quic_addr = %args.quic_addr, 329 + ramjet_host = %args.ramjet_host, 330 + sleep_duration = args.sleep_duration, 331 + "starting ramjet-recon" 332 + ); 333 + 334 + // Fetch tracked collections from ramjet HTTP API 335 + let collections = fetch_collections(&args.ramjet_host, &args.identity).await?; 336 + 337 + if collections.is_empty() { 338 + tracing::warn!( 339 + "no collections found for identity, will reconcile without collection filter" 340 + ); 341 + } else { 342 + tracing::info!(count = collections.len(), "tracked collections"); 343 + for coll in &collections { 344 + tracing::info!(collection = %coll, " tracked"); 345 + } 346 + } 347 + 348 + // Create QUIC client endpoint 349 + let endpoint = make_client_endpoint()?; 350 + let sleep_duration = Duration::from_secs(args.sleep_duration); 351 + 352 + // Local record state — persists across cycles for RIBLT delta sync 353 + let mut local_records = LocalRecords::new(); 354 + 355 + // Reconciliation loop 356 + loop { 357 + if collections.is_empty() { 358 + // No collection filter — reconcile all at once 359 + tracing::info!(collection = "(all)", "reconciling"); 360 + match reconcile_one( 361 + &endpoint, 362 + &args.ramjet_host, 363 + args.quic_addr, 364 + &args.identity, 365 + None, 366 + &mut local_records, 367 + ) 368 + .await 369 + { 370 + Ok(count) => { 371 + tracing::info!( 372 + collection = "(all)", 373 + missing_records = count, 374 + local_records = local_records.len(), 375 + "reconciliation complete" 376 + ); 377 + } 378 + Err(e) => { 379 + tracing::warn!( 380 + collection = "(all)", 381 + error = %e, 382 + "reconciliation failed" 383 + ); 384 + } 385 + } 386 + } else { 387 + for collection in &collections { 388 + tracing::info!(collection = %collection, "reconciling"); 389 + 390 + match reconcile_one( 391 + &endpoint, 392 + &args.ramjet_host, 393 + args.quic_addr, 394 + &args.identity, 395 + Some(collection.as_str()), 396 + &mut local_records, 397 + ) 398 + .await 399 + { 400 + Ok(count) => { 401 + tracing::info!( 402 + collection = %collection, 403 + missing_records = count, 404 + local_records = local_records.len(), 405 + "reconciliation complete" 406 + ); 407 + } 408 + Err(e) => { 409 + tracing::warn!( 410 + collection = %collection, 411 + error = %e, 412 + "reconciliation failed" 413 + ); 414 + } 415 + } 416 + } 417 + } 418 + 419 + tracing::info!( 420 + sleep_secs = args.sleep_duration, 421 + total_local_records = local_records.len(), 422 + "sleeping until next cycle" 423 + ); 424 + tokio::time::sleep(sleep_duration).await; 425 + } 426 + } 427 + 428 + async fn reconcile_one( 429 + endpoint: &quinn::Endpoint, 430 + ramjet_host: &str, 431 + quic_addr: SocketAddr, 432 + did: &str, 433 + collection: Option<&str>, 434 + local_records: &mut LocalRecords, 435 + ) -> anyhow::Result<usize> { 436 + // If we have no local records for this collection, bootstrap via HTTP 437 + let has_local = if let Some(coll) = collection { 438 + local_records.keys().any(|(c, _)| c == coll) 439 + } else { 440 + !local_records.is_empty() 441 + }; 442 + 443 + if !has_local { 444 + if let Some(coll) = collection { 445 + tracing::info!( 446 + collection = %coll, 447 + "no local records, bootstrapping via listRecords" 448 + ); 449 + let fetched = fetch_all_records(ramjet_host, did, coll).await?; 450 + tracing::info!( 451 + collection = %coll, 452 + records = fetched.len(), 453 + "bootstrap complete" 454 + ); 455 + local_records.extend(fetched); 456 + } else { 457 + tracing::info!("no local records and no collection filter, skipping bootstrap"); 458 + } 459 + } 460 + 461 + // Connect via QUIC 462 + let connecting = endpoint.connect(quic_addr, "localhost")?; 463 + let connection = connecting.await?; 464 + 465 + let (send, recv) = connection.open_bi().await?; 466 + 467 + let result = run_session(send, recv, did, collection, local_records).await; 468 + 469 + connection.close(0u32.into(), b"done"); 470 + 471 + result 472 + } 473 + 474 + async fn run_session( 475 + mut send: quinn::SendStream, 476 + mut recv: quinn::RecvStream, 477 + did: &str, 478 + collection: Option<&str>, 479 + local_records: &mut LocalRecords, 480 + ) -> anyhow::Result<usize> { 481 + let display_coll = collection.unwrap_or("(all)"); 482 + 483 + // Phase 1: Init 484 + write_message( 485 + &mut send, 486 + &ClientMessage::Init { 487 + did: did.to_string(), 488 + collection: collection.map(|s| s.to_string()), 489 + }, 490 + ) 491 + .await?; 492 + 493 + let init_resp: ServerMessage = read_message(&mut recv).await?; 494 + let rev = match init_resp { 495 + ServerMessage::InitOk { rev } => rev, 496 + ServerMessage::Error { code, message } => { 497 + anyhow::bail!("server error ({code}): {message}"); 498 + } 499 + other => { 500 + anyhow::bail!("unexpected response: {other:?}"); 501 + } 502 + }; 503 + 504 + tracing::debug!(collection = %display_coll, %rev, "session started"); 505 + 506 + // Phase 2: Build decoder with local symbols 507 + let mut decoder = Decoder::<ByteSymbol>::new(); 508 + let local_count = { 509 + let mut count = 0usize; 510 + for ((coll, rkey), cid_bytes) in local_records.iter() { 511 + let matches = match collection { 512 + Some(filter) => coll == filter, 513 + None => true, 514 + }; 515 + if matches { 516 + let symbol = make_record_symbol(coll, rkey, cid_bytes); 517 + decoder.add_symbol(symbol); 518 + count += 1; 519 + } 520 + } 521 + count 522 + }; 523 + tracing::debug!( 524 + collection = %display_coll, 525 + local_symbols = local_count, 526 + "decoder initialized with local symbols" 527 + ); 528 + 529 + // Phase 3: Receive coded symbols from server and decode 530 + let mut total_symbols = 0u64; 531 + let mut need_more_rounds = 0u32; 532 + loop { 533 + let msg: ServerMessage = read_message(&mut recv).await?; 534 + match msg { 535 + ServerMessage::Symbols { symbols } => { 536 + total_symbols += symbols.len() as u64; 537 + for wire_sym in symbols { 538 + let coded: riblt::CodedSymbol<ByteSymbol> = wire_sym.into(); 539 + decoder.add_coded_symbol(coded); 540 + decoder.try_decode(); 541 + } 542 + if decoder.decoded() { 543 + break; 544 + } 545 + need_more_rounds += 1; 546 + if need_more_rounds > MAX_NEED_MORE_ROUNDS { 547 + anyhow::bail!( 548 + "decoding did not converge after {total_symbols} symbols \ 549 + ({need_more_rounds} rounds), local={local_count}" 550 + ); 551 + } 552 + write_message( 553 + &mut send, 554 + &ClientMessage::NeedMore { 555 + count: NEED_MORE_BATCH, 556 + }, 557 + ) 558 + .await?; 559 + } 560 + ServerMessage::Error { code, message } => { 561 + anyhow::bail!("server error during symbols ({code}): {message}"); 562 + } 563 + _ => { 564 + anyhow::bail!("unexpected message during symbol exchange"); 565 + } 566 + } 567 + } 568 + 569 + let remote_syms = decoder.remote(); 570 + let local_syms = decoder.local(); 571 + tracing::info!( 572 + collection = %display_coll, 573 + %rev, 574 + total_symbols, 575 + remote_only = remote_syms.len(), 576 + local_only = local_syms.len(), 577 + "decoding complete" 578 + ); 579 + 580 + // Remove records that are local-only (deleted on server or changed CID) 581 + for hs in local_syms { 582 + if let Some((coll, rkey)) = parse_remote_symbol(&hs.symbol) { 583 + local_records.remove(&(coll, rkey)); 584 + } 585 + } 586 + 587 + // Phase 4: Request missing records (remote-only = server has, we don't) 588 + let missing: Vec<MissingRecord> = remote_syms 589 + .iter() 590 + .filter_map(|hs| { 591 + let (coll, rkey) = parse_remote_symbol(&hs.symbol)?; 592 + Some(MissingRecord { 593 + collection: coll, 594 + rkey, 595 + }) 596 + }) 597 + .collect(); 598 + 599 + let missing_count = missing.len(); 600 + 601 + if missing.is_empty() { 602 + write_message(&mut send, &ClientMessage::Done).await?; 603 + return Ok(0); 604 + } 605 + 606 + tracing::info!( 607 + collection = %display_coll, 608 + missing = missing_count, 609 + "requesting missing records" 610 + ); 611 + write_message(&mut send, &ClientMessage::Decoded { missing }).await?; 612 + 613 + let msg: ServerMessage = read_message(&mut recv).await?; 614 + match msg { 615 + ServerMessage::Records { records } => { 616 + for record in &records { 617 + let cid_str = format_cid(&record.cid); 618 + let value = decode_record_data(&record.data); 619 + 620 + // Update local state 621 + local_records.insert( 622 + (record.collection.clone(), record.rkey.clone()), 623 + record.cid.clone(), 624 + ); 625 + 626 + tracing::info!( 627 + collection = %record.collection, 628 + rkey = %record.rkey, 629 + cid = %cid_str, 630 + "record" 631 + ); 632 + println!( 633 + "{}", 634 + serde_json::json!({ 635 + "collection": record.collection, 636 + "rkey": record.rkey, 637 + "cid": cid_str, 638 + "value": value, 639 + }) 640 + ); 641 + } 642 + } 643 + ServerMessage::Error { code, message } => { 644 + anyhow::bail!("server error fetching records ({code}): {message}"); 645 + } 646 + _ => { 647 + anyhow::bail!("unexpected message when expecting records"); 648 + } 649 + } 650 + 651 + write_message(&mut send, &ClientMessage::Done).await?; 652 + 653 + Ok(missing_count) 654 + }
+1
src/bin/ramjet_writer.rs
··· 138 138 zstd_dict_path: None, 139 139 backfill_dids: Vec::new(), 140 140 consumer_groups: Vec::new(), 141 + quic_listen_addr: None, 141 142 }); 142 143 143 144 let (tx, rx) = mpsc::channel(4096);
+8
src/config.rs
··· 154 154 /// Example: `--consumer-group indexers:3 --consumer-group notifiers:2` 155 155 #[arg(long, env = "RAMJET_CONSUMER_GROUPS", value_delimiter = ',')] 156 156 pub consumer_group: Vec<String>, 157 + 158 + /// QUIC bind address for RIBLT set reconciliation. Server only starts if provided. 159 + #[arg(long, env = "RAMJET_QUIC_LISTEN_ADDR")] 160 + pub quic_listen_addr: Option<SocketAddr>, 157 161 } 158 162 159 163 /// A pre-defined consumer group with a fixed number of partitions. ··· 207 211 pub backfill_dids: Vec<String>, 208 212 /// Pre-defined consumer groups. 209 213 pub consumer_groups: Vec<ConsumerGroup>, 214 + /// QUIC bind address for RIBLT reconciliation (None = disabled). 215 + pub quic_listen_addr: Option<SocketAddr>, 210 216 } 211 217 212 218 impl From<CliArgs> for ServiceConfig { ··· 250 256 zstd_dict_path: args.zstd_dict_path, 251 257 backfill_dids, 252 258 consumer_groups, 259 + quic_listen_addr: args.quic_listen_addr, 253 260 } 254 261 } 255 262 } ··· 337 344 zstd_dict_path: None, 338 345 backfill: String::new(), 339 346 consumer_group: Vec::new(), 347 + quic_listen_addr: None, 340 348 }; 341 349 let config = ServiceConfig::from(args); 342 350 assert!(config.admin_dids.contains("did:plc:abc"));
+7
src/errors.rs
··· 56 56 /// Underlying I/O error. 57 57 source: std::io::Error, 58 58 }, 59 + 60 + /// QUIC transport error. 61 + #[error("error-ramjet-quic-1 QUIC error: {reason}")] 62 + Quic { 63 + /// Description of the QUIC error. 64 + reason: String, 65 + }, 59 66 }
+19
src/main.rs
··· 186 186 } 187 187 })); 188 188 189 + // Spawn QUIC reconciliation server (if configured) 190 + let quic_handle = config.quic_listen_addr.map(|addr| { 191 + tokio::spawn(task_monitor.instrument({ 192 + let db = db.clone(); 193 + let config = config.clone(); 194 + let cancel = cancel.clone(); 195 + async move { 196 + if let Err(e) = 197 + ramjet::server::quic::run_quic_server(addr, db, config, cancel).await 198 + { 199 + tracing::error!(error = %e, "QUIC reconciliation server failed"); 200 + } 201 + } 202 + })) 203 + }); 204 + 189 205 // Build router and bind 190 206 let router = build_router(state); 191 207 let listener = TcpListener::bind(config.listen_addr).await?; ··· 205 221 backfill_handle, 206 222 tokio_metrics_handle, 207 223 ); 224 + if let Some(handle) = quic_handle { 225 + let _ = handle.await; 226 + } 208 227 209 228 tracing::info!("ramjet shut down"); 210 229 Ok(())
+1
src/server/mod.rs
··· 4 4 pub mod dictionary; 5 5 pub mod health; 6 6 pub mod metrics; 7 + pub mod quic; 7 8 pub mod reconciliation; 8 9 pub mod tokio_metrics; 9 10 pub mod websocket;
+447
src/server/quic.rs
··· 1 + //! QUIC-based RIBLT set reconciliation server. 2 + //! 3 + //! Exposes a QUIC endpoint where clients connect to reconcile their 4 + //! understanding of a DID's records using streaming RIBLT coded symbols. 5 + //! Messages are framed as length-prefixed DAG-CBOR using `atproto_dasl`. 6 + 7 + use std::net::SocketAddr; 8 + use std::sync::Arc; 9 + 10 + use quinn::crypto::rustls::QuicServerConfig; 11 + use riblt::CodedSymbol; 12 + use riblt::Encoder; 13 + use riblt::byte_symbol::ByteSymbol; 14 + use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer}; 15 + use serde::{Deserialize, Serialize}; 16 + use tokio_util::sync::CancellationToken; 17 + 18 + use crate::config::ServiceConfig; 19 + use crate::storage::FjallDb; 20 + use crate::storage::keys; 21 + use crate::types::RecordValue; 22 + 23 + use super::reconciliation::collect_symbols; 24 + 25 + const RECON_ALPN: &[u8] = b"ramjet-recon/1"; 26 + const DEFAULT_BATCH_SIZE: u32 = 1000; 27 + const MAX_MESSAGE_SIZE: usize = 16 * 1024 * 1024; // 16 MB 28 + 29 + // --- Wire protocol message types --- 30 + 31 + #[derive(Serialize, Deserialize, Debug)] 32 + pub enum ClientMessage { 33 + Init { 34 + did: String, 35 + collection: Option<String>, 36 + }, 37 + NeedMore { 38 + count: u32, 39 + }, 40 + Decoded { 41 + missing: Vec<MissingRecord>, 42 + }, 43 + Done, 44 + } 45 + 46 + #[derive(Serialize, Deserialize, Debug)] 47 + pub struct MissingRecord { 48 + pub collection: String, 49 + pub rkey: String, 50 + } 51 + 52 + #[derive(Serialize, Deserialize, Debug)] 53 + pub enum ServerMessage { 54 + InitOk { rev: String }, 55 + Symbols { symbols: Vec<WireCodedSymbol> }, 56 + Records { records: Vec<RecordEntry> }, 57 + Error { code: String, message: String }, 58 + } 59 + 60 + #[derive(Serialize, Deserialize, Debug, Clone)] 61 + pub struct WireCodedSymbol { 62 + pub count: i64, 63 + pub hash: u64, 64 + pub symbol: Vec<u8>, 65 + } 66 + 67 + #[derive(Serialize, Deserialize, Debug)] 68 + pub struct RecordEntry { 69 + pub collection: String, 70 + pub rkey: String, 71 + pub cid: Vec<u8>, 72 + pub data: Vec<u8>, 73 + } 74 + 75 + // --- Conversions between riblt types and wire types --- 76 + 77 + impl From<&CodedSymbol<ByteSymbol>> for WireCodedSymbol { 78 + fn from(cs: &CodedSymbol<ByteSymbol>) -> Self { 79 + Self { 80 + count: cs.count, 81 + hash: cs.hash, 82 + symbol: cs.symbol.0.clone(), 83 + } 84 + } 85 + } 86 + 87 + impl From<WireCodedSymbol> for CodedSymbol<ByteSymbol> { 88 + fn from(w: WireCodedSymbol) -> Self { 89 + Self { 90 + count: w.count, 91 + hash: w.hash, 92 + symbol: ByteSymbol(w.symbol), 93 + } 94 + } 95 + } 96 + 97 + // --- Message framing: 4-byte BE length + DAG-CBOR payload --- 98 + 99 + async fn write_message<T: Serialize>( 100 + stream: &mut quinn::SendStream, 101 + msg: &T, 102 + ) -> anyhow::Result<()> { 103 + let data = 104 + atproto_dasl::drisl::to_vec(msg).map_err(|e| anyhow::anyhow!("encode error: {e}"))?; 105 + if data.len() > MAX_MESSAGE_SIZE { 106 + return Err(anyhow::anyhow!("message too large: {} bytes", data.len())); 107 + } 108 + let len = (data.len() as u32).to_be_bytes(); 109 + stream.write_all(&len).await?; 110 + stream.write_all(&data).await?; 111 + Ok(()) 112 + } 113 + 114 + async fn read_message<T: serde::de::DeserializeOwned>( 115 + stream: &mut quinn::RecvStream, 116 + ) -> anyhow::Result<T> { 117 + let mut len_buf = [0u8; 4]; 118 + stream 119 + .read_exact(&mut len_buf) 120 + .await 121 + .map_err(|e| anyhow::anyhow!("read length: {e}"))?; 122 + let len = u32::from_be_bytes(len_buf) as usize; 123 + if len > MAX_MESSAGE_SIZE { 124 + return Err(anyhow::anyhow!("message too large: {len} bytes")); 125 + } 126 + let mut buf = vec![0u8; len]; 127 + stream 128 + .read_exact(&mut buf) 129 + .await 130 + .map_err(|e| anyhow::anyhow!("read payload: {e}"))?; 131 + let msg = 132 + atproto_dasl::drisl::from_slice(&buf).map_err(|e| anyhow::anyhow!("decode error: {e}"))?; 133 + Ok(msg) 134 + } 135 + 136 + // --- TLS setup --- 137 + 138 + fn generate_self_signed() -> (CertificateDer<'static>, PrivatePkcs8KeyDer<'static>) { 139 + let cert = 140 + rcgen::generate_simple_self_signed(vec!["localhost".to_string()]).expect("cert gen failed"); 141 + let cert_der = CertificateDer::from(cert.cert); 142 + let key_der = PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der()); 143 + (cert_der, key_der) 144 + } 145 + 146 + fn make_server_endpoint(bind_addr: SocketAddr) -> anyhow::Result<quinn::Endpoint> { 147 + let (cert_der, key_der) = generate_self_signed(); 148 + let mut server_crypto = rustls::ServerConfig::builder() 149 + .with_no_client_auth() 150 + .with_single_cert(vec![cert_der], key_der.into()) 151 + .map_err(|e| anyhow::anyhow!("bad server cert/key: {e}"))?; 152 + server_crypto.alpn_protocols = vec![RECON_ALPN.to_vec()]; 153 + 154 + let server_config = quinn::ServerConfig::with_crypto(Arc::new( 155 + QuicServerConfig::try_from(server_crypto) 156 + .map_err(|e| anyhow::anyhow!("bad quic server config: {e}"))?, 157 + )); 158 + 159 + let endpoint = quinn::Endpoint::server(server_config, bind_addr) 160 + .map_err(|e| anyhow::anyhow!("failed to bind QUIC endpoint: {e}"))?; 161 + Ok(endpoint) 162 + } 163 + 164 + // --- Helpers --- 165 + 166 + /// Check if an error is a normal connection close (client disconnected gracefully). 167 + fn is_connection_closed_error(e: &anyhow::Error) -> bool { 168 + let msg = e.to_string(); 169 + msg.contains("connection lost") || msg.contains("closed by peer") 170 + } 171 + 172 + // --- Server entry point --- 173 + 174 + pub async fn run_quic_server( 175 + listen_addr: SocketAddr, 176 + db: Arc<FjallDb>, 177 + _config: Arc<ServiceConfig>, 178 + cancel: CancellationToken, 179 + ) -> anyhow::Result<()> { 180 + let endpoint = make_server_endpoint(listen_addr)?; 181 + tracing::info!(addr = %listen_addr, "QUIC reconciliation server listening"); 182 + 183 + loop { 184 + tokio::select! { 185 + incoming = endpoint.accept() => { 186 + let Some(incoming) = incoming else { 187 + tracing::info!("QUIC endpoint closed"); 188 + break; 189 + }; 190 + let db = db.clone(); 191 + tokio::spawn(async move { 192 + match incoming.await { 193 + Ok(connection) => { 194 + let remote = connection.remote_address(); 195 + tracing::debug!(%remote, "QUIC client connected"); 196 + if let Err(e) = handle_connection(connection, db).await { 197 + if is_connection_closed_error(&e) { 198 + tracing::debug!(%remote, "QUIC client disconnected"); 199 + } else { 200 + tracing::warn!(%remote, error = %e, "QUIC session error"); 201 + } 202 + } 203 + } 204 + Err(e) => { 205 + tracing::warn!(error = %e, "QUIC connection failed"); 206 + } 207 + } 208 + }); 209 + } 210 + () = cancel.cancelled() => { 211 + tracing::info!("QUIC server shutting down"); 212 + break; 213 + } 214 + } 215 + } 216 + 217 + endpoint.close(0u32.into(), b"shutdown"); 218 + endpoint.wait_idle().await; 219 + Ok(()) 220 + } 221 + 222 + // --- Per-connection handler --- 223 + 224 + async fn handle_connection(connection: quinn::Connection, db: Arc<FjallDb>) -> anyhow::Result<()> { 225 + let (mut send, mut recv) = connection.accept_bi().await?; 226 + handle_session(&mut send, &mut recv, db).await 227 + } 228 + 229 + // --- Session protocol --- 230 + 231 + async fn handle_session( 232 + send: &mut quinn::SendStream, 233 + recv: &mut quinn::RecvStream, 234 + db: Arc<FjallDb>, 235 + ) -> anyhow::Result<()> { 236 + // Phase 1: Read Init message 237 + let init: ClientMessage = read_message(recv).await?; 238 + let (did, collection_filter) = match init { 239 + ClientMessage::Init { did, collection } => (did, collection), 240 + _ => { 241 + write_message( 242 + send, 243 + &ServerMessage::Error { 244 + code: "InvalidRequest".to_string(), 245 + message: "expected Init message".to_string(), 246 + }, 247 + ) 248 + .await?; 249 + return Ok(()); 250 + } 251 + }; 252 + 253 + // Validate DID exists 254 + let repo_exists = db.get_repo_state(&did)?.is_some(); 255 + if !repo_exists { 256 + write_message( 257 + send, 258 + &ServerMessage::Error { 259 + code: "RepoNotFound".to_string(), 260 + message: format!("repo not found: {did}"), 261 + }, 262 + ) 263 + .await?; 264 + return Ok(()); 265 + } 266 + 267 + // Phase 2: Collect symbols (blocking scan) 268 + let db_clone = db.clone(); 269 + let did_clone = did.clone(); 270 + let coll_clone = collection_filter.clone(); 271 + let (symbols, rev) = tokio::task::spawn_blocking(move || { 272 + collect_symbols(&db_clone, &did_clone, coll_clone.as_deref()) 273 + }) 274 + .await??; 275 + 276 + // Build encoder from collected symbols 277 + let mut encoder = Encoder::<ByteSymbol>::new(); 278 + for sym in &symbols { 279 + encoder.add_symbol(sym.clone()); 280 + } 281 + 282 + // Send InitOk 283 + write_message(send, &ServerMessage::InitOk { rev }).await?; 284 + 285 + // Send initial batch of coded symbols 286 + let initial_batch = produce_batch(&mut encoder, DEFAULT_BATCH_SIZE); 287 + write_message( 288 + send, 289 + &ServerMessage::Symbols { 290 + symbols: initial_batch, 291 + }, 292 + ) 293 + .await?; 294 + 295 + // Phase 3: Message loop 296 + loop { 297 + let msg: ClientMessage = read_message(recv).await?; 298 + match msg { 299 + ClientMessage::NeedMore { count } => { 300 + let count = count.min(10_000); // cap per-request 301 + let batch = produce_batch(&mut encoder, count); 302 + write_message(send, &ServerMessage::Symbols { symbols: batch }).await?; 303 + } 304 + ClientMessage::Decoded { missing } => { 305 + let records = fetch_records(&db, &did, &missing)?; 306 + write_message(send, &ServerMessage::Records { records }).await?; 307 + } 308 + ClientMessage::Done => { 309 + break; 310 + } 311 + ClientMessage::Init { .. } => { 312 + write_message( 313 + send, 314 + &ServerMessage::Error { 315 + code: "InvalidRequest".to_string(), 316 + message: "unexpected Init message".to_string(), 317 + }, 318 + ) 319 + .await?; 320 + break; 321 + } 322 + } 323 + } 324 + 325 + Ok(()) 326 + } 327 + 328 + // --- Helpers --- 329 + 330 + fn produce_batch(encoder: &mut Encoder<ByteSymbol>, count: u32) -> Vec<WireCodedSymbol> { 331 + (0..count) 332 + .map(|_| WireCodedSymbol::from(&encoder.produce_next_coded_symbol())) 333 + .collect() 334 + } 335 + 336 + fn fetch_records( 337 + db: &FjallDb, 338 + did: &str, 339 + missing: &[MissingRecord], 340 + ) -> anyhow::Result<Vec<RecordEntry>> { 341 + let mut records = Vec::with_capacity(missing.len()); 342 + 343 + for req in missing { 344 + let prefix = keys::encode_record_prefix(did, &req.collection, &req.rkey); 345 + 346 + // Find the latest revision for this (collection, rkey) 347 + let mut latest_value: Option<Vec<u8>> = None; 348 + for guard in db.records.prefix(&prefix) { 349 + let Ok((_key, value)) = guard.into_inner() else { 350 + continue; 351 + }; 352 + let decompressed = db 353 + .decompress_event(&value) 354 + .unwrap_or_else(|_| value.to_vec()); 355 + latest_value = Some(decompressed); 356 + } 357 + 358 + if let Some(value) = latest_value { 359 + if !RecordValue::is_tombstone(&value) { 360 + if let Ok(rv) = RecordValue::decode(&value) { 361 + records.push(RecordEntry { 362 + collection: req.collection.clone(), 363 + rkey: req.rkey.clone(), 364 + cid: rv.cid, 365 + data: rv.data, 366 + }); 367 + } 368 + } 369 + } 370 + } 371 + 372 + Ok(records) 373 + } 374 + 375 + #[cfg(test)] 376 + mod tests { 377 + use super::*; 378 + 379 + #[test] 380 + fn test_wire_coded_symbol_roundtrip() { 381 + let original = WireCodedSymbol { 382 + count: -3, 383 + hash: 0xDEADBEEF, 384 + symbol: vec![1, 2, 3, 4], 385 + }; 386 + 387 + let encoded = atproto_dasl::drisl::to_vec(&original).expect("encode"); 388 + let decoded: WireCodedSymbol = atproto_dasl::drisl::from_slice(&encoded).expect("decode"); 389 + 390 + assert_eq!(decoded.count, original.count); 391 + assert_eq!(decoded.hash, original.hash); 392 + assert_eq!(decoded.symbol, original.symbol); 393 + } 394 + 395 + #[test] 396 + fn test_client_message_roundtrip() { 397 + let msg = ClientMessage::Init { 398 + did: "did:plc:abc123".to_string(), 399 + collection: Some("app.bsky.feed.post".to_string()), 400 + }; 401 + let encoded = atproto_dasl::drisl::to_vec(&msg).expect("encode"); 402 + let decoded: ClientMessage = atproto_dasl::drisl::from_slice(&encoded).expect("decode"); 403 + 404 + match decoded { 405 + ClientMessage::Init { did, collection } => { 406 + assert_eq!(did, "did:plc:abc123"); 407 + assert_eq!(collection.as_deref(), Some("app.bsky.feed.post")); 408 + } 409 + _ => panic!("wrong variant"), 410 + } 411 + } 412 + 413 + #[test] 414 + fn test_server_message_roundtrip() { 415 + let msg = ServerMessage::InitOk { 416 + rev: "3jzfcijpj2z2b".to_string(), 417 + }; 418 + let encoded = atproto_dasl::drisl::to_vec(&msg).expect("encode"); 419 + let decoded: ServerMessage = atproto_dasl::drisl::from_slice(&encoded).expect("decode"); 420 + 421 + match decoded { 422 + ServerMessage::InitOk { rev } => { 423 + assert_eq!(rev, "3jzfcijpj2z2b"); 424 + } 425 + _ => panic!("wrong variant"), 426 + } 427 + } 428 + 429 + #[test] 430 + fn test_coded_symbol_conversion() { 431 + let cs = CodedSymbol { 432 + count: 5, 433 + hash: 42, 434 + symbol: ByteSymbol(vec![10, 20, 30]), 435 + }; 436 + 437 + let wire = WireCodedSymbol::from(&cs); 438 + assert_eq!(wire.count, 5); 439 + assert_eq!(wire.hash, 42); 440 + assert_eq!(wire.symbol, vec![10, 20, 30]); 441 + 442 + let back: CodedSymbol<ByteSymbol> = wire.into(); 443 + assert_eq!(back.count, 5); 444 + assert_eq!(back.hash, 42); 445 + assert_eq!(back.symbol.0, vec![10, 20, 30]); 446 + } 447 + }
+22 -8
src/server/reconciliation.rs
··· 32 32 /// 33 33 /// Sizes are multiples of 25,000. The sketch stays at the current tier 34 34 /// until the record count crosses 50% into the next increment, then bumps. 35 - fn sketch_size_for_records(record_count: usize) -> usize { 35 + pub fn sketch_size_for_records(record_count: usize) -> usize { 36 36 let mut size = SKETCH_INCREMENT; 37 37 while record_count >= size + SKETCH_INCREMENT / 2 { 38 38 size += SKETCH_INCREMENT; ··· 41 41 } 42 42 43 43 /// Build a ByteSymbol for a record identified by (collection, rkey, cid). 44 - fn make_record_symbol(collection: &str, rkey: &str, cid: &[u8]) -> ByteSymbol { 44 + pub fn make_record_symbol(collection: &str, rkey: &str, cid: &[u8]) -> ByteSymbol { 45 45 let mut bytes = Vec::with_capacity(collection.len() + 1 + rkey.len() + 1 + cid.len()); 46 46 bytes.extend_from_slice(collection.as_bytes()); 47 47 bytes.push(0x00); ··· 146 146 } 147 147 } 148 148 149 - /// Build an RIBLT sketch from the records keyspace for a DID. 149 + /// Collect ByteSymbols for a DID's tracked records. 150 150 /// 151 151 /// Scans all records, groups by (collection, rkey), takes the latest 152 - /// non-tombstone revision of each, and produces coded symbols. 153 - fn build_sketch( 152 + /// non-tombstone revision of each, and returns the symbols plus the 153 + /// current repo revision. 154 + pub fn collect_symbols( 154 155 db: &FjallDb, 155 156 did: &str, 156 157 collection_filter: Option<&str>, 157 - ) -> anyhow::Result<(Vec<u8>, u64, String)> { 158 + ) -> anyhow::Result<(Vec<ByteSymbol>, String)> { 158 159 let prefix = if let Some(coll) = collection_filter { 159 160 keys::encode_collection_prefix(did, coll) 160 161 } else { ··· 231 232 } 232 233 } 233 234 235 + let rev = db.get_repo_state(did)?.map(|rs| rs.rev).unwrap_or_default(); 236 + 237 + Ok((symbols, rev)) 238 + } 239 + 240 + /// Build an RIBLT sketch from the records keyspace for a DID. 241 + /// 242 + /// Collects symbols and encodes them into an RibltFile. 243 + fn build_sketch( 244 + db: &FjallDb, 245 + did: &str, 246 + collection_filter: Option<&str>, 247 + ) -> anyhow::Result<(Vec<u8>, u64, String)> { 248 + let (symbols, rev) = collect_symbols(db, did, collection_filter)?; 249 + 234 250 let record_count = symbols.len() as u64; 235 251 let sketch_size = sketch_size_for_records(symbols.len()); 236 252 ··· 250 266 251 267 let mut buf = Vec::new(); 252 268 file.write_to(&mut buf)?; 253 - 254 - let rev = db.get_repo_state(did)?.map(|rs| rs.rev).unwrap_or_default(); 255 269 256 270 Ok((buf, record_count, rev)) 257 271 }