Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

horrible debug stuff everywhere

phil 2d734985 5aa2c368

+2187 -216
+144 -108
Cargo.lock
··· 274 274 275 275 [[package]] 276 276 name = "axum" 277 - version = "0.8.1" 277 + version = "0.8.3" 278 278 source = "registry+https://github.com/rust-lang/crates.io-index" 279 - checksum = "6d6fd624c75e18b3b4c6b9caf42b1afe24437daaee904069137d8bab077be8b8" 279 + checksum = "de45108900e1f9b9242f7f2e254aa3e2c029c921c258fe9e6b4217eeebd54288" 280 280 dependencies = [ 281 281 "axum-core", 282 282 "bytes", ··· 308 308 309 309 [[package]] 310 310 name = "axum-core" 311 - version = "0.5.0" 311 + version = "0.5.2" 312 312 source = "registry+https://github.com/rust-lang/crates.io-index" 313 - checksum = "df1362f362fd16024ae199c1970ce98f9661bf5ef94b9808fee734bc3698b733" 313 + checksum = "68464cd0412f486726fb3373129ef5d2993f90c34bc2bc1c1e9943b2f4fc7ca6" 314 314 dependencies = [ 315 315 "bytes", 316 - "futures-util", 316 + "futures-core", 317 317 "http", 318 318 "http-body", 319 319 "http-body-util", ··· 328 328 329 329 [[package]] 330 330 name = "axum-extra" 331 - version = "0.10.0" 331 + version = "0.10.1" 332 332 source = "registry+https://github.com/rust-lang/crates.io-index" 333 - checksum = "460fc6f625a1f7705c6cf62d0d070794e94668988b1c38111baeec177c715f7b" 333 + checksum = "45bf463831f5131b7d3c756525b305d40f1185b688565648a92e1392ca35713d" 334 334 dependencies = [ 335 335 "axum", 336 336 "axum-core", ··· 342 342 "http-body-util", 343 343 "mime", 344 344 "pin-project-lite", 345 + "rustversion", 345 346 "serde", 346 347 "tower", 347 348 "tower-layer", ··· 567 568 568 569 [[package]] 569 570 name = "cc" 570 - version = "1.2.17" 571 + version = "1.2.18" 571 572 source = "registry+https://github.com/rust-lang/crates.io-index" 572 - checksum = "1fcb57c740ae1daf453ae85f16e37396f672b039e00d9d866e07ddb24e328e3a" 573 + checksum = "525046617d8376e3db1deffb079e91cef90a89fc3ca5c185bbf8c9ecdd15cd5c" 573 574 dependencies = [ 574 575 "jobserver", 575 576 "libc", ··· 639 640 640 641 [[package]] 641 642 name = "clap" 642 - version = "4.5.33" 643 + version = "4.5.35" 643 644 source = "registry+https://github.com/rust-lang/crates.io-index" 644 - checksum = "e2c80cae4c3350dd8f1272c73e83baff9a6ba550b8bfbe651b3c45b78cd1751e" 645 + checksum = "d8aa86934b44c19c50f87cc2790e19f54f7a67aedb64101c2e1a2e5ecfb73944" 645 646 dependencies = [ 646 647 "clap_builder", 647 648 "clap_derive", ··· 649 650 650 651 [[package]] 651 652 name = "clap_builder" 652 - version = "4.5.33" 653 + version = "4.5.35" 653 654 source = "registry+https://github.com/rust-lang/crates.io-index" 654 - checksum = "0123e386f691c90aa228219b5b1ee72d465e8e231c79e9c82324f016a62a741c" 655 + checksum = "2414dbb2dd0695280da6ea9261e327479e9d37b0630f6b53ba2a11c60c679fd9" 655 656 dependencies = [ 656 657 "anstream", 657 658 "anstyle", ··· 780 781 781 782 [[package]] 782 783 name = "crossbeam-channel" 783 - version = "0.5.14" 784 + version = "0.5.15" 784 785 source = "registry+https://github.com/rust-lang/crates.io-index" 785 - checksum = "06ba6d68e24814cb8de6bb986db8222d3a027d15872cabc0d18817bc3c0e4471" 786 + checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" 786 787 dependencies = [ 787 788 "crossbeam-utils", 788 789 ] ··· 824 825 825 826 [[package]] 826 827 name = "ctrlc" 827 - version = "3.4.5" 828 + version = "3.4.6" 828 829 source = "registry+https://github.com/rust-lang/crates.io-index" 829 - checksum = "90eeab0aa92f3f9b4e87f258c72b139c207d251f9cbc1080a0086b86a8870dd3" 830 + checksum = "697b5419f348fd5ae2478e8018cb016c00a5881c7f46c717de98ffd135a5651c" 830 831 dependencies = [ 831 832 "nix", 832 833 "windows-sys 0.59.0", ··· 834 835 835 836 [[package]] 836 837 name = "darling" 837 - version = "0.20.10" 838 + version = "0.20.11" 838 839 source = "registry+https://github.com/rust-lang/crates.io-index" 839 - checksum = "6f63b86c8a8826a49b8c21f08a2d07338eec8d900540f8630dc76284be802989" 840 + checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" 840 841 dependencies = [ 841 842 "darling_core", 842 843 "darling_macro", ··· 844 845 845 846 [[package]] 846 847 name = "darling_core" 847 - version = "0.20.10" 848 + version = "0.20.11" 848 849 source = "registry+https://github.com/rust-lang/crates.io-index" 849 - checksum = "95133861a8032aaea082871032f5815eb9e98cef03fa916ab4500513994df9e5" 850 + checksum = "0d00b9596d185e565c2207a0b01f8bd1a135483d02d9b7b0a54b11da8d53412e" 850 851 dependencies = [ 851 852 "fnv", 852 853 "ident_case", ··· 858 859 859 860 [[package]] 860 861 name = "darling_macro" 861 - version = "0.20.10" 862 + version = "0.20.11" 862 863 source = "registry+https://github.com/rust-lang/crates.io-index" 863 - checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" 864 + checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" 864 865 dependencies = [ 865 866 "darling_core", 866 867 "quote", ··· 915 916 916 917 [[package]] 917 918 name = "deranged" 918 - version = "0.4.1" 919 + version = "0.4.0" 919 920 source = "registry+https://github.com/rust-lang/crates.io-index" 920 - checksum = "28cfac68e08048ae1883171632c2aef3ebc555621ae56fbccce1cbf22dd7f058" 921 + checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e" 921 922 dependencies = [ 922 923 "powerfmt", 923 924 "serde", ··· 987 988 "dropshot_endpoint", 988 989 "form_urlencoded", 989 990 "futures", 990 - "hostname 0.4.0", 991 + "hostname 0.4.1", 991 992 "http", 992 993 "http-body-util", 993 994 "hyper", 994 995 "hyper-util", 995 - "indexmap 2.8.0", 996 + "indexmap 2.9.0", 996 997 "multer", 997 998 "openapiv3", 998 999 "paste", ··· 1081 1082 1082 1083 [[package]] 1083 1084 name = "env_logger" 1084 - version = "0.11.7" 1085 + version = "0.11.8" 1085 1086 source = "registry+https://github.com/rust-lang/crates.io-index" 1086 - checksum = "c3716d7a920fb4fac5d84e9d4bce8ceb321e9414b4409da61b07b75c1e3d0697" 1087 + checksum = "13c863f0904021b108aa8b2f55046443e6b1ebde8fd4a15c399893aae4fa069f" 1087 1088 dependencies = [ 1088 1089 "anstream", 1089 1090 "anstyle", ··· 1100 1101 1101 1102 [[package]] 1102 1103 name = "errno" 1103 - version = "0.3.10" 1104 + version = "0.3.11" 1104 1105 source = "registry+https://github.com/rust-lang/crates.io-index" 1105 - checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d" 1106 + checksum = "976dd42dc7e85965fe702eb8164f21f450704bdde31faefd6471dba214cb594e" 1106 1107 dependencies = [ 1107 1108 "libc", 1108 - "windows-sys 0.59.0", 1109 + "windows-sys 0.52.0", 1109 1110 ] 1110 1111 1111 1112 [[package]] ··· 1121 1122 1122 1123 [[package]] 1123 1124 name = "event-listener-strategy" 1124 - version = "0.5.3" 1125 + version = "0.5.4" 1125 1126 source = "registry+https://github.com/rust-lang/crates.io-index" 1126 - checksum = "3c3e4e0dd3673c1139bf041f3008816d9cf2946bbfac2945c09e523b8d7b05b2" 1127 + checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" 1127 1128 dependencies = [ 1128 1129 "event-listener", 1129 1130 "pin-project-lite", ··· 1316 1317 "libc", 1317 1318 "log", 1318 1319 "rustversion", 1319 - "windows 0.58.0", 1320 + "windows", 1320 1321 ] 1321 1322 1322 1323 [[package]] ··· 1382 1383 "futures-core", 1383 1384 "futures-sink", 1384 1385 "http", 1385 - "indexmap 2.8.0", 1386 + "indexmap 2.9.0", 1386 1387 "slab", 1387 1388 "tokio", 1388 1389 "tokio-util", ··· 1478 1479 1479 1480 [[package]] 1480 1481 name = "hostname" 1481 - version = "0.4.0" 1482 + version = "0.4.1" 1482 1483 source = "registry+https://github.com/rust-lang/crates.io-index" 1483 - checksum = "f9c7c7c8ac16c798734b8a24560c1362120597c40d5e1459f09498f8f6c8f2ba" 1484 + checksum = "a56f203cd1c76362b69e3863fd987520ac36cf70a8c92627449b2f64a8cf7d65" 1484 1485 dependencies = [ 1485 1486 "cfg-if", 1486 1487 "libc", 1487 - "windows 0.52.0", 1488 + "windows-link", 1488 1489 ] 1489 1490 1490 1491 [[package]] ··· 1565 1566 1566 1567 [[package]] 1567 1568 name = "hyper-util" 1568 - version = "0.1.10" 1569 + version = "0.1.11" 1569 1570 source = "registry+https://github.com/rust-lang/crates.io-index" 1570 - checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4" 1571 + checksum = "497bbc33a26fdd4af9ed9c70d63f61cf56a938375fbb32df34db9b1cd6d643f2" 1571 1572 dependencies = [ 1572 1573 "bytes", 1573 1574 "futures-channel", ··· 1575 1576 "http", 1576 1577 "http-body", 1577 1578 "hyper", 1579 + "libc", 1578 1580 "pin-project-lite", 1579 1581 "socket2", 1580 1582 "tokio", ··· 1584 1586 1585 1587 [[package]] 1586 1588 name = "iana-time-zone" 1587 - version = "0.1.62" 1589 + version = "0.1.63" 1588 1590 source = "registry+https://github.com/rust-lang/crates.io-index" 1589 - checksum = "b2fd658b06e56721792c5df4475705b6cda790e9298d19d2f8af083457bcd127" 1591 + checksum = "b0c919e5debc312ad217002b8048a17b7d83f80703865bbfcfebb0458b0b27d8" 1590 1592 dependencies = [ 1591 1593 "android_system_properties", 1592 1594 "core-foundation-sys", ··· 1594 1596 "js-sys", 1595 1597 "log", 1596 1598 "wasm-bindgen", 1597 - "windows-core 0.52.0", 1599 + "windows-core 0.61.0", 1598 1600 ] 1599 1601 1600 1602 [[package]] ··· 1764 1766 1765 1767 [[package]] 1766 1768 name = "indexmap" 1767 - version = "2.8.0" 1769 + version = "2.9.0" 1768 1770 source = "registry+https://github.com/rust-lang/crates.io-index" 1769 - checksum = "3954d50fe15b02142bf25d3b8bdadb634ec3948f103d04ffe3031bc8fe9d7058" 1771 + checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e" 1770 1772 dependencies = [ 1771 1773 "equivalent", 1772 1774 "hashbrown 0.15.2", ··· 1807 1809 dependencies = [ 1808 1810 "hermit-abi", 1809 1811 "libc", 1810 - "windows-sys 0.59.0", 1812 + "windows-sys 0.52.0", 1811 1813 ] 1812 1814 1813 1815 [[package]] ··· 1862 1864 1863 1865 [[package]] 1864 1866 name = "jiff" 1865 - version = "0.2.5" 1867 + version = "0.2.6" 1866 1868 source = "registry+https://github.com/rust-lang/crates.io-index" 1867 - checksum = "c102670231191d07d37a35af3eb77f1f0dbf7a71be51a962dcd57ea607be7260" 1869 + checksum = "1f33145a5cbea837164362c7bd596106eb7c5198f97d1ba6f6ebb3223952e488" 1868 1870 dependencies = [ 1869 1871 "jiff-static", 1870 1872 "log", ··· 1875 1877 1876 1878 [[package]] 1877 1879 name = "jiff-static" 1878 - version = "0.2.5" 1880 + version = "0.2.6" 1879 1881 source = "registry+https://github.com/rust-lang/crates.io-index" 1880 - checksum = "4cdde31a9d349f1b1f51a0b3714a5940ac022976f4b49485fc04be052b183b4c" 1882 + checksum = "43ce13c40ec6956157a3635d97a1ee2df323b263f09ea14165131289cb0f5c19" 1881 1883 dependencies = [ 1882 1884 "proc-macro2", 1883 1885 "quote", ··· 1886 1888 1887 1889 [[package]] 1888 1890 name = "jobserver" 1889 - version = "0.1.32" 1891 + version = "0.1.33" 1890 1892 source = "registry+https://github.com/rust-lang/crates.io-index" 1891 - checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0" 1893 + checksum = "38f262f097c174adebe41eb73d66ae9c06b2844fb0da69969647bbddd9b0538a" 1892 1894 dependencies = [ 1895 + "getrandom 0.3.2", 1893 1896 "libc", 1894 1897 ] 1895 1898 ··· 2012 2015 2013 2016 [[package]] 2014 2017 name = "linux-raw-sys" 2015 - version = "0.9.3" 2018 + version = "0.9.4" 2016 2019 source = "registry+https://github.com/rust-lang/crates.io-index" 2017 - checksum = "fe7db12097d22ec582439daf8618b8fdd1a7bef6270e9af3b1ebcd30893cf413" 2020 + checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12" 2018 2021 2019 2022 [[package]] 2020 2023 name = "litemap" ··· 2162 2165 "http-body-util", 2163 2166 "hyper", 2164 2167 "hyper-util", 2165 - "indexmap 2.8.0", 2168 + "indexmap 2.9.0", 2166 2169 "ipnet", 2167 2170 "metrics", 2168 2171 "metrics-util", ··· 2185 2188 "once_cell", 2186 2189 "procfs", 2187 2190 "rlimit", 2188 - "windows 0.58.0", 2191 + "windows", 2189 2192 ] 2190 2193 2191 2194 [[package]] ··· 2228 2231 2229 2232 [[package]] 2230 2233 name = "miniz_oxide" 2231 - version = "0.8.5" 2234 + version = "0.8.8" 2232 2235 source = "registry+https://github.com/rust-lang/crates.io-index" 2233 - checksum = "8e3e04debbb59698c15bacbb6d93584a8c0ca9cc3213cb423d31f760d8843ce5" 2236 + checksum = "3be647b768db090acb35d5ec5db2b0e1f1de11133ca123b9eacf5137868f892a" 2234 2237 dependencies = [ 2235 2238 "adler2", 2236 2239 ] ··· 2401 2404 2402 2405 [[package]] 2403 2406 name = "once_cell" 2404 - version = "1.21.1" 2407 + version = "1.21.3" 2405 2408 source = "registry+https://github.com/rust-lang/crates.io-index" 2406 - checksum = "d75b0bedcc4fe52caa0e03d9f1151a323e4aa5e2d78ba3580400cd3c9e2bc4bc" 2409 + checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" 2407 2410 2408 2411 [[package]] 2409 2412 name = "openapiv3" ··· 2411 2414 source = "registry+https://github.com/rust-lang/crates.io-index" 2412 2415 checksum = "cc02deea53ffe807708244e5914f6b099ad7015a207ee24317c22112e17d9c5c" 2413 2416 dependencies = [ 2414 - "indexmap 2.8.0", 2417 + "indexmap 2.9.0", 2415 2418 "serde", 2416 2419 "serde_json", 2417 2420 ] 2418 2421 2419 2422 [[package]] 2420 2423 name = "openssl" 2421 - version = "0.10.71" 2424 + version = "0.10.72" 2422 2425 source = "registry+https://github.com/rust-lang/crates.io-index" 2423 - checksum = "5e14130c6a98cd258fdcb0fb6d744152343ff729cbfcb28c656a9d12b999fbcd" 2426 + checksum = "fedfea7d58a1f73118430a55da6a286e7b044961736ce96a16a17068ea25e5da" 2424 2427 dependencies = [ 2425 2428 "bitflags", 2426 2429 "cfg-if", ··· 2450 2453 2451 2454 [[package]] 2452 2455 name = "openssl-src" 2453 - version = "300.4.2+3.4.1" 2456 + version = "300.5.0+3.5.0" 2454 2457 source = "registry+https://github.com/rust-lang/crates.io-index" 2455 - checksum = "168ce4e058f975fe43e89d9ccf78ca668601887ae736090aacc23ae353c298e2" 2458 + checksum = "e8ce546f549326b0e6052b649198487d91320875da901e7bd11a06d1ee3f9c2f" 2456 2459 dependencies = [ 2457 2460 "cc", 2458 2461 ] 2459 2462 2460 2463 [[package]] 2461 2464 name = "openssl-sys" 2462 - version = "0.9.106" 2465 + version = "0.9.107" 2463 2466 source = "registry+https://github.com/rust-lang/crates.io-index" 2464 - checksum = "8bb61ea9811cc39e3c2069f40b8b8e2e70d8569b361f879786cc7ed48b777cdd" 2467 + checksum = "8288979acd84749c744a9014b4382d42b8f7b2592847b5afb2ed29e5d16ede07" 2465 2468 dependencies = [ 2466 2469 "cc", 2467 2470 "libc", ··· 2745 2748 2746 2749 [[package]] 2747 2750 name = "redox_syscall" 2748 - version = "0.5.10" 2751 + version = "0.5.11" 2749 2752 source = "registry+https://github.com/rust-lang/crates.io-index" 2750 - checksum = "0b8c0c260b63a8219631167be35e6a988e9554dbd323f8bd08439c8ed1302bd1" 2753 + checksum = "d2f103c6d277498fbceb16e84d317e2a400f160f46904d5f5410848c829511a3" 2751 2754 dependencies = [ 2752 2755 "bitflags", 2753 2756 ] ··· 2897 2900 "errno", 2898 2901 "libc", 2899 2902 "linux-raw-sys 0.4.15", 2900 - "windows-sys 0.59.0", 2903 + "windows-sys 0.52.0", 2901 2904 ] 2902 2905 2903 2906 [[package]] 2904 2907 name = "rustix" 2905 - version = "1.0.3" 2908 + version = "1.0.5" 2906 2909 source = "registry+https://github.com/rust-lang/crates.io-index" 2907 - checksum = "e56a18552996ac8d29ecc3b190b4fdbb2d91ca4ec396de7bbffaf43f3d637e96" 2910 + checksum = "d97817398dd4bb2e6da002002db259209759911da105da92bec29ccb12cf58bf" 2908 2911 dependencies = [ 2909 2912 "bitflags", 2910 2913 "errno", 2911 2914 "libc", 2912 - "linux-raw-sys 0.9.3", 2913 - "windows-sys 0.59.0", 2915 + "linux-raw-sys 0.9.4", 2916 + "windows-sys 0.52.0", 2914 2917 ] 2915 2918 2916 2919 [[package]] ··· 3093 3096 checksum = "9d2de91cf02bbc07cde38891769ccd5d4f073d22a40683aa4bc7a95781aaa2c4" 3094 3097 dependencies = [ 3095 3098 "form_urlencoded", 3096 - "indexmap 2.8.0", 3099 + "indexmap 2.9.0", 3097 3100 "itoa", 3098 3101 "ryu", 3099 3102 "serde", ··· 3164 3167 "chrono", 3165 3168 "hex", 3166 3169 "indexmap 1.9.3", 3167 - "indexmap 2.8.0", 3170 + "indexmap 2.9.0", 3168 3171 "serde", 3169 3172 "serde_derive", 3170 3173 "serde_json", ··· 3291 3294 3292 3295 [[package]] 3293 3296 name = "smallvec" 3294 - version = "1.14.0" 3297 + version = "1.15.0" 3295 3298 source = "registry+https://github.com/rust-lang/crates.io-index" 3296 - checksum = "7fcf8323ef1faaee30a44a340193b1ac6814fd9b7b4e88e9d4519a3e4abe1cfd" 3299 + checksum = "8917285742e9f3e1683f0a9c4e6b57960b7314d0b08d30d1ecd426713ee2eee9" 3297 3300 3298 3301 [[package]] 3299 3302 name = "socket2" 3300 - version = "0.5.8" 3303 + version = "0.5.9" 3301 3304 source = "registry+https://github.com/rust-lang/crates.io-index" 3302 - checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8" 3305 + checksum = "4f5fd57c80058a56cf5c777ab8a126398ece8e442983605d280a44ce79d0edef" 3303 3306 dependencies = [ 3304 3307 "libc", 3305 3308 "windows-sys 0.52.0", ··· 3387 3390 "fastrand", 3388 3391 "getrandom 0.3.2", 3389 3392 "once_cell", 3390 - "rustix 1.0.3", 3391 - "windows-sys 0.59.0", 3393 + "rustix 1.0.5", 3394 + "windows-sys 0.52.0", 3392 3395 ] 3393 3396 3394 3397 [[package]] ··· 3523 3526 3524 3527 [[package]] 3525 3528 name = "tokio" 3526 - version = "1.44.1" 3529 + version = "1.44.2" 3527 3530 source = "registry+https://github.com/rust-lang/crates.io-index" 3528 - checksum = "f382da615b842244d4b8738c82ed1275e6c5dd90c459a30941cd07080b06c91a" 3531 + checksum = "e6b88822cbe49de4185e3a4cbf8321dd487cf5fe0c5c65695fef6346371e9c48" 3529 3532 dependencies = [ 3530 3533 "backtrace", 3531 3534 "bytes", ··· 3625 3628 source = "registry+https://github.com/rust-lang/crates.io-index" 3626 3629 checksum = "17b4795ff5edd201c7cd6dca065ae59972ce77d1b80fa0a84d94950ece7d1474" 3627 3630 dependencies = [ 3628 - "indexmap 2.8.0", 3631 + "indexmap 2.9.0", 3629 3632 "serde", 3630 3633 "serde_spanned", 3631 3634 "toml_datetime", ··· 3879 3882 checksum = "fd29b17c041f94e0885179637289815cd038f0c9fc19c4549d5a97017404fb7d" 3880 3883 dependencies = [ 3881 3884 "byteorder", 3882 - "bytes", 3883 3885 "byteview", 3884 3886 "interval-heap", 3885 3887 "log", ··· 4049 4051 4050 4052 [[package]] 4051 4053 name = "windows" 4052 - version = "0.52.0" 4054 + version = "0.58.0" 4053 4055 source = "registry+https://github.com/rust-lang/crates.io-index" 4054 - checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" 4056 + checksum = "dd04d41d93c4992d421894c18c8b43496aa748dd4c081bac0dc93eb0489272b6" 4055 4057 dependencies = [ 4056 - "windows-core 0.52.0", 4058 + "windows-core 0.58.0", 4057 4059 "windows-targets", 4058 4060 ] 4059 4061 4060 4062 [[package]] 4061 - name = "windows" 4063 + name = "windows-core" 4062 4064 version = "0.58.0" 4063 4065 source = "registry+https://github.com/rust-lang/crates.io-index" 4064 - checksum = "dd04d41d93c4992d421894c18c8b43496aa748dd4c081bac0dc93eb0489272b6" 4066 + checksum = "6ba6d44ec8c2591c134257ce647b7ea6b20335bf6379a27dac5f1641fcf59f99" 4065 4067 dependencies = [ 4066 - "windows-core 0.58.0", 4068 + "windows-implement 0.58.0", 4069 + "windows-interface 0.58.0", 4070 + "windows-result 0.2.0", 4071 + "windows-strings 0.1.0", 4067 4072 "windows-targets", 4068 4073 ] 4069 4074 4070 4075 [[package]] 4071 4076 name = "windows-core" 4072 - version = "0.52.0" 4077 + version = "0.61.0" 4073 4078 source = "registry+https://github.com/rust-lang/crates.io-index" 4074 - checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" 4079 + checksum = "4763c1de310c86d75a878046489e2e5ba02c649d185f21c67d4cf8a56d098980" 4075 4080 dependencies = [ 4076 - "windows-targets", 4081 + "windows-implement 0.60.0", 4082 + "windows-interface 0.59.1", 4083 + "windows-link", 4084 + "windows-result 0.3.2", 4085 + "windows-strings 0.4.0", 4077 4086 ] 4078 4087 4079 4088 [[package]] 4080 - name = "windows-core" 4089 + name = "windows-implement" 4081 4090 version = "0.58.0" 4082 4091 source = "registry+https://github.com/rust-lang/crates.io-index" 4083 - checksum = "6ba6d44ec8c2591c134257ce647b7ea6b20335bf6379a27dac5f1641fcf59f99" 4092 + checksum = "2bbd5b46c938e506ecbce286b6628a02171d56153ba733b6c741fc627ec9579b" 4084 4093 dependencies = [ 4085 - "windows-implement", 4086 - "windows-interface", 4087 - "windows-result", 4088 - "windows-strings", 4089 - "windows-targets", 4094 + "proc-macro2", 4095 + "quote", 4096 + "syn", 4090 4097 ] 4091 4098 4092 4099 [[package]] 4093 4100 name = "windows-implement" 4094 - version = "0.58.0" 4101 + version = "0.60.0" 4095 4102 source = "registry+https://github.com/rust-lang/crates.io-index" 4096 - checksum = "2bbd5b46c938e506ecbce286b6628a02171d56153ba733b6c741fc627ec9579b" 4103 + checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836" 4097 4104 dependencies = [ 4098 4105 "proc-macro2", 4099 4106 "quote", ··· 4112 4119 ] 4113 4120 4114 4121 [[package]] 4122 + name = "windows-interface" 4123 + version = "0.59.1" 4124 + source = "registry+https://github.com/rust-lang/crates.io-index" 4125 + checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8" 4126 + dependencies = [ 4127 + "proc-macro2", 4128 + "quote", 4129 + "syn", 4130 + ] 4131 + 4132 + [[package]] 4115 4133 name = "windows-link" 4116 4134 version = "0.1.1" 4117 4135 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 4127 4145 ] 4128 4146 4129 4147 [[package]] 4148 + name = "windows-result" 4149 + version = "0.3.2" 4150 + source = "registry+https://github.com/rust-lang/crates.io-index" 4151 + checksum = "c64fd11a4fd95df68efcfee5f44a294fe71b8bc6a91993e2791938abcc712252" 4152 + dependencies = [ 4153 + "windows-link", 4154 + ] 4155 + 4156 + [[package]] 4130 4157 name = "windows-strings" 4131 4158 version = "0.1.0" 4132 4159 source = "registry+https://github.com/rust-lang/crates.io-index" 4133 4160 checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10" 4134 4161 dependencies = [ 4135 - "windows-result", 4162 + "windows-result 0.2.0", 4136 4163 "windows-targets", 4164 + ] 4165 + 4166 + [[package]] 4167 + name = "windows-strings" 4168 + version = "0.4.0" 4169 + source = "registry+https://github.com/rust-lang/crates.io-index" 4170 + checksum = "7a2ba9642430ee452d5a7aa78d72907ebe8cfda358e8cb7918a2050581322f97" 4171 + dependencies = [ 4172 + "windows-link", 4137 4173 ] 4138 4174 4139 4175 [[package]] ··· 4220 4256 4221 4257 [[package]] 4222 4258 name = "winnow" 4223 - version = "0.7.4" 4259 + version = "0.7.6" 4224 4260 source = "registry+https://github.com/rust-lang/crates.io-index" 4225 - checksum = "0e97b544156e9bebe1a0ffbc03484fc1ffe3100cbce3ffb17eac35f7cdd7ab36" 4261 + checksum = "63d3fcd9bba44b03821e7d699eeee959f3126dcc4aa8e4ae18ec617c2a5cea10" 4226 4262 dependencies = [ 4227 4263 "memchr", 4228 4264 ]
+1 -1
ufos/Cargo.toml
··· 11 11 clap = { version = "4.5.31", features = ["derive"] } 12 12 dropshot = "0.16.0" 13 13 env_logger = "0.11.7" 14 - fjall = { version = "2.8.0", features = ["lz4", "bytes"] } 14 + fjall = { version = "2.8.0", features = ["lz4"] } 15 15 jetstream = { path = "../jetstream" } 16 16 log = "0.4.26" 17 17 lsm-tree = "2.6.6"
+16 -3
ufos/src/db_types.rs
··· 1 + use std::fmt; 1 2 use crate::{Cursor, Did, Nsid, RecordKey}; 2 3 use bincode::{ 3 4 config::{standard, Config}, ··· 8 9 error::{DecodeError, EncodeError}, 9 10 }; 10 11 use lsm_tree::range::prefix_to_range; 11 - use std::fmt; 12 12 use std::marker::PhantomData; 13 13 use std::ops::{Bound, Range}; 14 14 use thiserror::Error; ··· 49 49 } 50 50 51 51 fn bincode_conf() -> impl Config { 52 + log::trace!("bincode conf"); 52 53 standard() 53 54 .with_big_endian() 54 55 .with_fixed_int_encoding() ··· 184 185 185 186 impl<T> DbBytes for T 186 187 where 187 - T: BincodeEncode + BincodeDecode<()> + UseBincodePlz + Sized, 188 + T: BincodeEncode + BincodeDecode<()> + UseBincodePlz + Sized + std::fmt::Debug, 188 189 { 189 190 fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 191 + log::info!("bincode to_db_bytes: {self:?}"); 190 192 Ok(encode_to_vec(self, bincode_conf())?) 191 193 } 192 194 fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 195 + log::info!("bincode from_db_bytes..."); 193 196 Ok(decode_from_slice(bytes, bincode_conf())?) 194 197 } 195 198 } 196 199 197 200 /// helper trait: impl on a type to get helpers to implement DbBytes 198 201 pub trait SerdeBytes: serde::Serialize + for<'a> serde::Deserialize<'a> { 199 - fn to_bytes(&self) -> Result<Vec<u8>, EncodingError> { 202 + fn to_bytes(&self) -> Result<Vec<u8>, EncodingError> where Self: std::fmt::Debug { 203 + log::info!("bincode serde to_db_bytes: {self:?}"); 200 204 Ok(bincode::serde::encode_to_vec(self, bincode_conf())?) 201 205 } 202 206 fn from_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 207 + log::info!("bincode serde from_db_bytes..."); 203 208 Ok(bincode::serde::decode_from_slice(bytes, bincode_conf())?) 204 209 } 205 210 } ··· 208 213 209 214 impl DbBytes for Vec<u8> { 210 215 fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 216 + log::info!("bincode vec to_db_bytes"); 211 217 Ok(self.to_vec()) 212 218 } 213 219 fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 220 + log::info!("bincode vec from_db_bytes..."); 214 221 Ok((bytes.to_owned(), bytes.len())) 215 222 } 216 223 } ··· 247 254 248 255 impl DbBytes for Did { 249 256 fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 257 + log::info!("bincode did dbbytes from_db_bytes..."); 250 258 let (s, n) = decode_from_slice(bytes, bincode_conf())?; 251 259 let me = Self::new(s).map_err(EncodingError::BadAtriumStringType)?; 252 260 Ok((me, n)) 253 261 } 254 262 fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 263 + log::info!("bincode did dbbytes to_db_bytes {self:?}"); 255 264 Ok(encode_to_vec(self.as_ref(), bincode_conf())?) 256 265 } 257 266 } 258 267 259 268 impl DbBytes for Nsid { 260 269 fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 270 + log::info!("bincode nsid dbbytes from_db_bytes..."); 261 271 let (s, n) = decode_from_slice(bytes, bincode_conf())?; 262 272 let me = Self::new(s).map_err(EncodingError::BadAtriumStringType)?; 263 273 Ok((me, n)) 264 274 } 265 275 fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 276 + log::info!("bincode nsid dbbytes to_db_bytes {self:?}"); 266 277 Ok(encode_to_vec(self.as_ref(), bincode_conf())?) 267 278 } 268 279 } 269 280 270 281 impl DbBytes for RecordKey { 271 282 fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 283 + log::info!("bincode rkey dbbytes from_db_bytes..."); 272 284 let (s, n) = decode_from_slice(bytes, bincode_conf())?; 273 285 let me = Self::new(s).map_err(EncodingError::BadAtriumStringType)?; 274 286 Ok((me, n)) 275 287 } 276 288 fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 289 + log::info!("bincode rkey dbbytes to_db_bytes {self:?}"); 277 290 Ok(encode_to_vec(self.as_ref(), bincode_conf())?) 278 291 } 279 292 }
+1
ufos/src/lib.rs
··· 4 4 pub mod server; 5 5 pub mod storage; 6 6 pub mod storage_fjall; 7 + pub mod storage_mem; 7 8 pub mod store_types; 8 9 9 10 use crate::error::BatchInsertError;
+57 -17
ufos/src/main.rs
··· 3 3 use ufos::consumer; 4 4 use ufos::error::StorageError; 5 5 use ufos::server; 6 - use ufos::storage::{StorageWhatever, StoreWriter}; 6 + use jetstream::events::Cursor; 7 + use ufos::storage::{StorageWhatever, StoreReader, StoreWriter}; 7 8 use ufos::storage_fjall::FjallStorage; 9 + use ufos::storage_mem::MemStorage; 8 10 9 11 #[cfg(not(target_env = "msvc"))] 10 12 use tikv_jemallocator::Jemalloc; ··· 33 35 #[arg(long)] 34 36 data: PathBuf, 35 37 /// DEBUG: don't start the jetstream consumer or its write loop 38 + /// todo: restore this 36 39 #[arg(long, action)] 37 40 pause_writer: bool, 38 41 /// DEBUG: force the rw loop to fall behind by pausing it 42 + /// todo: restore this 39 43 #[arg(long, action)] 40 44 pause_rw: bool, 45 + /// DEBUG: use an in-memory store instead of fjall 46 + #[arg(long, action)] 47 + in_mem: bool, 41 48 } 42 49 43 50 // #[tokio::main(flavor = "current_thread")] // TODO: move this to config via args ··· 47 54 48 55 let args = Args::parse(); 49 56 let jetstream = args.jetstream.clone(); 50 - let (read_store, mut write_store, cursor) = FjallStorage::init( 51 - args.data, 52 - jetstream, 53 - args.jetstream_force, 54 - Default::default(), 55 - )?; 57 + if args.in_mem { 58 + let (read_store, write_store, cursor) = MemStorage::init( 59 + args.data, 60 + jetstream, 61 + args.jetstream_force, 62 + Default::default(), 63 + )?; 64 + go(args.jetstream, args.pause_writer, read_store, write_store, cursor).await?; 65 + } else { 66 + let (read_store, write_store, cursor) = FjallStorage::init( 67 + args.data, 68 + jetstream, 69 + args.jetstream_force, 70 + Default::default(), 71 + )?; 72 + go(args.jetstream, args.pause_writer, read_store, write_store, cursor).await?; 73 + } 56 74 75 + Ok(()) 76 + } 77 + 78 + async fn go( 79 + jetstream: String, 80 + pause_writer: bool, 81 + read_store: impl StoreReader + 'static, 82 + mut write_store: impl StoreWriter + 'static, 83 + cursor: Option<Cursor>, 84 + ) -> anyhow::Result<()> { 57 85 println!("starting server with storage..."); 58 86 let serving = server::serve(read_store); 59 87 ··· 64 92 65 93 let t2: tokio::task::JoinHandle<anyhow::Result<()>> = tokio::task::spawn({ 66 94 async move { 67 - if !args.pause_writer { 95 + if !pause_writer { 68 96 println!( 69 97 "starting consumer with cursor: {cursor:?} from {:?} ago", 70 98 cursor.map(|c| c.elapsed()) 71 99 ); 72 100 let mut batches = 73 - consumer::consume(&args.jetstream, cursor, args.jetstream_no_zstd).await?; 101 + consumer::consume(&jetstream, cursor, false).await?; 102 + 103 + log::info!("started consumer, got chan etc..."); 74 104 75 105 tokio::task::spawn_blocking(move || { 76 106 while let Some(event_batch) = batches.blocking_recv() { 107 + log::info!("got batch, putting to storage..."); 77 108 write_store.insert_batch(event_batch)?; 78 - write_store.step_rollup()?; 109 + log::info!("inserted batch..."); 110 + write_store.step_rollup() 111 + .inspect_err(|e| log::error!("laksjdfl: {e:?}"))?; 112 + log::info!("inserted and stepped rollup. ready for next..."); 79 113 } 114 + log::warn!("??????????????????????"); 80 115 Ok::<(), StorageError>(()) 81 116 }) 82 117 .await??; ··· 105 140 // v = storage.rw_loop() => eprintln!("storage rw-loop ended: {v:?}"), 106 141 // }; 107 142 108 - // log::trace!("tasks running. waiting."); 109 - t1.await?; 110 - log::trace!("serve task ended."); 111 - t2.await??; 112 - log::trace!("storage receive task ended."); 113 - // t3.await?; 114 - // log::trace!("storage rw task ended."); 143 + log::trace!("tasks running. waiting."); 144 + tokio::select! { 145 + z = t1 => log::warn!("serve task ended: {z:?}"), 146 + z = t2 => log::warn!("storage task ended: {z:?}"), 147 + }; 148 + 149 + // t1.await?; 150 + // log::trace!("serve task ended."); 151 + // t2.await??; 152 + // log::trace!("storage receive task ended."); 153 + // // t3.await?; 154 + // // log::trace!("storage rw task ended."); 115 155 116 156 println!("bye!"); 117 157
+3 -1
ufos/src/storage.rs
··· 17 17 Self: Sized; 18 18 } 19 19 20 - pub trait StoreWriter { 20 + pub trait StoreWriter: Send + Sync { 21 21 fn insert_batch<const LIMIT: usize>( 22 22 &mut self, 23 23 event_batch: EventBatch<LIMIT>, 24 24 ) -> StorageResult<()>; 25 + 26 + fn step_rollup(&mut self) -> StorageResult<usize>; 25 27 26 28 fn trim_collection(&mut self, collection: &Nsid, limit: usize) -> StorageResult<()>; 27 29
+138 -83
ufos/src/storage_fjall.rs
··· 13 13 use async_trait::async_trait; 14 14 use fjall::{Batch as FjallBatch, Config, Keyspace, PartitionCreateOptions, PartitionHandle}; 15 15 use jetstream::events::Cursor; 16 - use schemars::JsonSchema; 17 - use serde::Serialize; 18 16 use std::collections::HashMap; 19 17 use std::path::Path; 20 18 use std::time::SystemTime; ··· 99 97 /// this is only meant for tests 100 98 #[cfg(test)] 101 99 pub temp: bool, 102 - } 103 - 104 - #[derive(Debug, Serialize, JsonSchema)] 105 - pub struct FjallStats { 106 - pub keyspace_disk_space: u64, 107 - pub keyspace_journal_count: usize, 108 - pub keyspace_sequence: u64, 109 - pub rollup_cursor: Option<u64>, 110 100 } 111 101 112 102 impl StorageWhatever<FjallReader, FjallWriter, FjallConfig> for FjallStorage { ··· 482 472 } 483 473 484 474 impl FjallWriter { 485 - pub fn step_rollup(&mut self) -> StorageResult<usize> { 486 - let rollup_cursor = 487 - get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)?.ok_or( 488 - StorageError::BadStateError("Could not find current rollup cursor".to_string()), 489 - )?; 490 - 491 - // timelies 492 - let live_counts_range = LiveCountsKey::range_from_cursor(rollup_cursor)?; 493 - let mut timely_iter = self.rollups.range(live_counts_range).peekable(); 494 - 495 - let timely_next_cursor = timely_iter 496 - .peek_mut() 497 - .map(|kv| -> StorageResult<Cursor> { 498 - match kv { 499 - Err(e) => Err(std::mem::replace(e, fjall::Error::Poisoned))?, 500 - Ok((key_bytes, _)) => { 501 - let key = db_complete::<LiveCountsKey>(key_bytes)?; 502 - Ok(key.cursor()) 503 - } 504 - } 505 - }) 506 - .transpose()?; 507 - 508 - // delete accounts 509 - let delete_accounts_range = 510 - DeleteAccountQueueKey::new(rollup_cursor).range_to_prefix_end()?; 511 - 512 - let next_delete = self 513 - .queues 514 - .range(delete_accounts_range) 515 - .next() 516 - .transpose()? 517 - .map(|(key_bytes, val_bytes)| { 518 - db_complete::<DeleteAccountQueueKey>(&key_bytes) 519 - .map(|k| (k.suffix, key_bytes, val_bytes)) 520 - }) 521 - .transpose()?; 522 - 523 - let cursors_stepped = match (timely_next_cursor, next_delete) { 524 - ( 525 - Some(timely_next_cursor), 526 - Some((delete_cursor, delete_key_bytes, delete_val_bytes)), 527 - ) => { 528 - if timely_next_cursor < delete_cursor { 529 - self.rollup_live_counts( 530 - timely_iter, 531 - Some(delete_cursor), 532 - MAX_BATCHED_ROLLUP_COUNTS, 533 - )? 534 - } else { 535 - self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)? 536 - } 537 - } 538 - (Some(_), None) => { 539 - self.rollup_live_counts(timely_iter, None, MAX_BATCHED_ROLLUP_COUNTS)? 540 - } 541 - (None, Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => { 542 - self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes)? 543 - } 544 - (None, None) => 0, 545 - }; 546 - 547 - Ok(cursors_stepped) 548 - } 549 - 550 475 fn rollup_delete_account( 551 476 &mut self, 552 477 cursor: Cursor, ··· 572 497 // we *could* read+write every single batch to rollup.. but their merge is associative so 573 498 // ...so save the db some work up front? is this worth it? who knows... 574 499 500 + log::warn!("sup!!!"); 501 + 575 502 #[derive(Eq, Hash, PartialEq)] 576 503 enum Rollup { 577 504 Hourly(HourTruncatedCursor), ··· 584 511 let mut last_cursor = Cursor::from_start(); 585 512 let mut counts_by_rollup: HashMap<(Nsid, Rollup), CountsValue> = HashMap::new(); 586 513 514 + log::warn!("about to loop...."); 587 515 for (i, kv) in timelies.enumerate() { 516 + // log::warn!("loop {i}..."); 588 517 if i >= rollup_limit { 589 518 break; 590 519 } 591 520 592 521 let (key_bytes, val_bytes) = kv?; 593 - let key = db_complete::<LiveCountsKey>(&key_bytes)?; 522 + let key = db_complete::<LiveCountsKey>(&key_bytes) 523 + .inspect_err(|e| log::warn!("rlc: key: {e:?}"))?; 594 524 595 525 if cursor_exclusive_limit 596 526 .map(|limit| key.cursor() > limit) ··· 600 530 } 601 531 602 532 batch.remove(&self.rollups, key_bytes); 603 - let val = db_complete::<CountsValue>(&val_bytes)?; 533 + let val = db_complete::<CountsValue>(&val_bytes) 534 + .inspect_err(|e| log::warn!("rlc: val: {e:?}"))?; 604 535 counts_by_rollup 605 536 .entry(( 606 537 key.collection().clone(), ··· 623 554 cursors_advanced += 1; 624 555 last_cursor = key.cursor(); 625 556 } 557 + log::warn!("done looping. looping cbr counts(?).."); 626 558 627 559 for ((nsid, rollup), counts) in counts_by_rollup { 560 + log::warn!("######################## cbr loop {nsid:?} {counts:?} ########################"); 628 561 let key_bytes = match rollup { 629 562 Rollup::Hourly(hourly_cursor) => { 630 - HourlyRollupKey::new(hourly_cursor, &nsid).to_db_bytes()? 563 + let k = HourlyRollupKey::new(hourly_cursor, &nsid); 564 + log::info!("hrly k: {k:?}"); 565 + k.to_db_bytes()? 631 566 } 632 567 Rollup::Weekly(weekly_cursor) => { 633 - WeeklyRollupKey::new(weekly_cursor, &nsid).to_db_bytes()? 568 + let k = WeeklyRollupKey::new(weekly_cursor, &nsid); 569 + log::info!("weekly k: {k:?}"); 570 + k.to_db_bytes()? 571 + } 572 + Rollup::AllTime => { 573 + let k = AllTimeRollupKey::new(&nsid); 574 + log::info!("alltime k: {k:?}"); 575 + k.to_db_bytes()? 634 576 } 635 - Rollup::AllTime => AllTimeRollupKey::new(&nsid).to_db_bytes()?, 636 577 }; 637 - let mut rolled = self 578 + // log::info!("key bytes: {key_bytes:?}"); 579 + let mut rolled: CountsValue = self 638 580 .rollups 639 581 .get(&key_bytes)? 582 + .inspect(|v| { 583 + let lax = CountsValue::from_db_bytes(v); 584 + log::info!("val: len={}, lax={lax:?} first32={:?}", v.len(), v.get(..32)); 585 + }) 640 586 .as_deref() 641 587 .map(db_complete::<CountsValue>) 642 - .transpose()? 588 + .transpose() 589 + .inspect_err(|e| log::warn!("oooh did we break on the rolled thing? {e:?}"))? 643 590 .unwrap_or_default(); 591 + 592 + 593 + // try to round-trip before inserting, for funsies 594 + let tripppin = counts.to_db_bytes()?; 595 + let (and_back, n) = CountsValue::from_db_bytes(&tripppin)?; 596 + assert_eq!(n, tripppin.len()); 597 + assert_eq!(counts.prefix, and_back.prefix); 598 + assert_eq!(counts.dids().estimate(), and_back.dids().estimate()); 599 + if counts.records() > 20_000_000 { 600 + panic!("COUNTS maybe wtf? {counts:?}") 601 + } 602 + // assert_eq!(rolled, and_back); 603 + 604 + 644 605 rolled.merge(&counts); 606 + 607 + // try to round-trip before inserting, for funsies 608 + let tripppin = rolled.to_db_bytes()?; 609 + let (and_back, n) = CountsValue::from_db_bytes(&tripppin)?; 610 + assert_eq!(n, tripppin.len()); 611 + assert_eq!(rolled.prefix, and_back.prefix); 612 + assert_eq!(rolled.dids().estimate(), and_back.dids().estimate()); 613 + if rolled.records() > 20_000_000 { 614 + panic!("maybe wtf? {rolled:?}") 615 + } 616 + // assert_eq!(rolled, and_back); 617 + 645 618 batch.insert(&self.rollups, &key_bytes, &rolled.to_db_bytes()?); 646 619 } 647 620 648 - insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, last_cursor)?; 621 + log::warn!("done cbr loop."); 622 + 623 + insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, last_cursor) 624 + .inspect_err(|e| log::warn!("insert neu: {e:?}"))?; 649 625 650 626 batch.commit()?; 627 + 628 + log::warn!("ok finished rlc stuff. huh."); 651 629 Ok(cursors_advanced) 652 630 } 653 631 } ··· 683 661 feed_key.to_db_bytes()?, 684 662 feed_val.to_db_bytes()?, 685 663 ); 664 + 686 665 687 666 let location_val: RecordLocationVal = 688 667 (commit.cursor, commit.rev.as_str(), put_action).into(); ··· 721 700 722 701 batch.commit()?; 723 702 Ok(()) 703 + } 704 + 705 + fn step_rollup(&mut self) -> StorageResult<usize> { 706 + let rollup_cursor = 707 + get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)?.ok_or( 708 + StorageError::BadStateError("Could not find current rollup cursor".to_string()), 709 + ) 710 + .inspect_err(|e| log::warn!("failed getting rollup cursor: {e:?}"))?; 711 + 712 + // timelies 713 + let live_counts_range = LiveCountsKey::range_from_cursor(rollup_cursor) 714 + .inspect_err(|e| log::warn!("live counts range: {e:?}"))?; 715 + let mut timely_iter = self.rollups.range(live_counts_range).peekable(); 716 + 717 + let timely_next_cursor = timely_iter 718 + .peek_mut() 719 + .map(|kv| -> StorageResult<Cursor> { 720 + match kv { 721 + Err(e) => Err(std::mem::replace(e, fjall::Error::Poisoned))?, 722 + Ok((key_bytes, _)) => { 723 + let key = db_complete::<LiveCountsKey>(key_bytes) 724 + .inspect_err(|e| log::warn!("failed getting key for next timely: {e:?}"))?; 725 + Ok(key.cursor()) 726 + } 727 + } 728 + }) 729 + .transpose() 730 + .inspect_err(|e| log::warn!("something about timely: {e:?}"))?; 731 + 732 + // delete accounts 733 + let delete_accounts_range = 734 + DeleteAccountQueueKey::new(rollup_cursor).range_to_prefix_end()?; 735 + 736 + let next_delete = self 737 + .queues 738 + .range(delete_accounts_range) 739 + .next() 740 + .transpose() 741 + .inspect_err(|e| log::warn!("range for next delete: {e:?}"))? 742 + .map(|(key_bytes, val_bytes)| { 743 + db_complete::<DeleteAccountQueueKey>(&key_bytes) 744 + .inspect_err(|e| log::warn!("failed inside next delete thing????: {e:?}")) 745 + .map(|k| (k.suffix, key_bytes, val_bytes)) 746 + }) 747 + .transpose() 748 + .inspect_err(|e| log::warn!("failed getting next delete: {e:?}"))?; 749 + 750 + let cursors_stepped = match (timely_next_cursor, next_delete) { 751 + ( 752 + Some(timely_next_cursor), 753 + Some((delete_cursor, delete_key_bytes, delete_val_bytes)), 754 + ) => { 755 + if timely_next_cursor < delete_cursor { 756 + self.rollup_live_counts( 757 + timely_iter, 758 + Some(delete_cursor), 759 + MAX_BATCHED_ROLLUP_COUNTS, 760 + ) 761 + .inspect_err(|e| log::warn!("rolling up live counts: {e:?}"))? 762 + } else { 763 + self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes) 764 + .inspect_err(|e| log::warn!("deleting acocunt: {e:?}"))? 765 + } 766 + } 767 + (Some(_), None) => { 768 + self.rollup_live_counts(timely_iter, None, MAX_BATCHED_ROLLUP_COUNTS) 769 + .inspect_err(|e| log::warn!("rolling up (lasjdflkajs): {e:?}"))? 770 + } 771 + (None, Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => { 772 + self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes) 773 + .inspect_err(|e| log::warn!("deleting acocunt other branch: {e:?}"))? 774 + } 775 + (None, None) => 0, 776 + }; 777 + 778 + Ok(cursors_stepped) 724 779 } 725 780 726 781 fn trim_collection(
+1813
ufos/src/storage_mem.rs
··· 1 + use std::sync::Arc; 2 + use std::ops::Bound; 3 + 4 + use std::sync::RwLock; 5 + use std::sync::Mutex; 6 + use crate::db_types::{db_complete, DbBytes, DbStaticStr, StaticStr}; 7 + use crate::error::StorageError; 8 + use crate::storage::{StorageResult, StorageWhatever, StoreReader, StoreWriter}; 9 + use crate::store_types::{ 10 + AllTimeRollupKey, CountsValue, DeleteAccountQueueKey, DeleteAccountQueueVal, 11 + HourTruncatedCursor, HourlyRollupKey, JetstreamCursorKey, JetstreamCursorValue, 12 + JetstreamEndpointKey, JetstreamEndpointValue, LiveCountsKey, NewRollupCursorKey, 13 + NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal, RecordLocationKey, 14 + RecordLocationMeta, RecordLocationVal, RecordRawValue, TakeoffKey, TakeoffValue, 15 + WeekTruncatedCursor, WeeklyRollupKey, 16 + }; 17 + use crate::{CommitAction, ConsumerInfo, Did, EventBatch, Nsid, TopCollections, UFOsRecord}; 18 + use async_trait::async_trait; 19 + use jetstream::events::Cursor; 20 + use std::collections::HashMap; 21 + use std::collections::BTreeMap; 22 + use std::path::Path; 23 + use std::time::SystemTime; 24 + use lsm_tree::range::prefix_to_range; 25 + 26 + 27 + const MAX_BATCHED_CLEANUP_SIZE: usize = 1024; // try to commit progress for longer feeds 28 + const MAX_BATCHED_ACCOUNT_DELETE_RECORDS: usize = 1024; 29 + const MAX_BATCHED_ROLLUP_COUNTS: usize = 256; 30 + 31 + /// 32 + /// new data format, roughly: 33 + /// 34 + /// Partion: 'global' 35 + /// 36 + /// - Global sequence counter (is the jetstream cursor -- monotonic with many gaps) 37 + /// - key: "js_cursor" (literal) 38 + /// - val: u64 39 + /// 40 + /// - Jetstream server endpoint (persisted because the cursor can't be used on another instance without data loss) 41 + /// - key: "js_endpoint" (literal) 42 + /// - val: string (URL of the instance) 43 + /// 44 + /// - Launch date 45 + /// - key: "takeoff" (literal) 46 + /// - val: u64 (micros timestamp, not from jetstream for now so not precise) 47 + /// 48 + /// - Rollup cursor (bg work: roll stats into hourlies, delete accounts, old record deletes) 49 + /// - key: "rollup_cursor" (literal) 50 + /// - val: u64 (tracks behind js_cursor) 51 + /// 52 + /// 53 + /// Partition: 'feed' 54 + /// 55 + /// - Per-collection list of record references ordered by jetstream cursor 56 + /// - key: nullstr || u64 (collection nsid null-terminated, jetstream cursor) 57 + /// - val: nullstr || nullstr || nullstr (did, rkey, rev. rev is mostly a sanity-check for now.) 58 + /// 59 + /// 60 + /// Partition: 'records' 61 + /// 62 + /// - Actual records by their atproto location 63 + /// - key: nullstr || nullstr || nullstr (did, collection, rkey) 64 + /// - val: u64 || bool || nullstr || rawval (js_cursor, is_update, rev, actual record) 65 + /// 66 + /// 67 + /// Partition: 'rollups' 68 + /// 69 + /// - Live (batched) records counts and dids estimate per collection 70 + /// - key: "live_counts" || u64 || nullstr (js_cursor, nsid) 71 + /// - val: u64 || HLL (count (not cursor), estimator) 72 + /// 73 + /// - Hourly total record counts and dids estimate per collection 74 + /// - key: "hourly_counts" || u64 || nullstr (hour, nsid) 75 + /// - val: u64 || HLL (count (not cursor), estimator) 76 + /// 77 + /// - Weekly total record counts and dids estimate per collection 78 + /// - key: "weekly_counts" || u64 || nullstr (hour, nsid) 79 + /// - val: u64 || HLL (count (not cursor), estimator) 80 + /// 81 + /// - All-time total record counts and dids estimate per collection 82 + /// - key: "ever_counts" || nullstr (nsid) 83 + /// - val: u64 || HLL (count (not cursor), estimator) 84 + /// 85 + /// - TODO: sorted indexes for all-times? 86 + /// 87 + /// 88 + /// Partition: 'queues' 89 + /// 90 + /// - Delete account queue 91 + /// - key: "delete_acount" || u64 (js_cursor) 92 + /// - val: nullstr (did) 93 + /// 94 + /// 95 + /// TODO: moderation actions 96 + /// TODO: account privacy preferences. Might wait for the protocol-level (PDS-level?) stuff to land. Will probably do lazy fetching + caching on read. 97 + #[derive(Debug)] 98 + pub struct MemStorage {} 99 + 100 + #[derive(Debug, Default)] 101 + pub struct MemConfig { 102 + /// drop the db when the storage is dropped 103 + /// 104 + /// this is only meant for tests 105 + #[cfg(test)] 106 + pub temp: bool, 107 + } 108 + 109 + //////////// 110 + //////////// 111 + //////////// 112 + //////////// 113 + //////////// 114 + //////////// 115 + 116 + struct BatchSentinel {} 117 + 118 + #[derive(Clone)] 119 + struct MemKeyspace { 120 + keyspace_guard: Arc<RwLock<BatchSentinel>>, 121 + } 122 + 123 + impl MemKeyspace { 124 + pub fn open() -> Self { 125 + Self { keyspace_guard: Arc::new(RwLock::new(BatchSentinel {})) } 126 + } 127 + pub fn open_partition(&self, _name: &str) -> StorageResult<MemPartion> { 128 + Ok(MemPartion { 129 + // name: name.to_string(), 130 + keyspace_guard: self.keyspace_guard.clone(), 131 + contents: Default::default(), 132 + }) 133 + } 134 + pub fn batch(&self) -> MemBatch { 135 + MemBatch { 136 + keyspace_guard: self.keyspace_guard.clone(), 137 + tasks: Vec::new(), 138 + } 139 + } 140 + pub fn instant(&self) -> () {} 141 + } 142 + 143 + enum BatchTask { 144 + Insert { 145 + p: MemPartion, 146 + key: Vec<u8>, 147 + val: Vec<u8>, 148 + }, 149 + Remove { 150 + p: MemPartion, 151 + key: Vec<u8>, 152 + }, 153 + } 154 + struct MemBatch { 155 + keyspace_guard: Arc<RwLock<BatchSentinel>>, 156 + tasks: Vec<BatchTask>, 157 + } 158 + impl MemBatch { 159 + pub fn insert(&mut self, p: &MemPartion, key: &[u8], val: &[u8]) { 160 + self.tasks.push(BatchTask::Insert { 161 + p: p.clone(), 162 + key: key.to_vec(), 163 + val: val.to_vec(), 164 + }); 165 + } 166 + pub fn remove(&mut self, p: &MemPartion, key: &[u8]) { 167 + self.tasks.push(BatchTask::Remove { 168 + p: p.clone(), 169 + key: key.to_vec(), 170 + }); 171 + } 172 + pub fn len(&self) -> usize { 173 + self.tasks.len() 174 + } 175 + pub fn commit(&mut self) -> StorageResult<()> { 176 + let _guard = self.keyspace_guard.write().unwrap(); 177 + for task in &mut self.tasks { 178 + match task { 179 + BatchTask::Insert { p, key, val } => 180 + p.contents 181 + .try_lock() 182 + .unwrap() 183 + .insert(key.to_vec(), val.to_vec()), 184 + BatchTask::Remove { p, key } => 185 + p.contents 186 + .try_lock() 187 + .unwrap() 188 + .remove(key), 189 + }; 190 + } 191 + Ok(()) 192 + } 193 + } 194 + 195 + #[derive(Clone)] 196 + struct MemPartion { 197 + // name: String, 198 + keyspace_guard: Arc<RwLock<BatchSentinel>>, 199 + contents: Arc<Mutex<BTreeMap<Vec<u8>, Vec<u8>>>>, 200 + } 201 + impl MemPartion { 202 + pub fn get(&self, key: &[u8]) -> StorageResult<Option<Vec<u8>>> { 203 + let _guard = self.keyspace_guard.read().unwrap(); 204 + Ok(self.contents 205 + .lock() 206 + .unwrap() 207 + .get(key) 208 + .cloned()) 209 + } 210 + pub fn prefix(&self, pre: &[u8]) -> Vec<StorageResult<(Vec<u8>, Vec<u8>)>> { 211 + // let prefix_bytes = prefix.to_db_bytes()?; 212 + let (_, Bound::Excluded(range_end)) = prefix_to_range(&pre) else { 213 + panic!("bad range thing"); 214 + }; 215 + 216 + return self.range(pre.to_vec()..range_end.to_vec()).into() 217 + } 218 + pub fn range(&self, r: std::ops::Range<Vec<u8>>) -> Vec<StorageResult<(Vec<u8>, Vec<u8>)>> { 219 + let _guard = self.keyspace_guard.read().unwrap(); 220 + self.contents 221 + .lock() 222 + .unwrap() 223 + .range(r) 224 + .map(|(k, v)| Ok((k.clone(), v.clone()))) 225 + .collect() 226 + } 227 + pub fn insert(&self, key: &[u8], val: &[u8]) -> StorageResult<()> { 228 + let _guard = self.keyspace_guard.read().unwrap(); 229 + self.contents 230 + .lock() 231 + .unwrap() 232 + .insert(key.to_vec(), val.to_vec()); 233 + Ok(()) 234 + } 235 + // pub fn remove(&self, key: &[u8]) -> StorageResult<()> { 236 + // let _guard = self.keyspace_guard.read().unwrap(); 237 + // self.contents 238 + // .lock() 239 + // .unwrap() 240 + // .remove(key); 241 + // Ok(()) 242 + // } 243 + pub fn snapshot_at(&self, _instant: ()) -> Self { 244 + self.clone() 245 + } 246 + pub fn snapshot(&self) -> Self { 247 + self.clone() 248 + } 249 + } 250 + 251 + //////////// 252 + //////////// 253 + //////////// 254 + //////////// 255 + //////////// 256 + //////////// 257 + 258 + impl StorageWhatever<MemReader, MemWriter, MemConfig> for MemStorage { 259 + fn init( 260 + _path: impl AsRef<Path>, 261 + endpoint: String, 262 + force_endpoint: bool, 263 + _config: MemConfig, 264 + ) -> StorageResult<(MemReader, MemWriter, Option<Cursor>)> { 265 + let keyspace = MemKeyspace::open(); 266 + 267 + let global = keyspace.open_partition("global")?; 268 + let feeds = keyspace.open_partition("feeds")?; 269 + let records = keyspace.open_partition("records")?; 270 + let rollups = keyspace.open_partition("rollups")?; 271 + let queues = keyspace.open_partition("queues")?; 272 + 273 + let js_cursor = get_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)?; 274 + 275 + if js_cursor.is_some() { 276 + let stored_endpoint = 277 + get_static_neu::<JetstreamEndpointKey, JetstreamEndpointValue>(&global)?; 278 + 279 + let JetstreamEndpointValue(stored) = stored_endpoint.ok_or(StorageError::InitError( 280 + "found cursor but missing js_endpoint, refusing to start.".to_string(), 281 + ))?; 282 + 283 + if stored != endpoint { 284 + if force_endpoint { 285 + log::warn!("forcing a jetstream switch from {stored:?} to {endpoint:?}"); 286 + insert_static_neu::<JetstreamEndpointKey>( 287 + &global, 288 + JetstreamEndpointValue(endpoint.to_string()), 289 + )?; 290 + } else { 291 + return Err(StorageError::InitError(format!( 292 + "stored js_endpoint {stored:?} differs from provided {endpoint:?}, refusing to start."))); 293 + } 294 + } 295 + } else { 296 + insert_static_neu::<JetstreamEndpointKey>( 297 + &global, 298 + JetstreamEndpointValue(endpoint.to_string()), 299 + )?; 300 + insert_static_neu::<TakeoffKey>(&global, Cursor::at(SystemTime::now()))?; 301 + insert_static_neu::<NewRollupCursorKey>(&global, Cursor::from_start())?; 302 + } 303 + 304 + let reader = MemReader { 305 + keyspace: keyspace.clone(), 306 + global: global.clone(), 307 + feeds: feeds.clone(), 308 + records: records.clone(), 309 + rollups: rollups.clone(), 310 + }; 311 + let writer = MemWriter { 312 + keyspace, 313 + global, 314 + feeds, 315 + records, 316 + rollups, 317 + queues, 318 + }; 319 + Ok((reader, writer, js_cursor)) 320 + } 321 + } 322 + 323 + type MemRKV = StorageResult<(Vec<u8>, Vec<u8>)>; 324 + 325 + #[derive(Clone)] 326 + pub struct MemReader { 327 + keyspace: MemKeyspace, 328 + global: MemPartion, 329 + feeds: MemPartion, 330 + records: MemPartion, 331 + rollups: MemPartion, 332 + } 333 + 334 + /// An iterator that knows how to skip over deleted/invalidated records 335 + struct RecordIterator { 336 + db_iter: Box<dyn Iterator<Item = MemRKV>>, 337 + records: MemPartion, 338 + limit: usize, 339 + fetched: usize, 340 + } 341 + impl RecordIterator { 342 + pub fn new( 343 + feeds: &MemPartion, 344 + records: MemPartion, 345 + collection: &Nsid, 346 + limit: usize, 347 + ) -> StorageResult<Self> { 348 + let prefix = NsidRecordFeedKey::from_prefix_to_db_bytes(collection)?; 349 + let db_iter = feeds.prefix(&prefix).into_iter().rev(); 350 + Ok(Self { 351 + db_iter: Box::new(db_iter), 352 + records, 353 + limit, 354 + fetched: 0, 355 + }) 356 + } 357 + fn get_record(&self, db_next: MemRKV) -> StorageResult<Option<UFOsRecord>> { 358 + let (key_bytes, val_bytes) = db_next?; 359 + let feed_key = db_complete::<NsidRecordFeedKey>(&key_bytes)?; 360 + let feed_val = db_complete::<NsidRecordFeedVal>(&val_bytes)?; 361 + let location_key: RecordLocationKey = (&feed_key, &feed_val).into(); 362 + 363 + let Some(location_val_bytes) = self.records.get(&location_key.to_db_bytes()?)? else { 364 + // record was deleted (hopefully) 365 + return Ok(None); 366 + }; 367 + 368 + let (meta, n) = RecordLocationMeta::from_db_bytes(&location_val_bytes)?; 369 + 370 + if meta.cursor() != feed_key.cursor() { 371 + // older/different version 372 + return Ok(None); 373 + } 374 + if meta.rev != feed_val.rev() { 375 + // weird... 376 + log::warn!("record lookup: cursor match but rev did not...? excluding."); 377 + return Ok(None); 378 + } 379 + let Some(raw_value_bytes) = location_val_bytes.get(n..) else { 380 + log::warn!( 381 + "record lookup: found record but could not get bytes to decode the record??" 382 + ); 383 + return Ok(None); 384 + }; 385 + let rawval = db_complete::<RecordRawValue>(raw_value_bytes)?; 386 + Ok(Some(UFOsRecord { 387 + collection: feed_key.collection().clone(), 388 + cursor: feed_key.cursor(), 389 + did: feed_val.did().clone(), 390 + rkey: feed_val.rkey().clone(), 391 + rev: meta.rev.to_string(), 392 + record: rawval.try_into()?, 393 + is_update: meta.is_update, 394 + })) 395 + } 396 + } 397 + impl Iterator for RecordIterator { 398 + type Item = StorageResult<Option<UFOsRecord>>; 399 + fn next(&mut self) -> Option<Self::Item> { 400 + if self.fetched == self.limit { 401 + return Some(Ok(None)); 402 + } 403 + let record = loop { 404 + let db_next = self.db_iter.next()?; // None short-circuits here 405 + match self.get_record(db_next) { 406 + Err(e) => return Some(Err(e)), 407 + Ok(Some(record)) => break record, 408 + Ok(None) => continue, 409 + } 410 + }; 411 + self.fetched += 1; 412 + Some(Ok(Some(record))) 413 + } 414 + } 415 + 416 + impl MemReader { 417 + fn get_storage_stats(&self) -> StorageResult<serde_json::Value> { 418 + let rollup_cursor = 419 + get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)? 420 + .map(|c| c.to_raw_u64()); 421 + 422 + Ok(serde_json::json!({ 423 + "rollup_cursor": rollup_cursor, 424 + })) 425 + } 426 + 427 + fn get_consumer_info(&self) -> StorageResult<ConsumerInfo> { 428 + let global = self.global.snapshot(); 429 + 430 + let endpoint = 431 + get_snapshot_static_neu::<JetstreamEndpointKey, JetstreamEndpointValue>(&global)? 432 + .ok_or(StorageError::BadStateError( 433 + "Could not find jetstream endpoint".to_string(), 434 + ))? 435 + .0; 436 + 437 + let started_at = get_snapshot_static_neu::<TakeoffKey, TakeoffValue>(&global)? 438 + .ok_or(StorageError::BadStateError( 439 + "Could not find jetstream takeoff time".to_string(), 440 + ))? 441 + .to_raw_u64(); 442 + 443 + let latest_cursor = 444 + get_snapshot_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)? 445 + .map(|c| c.to_raw_u64()); 446 + 447 + Ok(ConsumerInfo::Jetstream { 448 + endpoint, 449 + started_at, 450 + latest_cursor, 451 + }) 452 + } 453 + 454 + fn get_top_collections(&self) -> Result<TopCollections, StorageError> { 455 + // TODO: limit nsid traversal depth 456 + // TODO: limit nsid traversal breadth 457 + // TODO: be serious about anything 458 + 459 + // TODO: probably use a stack of segments to reduce to ~log-n merges 460 + 461 + #[derive(Default)] 462 + struct Blah { 463 + counts: CountsValue, 464 + children: HashMap<String, Blah>, 465 + } 466 + impl From<&Blah> for TopCollections { 467 + fn from(bla: &Blah) -> Self { 468 + Self { 469 + total_records: bla.counts.records(), 470 + dids_estimate: bla.counts.dids().estimate() as u64, 471 + nsid_child_segments: HashMap::from_iter( 472 + bla.children.iter().map(|(k, v)| (k.to_string(), v.into())), 473 + ), 474 + } 475 + } 476 + } 477 + 478 + let mut b = Blah::default(); 479 + let prefix = AllTimeRollupKey::from_prefix_to_db_bytes(&Default::default())?; 480 + for kv in self.rollups.prefix(&prefix.to_db_bytes()?) { 481 + let (key_bytes, val_bytes) = kv?; 482 + let key = db_complete::<AllTimeRollupKey>(&key_bytes)?; 483 + let val = db_complete::<CountsValue>(&val_bytes)?; 484 + 485 + let mut node = &mut b; 486 + node.counts.merge(&val); 487 + for segment in key.collection().split('.') { 488 + node = node.children.entry(segment.to_string()).or_default(); 489 + node.counts.merge(&val); 490 + } 491 + } 492 + 493 + Ok((&b).into()) 494 + } 495 + 496 + fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)> { 497 + // 0. grab a snapshot in case rollups happen while we're working 498 + let instant = self.keyspace.instant(); 499 + let global = self.global.snapshot_at(instant); 500 + let rollups = self.rollups.snapshot_at(instant); 501 + 502 + // 1. all-time counts 503 + let all_time_key = AllTimeRollupKey::new(collection).to_db_bytes()?; 504 + let mut total_counts = rollups 505 + .get(&all_time_key)? 506 + .as_deref() 507 + .map(db_complete::<CountsValue>) 508 + .transpose()? 509 + .unwrap_or_default(); 510 + 511 + // 2. live counts that haven't been rolled into all-time yet. 512 + let rollup_cursor = 513 + get_snapshot_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&global)?.ok_or( 514 + StorageError::BadStateError("Could not find current rollup cursor".to_string()), 515 + )?; 516 + 517 + let full_range = LiveCountsKey::range_from_cursor(rollup_cursor)?; 518 + for kv in rollups.range(full_range) { 519 + let (key_bytes, val_bytes) = kv?; 520 + let key = db_complete::<LiveCountsKey>(&key_bytes)?; 521 + if key.collection() == collection { 522 + let counts = db_complete::<CountsValue>(&val_bytes)?; 523 + total_counts.merge(&counts); 524 + } 525 + } 526 + Ok(( 527 + total_counts.records(), 528 + total_counts.dids().estimate() as u64, 529 + )) 530 + } 531 + 532 + fn get_records_by_collections( 533 + &self, 534 + collections: &[Nsid], 535 + limit: usize, 536 + ) -> StorageResult<Vec<UFOsRecord>> { 537 + if collections.is_empty() { 538 + return Ok(vec![]); 539 + } 540 + let mut record_iterators = Vec::new(); 541 + for collection in collections { 542 + let iter = RecordIterator::new(&self.feeds, self.records.clone(), collection, limit)?; 543 + record_iterators.push(iter.peekable()); 544 + } 545 + let mut merged = Vec::new(); 546 + loop { 547 + let mut latest: Option<(Cursor, usize)> = None; // ugh 548 + for (i, iter) in record_iterators.iter_mut().enumerate() { 549 + let Some(it) = iter.peek_mut() else { 550 + continue; 551 + }; 552 + let it = match it { 553 + Ok(v) => v, 554 + Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?, 555 + }; 556 + let Some(rec) = it else { 557 + break; 558 + }; 559 + if let Some((cursor, _)) = latest { 560 + if rec.cursor > cursor { 561 + latest = Some((rec.cursor, i)) 562 + } 563 + } else { 564 + latest = Some((rec.cursor, i)); 565 + } 566 + } 567 + let Some((_, idx)) = latest else { 568 + break; 569 + }; 570 + // yeah yeah whateverrrrrrrrrrrrrrrr 571 + merged.push(record_iterators[idx].next().unwrap().unwrap().unwrap()); 572 + } 573 + Ok(merged) 574 + } 575 + } 576 + 577 + #[async_trait] 578 + impl StoreReader for MemReader { 579 + async fn get_storage_stats(&self) -> StorageResult<serde_json::Value> { 580 + let s = self.clone(); 581 + tokio::task::spawn_blocking(move || MemReader::get_storage_stats(&s)).await? 582 + } 583 + async fn get_consumer_info(&self) -> StorageResult<ConsumerInfo> { 584 + let s = self.clone(); 585 + tokio::task::spawn_blocking(move || MemReader::get_consumer_info(&s)).await? 586 + } 587 + async fn get_top_collections(&self) -> Result<TopCollections, StorageError> { 588 + let s = self.clone(); 589 + tokio::task::spawn_blocking(move || MemReader::get_top_collections(&s)).await? 590 + } 591 + async fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)> { 592 + let s = self.clone(); 593 + let collection = collection.clone(); 594 + tokio::task::spawn_blocking(move || MemReader::get_counts_by_collection(&s, &collection)) 595 + .await? 596 + } 597 + async fn get_records_by_collections( 598 + &self, 599 + collections: &[Nsid], 600 + limit: usize, 601 + ) -> StorageResult<Vec<UFOsRecord>> { 602 + let s = self.clone(); 603 + let collections = collections.to_vec(); 604 + tokio::task::spawn_blocking(move || { 605 + MemReader::get_records_by_collections(&s, &collections, limit) 606 + }) 607 + .await? 608 + } 609 + } 610 + 611 + pub struct MemWriter { 612 + keyspace: MemKeyspace, 613 + global: MemPartion, 614 + feeds: MemPartion, 615 + records: MemPartion, 616 + rollups: MemPartion, 617 + queues: MemPartion, 618 + } 619 + 620 + impl MemWriter { 621 + fn rollup_delete_account( 622 + &mut self, 623 + cursor: Cursor, 624 + key_bytes: &[u8], 625 + val_bytes: &[u8], 626 + ) -> StorageResult<usize> { 627 + let did = db_complete::<DeleteAccountQueueVal>(val_bytes)?; 628 + self.delete_account(&did)?; 629 + let mut batch = self.keyspace.batch(); 630 + batch.remove(&self.queues, key_bytes); 631 + insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, cursor)?; 632 + batch.commit()?; 633 + Ok(1) 634 + } 635 + 636 + fn rollup_live_counts( 637 + &mut self, 638 + timelies: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), StorageError>>, 639 + cursor_exclusive_limit: Option<Cursor>, 640 + rollup_limit: usize, 641 + ) -> StorageResult<usize> { 642 + // current strategy is to buffer counts in mem before writing the rollups 643 + // we *could* read+write every single batch to rollup.. but their merge is associative so 644 + // ...so save the db some work up front? is this worth it? who knows... 645 + 646 + log::warn!("sup!!!"); 647 + 648 + #[derive(Eq, Hash, PartialEq)] 649 + enum Rollup { 650 + Hourly(HourTruncatedCursor), 651 + Weekly(WeekTruncatedCursor), 652 + AllTime, 653 + } 654 + 655 + let mut batch = self.keyspace.batch(); 656 + let mut cursors_advanced = 0; 657 + let mut last_cursor = Cursor::from_start(); 658 + let mut counts_by_rollup: HashMap<(Nsid, Rollup), CountsValue> = HashMap::new(); 659 + 660 + log::warn!("about to loop...."); 661 + for (i, kv) in timelies.enumerate() { 662 + // log::warn!("loop {i}..."); 663 + if i >= rollup_limit { 664 + break; 665 + } 666 + 667 + let (key_bytes, val_bytes) = kv?; 668 + let key = db_complete::<LiveCountsKey>(&key_bytes) 669 + .inspect_err(|e| log::warn!("rlc: key: {e:?}"))?; 670 + 671 + if cursor_exclusive_limit 672 + .map(|limit| key.cursor() > limit) 673 + .unwrap_or(false) 674 + { 675 + break; 676 + } 677 + 678 + batch.remove(&self.rollups, &key_bytes); 679 + let val = db_complete::<CountsValue>(&val_bytes) 680 + .inspect_err(|e| log::warn!("rlc: val: {e:?}"))?; 681 + counts_by_rollup 682 + .entry(( 683 + key.collection().clone(), 684 + Rollup::Hourly(key.cursor().into()), 685 + )) 686 + .or_default() 687 + .merge(&val); 688 + counts_by_rollup 689 + .entry(( 690 + key.collection().clone(), 691 + Rollup::Weekly(key.cursor().into()), 692 + )) 693 + .or_default() 694 + .merge(&val); 695 + counts_by_rollup 696 + .entry((key.collection().clone(), Rollup::AllTime)) 697 + .or_default() 698 + .merge(&val); 699 + 700 + cursors_advanced += 1; 701 + last_cursor = key.cursor(); 702 + } 703 + log::warn!("done looping. looping cbr counts(?).."); 704 + 705 + for ((nsid, rollup), counts) in counts_by_rollup { 706 + log::warn!("######################## cbr loop {nsid:?} {counts:?} ########################"); 707 + let key_bytes = match rollup { 708 + Rollup::Hourly(hourly_cursor) => { 709 + let k = HourlyRollupKey::new(hourly_cursor, &nsid); 710 + log::info!("hrly k: {k:?}"); 711 + k.to_db_bytes()? 712 + } 713 + Rollup::Weekly(weekly_cursor) => { 714 + let k = WeeklyRollupKey::new(weekly_cursor, &nsid); 715 + log::info!("weekly k: {k:?}"); 716 + k.to_db_bytes()? 717 + } 718 + Rollup::AllTime => { 719 + let k = AllTimeRollupKey::new(&nsid); 720 + log::info!("alltime k: {k:?}"); 721 + k.to_db_bytes()? 722 + } 723 + }; 724 + // log::info!("key bytes: {key_bytes:?}"); 725 + let mut rolled: CountsValue = self 726 + .rollups 727 + .get(&key_bytes)? 728 + .inspect(|v| { 729 + let lax = CountsValue::from_db_bytes(v); 730 + log::info!("val: len={}, lax={lax:?} first32={:?}", v.len(), v.get(..32)); 731 + }) 732 + .as_deref() 733 + .map(db_complete::<CountsValue>) 734 + .transpose() 735 + .inspect_err(|e| log::warn!("oooh did we break on the rolled thing? {e:?}"))? 736 + .unwrap_or_default(); 737 + 738 + 739 + // try to round-trip before inserting, for funsies 740 + let tripppin = counts.to_db_bytes()?; 741 + let (and_back, n) = CountsValue::from_db_bytes(&tripppin)?; 742 + assert_eq!(n, tripppin.len()); 743 + assert_eq!(counts.prefix, and_back.prefix); 744 + assert_eq!(counts.dids().estimate(), and_back.dids().estimate()); 745 + if counts.records() > 20000000 { 746 + panic!("COUNTS maybe wtf? {counts:?}") 747 + } 748 + // assert_eq!(rolled, and_back); 749 + 750 + 751 + rolled.merge(&counts); 752 + 753 + // try to round-trip before inserting, for funsies 754 + let tripppin = rolled.to_db_bytes()?; 755 + let (and_back, n) = CountsValue::from_db_bytes(&tripppin)?; 756 + assert_eq!(n, tripppin.len()); 757 + assert_eq!(rolled.prefix, and_back.prefix); 758 + assert_eq!(rolled.dids().estimate(), and_back.dids().estimate()); 759 + if rolled.records() > 20000000 { 760 + panic!("maybe wtf? {rolled:?}") 761 + } 762 + // assert_eq!(rolled, and_back); 763 + 764 + batch.insert(&self.rollups, &key_bytes, &rolled.to_db_bytes()?); 765 + } 766 + 767 + log::warn!("done cbr loop."); 768 + 769 + insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, last_cursor) 770 + .inspect_err(|e| log::warn!("insert neu: {e:?}"))?; 771 + 772 + batch.commit()?; 773 + 774 + log::warn!("ok finished rlc stuff. huh."); 775 + Ok(cursors_advanced) 776 + } 777 + } 778 + 779 + impl StoreWriter for MemWriter { 780 + fn insert_batch<const LIMIT: usize>( 781 + &mut self, 782 + event_batch: EventBatch<LIMIT>, 783 + ) -> StorageResult<()> { 784 + if event_batch.is_empty() { 785 + return Ok(()); 786 + } 787 + 788 + let mut batch = self.keyspace.batch(); 789 + 790 + // would be nice not to have to iterate everything at once here 791 + let latest = event_batch.latest_cursor().unwrap(); 792 + 793 + for (nsid, commits) in event_batch.commits_by_nsid { 794 + for commit in commits.commits { 795 + let location_key: RecordLocationKey = (&commit, &nsid).into(); 796 + 797 + match commit.action { 798 + CommitAction::Cut => { 799 + batch.remove(&self.records, &location_key.to_db_bytes()?); 800 + } 801 + CommitAction::Put(put_action) => { 802 + let feed_key = NsidRecordFeedKey::from_pair(nsid.clone(), commit.cursor); 803 + let feed_val: NsidRecordFeedVal = 804 + (&commit.did, &commit.rkey, commit.rev.as_str()).into(); 805 + batch.insert( 806 + &self.feeds, 807 + &feed_key.to_db_bytes()?, 808 + &feed_val.to_db_bytes()?, 809 + ); 810 + 811 + 812 + let location_val: RecordLocationVal = 813 + (commit.cursor, commit.rev.as_str(), put_action).into(); 814 + batch.insert( 815 + &self.records, 816 + &location_key.to_db_bytes()?, 817 + &location_val.to_db_bytes()?, 818 + ); 819 + } 820 + } 821 + } 822 + let live_counts_key: LiveCountsKey = (latest, &nsid).into(); 823 + let counts_value = CountsValue::new(commits.total_seen as u64, commits.dids_estimate); 824 + batch.insert( 825 + &self.rollups, 826 + &live_counts_key.to_db_bytes()?, 827 + &counts_value.to_db_bytes()?, 828 + ); 829 + } 830 + 831 + for remove in event_batch.account_removes { 832 + let queue_key = DeleteAccountQueueKey::new(remove.cursor); 833 + let queue_val: DeleteAccountQueueVal = remove.did; 834 + batch.insert( 835 + &self.queues, 836 + &queue_key.to_db_bytes()?, 837 + &queue_val.to_db_bytes()?, 838 + ); 839 + } 840 + 841 + batch.insert( 842 + &self.global, 843 + &DbStaticStr::<JetstreamCursorKey>::default().to_db_bytes()?, 844 + &latest.to_db_bytes()?, 845 + ); 846 + 847 + batch.commit()?; 848 + Ok(()) 849 + } 850 + 851 + fn step_rollup(&mut self) -> StorageResult<usize> { 852 + let rollup_cursor = 853 + get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)?.ok_or( 854 + StorageError::BadStateError("Could not find current rollup cursor".to_string()), 855 + ) 856 + .inspect_err(|e| log::warn!("failed getting rollup cursor: {e:?}"))?; 857 + 858 + // timelies 859 + let live_counts_range = LiveCountsKey::range_from_cursor(rollup_cursor) 860 + .inspect_err(|e| log::warn!("live counts range: {e:?}"))?; 861 + let mut timely_iter = self.rollups.range(live_counts_range).into_iter().peekable(); 862 + 863 + let timely_next_cursor = timely_iter 864 + .peek_mut() 865 + .map(|kv| -> StorageResult<Cursor> { 866 + match kv { 867 + Err(e) => Err(std::mem::replace(e, StorageError::Stolen))?, 868 + Ok((key_bytes, _)) => { 869 + let key = db_complete::<LiveCountsKey>(key_bytes) 870 + .inspect_err(|e| log::warn!("failed getting key for next timely: {e:?}"))?; 871 + Ok(key.cursor()) 872 + } 873 + } 874 + }) 875 + .transpose() 876 + .inspect_err(|e| log::warn!("something about timely: {e:?}"))?; 877 + 878 + // delete accounts 879 + let delete_accounts_range = 880 + DeleteAccountQueueKey::new(rollup_cursor).range_to_prefix_end()?; 881 + 882 + let next_delete = self 883 + .queues 884 + .range(delete_accounts_range) 885 + .into_iter() 886 + .next() 887 + .transpose() 888 + .inspect_err(|e| log::warn!("range for next delete: {e:?}"))? 889 + .map(|(key_bytes, val_bytes)| { 890 + db_complete::<DeleteAccountQueueKey>(&key_bytes) 891 + .inspect_err(|e| log::warn!("failed inside next delete thing????: {e:?}")) 892 + .map(|k| (k.suffix, key_bytes, val_bytes)) 893 + }) 894 + .transpose() 895 + .inspect_err(|e| log::warn!("failed getting next delete: {e:?}"))?; 896 + 897 + let cursors_stepped = match (timely_next_cursor, next_delete) { 898 + ( 899 + Some(timely_next_cursor), 900 + Some((delete_cursor, delete_key_bytes, delete_val_bytes)), 901 + ) => { 902 + if timely_next_cursor < delete_cursor { 903 + self.rollup_live_counts( 904 + timely_iter, 905 + Some(delete_cursor), 906 + MAX_BATCHED_ROLLUP_COUNTS, 907 + ) 908 + .inspect_err(|e| log::warn!("rolling up live counts: {e:?}"))? 909 + } else { 910 + self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes) 911 + .inspect_err(|e| log::warn!("deleting acocunt: {e:?}"))? 912 + } 913 + } 914 + (Some(_), None) => { 915 + self.rollup_live_counts(timely_iter, None, MAX_BATCHED_ROLLUP_COUNTS) 916 + .inspect_err(|e| log::warn!("rolling up (lasjdflkajs): {e:?}"))? 917 + } 918 + (None, Some((delete_cursor, delete_key_bytes, delete_val_bytes))) => { 919 + self.rollup_delete_account(delete_cursor, &delete_key_bytes, &delete_val_bytes) 920 + .inspect_err(|e| log::warn!("deleting acocunt other branch: {e:?}"))? 921 + } 922 + (None, None) => 0, 923 + }; 924 + 925 + Ok(cursors_stepped) 926 + } 927 + 928 + fn trim_collection( 929 + &mut self, 930 + collection: &Nsid, 931 + limit: usize, 932 + // TODO: could add a start cursor limit to avoid iterating deleted stuff at the start (/end) 933 + ) -> StorageResult<()> { 934 + let mut dangling_feed_keys_cleaned = 0; 935 + let mut records_deleted = 0; 936 + 937 + let mut batch = self.keyspace.batch(); 938 + 939 + let prefix = NsidRecordFeedKey::from_prefix_to_db_bytes(collection)?; 940 + let mut found = 0; 941 + for kv in self.feeds.prefix(&prefix).into_iter().rev() { 942 + let (key_bytes, val_bytes) = kv?; 943 + let feed_key = db_complete::<NsidRecordFeedKey>(&key_bytes)?; 944 + let feed_val = db_complete::<NsidRecordFeedVal>(&val_bytes)?; 945 + let location_key: RecordLocationKey = (&feed_key, &feed_val).into(); 946 + let location_key_bytes = location_key.to_db_bytes()?; 947 + 948 + let Some(location_val_bytes) = self.records.get(&location_key_bytes)? else { 949 + // record was deleted (hopefully) 950 + batch.remove(&self.feeds, &location_key_bytes); 951 + dangling_feed_keys_cleaned += 1; 952 + continue; 953 + }; 954 + 955 + let (meta, _) = RecordLocationMeta::from_db_bytes(&location_val_bytes)?; 956 + 957 + if meta.cursor() != feed_key.cursor() { 958 + // older/different version 959 + batch.remove(&self.feeds, &location_key_bytes); 960 + dangling_feed_keys_cleaned += 1; 961 + continue; 962 + } 963 + if meta.rev != feed_val.rev() { 964 + // weird... 965 + log::warn!("record lookup: cursor match but rev did not...? removing."); 966 + batch.remove(&self.feeds, &location_key_bytes); 967 + dangling_feed_keys_cleaned += 1; 968 + continue; 969 + } 970 + 971 + if batch.len() >= MAX_BATCHED_CLEANUP_SIZE { 972 + batch.commit()?; 973 + batch = self.keyspace.batch(); 974 + } 975 + 976 + found += 1; 977 + if found <= limit { 978 + continue; 979 + } 980 + 981 + batch.remove(&self.feeds, &location_key_bytes); 982 + batch.remove(&self.records, &location_key_bytes); 983 + records_deleted += 1; 984 + } 985 + 986 + batch.commit()?; 987 + 988 + log::info!("trim_collection ({collection:?}) removed {dangling_feed_keys_cleaned} dangling feed entries and {records_deleted} records"); 989 + Ok(()) 990 + } 991 + 992 + fn delete_account(&mut self, did: &Did) -> Result<usize, StorageError> { 993 + let mut records_deleted = 0; 994 + let mut batch = self.keyspace.batch(); 995 + let prefix = RecordLocationKey::from_prefix_to_db_bytes(did)?; 996 + for kv in self.records.prefix(&prefix) { 997 + let (key_bytes, _) = kv?; 998 + batch.remove(&self.records, &key_bytes); 999 + records_deleted += 1; 1000 + if batch.len() >= MAX_BATCHED_ACCOUNT_DELETE_RECORDS { 1001 + batch.commit()?; 1002 + batch = self.keyspace.batch(); 1003 + } 1004 + } 1005 + batch.commit()?; 1006 + Ok(records_deleted) 1007 + } 1008 + } 1009 + 1010 + /// Get a value from a fixed key 1011 + fn get_static_neu<K: StaticStr, V: DbBytes>(global: &MemPartion) -> StorageResult<Option<V>> { 1012 + let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 1013 + let value = global 1014 + .get(&key_bytes)? 1015 + .map(|value_bytes| db_complete(&value_bytes)) 1016 + .transpose()?; 1017 + Ok(value) 1018 + } 1019 + 1020 + /// Get a value from a fixed key 1021 + fn get_snapshot_static_neu<K: StaticStr, V: DbBytes>( 1022 + global: &MemPartion, 1023 + ) -> StorageResult<Option<V>> { 1024 + let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 1025 + let value = global 1026 + .get(&key_bytes)? 1027 + .map(|value_bytes| db_complete(&value_bytes)) 1028 + .transpose()?; 1029 + Ok(value) 1030 + } 1031 + 1032 + /// Set a value to a fixed key 1033 + fn insert_static_neu<K: StaticStr>( 1034 + global: &MemPartion, 1035 + value: impl DbBytes, 1036 + ) -> StorageResult<()> { 1037 + let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 1038 + let value_bytes = value.to_db_bytes()?; 1039 + global.insert(&key_bytes, &value_bytes)?; 1040 + Ok(()) 1041 + } 1042 + 1043 + /// Set a value to a fixed key 1044 + fn insert_batch_static_neu<K: StaticStr>( 1045 + batch: &mut MemBatch, 1046 + global: &MemPartion, 1047 + value: impl DbBytes, 1048 + ) -> StorageResult<()> { 1049 + let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?; 1050 + let value_bytes = value.to_db_bytes()?; 1051 + batch.insert(global, &key_bytes, &value_bytes); 1052 + Ok(()) 1053 + } 1054 + 1055 + #[derive(Debug, serde::Serialize, schemars::JsonSchema)] 1056 + pub struct StorageInfo { 1057 + pub keyspace_disk_space: u64, 1058 + pub keyspace_journal_count: usize, 1059 + pub keyspace_sequence: u64, 1060 + pub global_approximate_len: usize, 1061 + } 1062 + 1063 + #[cfg(test)] 1064 + mod tests { 1065 + use super::*; 1066 + use crate::{DeleteAccount, RecordKey, UFOsCommit}; 1067 + use jetstream::events::{CommitEvent, CommitOp}; 1068 + use jetstream::exports::Cid; 1069 + use serde_json::value::RawValue; 1070 + 1071 + fn fjall_db() -> (MemReader, MemWriter) { 1072 + let (read, write, _) = MemStorage::init( 1073 + tempfile::tempdir().unwrap(), 1074 + "offline test (no real jetstream endpoint)".to_string(), 1075 + false, 1076 + MemConfig { temp: true }, 1077 + ) 1078 + .unwrap(); 1079 + (read, write) 1080 + } 1081 + 1082 + const TEST_BATCH_LIMIT: usize = 16; 1083 + 1084 + #[derive(Debug, Default)] 1085 + struct TestBatch { 1086 + pub batch: EventBatch<TEST_BATCH_LIMIT>, 1087 + } 1088 + 1089 + impl TestBatch { 1090 + #[allow(clippy::too_many_arguments)] 1091 + pub fn create( 1092 + &mut self, 1093 + did: &str, 1094 + collection: &str, 1095 + rkey: &str, 1096 + record: &str, 1097 + rev: Option<&str>, 1098 + cid: Option<Cid>, 1099 + cursor: u64, 1100 + ) -> Nsid { 1101 + let did = Did::new(did.to_string()).unwrap(); 1102 + let collection = Nsid::new(collection.to_string()).unwrap(); 1103 + let record = RawValue::from_string(record.to_string()).unwrap(); 1104 + let cid = cid.unwrap_or( 1105 + "bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy" 1106 + .parse() 1107 + .unwrap(), 1108 + ); 1109 + 1110 + let event = CommitEvent { 1111 + collection, 1112 + rkey: RecordKey::new(rkey.to_string()).unwrap(), 1113 + rev: rev.unwrap_or("asdf").to_string(), 1114 + operation: CommitOp::Create, 1115 + record: Some(record), 1116 + cid: Some(cid), 1117 + }; 1118 + 1119 + let (commit, collection) = 1120 + UFOsCommit::from_commit_info(event, did.clone(), Cursor::from_raw_u64(cursor)) 1121 + .unwrap(); 1122 + 1123 + self.batch 1124 + .commits_by_nsid 1125 + .entry(collection.clone()) 1126 + .or_default() 1127 + .truncating_insert(commit) 1128 + .unwrap(); 1129 + 1130 + collection 1131 + } 1132 + #[allow(clippy::too_many_arguments)] 1133 + pub fn update( 1134 + &mut self, 1135 + did: &str, 1136 + collection: &str, 1137 + rkey: &str, 1138 + record: &str, 1139 + rev: Option<&str>, 1140 + cid: Option<Cid>, 1141 + cursor: u64, 1142 + ) -> Nsid { 1143 + let did = Did::new(did.to_string()).unwrap(); 1144 + let collection = Nsid::new(collection.to_string()).unwrap(); 1145 + let record = RawValue::from_string(record.to_string()).unwrap(); 1146 + let cid = cid.unwrap_or( 1147 + "bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy" 1148 + .parse() 1149 + .unwrap(), 1150 + ); 1151 + 1152 + let event = CommitEvent { 1153 + collection, 1154 + rkey: RecordKey::new(rkey.to_string()).unwrap(), 1155 + rev: rev.unwrap_or("asdf").to_string(), 1156 + operation: CommitOp::Update, 1157 + record: Some(record), 1158 + cid: Some(cid), 1159 + }; 1160 + 1161 + let (commit, collection) = 1162 + UFOsCommit::from_commit_info(event, did.clone(), Cursor::from_raw_u64(cursor)) 1163 + .unwrap(); 1164 + 1165 + self.batch 1166 + .commits_by_nsid 1167 + .entry(collection.clone()) 1168 + .or_default() 1169 + .truncating_insert(commit) 1170 + .unwrap(); 1171 + 1172 + collection 1173 + } 1174 + #[allow(clippy::too_many_arguments)] 1175 + pub fn delete( 1176 + &mut self, 1177 + did: &str, 1178 + collection: &str, 1179 + rkey: &str, 1180 + rev: Option<&str>, 1181 + cursor: u64, 1182 + ) -> Nsid { 1183 + let did = Did::new(did.to_string()).unwrap(); 1184 + let collection = Nsid::new(collection.to_string()).unwrap(); 1185 + let event = CommitEvent { 1186 + collection, 1187 + rkey: RecordKey::new(rkey.to_string()).unwrap(), 1188 + rev: rev.unwrap_or("asdf").to_string(), 1189 + operation: CommitOp::Delete, 1190 + record: None, 1191 + cid: None, 1192 + }; 1193 + 1194 + let (commit, collection) = 1195 + UFOsCommit::from_commit_info(event, did, Cursor::from_raw_u64(cursor)).unwrap(); 1196 + 1197 + self.batch 1198 + .commits_by_nsid 1199 + .entry(collection.clone()) 1200 + .or_default() 1201 + .truncating_insert(commit) 1202 + .unwrap(); 1203 + 1204 + collection 1205 + } 1206 + pub fn delete_account(&mut self, did: &str, cursor: u64) -> Did { 1207 + let did = Did::new(did.to_string()).unwrap(); 1208 + self.batch.account_removes.push(DeleteAccount { 1209 + did: did.clone(), 1210 + cursor: Cursor::from_raw_u64(cursor), 1211 + }); 1212 + did 1213 + } 1214 + } 1215 + 1216 + #[test] 1217 + fn test_hello() -> anyhow::Result<()> { 1218 + let (read, mut write) = fjall_db(); 1219 + write.insert_batch::<TEST_BATCH_LIMIT>(EventBatch::default())?; 1220 + let (records, dids) = 1221 + read.get_counts_by_collection(&Nsid::new("a.b.c".to_string()).unwrap())?; 1222 + assert_eq!(records, 0); 1223 + assert_eq!(dids, 0); 1224 + Ok(()) 1225 + } 1226 + 1227 + #[test] 1228 + fn test_insert_one() -> anyhow::Result<()> { 1229 + let (read, mut write) = fjall_db(); 1230 + 1231 + let mut batch = TestBatch::default(); 1232 + let collection = batch.create( 1233 + "did:plc:inze6wrmsm7pjl7yta3oig77", 1234 + "a.b.c", 1235 + "asdf", 1236 + "{}", 1237 + Some("rev-z"), 1238 + None, 1239 + 100, 1240 + ); 1241 + write.insert_batch(batch.batch)?; 1242 + 1243 + let (records, dids) = read.get_counts_by_collection(&collection)?; 1244 + assert_eq!(records, 1); 1245 + assert_eq!(dids, 1); 1246 + let (records, dids) = 1247 + read.get_counts_by_collection(&Nsid::new("d.e.f".to_string()).unwrap())?; 1248 + assert_eq!(records, 0); 1249 + assert_eq!(dids, 0); 1250 + 1251 + let records = read.get_records_by_collections(&[collection], 2)?; 1252 + assert_eq!(records.len(), 1); 1253 + let rec = &records[0]; 1254 + assert_eq!(rec.record.get(), "{}"); 1255 + assert!(!rec.is_update); 1256 + 1257 + let records = 1258 + read.get_records_by_collections(&[Nsid::new("d.e.f".to_string()).unwrap()], 2)?; 1259 + assert_eq!(records.len(), 0); 1260 + 1261 + Ok(()) 1262 + } 1263 + 1264 + #[test] 1265 + fn test_get_multi_collection() -> anyhow::Result<()> { 1266 + let (read, mut write) = fjall_db(); 1267 + 1268 + let mut batch = TestBatch::default(); 1269 + batch.create( 1270 + "did:plc:inze6wrmsm7pjl7yta3oig77", 1271 + "a.a.a", 1272 + "aaa", 1273 + r#""earliest""#, 1274 + Some("rev-a"), 1275 + None, 1276 + 100, 1277 + ); 1278 + batch.create( 1279 + "did:plc:inze6wrmsm7pjl7yta3oig77", 1280 + "a.a.b", 1281 + "aab", 1282 + r#""in between""#, 1283 + Some("rev-ab"), 1284 + None, 1285 + 101, 1286 + ); 1287 + batch.create( 1288 + "did:plc:inze6wrmsm7pjl7yta3oig77", 1289 + "a.a.a", 1290 + "aaa-2", 1291 + r#""last""#, 1292 + Some("rev-a-2"), 1293 + None, 1294 + 102, 1295 + ); 1296 + write.insert_batch(batch.batch)?; 1297 + 1298 + let records = read.get_records_by_collections( 1299 + &[ 1300 + Nsid::new("a.a.a".to_string()).unwrap(), 1301 + Nsid::new("a.a.b".to_string()).unwrap(), 1302 + Nsid::new("a.a.c".to_string()).unwrap(), 1303 + ], 1304 + 100, 1305 + )?; 1306 + assert_eq!(records.len(), 3); 1307 + assert_eq!(records[0].record.get(), r#""last""#); 1308 + assert_eq!( 1309 + records[0].collection, 1310 + Nsid::new("a.a.a".to_string()).unwrap() 1311 + ); 1312 + assert_eq!(records[1].record.get(), r#""in between""#); 1313 + assert_eq!( 1314 + records[1].collection, 1315 + Nsid::new("a.a.b".to_string()).unwrap() 1316 + ); 1317 + assert_eq!(records[2].record.get(), r#""earliest""#); 1318 + assert_eq!( 1319 + records[2].collection, 1320 + Nsid::new("a.a.a".to_string()).unwrap() 1321 + ); 1322 + 1323 + Ok(()) 1324 + } 1325 + 1326 + #[test] 1327 + fn test_update_one() -> anyhow::Result<()> { 1328 + let (read, mut write) = fjall_db(); 1329 + 1330 + let mut batch = TestBatch::default(); 1331 + let collection = batch.create( 1332 + "did:plc:inze6wrmsm7pjl7yta3oig77", 1333 + "a.b.c", 1334 + "rkey-asdf", 1335 + "{}", 1336 + Some("rev-a"), 1337 + None, 1338 + 100, 1339 + ); 1340 + write.insert_batch(batch.batch)?; 1341 + 1342 + let mut batch = TestBatch::default(); 1343 + batch.update( 1344 + "did:plc:inze6wrmsm7pjl7yta3oig77", 1345 + "a.b.c", 1346 + "rkey-asdf", 1347 + r#"{"ch": "ch-ch-ch-changes"}"#, 1348 + Some("rev-z"), 1349 + None, 1350 + 101, 1351 + ); 1352 + write.insert_batch(batch.batch)?; 1353 + 1354 + let (records, dids) = read.get_counts_by_collection(&collection)?; 1355 + assert_eq!(records, 1); 1356 + assert_eq!(dids, 1); 1357 + 1358 + let records = read.get_records_by_collections(&[collection], 2)?; 1359 + assert_eq!(records.len(), 1); 1360 + let rec = &records[0]; 1361 + assert_eq!(rec.record.get(), r#"{"ch": "ch-ch-ch-changes"}"#); 1362 + assert!(rec.is_update); 1363 + Ok(()) 1364 + } 1365 + 1366 + #[test] 1367 + fn test_delete_one() -> anyhow::Result<()> { 1368 + let (read, mut write) = fjall_db(); 1369 + 1370 + let mut batch = TestBatch::default(); 1371 + let collection = batch.create( 1372 + "did:plc:inze6wrmsm7pjl7yta3oig77", 1373 + "a.b.c", 1374 + "rkey-asdf", 1375 + "{}", 1376 + Some("rev-a"), 1377 + None, 1378 + 100, 1379 + ); 1380 + write.insert_batch(batch.batch)?; 1381 + 1382 + let mut batch = TestBatch::default(); 1383 + batch.delete( 1384 + "did:plc:inze6wrmsm7pjl7yta3oig77", 1385 + "a.b.c", 1386 + "rkey-asdf", 1387 + Some("rev-z"), 1388 + 101, 1389 + ); 1390 + write.insert_batch(batch.batch)?; 1391 + 1392 + let (records, dids) = read.get_counts_by_collection(&collection)?; 1393 + assert_eq!(records, 1); 1394 + assert_eq!(dids, 1); 1395 + 1396 + let records = read.get_records_by_collections(&[collection], 2)?; 1397 + assert_eq!(records.len(), 0); 1398 + 1399 + Ok(()) 1400 + } 1401 + 1402 + #[test] 1403 + fn test_collection_trim() -> anyhow::Result<()> { 1404 + let (read, mut write) = fjall_db(); 1405 + 1406 + let mut batch = TestBatch::default(); 1407 + batch.create( 1408 + "did:plc:inze6wrmsm7pjl7yta3oig77", 1409 + "a.a.a", 1410 + "rkey-aaa", 1411 + "{}", 1412 + Some("rev-aaa"), 1413 + None, 1414 + 10_000, 1415 + ); 1416 + let mut last_b_cursor; 1417 + for i in 1..=10 { 1418 + last_b_cursor = 11_000 + i; 1419 + batch.create( 1420 + &format!("did:plc:inze6wrmsm7pjl7yta3oig7{}", i % 3), 1421 + "a.a.b", 1422 + &format!("rkey-bbb-{i}"), 1423 + &format!(r#"{{"n": {i}}}"#), 1424 + Some(&format!("rev-bbb-{i}")), 1425 + None, 1426 + last_b_cursor, 1427 + ); 1428 + } 1429 + batch.create( 1430 + "did:plc:inze6wrmsm7pjl7yta3oig77", 1431 + "a.a.c", 1432 + "rkey-ccc", 1433 + "{}", 1434 + Some("rev-ccc"), 1435 + None, 1436 + 12_000, 1437 + ); 1438 + 1439 + write.insert_batch(batch.batch)?; 1440 + 1441 + let records = 1442 + read.get_records_by_collections(&[Nsid::new("a.a.a".to_string()).unwrap()], 100)?; 1443 + assert_eq!(records.len(), 1); 1444 + let records = 1445 + read.get_records_by_collections(&[Nsid::new("a.a.b".to_string()).unwrap()], 100)?; 1446 + assert_eq!(records.len(), 10); 1447 + let records = 1448 + read.get_records_by_collections(&[Nsid::new("a.a.c".to_string()).unwrap()], 100)?; 1449 + assert_eq!(records.len(), 1); 1450 + let records = 1451 + read.get_records_by_collections(&[Nsid::new("a.a.d".to_string()).unwrap()], 100)?; 1452 + assert_eq!(records.len(), 0); 1453 + 1454 + write.trim_collection(&Nsid::new("a.a.a".to_string()).unwrap(), 6)?; 1455 + write.trim_collection(&Nsid::new("a.a.b".to_string()).unwrap(), 6)?; 1456 + write.trim_collection(&Nsid::new("a.a.c".to_string()).unwrap(), 6)?; 1457 + write.trim_collection(&Nsid::new("a.a.d".to_string()).unwrap(), 6)?; 1458 + 1459 + let records = 1460 + read.get_records_by_collections(&[Nsid::new("a.a.a".to_string()).unwrap()], 100)?; 1461 + assert_eq!(records.len(), 1); 1462 + let records = 1463 + read.get_records_by_collections(&[Nsid::new("a.a.b".to_string()).unwrap()], 100)?; 1464 + assert_eq!(records.len(), 6); 1465 + let records = 1466 + read.get_records_by_collections(&[Nsid::new("a.a.c".to_string()).unwrap()], 100)?; 1467 + assert_eq!(records.len(), 1); 1468 + let records = 1469 + read.get_records_by_collections(&[Nsid::new("a.a.d".to_string()).unwrap()], 100)?; 1470 + assert_eq!(records.len(), 0); 1471 + 1472 + Ok(()) 1473 + } 1474 + 1475 + #[test] 1476 + fn test_delete_account() -> anyhow::Result<()> { 1477 + let (read, mut write) = fjall_db(); 1478 + 1479 + let mut batch = TestBatch::default(); 1480 + batch.create( 1481 + "did:plc:person-a", 1482 + "a.a.a", 1483 + "rkey-aaa", 1484 + "{}", 1485 + Some("rev-aaa"), 1486 + None, 1487 + 10_000, 1488 + ); 1489 + for i in 1..=2 { 1490 + batch.create( 1491 + "did:plc:person-b", 1492 + "a.a.a", 1493 + &format!("rkey-bbb-{i}"), 1494 + &format!(r#"{{"n": {i}}}"#), 1495 + Some(&format!("rev-bbb-{i}")), 1496 + None, 1497 + 11_000 + i, 1498 + ); 1499 + } 1500 + write.insert_batch(batch.batch)?; 1501 + 1502 + let records = 1503 + read.get_records_by_collections(&[Nsid::new("a.a.a".to_string()).unwrap()], 100)?; 1504 + assert_eq!(records.len(), 3); 1505 + 1506 + let records_deleted = 1507 + write.delete_account(&Did::new("did:plc:person-b".to_string()).unwrap())?; 1508 + assert_eq!(records_deleted, 2); 1509 + 1510 + let records = 1511 + read.get_records_by_collections(&[Nsid::new("a.a.a".to_string()).unwrap()], 100)?; 1512 + assert_eq!(records.len(), 1); 1513 + 1514 + Ok(()) 1515 + } 1516 + 1517 + #[test] 1518 + fn rollup_delete_account_removes_record() -> anyhow::Result<()> { 1519 + let (read, mut write) = fjall_db(); 1520 + 1521 + let mut batch = TestBatch::default(); 1522 + batch.create( 1523 + "did:plc:person-a", 1524 + "a.a.a", 1525 + "rkey-aaa", 1526 + "{}", 1527 + Some("rev-aaa"), 1528 + None, 1529 + 10_000, 1530 + ); 1531 + write.insert_batch(batch.batch)?; 1532 + 1533 + let mut batch = TestBatch::default(); 1534 + batch.delete_account("did:plc:person-a", 9_999); // queue it before the rollup 1535 + write.insert_batch(batch.batch)?; 1536 + 1537 + write.step_rollup()?; 1538 + 1539 + let records = 1540 + read.get_records_by_collections(&[Nsid::new("a.a.a".to_string()).unwrap()], 1)?; 1541 + assert_eq!(records.len(), 0); 1542 + 1543 + Ok(()) 1544 + } 1545 + 1546 + #[test] 1547 + fn rollup_delete_live_count_step() -> anyhow::Result<()> { 1548 + let (read, mut write) = fjall_db(); 1549 + 1550 + let mut batch = TestBatch::default(); 1551 + batch.create( 1552 + "did:plc:person-a", 1553 + "a.a.a", 1554 + "rkey-aaa", 1555 + "{}", 1556 + Some("rev-aaa"), 1557 + None, 1558 + 10_000, 1559 + ); 1560 + write.insert_batch(batch.batch)?; 1561 + 1562 + let n = write.step_rollup()?; 1563 + assert_eq!(n, 1); 1564 + 1565 + let mut batch = TestBatch::default(); 1566 + batch.delete_account("did:plc:person-a", 10_001); 1567 + write.insert_batch(batch.batch)?; 1568 + 1569 + let records = 1570 + read.get_records_by_collections(&[Nsid::new("a.a.a".to_string()).unwrap()], 1)?; 1571 + assert_eq!(records.len(), 1); 1572 + 1573 + let n = write.step_rollup()?; 1574 + assert_eq!(n, 1); 1575 + 1576 + let records = 1577 + read.get_records_by_collections(&[Nsid::new("a.a.a".to_string()).unwrap()], 1)?; 1578 + assert_eq!(records.len(), 0); 1579 + 1580 + let mut batch = TestBatch::default(); 1581 + batch.delete_account("did:plc:person-a", 9_999); 1582 + write.insert_batch(batch.batch)?; 1583 + 1584 + let n = write.step_rollup()?; 1585 + assert_eq!(n, 0); 1586 + 1587 + Ok(()) 1588 + } 1589 + 1590 + #[test] 1591 + fn rollup_multiple_count_batches() -> anyhow::Result<()> { 1592 + let (_read, mut write) = fjall_db(); 1593 + 1594 + let mut batch = TestBatch::default(); 1595 + batch.create( 1596 + "did:plc:person-a", 1597 + "a.a.a", 1598 + "rkey-aaa", 1599 + "{}", 1600 + Some("rev-aaa"), 1601 + None, 1602 + 10_000, 1603 + ); 1604 + write.insert_batch(batch.batch)?; 1605 + 1606 + let mut batch = TestBatch::default(); 1607 + batch.create( 1608 + "did:plc:person-a", 1609 + "a.a.a", 1610 + "rkey-aab", 1611 + "{}", 1612 + Some("rev-aab"), 1613 + None, 1614 + 10_001, 1615 + ); 1616 + write.insert_batch(batch.batch)?; 1617 + 1618 + let n = write.step_rollup()?; 1619 + assert_eq!(n, 2); 1620 + 1621 + let n = write.step_rollup()?; 1622 + assert_eq!(n, 0); 1623 + 1624 + Ok(()) 1625 + } 1626 + 1627 + #[test] 1628 + fn counts_before_and_after_rollup() -> anyhow::Result<()> { 1629 + let (read, mut write) = fjall_db(); 1630 + 1631 + let mut batch = TestBatch::default(); 1632 + batch.create( 1633 + "did:plc:person-a", 1634 + "a.a.a", 1635 + "rkey-aaa", 1636 + "{}", 1637 + Some("rev-aaa"), 1638 + None, 1639 + 10_000, 1640 + ); 1641 + batch.create( 1642 + "did:plc:person-b", 1643 + "a.a.a", 1644 + "rkey-bbb", 1645 + "{}", 1646 + Some("rev-bbb"), 1647 + None, 1648 + 10_001, 1649 + ); 1650 + write.insert_batch(batch.batch)?; 1651 + 1652 + let mut batch = TestBatch::default(); 1653 + batch.delete_account("did:plc:person-a", 11_000); 1654 + write.insert_batch(batch.batch)?; 1655 + 1656 + let mut batch = TestBatch::default(); 1657 + batch.create( 1658 + "did:plc:person-a", 1659 + "a.a.a", 1660 + "rkey-aac", 1661 + "{}", 1662 + Some("rev-aac"), 1663 + None, 1664 + 12_000, 1665 + ); 1666 + write.insert_batch(batch.batch)?; 1667 + 1668 + // before any rollup 1669 + let (records, dids) = 1670 + read.get_counts_by_collection(&Nsid::new("a.a.a".to_string()).unwrap())?; 1671 + assert_eq!(records, 3); 1672 + assert_eq!(dids, 2); 1673 + 1674 + // first batch rolled up 1675 + let n = write.step_rollup()?; 1676 + assert_eq!(n, 1); 1677 + 1678 + let (records, dids) = 1679 + read.get_counts_by_collection(&Nsid::new("a.a.a".to_string()).unwrap())?; 1680 + assert_eq!(records, 3); 1681 + assert_eq!(dids, 2); 1682 + 1683 + // delete account rolled up 1684 + let n = write.step_rollup()?; 1685 + assert_eq!(n, 1); 1686 + 1687 + let (records, dids) = 1688 + read.get_counts_by_collection(&Nsid::new("a.a.a".to_string()).unwrap())?; 1689 + assert_eq!(records, 3); 1690 + assert_eq!(dids, 2); 1691 + 1692 + // second batch rolled up 1693 + let n = write.step_rollup()?; 1694 + assert_eq!(n, 1); 1695 + 1696 + let (records, dids) = 1697 + read.get_counts_by_collection(&Nsid::new("a.a.a".to_string()).unwrap())?; 1698 + assert_eq!(records, 3); 1699 + assert_eq!(dids, 2); 1700 + 1701 + // no more rollups left 1702 + let n = write.step_rollup()?; 1703 + assert_eq!(n, 0); 1704 + 1705 + Ok(()) 1706 + } 1707 + 1708 + #[test] 1709 + fn get_top_collections() -> anyhow::Result<()> { 1710 + let (read, mut write) = fjall_db(); 1711 + 1712 + let mut batch = TestBatch::default(); 1713 + batch.create( 1714 + "did:plc:person-a", 1715 + "a.a.a", 1716 + "rkey-aaa", 1717 + "{}", 1718 + Some("rev-aaa"), 1719 + None, 1720 + 10_000, 1721 + ); 1722 + batch.create( 1723 + "did:plc:person-b", 1724 + "a.a.b", 1725 + "rkey-bbb", 1726 + "{}", 1727 + Some("rev-bbb"), 1728 + None, 1729 + 10_001, 1730 + ); 1731 + batch.create( 1732 + "did:plc:person-c", 1733 + "a.b.c", 1734 + "rkey-ccc", 1735 + "{}", 1736 + Some("rev-ccc"), 1737 + None, 1738 + 10_002, 1739 + ); 1740 + batch.create( 1741 + "did:plc:person-a", 1742 + "a.a.a", 1743 + "rkey-aaa-2", 1744 + "{}", 1745 + Some("rev-aaa-2"), 1746 + None, 1747 + 10_003, 1748 + ); 1749 + write.insert_batch(batch.batch)?; 1750 + 1751 + let n = write.step_rollup()?; 1752 + assert_eq!(n, 3); // 3 collections 1753 + 1754 + let tops = read.get_top_collections()?; 1755 + assert_eq!( 1756 + tops, 1757 + TopCollections { 1758 + total_records: 4, 1759 + dids_estimate: 3, 1760 + nsid_child_segments: HashMap::from([( 1761 + "a".to_string(), 1762 + TopCollections { 1763 + total_records: 4, 1764 + dids_estimate: 3, 1765 + nsid_child_segments: HashMap::from([ 1766 + ( 1767 + "a".to_string(), 1768 + TopCollections { 1769 + total_records: 3, 1770 + dids_estimate: 2, 1771 + nsid_child_segments: HashMap::from([ 1772 + ( 1773 + "a".to_string(), 1774 + TopCollections { 1775 + total_records: 2, 1776 + dids_estimate: 1, 1777 + nsid_child_segments: HashMap::from([]), 1778 + }, 1779 + ), 1780 + ( 1781 + "b".to_string(), 1782 + TopCollections { 1783 + total_records: 1, 1784 + dids_estimate: 1, 1785 + nsid_child_segments: HashMap::from([]), 1786 + } 1787 + ), 1788 + ]), 1789 + }, 1790 + ), 1791 + ( 1792 + "b".to_string(), 1793 + TopCollections { 1794 + total_records: 1, 1795 + dids_estimate: 1, 1796 + nsid_child_segments: HashMap::from([( 1797 + "c".to_string(), 1798 + TopCollections { 1799 + total_records: 1, 1800 + dids_estimate: 1, 1801 + nsid_child_segments: HashMap::from([]), 1802 + }, 1803 + ),]), 1804 + }, 1805 + ), 1806 + ]), 1807 + }, 1808 + ),]), 1809 + } 1810 + ); 1811 + Ok(()) 1812 + } 1813 + }
+14 -3
ufos/src/store_types.rs
··· 211 211 impl SerdeBytes for EstimatedDidsValue {} 212 212 impl DbBytes for EstimatedDidsValue { 213 213 fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 214 - SerdeBytes::to_bytes(self) 214 + Ok(vec![1, 2, 3]) 215 + // SerdeBytes::to_bytes(self) 215 216 } 216 - fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 217 - SerdeBytes::from_bytes(bytes) 217 + fn from_db_bytes(_bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 218 + Ok((Self(CardinalityEstimator::new()), 3)) 219 + // SerdeBytes::from_bytes(bytes) 218 220 } 219 221 } 220 222 ··· 398 400 let mut estimator = CardinalityEstimator::new(); 399 401 for i in 0..10 { 400 402 estimator.insert(&Did::new(format!("did:plc:inze6wrmsm7pjl7yta3oig7{i}")).unwrap()); 403 + } 404 + let original = CountsValue::new(123, estimator.clone()); 405 + let serialized = original.to_db_bytes()?; 406 + let (restored, bytes_consumed) = CountsValue::from_db_bytes(&serialized)?; 407 + assert_eq!(restored, original); 408 + assert_eq!(bytes_consumed, serialized.len()); 409 + 410 + for i in 10..10_000 { 411 + estimator.insert(&Did::new(format!("did:plc:inze6wrmsm7pjl7yta3oig{i}")).unwrap()); 401 412 } 402 413 let original = CountsValue::new(123, estimator); 403 414 let serialized = original.to_db_bytes()?;