Server tools to backfill, tail, mirror, and verify PLC logs
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

read any fixtures in the folder instead of hardcoding the paths

dawn 9f092383 3624c03b

+98 -34
+9 -9
flake.lock
··· 2 2 "nodes": { 3 3 "nixpkgs": { 4 4 "locked": { 5 - "lastModified": 1771177547, 6 - "narHash": "sha256-trTtk3WTOHz7hSw89xIIvahkgoFJYQ0G43IlqprFoMA=", 5 + "lastModified": 1776255774, 6 + "narHash": "sha256-psVTpH6PK3q1htMJpmdz1hLF5pQgEshu7gQWgKO6t6Y=", 7 7 "owner": "nixos", 8 8 "repo": "nixpkgs", 9 - "rev": "ac055f38c798b0d87695240c7b761b82fc7e5bc2", 9 + "rev": "566acc07c54dc807f91625bb286cb9b321b5f42a", 10 10 "type": "github" 11 11 }, 12 12 "original": { ··· 18 18 }, 19 19 "nixpkgs-lib": { 20 20 "locked": { 21 - "lastModified": 1769909678, 22 - "narHash": "sha256-cBEymOf4/o3FD5AZnzC3J9hLbiZ+QDT/KDuyHXVJOpM=", 21 + "lastModified": 1774748309, 22 + "narHash": "sha256-+U7gF3qxzwD5TZuANzZPeJTZRHS29OFQgkQ2kiTJBIQ=", 23 23 "owner": "nix-community", 24 24 "repo": "nixpkgs.lib", 25 - "rev": "72716169fe93074c333e8d0173151350670b824c", 25 + "rev": "333c4e0545a6da976206c74db8773a1645b5870a", 26 26 "type": "github" 27 27 }, 28 28 "original": { ··· 36 36 "nixpkgs-lib": "nixpkgs-lib" 37 37 }, 38 38 "locked": { 39 - "lastModified": 1769996383, 40 - "narHash": "sha256-AnYjnFWgS49RlqX7LrC4uA+sCCDBj0Ry/WOJ5XWAsa0=", 39 + "lastModified": 1775087534, 40 + "narHash": "sha256-91qqW8lhL7TLwgQWijoGBbiD4t7/q75KTi8NxjVmSmA=", 41 41 "owner": "hercules-ci", 42 42 "repo": "flake-parts", 43 - "rev": "57928607ea566b5db3ad13af0e57e921e6b12381", 43 + "rev": "3107b77cd68437b9a76194f0f7f9c55f2329ca5b", 44 44 "type": "github" 45 45 }, 46 46 "original": {
+4 -1
src/bin/mirror.rs
··· 177 177 if let Some(last) = page.ops.last() { 178 178 last_seq_from_poll = last.seq; 179 179 } 180 - let _ = send_page_bg.send(page).await; 180 + if send_page_bg.send(page).await.is_err() { 181 + poll_task.abort(); 182 + return anyhow::Ok("fjall-poll-stream (dest closed)"); 183 + } 181 184 if near_tip { 182 185 break; 183 186 }
+12 -10
src/crypto.rs
··· 159 159 160 160 #[test] 161 161 fn test_fixture_signatures() { 162 - let fixtures = [ 163 - "tests/fixtures/log_bskyapp.json", 164 - "tests/fixtures/log_legacy_dholms.json", 165 - "tests/fixtures/log_nullification.json", 166 - "tests/fixtures/log_tombstone.json", 167 - ]; 162 + let mut fixtures: Vec<_> = std::fs::read_dir("tests/fixtures") 163 + .unwrap() 164 + .filter_map(|e| e.ok()) 165 + .map(|e| e.path()) 166 + .filter(|p| p.extension().map_or(false, |ext| ext == "json")) 167 + .collect(); 168 + fixtures.sort(); 168 169 169 - for path in fixtures { 170 + for path in &fixtures { 170 171 let data = std::fs::read_to_string(path).unwrap(); 171 172 let entries: Vec<serde_json::Value> = serde_json::from_str(&data).unwrap(); 172 173 ··· 199 200 200 201 assert!( 201 202 !valid_keys.is_empty(), 202 - "{path}/{cid}: no keys to verify against" 203 + "{}/{cid}: no keys to verify against", 204 + path.display() 203 205 ); 204 206 205 207 let results = assure_valid_sig(&valid_keys, &sig, &data) 206 208 .expect("that we used the function correctly"); 207 209 for err in results.errors { 208 - println!("{path}/{cid}: {err}"); 210 + println!("{}/{cid}: {err}", path.display()); 209 211 } 210 212 if !results.valid { 211 - panic!("signature verification failed in {path}/{cid}"); 213 + panic!("signature verification failed in {}/{cid}", path.display()); 212 214 } 213 215 214 216 ops_by_cid.insert(cid, data);
+73 -14
src/plc_fjall.rs
··· 4 4 }; 5 5 use anyhow::Context; 6 6 use data_encoding::BASE32_NOPAD; 7 - use fjall::{Database, Keyspace, KeyspaceCreateOptions, PersistMode, config::BlockSizePolicy}; 7 + use fjall::{ 8 + Database, Keyspace, KeyspaceCreateOptions, PersistMode, 9 + config::{BlockSizePolicy, RestartIntervalPolicy}, 10 + }; 8 11 use ordered_varint::Variable; 9 12 use serde::{Deserialize, Serialize}; 10 13 use std::collections::BTreeMap; ··· 360 363 // STABILITY: never reorder variants, only append. 361 364 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, bitcode::Encode, bitcode::Decode)] 362 365 enum Handle { 363 - Other(String), // 0 366 + Other(String), // 0 364 367 BskySocial(String), // 1 365 368 } 366 369 ··· 958 961 .max_memtable_size(mb(192)) 959 962 // this wont compress terribly well since its a bunch of CIDs and signatures and did:keys 960 963 // and we want to keep reads fast since we'll be reading a lot... 961 - .data_block_size_policy(BlockSizePolicy::new([kb(4), kb(8), kb(32)])) 964 + .data_block_size_policy(BlockSizePolicy::new([kb(8), kb(32), kb(64), kb(128)])) 962 965 // this has no downsides, since the only point reads that might miss we do is on by_did 963 966 .expect_point_read_hits(true) 964 967 })?; ··· 968 971 // this isn't gonna compress well anyway, since its just keys (did + seq) 969 972 // and dids dont have many operations in the first place, so we can use small blocks 970 973 .data_block_size_policy(BlockSizePolicy::all(kb(2))) 974 + // lower restart interval since plcs are hashes, and dids dont have 975 + // many ops in themselves 976 + .data_block_restart_interval_policy(RestartIntervalPolicy::all(8)) 971 977 })?; 972 978 Ok(Self { 973 979 inner: Arc::new(FjallInner { ··· 1063 1069 .map(|e| e.to_string()) 1064 1070 .collect::<Vec<_>>() 1065 1071 .join("\n"); 1066 - log::warn!("invalid op {} {}:\n{msg}", op.did, op.cid); 1072 + log::warn!("dropping op {} {} (invalid sig):\n{msg}", op.did, op.cid); 1067 1073 return Ok(0); 1068 1074 } 1069 1075 } 1070 1076 Err(e) => { 1071 - log::warn!("invalid op {} {}: {e}", op.did, op.cid); 1077 + log::warn!("dropping op {} {}: {e}", op.did, op.cid); 1072 1078 return Ok(0); 1073 1079 } 1074 1080 } ··· 1759 1765 1760 1766 #[test] 1761 1767 fn stored_op_fixture_roundtrip() { 1762 - let fixtures = [ 1763 - "tests/fixtures/log_bskyapp.json", 1764 - "tests/fixtures/log_legacy_dholms.json", 1765 - "tests/fixtures/log_nullification.json", 1766 - "tests/fixtures/log_tombstone.json", 1767 - ]; 1768 + let mut fixtures: Vec<_> = std::fs::read_dir("tests/fixtures") 1769 + .unwrap() 1770 + .filter_map(|e| e.ok()) 1771 + .map(|e| e.path()) 1772 + .filter(|p| p.extension().map_or(false, |ext| ext == "json")) 1773 + .collect(); 1774 + fixtures.sort(); 1768 1775 1769 1776 let mut total_json_size = 0; 1770 1777 let mut total_packed_size = 0; 1771 1778 1772 - for path in fixtures { 1779 + for path in &fixtures { 1773 1780 let data = std::fs::read_to_string(path).unwrap(); 1774 1781 let entries: Vec<serde_json::Value> = serde_json::from_str(&data).unwrap(); 1775 1782 ··· 1777 1784 let op = &entry["operation"]; 1778 1785 let (stored, errors) = StoredOp::from_json_value(op.clone()); 1779 1786 if !errors.is_empty() { 1780 - let mut msg = format!("failed to parse op in {path}:\n"); 1787 + let mut msg = format!("failed to parse op in {}:\n", path.display()); 1781 1788 for e in errors { 1782 1789 msg.push_str(&format!(" - {e:?}\n")); 1783 1790 } ··· 1790 1797 let unpacked: StoredOp = bitcode::decode::<StoredOp>(&packed).unwrap(); 1791 1798 1792 1799 let reconstructed = unpacked.to_json_value(); 1793 - assert_eq!(*op, reconstructed, "roundtrip mismatch in {path}"); 1800 + assert_eq!( 1801 + *op, 1802 + reconstructed, 1803 + "roundtrip mismatch in {}", 1804 + path.display() 1805 + ); 1794 1806 1795 1807 total_json_size += serde_json::to_vec(op).unwrap().len(); 1796 1808 total_packed_size += packed.len(); ··· 1803 1815 total_packed_size, 1804 1816 total_json_size as isize - total_packed_size as isize 1805 1817 ); 1818 + } 1819 + 1820 + #[test] 1821 + fn stored_op_fixture_sig_roundtrip() { 1822 + let mut fixtures: Vec<_> = std::fs::read_dir("tests/fixtures") 1823 + .unwrap() 1824 + .filter_map(|e| e.ok()) 1825 + .map(|e| e.path()) 1826 + .filter(|p| p.extension().map_or(false, |ext| ext == "json")) 1827 + .collect(); 1828 + fixtures.sort(); 1829 + 1830 + for path in &fixtures { 1831 + let data = std::fs::read_to_string(path).unwrap(); 1832 + let entries: Vec<serde_json::Value> = serde_json::from_str(&data).unwrap(); 1833 + 1834 + // build a cid -> StoredOp map so we can look up prev ops 1835 + let mut by_cid: std::collections::HashMap<String, StoredOp> = 1836 + std::collections::HashMap::new(); 1837 + 1838 + for entry in &entries { 1839 + let cid = entry["cid"].as_str().unwrap().to_string(); 1840 + let op_json = entry["operation"].clone(); 1841 + 1842 + let (stored, errors) = StoredOp::from_json_value(op_json); 1843 + assert!( 1844 + errors.is_empty(), 1845 + "{} {cid}: parse errors: {errors:?}", 1846 + path.display() 1847 + ); 1848 + let stored = stored.unwrap(); 1849 + 1850 + let prev = stored.prev.as_ref().map(|c| c.to_string()); 1851 + let prev_stored = prev.as_deref().and_then(|c| by_cid.get(c)); 1852 + 1853 + let results = verify_op_sig(&stored, prev_stored) 1854 + .unwrap_or_else(|e| panic!("{} {cid}: {e}", path.display())); 1855 + assert!( 1856 + results.valid, 1857 + "{} {cid}: sig invalid after StoredOp roundtrip: {:?}", 1858 + path.display(), 1859 + results.errors 1860 + ); 1861 + 1862 + by_cid.insert(cid, stored); 1863 + } 1864 + } 1806 1865 } 1807 1866 }