Server tools to backfill, tail, mirror, and verify PLC logs
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 1884 lines 66 kB view raw
1use crate::{ 2 Dt, InvalidOp, Op as CommonOp, 3 crypto::{AssuranceResults, DidKey, Signature, assure_valid_sig}, 4}; 5use anyhow::Context; 6use data_encoding::BASE32_NOPAD; 7use fjall::{ 8 Database, Keyspace, KeyspaceCreateOptions, PersistMode, 9 config::{BlockSizePolicy, RestartIntervalPolicy}, 10}; 11use ordered_varint::Variable; 12use serde::{Deserialize, Serialize}; 13use std::collections::BTreeMap; 14use std::fmt; 15use std::path::Path; 16use std::sync::Arc; 17use std::time::Instant; 18use tokio::sync::{Notify, futures::Notified, mpsc, oneshot}; 19 20const SEP: u8 = 0; 21 22fn seq_key(seq: u64) -> Vec<u8> { 23 seq.to_variable_vec().expect("that seq number encodes") 24} 25 26fn decode_seq_key(key: &[u8]) -> anyhow::Result<u64> { 27 u64::decode_variable(key).context("failed to decode seq key") 28} 29 30type IpldCid = cid::CidGeneric<64>; 31 32// 24 bytes -> 15 bytes 33fn encode_did(buf: &mut Vec<u8>, did: &str) -> anyhow::Result<usize> { 34 let input = did.trim_start_matches("did:plc:").to_uppercase(); 35 let len = BASE32_NOPAD 36 .decode_len(input.len()) 37 .map_err(|_| anyhow::anyhow!("failed to calculate decode len for {did}"))?; 38 39 let start = buf.len(); 40 buf.resize(start + len, 0); 41 42 BASE32_NOPAD 43 .decode_mut(input.as_bytes(), &mut buf[start..]) 44 .map_err(|_| anyhow::anyhow!("failed to encode did {did}")) 45} 46 47// 59 bytes -> 36 bytes 48fn decode_cid_str(s: &str) -> anyhow::Result<Vec<u8>> { 49 let cid = IpldCid::try_from(s)?; 50 let mut buf = Vec::new(); 51 cid.write_bytes(&mut buf) 52 .map_err(|e| anyhow::anyhow!("failed to encode cid {s}: {e}"))?; 53 Ok(buf) 54} 55 56fn decode_cid(bytes: &[u8]) -> anyhow::Result<String> { 57 IpldCid::try_from(bytes) 58 .map_err(|e| anyhow::anyhow!("failed to decode cid: {e}")) 59 .map(|cid| cid.to_string()) 60} 61 62fn decode_did(bytes: &[u8]) -> String { 63 let decoded = BASE32_NOPAD.encode(bytes).to_lowercase(); 64 format!("did:plc:{decoded}") 65} 66 67fn by_did_prefix(did: &str) -> anyhow::Result<Vec<u8>> { 68 let mut p = Vec::with_capacity(BASE32_NOPAD.decode_len(did.len())? + 1); 69 encode_did(&mut p, did)?; 70 p.push(SEP); 71 Ok(p) 72} 73 74/// by_did key: [15 bytes encoded did][SEP][seq varint] 75fn by_did_key(did: &str, seq: u64) -> anyhow::Result<Vec<u8>> { 76 let mut key = by_did_prefix(did)?; 77 seq.encode_variable(&mut key)?; 78 Ok(key) 79} 80 81/// CID string → binary CID bytes 82// STABILITY: never reorder variants, only append. 83#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, bitcode::Encode, bitcode::Decode)] 84struct PlcCid(#[serde(with = "serde_bytes")] Vec<u8>); 85 86impl PlcCid { 87 fn from_cid_str(s: &str) -> anyhow::Result<Self> { 88 let cid = IpldCid::try_from(s)?; 89 let mut buf = Vec::new(); 90 cid.write_bytes(&mut buf) 91 .map_err(|e| anyhow::anyhow!("failed to encode cid {s}: {e}"))?; 92 Ok(Self(buf)) 93 } 94} 95 96impl fmt::Display for PlcCid { 97 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 98 let cid = IpldCid::try_from(self.0.as_slice()).map_err(|_| fmt::Error)?; 99 write!(f, "{cid}") 100 } 101} 102 103// STABILITY: never reorder variants, only append. 104#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, bitcode::Encode, bitcode::Decode)] 105enum Aka { 106 Other(String), // 0 107 Bluesky(String), // 1 108 Atproto(String), // 2 109} 110 111impl Aka { 112 fn from_str(s: &str) -> Self { 113 if let Some(stripped) = s.strip_prefix("at://") { 114 if let Some(handle) = stripped.strip_suffix(".bsky.social") { 115 Self::Bluesky(handle.to_string()) 116 } else { 117 Self::Atproto(stripped.to_string()) 118 } 119 } else { 120 Self::Other(s.to_string()) 121 } 122 } 123} 124 125impl fmt::Display for Aka { 126 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 127 match self { 128 Self::Bluesky(h) => write!(f, "at://{h}.bsky.social"), 129 Self::Atproto(h) => write!(f, "at://{h}"), 130 Self::Other(s) => f.write_str(s), 131 } 132 } 133} 134 135// STABILITY: never reorder variants, only append. 136#[derive(Debug, Clone, Serialize, Deserialize, bitcode::Encode, bitcode::Decode)] 137#[serde(rename_all = "snake_case")] 138enum OpType { 139 Other(String), // 0 140 PlcOperation, // 1 141 Create, // 2 142 PlcTombstone, // 3 143} 144 145impl OpType { 146 fn from_str(s: &str) -> Self { 147 match s { 148 "plc_operation" => Self::PlcOperation, 149 "create" => Self::Create, 150 "plc_tombstone" => Self::PlcTombstone, 151 other => Self::Other(other.to_string()), 152 } 153 } 154 155 fn as_str(&self) -> &str { 156 match self { 157 Self::PlcOperation => "plc_operation", 158 Self::Create => "create", 159 Self::PlcTombstone => "plc_tombstone", 160 Self::Other(s) => s, 161 } 162 } 163} 164 165#[derive(Debug, Clone, Copy, PartialEq, Eq)] 166enum StoredOpField { 167 Type, 168 Sig, 169 Prev, 170 RotationKeys, 171 VerificationMethods, 172 AlsoKnownAs, 173 Services, 174 SigningKey, 175 RecoveryKey, 176 Handle, 177 Service, 178} 179 180impl StoredOpField { 181 fn as_str(&self) -> &'static str { 182 match self { 183 Self::Type => "type", 184 Self::Sig => "sig", 185 Self::Prev => "prev", 186 Self::RotationKeys => "rotationKeys", 187 Self::VerificationMethods => "verificationMethods", 188 Self::AlsoKnownAs => "alsoKnownAs", 189 Self::Services => "services", 190 Self::SigningKey => "signingKey", 191 Self::RecoveryKey => "recoveryKey", 192 Self::Handle => "handle", 193 Self::Service => "service", 194 } 195 } 196} 197 198impl AsRef<str> for StoredOpField { 199 fn as_ref(&self) -> &str { 200 self.as_str() 201 } 202} 203 204impl std::ops::Deref for StoredOpField { 205 type Target = str; 206 fn deref(&self) -> &Self::Target { 207 self.as_str() 208 } 209} 210 211impl fmt::Display for StoredOpField { 212 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 213 f.write_str(self.as_str()) 214 } 215} 216 217#[derive(Debug, thiserror::Error)] 218enum StoredOpError { 219 #[error("operation is not an object")] 220 NotAnObject, 221 #[error("missing required field: {0}")] 222 MissingField(StoredOpField), 223 #[error("invalid field {0}: {1}")] 224 InvalidField(StoredOpField, #[source] anyhow::Error), 225 #[error("type mismatch for field {0}: expected {1}")] 226 TypeMismatch(StoredOpField, &'static str), 227} 228 229// STABILITY: never reorder variants, only append. 230#[derive( 231 Debug, 232 Clone, 233 Serialize, 234 Deserialize, 235 PartialEq, 236 Eq, 237 PartialOrd, 238 Ord, 239 bitcode::Encode, 240 bitcode::Decode, 241)] 242enum VerificationMethodKey { 243 Other(String), // 0 244 Atproto, // 1 245} 246 247impl VerificationMethodKey { 248 fn from_str(s: &str) -> Self { 249 match s { 250 "atproto" => Self::Atproto, 251 _ => Self::Other(s.to_string()), 252 } 253 } 254 255 fn as_str(&self) -> &str { 256 match self { 257 Self::Atproto => "atproto", 258 Self::Other(s) => s, 259 } 260 } 261} 262 263impl fmt::Display for VerificationMethodKey { 264 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 265 f.write_str(self.as_str()) 266 } 267} 268 269// STABILITY: never reorder variants, only append. 270#[derive( 271 Debug, 272 Clone, 273 Serialize, 274 Deserialize, 275 PartialEq, 276 Eq, 277 PartialOrd, 278 Ord, 279 bitcode::Encode, 280 bitcode::Decode, 281)] 282enum ServiceKey { 283 Other(String), // 0 284 AtprotoPds, // 1 285} 286 287impl ServiceKey { 288 fn from_str(s: &str) -> Self { 289 match s { 290 "atproto_pds" => Self::AtprotoPds, 291 _ => Self::Other(s.to_string()), 292 } 293 } 294 295 fn as_str(&self) -> &str { 296 match self { 297 Self::AtprotoPds => "atproto_pds", 298 Self::Other(s) => s, 299 } 300 } 301} 302 303impl fmt::Display for ServiceKey { 304 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 305 f.write_str(self.as_str()) 306 } 307} 308 309// STABILITY: never reorder variants, only append. 310#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, bitcode::Encode, bitcode::Decode)] 311enum ServiceType { 312 Other(String), // 0 313 AtprotoPersonalDataServer, // 1 314} 315 316impl ServiceType { 317 fn from_str(s: &str) -> Self { 318 match s { 319 "AtprotoPersonalDataServer" => Self::AtprotoPersonalDataServer, 320 _ => Self::Other(s.to_string()), 321 } 322 } 323 324 fn as_str(&self) -> &str { 325 match self { 326 Self::AtprotoPersonalDataServer => "AtprotoPersonalDataServer", 327 Self::Other(s) => s, 328 } 329 } 330} 331 332// STABILITY: never reorder variants, only append. 333#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, bitcode::Encode, bitcode::Decode)] 334enum ServiceEndpoint { 335 Other(String), // 0 336 BlueskyPds(String), // 1 337 BlueskySocial, // 2 338} 339 340impl ServiceEndpoint { 341 fn from_str(s: &str) -> Self { 342 if let Some(host) = s 343 .strip_prefix("https://") 344 .and_then(|h| h.strip_suffix(".host.bsky.network")) 345 { 346 Self::BlueskyPds(host.to_string()) 347 } else if s == "https://bsky.social" { 348 Self::BlueskySocial 349 } else { 350 Self::Other(s.to_string()) 351 } 352 } 353 354 fn as_string(&self) -> String { 355 match self { 356 Self::BlueskyPds(h) => format!("https://{h}.host.bsky.network"), 357 Self::BlueskySocial => "https://bsky.social".to_string(), 358 Self::Other(s) => s.clone(), 359 } 360 } 361} 362 363// STABILITY: never reorder variants, only append. 364#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, bitcode::Encode, bitcode::Decode)] 365enum Handle { 366 Other(String), // 0 367 BskySocial(String), // 1 368} 369 370impl Handle { 371 fn from_str(s: &str) -> Self { 372 if let Some(handle) = s.strip_suffix(".bsky.social") { 373 Self::BskySocial(handle.to_string()) 374 } else { 375 Self::Other(s.to_string()) 376 } 377 } 378 379 fn as_string(&self) -> String { 380 match self { 381 Self::BskySocial(h) => format!("{h}.bsky.social"), 382 Self::Other(s) => s.clone(), 383 } 384 } 385} 386 387#[derive(Debug, Clone, Serialize, Deserialize, bitcode::Encode, bitcode::Decode)] 388struct StoredService { 389 r#type: ServiceType, 390 endpoint: ServiceEndpoint, 391} 392 393#[derive(Debug, Clone, Serialize, Deserialize, bitcode::Encode, bitcode::Decode)] 394struct StoredOp { 395 op_type: OpType, 396 sig: Signature, 397 prev: Option<PlcCid>, 398 399 rotation_keys: Option<Vec<DidKey>>, 400 verification_methods: Option<BTreeMap<VerificationMethodKey, DidKey>>, 401 also_known_as: Option<Vec<Aka>>, 402 services: Option<BTreeMap<ServiceKey, StoredService>>, 403 404 // legacy create fields 405 signing_key: Option<DidKey>, 406 recovery_key: Option<DidKey>, 407 handle: Option<Handle>, 408 service: Option<ServiceEndpoint>, 409 410 // msgpack-encoded BTreeMap<String, serde_json::Value>. 411 // Vec<u8> is used because bitcode cannot handle serde_json::Value directly. 412 // empty vec when there are no unknown fields (the common case). 413 #[serde(skip)] 414 unknown_packed: Vec<u8>, 415} 416 417impl StoredOp { 418 fn get_keys(&self) -> Vec<&DidKey> { 419 let mut keys = Vec::with_capacity(self.rotation_keys.as_ref().map_or(2, |keys| keys.len())); 420 if let Some(rot_keys) = self.rotation_keys.as_ref() { 421 keys.extend(rot_keys.iter()); 422 } else { 423 // legacy genesis op 424 if let Some(recovery_key) = self.recovery_key.as_ref() { 425 keys.push(recovery_key); 426 } 427 if let Some(signing_key) = self.signing_key.as_ref() { 428 keys.push(signing_key); 429 } 430 } 431 keys 432 } 433 434 fn unknown(&self) -> BTreeMap<String, serde_json::Value> { 435 if self.unknown_packed.is_empty() { 436 return BTreeMap::new(); 437 } 438 rmp_serde::from_slice(&self.unknown_packed).unwrap_or_default() 439 } 440 441 fn pack_unknown(unknown: BTreeMap<String, serde_json::Value>) -> Vec<u8> { 442 if unknown.is_empty() { 443 return Vec::new(); 444 } 445 rmp_serde::to_vec(&unknown).expect("unknown fields are serializable") 446 } 447 fn from_json_value(v: serde_json::Value) -> (Option<Self>, Vec<StoredOpError>) { 448 let serde_json::Value::Object(mut obj) = v else { 449 return (None, vec![StoredOpError::NotAnObject]); 450 }; 451 452 let mut errors = Vec::new(); 453 let mut unknown = BTreeMap::new(); 454 455 let op_type = match obj.remove(&*StoredOpField::Type) { 456 Some(serde_json::Value::String(s)) => OpType::from_str(&s), 457 Some(v) => { 458 errors.push(StoredOpError::TypeMismatch(StoredOpField::Type, "string")); 459 unknown.insert(StoredOpField::Type.to_string(), v); 460 OpType::Other(String::new()) 461 } 462 Option::None => { 463 errors.push(StoredOpError::MissingField(StoredOpField::Type)); 464 OpType::Other(String::new()) 465 } 466 }; 467 468 let sig = match obj.remove(&*StoredOpField::Sig) { 469 Some(serde_json::Value::String(s)) => match Signature::from_base64url(&s) { 470 Ok(sig) => sig, 471 Err(e) => { 472 errors.push(StoredOpError::InvalidField(StoredOpField::Sig, e)); 473 unknown.insert(StoredOpField::Sig.to_string(), serde_json::Value::String(s)); 474 Signature(Vec::new()) 475 } 476 }, 477 Some(v) => { 478 errors.push(StoredOpError::TypeMismatch(StoredOpField::Sig, "string")); 479 unknown.insert(StoredOpField::Sig.to_string(), v); 480 Signature(Vec::new()) 481 } 482 Option::None => { 483 errors.push(StoredOpError::MissingField(StoredOpField::Sig)); 484 Signature(Vec::new()) 485 } 486 }; 487 488 let prev = match obj.remove(&*StoredOpField::Prev) { 489 Some(serde_json::Value::Null) | Option::None => Option::None, 490 Some(serde_json::Value::String(s)) => match PlcCid::from_cid_str(&s) { 491 Ok(p) => Some(p), 492 Err(e) => { 493 errors.push(StoredOpError::InvalidField(StoredOpField::Prev, e)); 494 unknown.insert( 495 StoredOpField::Prev.to_string(), 496 serde_json::Value::String(s), 497 ); 498 Option::None 499 } 500 }, 501 Some(v) => { 502 errors.push(StoredOpError::TypeMismatch(StoredOpField::Prev, "string")); 503 unknown.insert(StoredOpField::Prev.to_string(), v); 504 Option::None 505 } 506 }; 507 508 let rotation_keys = match obj.remove(&*StoredOpField::RotationKeys) { 509 Some(serde_json::Value::Array(arr)) => { 510 let mut keys = Vec::with_capacity(arr.len()); 511 let mut failed = false; 512 for v in &arr { 513 match v { 514 serde_json::Value::String(s) => match DidKey::from_did_key(s) { 515 Ok(k) => keys.push(k), 516 Err(e) => { 517 errors.push(StoredOpError::InvalidField( 518 StoredOpField::RotationKeys, 519 e, 520 )); 521 failed = true; 522 break; 523 } 524 }, 525 _ => { 526 errors.push(StoredOpError::TypeMismatch( 527 StoredOpField::RotationKeys, 528 "string inside array", 529 )); 530 failed = true; 531 break; 532 } 533 } 534 } 535 if failed { 536 unknown.insert( 537 StoredOpField::RotationKeys.to_string(), 538 serde_json::Value::Array(arr), 539 ); 540 Option::None 541 } else { 542 Some(keys) 543 } 544 } 545 Some(v) => { 546 errors.push(StoredOpError::TypeMismatch( 547 StoredOpField::RotationKeys, 548 "array", 549 )); 550 unknown.insert(StoredOpField::RotationKeys.to_string(), v); 551 Option::None 552 } 553 Option::None => Option::None, 554 }; 555 556 let verification_methods = match obj.remove(&*StoredOpField::VerificationMethods) { 557 Some(serde_json::Value::Object(map)) => { 558 let mut methods = BTreeMap::new(); 559 let mut failed = false; 560 for (k, v) in &map { 561 match v { 562 serde_json::Value::String(s) => match DidKey::from_did_key(s) { 563 Ok(key) => { 564 methods.insert(VerificationMethodKey::from_str(k), key); 565 } 566 Err(e) => { 567 errors.push(StoredOpError::InvalidField( 568 StoredOpField::VerificationMethods, 569 e, 570 )); 571 failed = true; 572 break; 573 } 574 }, 575 _ => { 576 errors.push(StoredOpError::TypeMismatch( 577 StoredOpField::VerificationMethods, 578 "string value in object", 579 )); 580 failed = true; 581 break; 582 } 583 } 584 } 585 if failed { 586 unknown.insert( 587 StoredOpField::VerificationMethods.to_string(), 588 serde_json::Value::Object(map), 589 ); 590 Option::None 591 } else { 592 Some(methods) 593 } 594 } 595 Some(v) => { 596 errors.push(StoredOpError::TypeMismatch( 597 StoredOpField::VerificationMethods, 598 "object", 599 )); 600 unknown.insert(StoredOpField::VerificationMethods.to_string(), v); 601 Option::None 602 } 603 Option::None => Option::None, 604 }; 605 606 let also_known_as = match obj.remove(&*StoredOpField::AlsoKnownAs) { 607 Some(serde_json::Value::Array(arr)) => { 608 let mut akas = Vec::with_capacity(arr.len()); 609 let mut failed = false; 610 for v in &arr { 611 match v { 612 serde_json::Value::String(s) => akas.push(Aka::from_str(s)), 613 _ => { 614 errors.push(StoredOpError::TypeMismatch( 615 StoredOpField::AlsoKnownAs, 616 "string inside array", 617 )); 618 failed = true; 619 break; 620 } 621 } 622 } 623 if failed { 624 unknown.insert( 625 StoredOpField::AlsoKnownAs.to_string(), 626 serde_json::Value::Array(arr), 627 ); 628 Option::None 629 } else { 630 Some(akas) 631 } 632 } 633 Some(v) => { 634 errors.push(StoredOpError::TypeMismatch( 635 StoredOpField::AlsoKnownAs, 636 "array", 637 )); 638 unknown.insert(StoredOpField::AlsoKnownAs.to_string(), v); 639 Option::None 640 } 641 Option::None => Option::None, 642 }; 643 644 let services = match obj.remove(&*StoredOpField::Services) { 645 Some(serde_json::Value::Object(map)) => { 646 let mut svcs = BTreeMap::new(); 647 let mut failed = false; 648 for (k, v) in &map { 649 if let (Some(r#type), Some(endpoint)) = ( 650 v.get("type").and_then(|t| t.as_str()), 651 v.get("endpoint").and_then(|e| e.as_str()), 652 ) { 653 let svc = StoredService { 654 r#type: ServiceType::from_str(r#type), 655 endpoint: ServiceEndpoint::from_str(endpoint), 656 }; 657 svcs.insert(ServiceKey::from_str(k), svc); 658 } else { 659 errors.push(StoredOpError::TypeMismatch( 660 StoredOpField::Services, 661 "missing or invalid type/endpoint in service object", 662 )); 663 failed = true; 664 break; 665 } 666 } 667 if failed { 668 unknown.insert( 669 StoredOpField::Services.to_string(), 670 serde_json::Value::Object(map), 671 ); 672 Option::None 673 } else { 674 Some(svcs) 675 } 676 } 677 Some(v) => { 678 errors.push(StoredOpError::TypeMismatch( 679 StoredOpField::Services, 680 "object", 681 )); 682 unknown.insert(StoredOpField::Services.to_string(), v); 683 Option::None 684 } 685 Option::None => Option::None, 686 }; 687 688 let signing_key = match obj.remove(&*StoredOpField::SigningKey) { 689 Some(serde_json::Value::String(s)) => match DidKey::from_did_key(&s) { 690 Ok(key) => Some(key), 691 Err(e) => { 692 errors.push(StoredOpError::InvalidField(StoredOpField::SigningKey, e)); 693 unknown.insert( 694 StoredOpField::SigningKey.to_string(), 695 serde_json::Value::String(s), 696 ); 697 Option::None 698 } 699 }, 700 Some(v) => { 701 errors.push(StoredOpError::TypeMismatch( 702 StoredOpField::SigningKey, 703 "string", 704 )); 705 unknown.insert(StoredOpField::SigningKey.to_string(), v); 706 Option::None 707 } 708 Option::None => Option::None, 709 }; 710 711 let recovery_key = match obj.remove(&*StoredOpField::RecoveryKey) { 712 Some(serde_json::Value::String(s)) => match DidKey::from_did_key(&s) { 713 Ok(key) => Some(key), 714 Err(e) => { 715 errors.push(StoredOpError::InvalidField(StoredOpField::RecoveryKey, e)); 716 unknown.insert( 717 StoredOpField::RecoveryKey.to_string(), 718 serde_json::Value::String(s), 719 ); 720 Option::None 721 } 722 }, 723 Some(v) => { 724 errors.push(StoredOpError::TypeMismatch( 725 StoredOpField::RecoveryKey, 726 "string", 727 )); 728 unknown.insert(StoredOpField::RecoveryKey.to_string(), v); 729 Option::None 730 } 731 Option::None => Option::None, 732 }; 733 734 let handle = match obj.remove(&*StoredOpField::Handle) { 735 Some(serde_json::Value::String(s)) => Some(Handle::from_str(&s)), 736 Some(v) => { 737 errors.push(StoredOpError::TypeMismatch(StoredOpField::Handle, "string")); 738 unknown.insert(StoredOpField::Handle.to_string(), v); 739 Option::None 740 } 741 Option::None => Option::None, 742 }; 743 744 let service = match obj.remove(&*StoredOpField::Service) { 745 Some(serde_json::Value::String(s)) => Some(ServiceEndpoint::from_str(&s)), 746 Some(v) => { 747 errors.push(StoredOpError::TypeMismatch( 748 StoredOpField::Service, 749 "string", 750 )); 751 unknown.insert(StoredOpField::Service.to_string(), v); 752 Option::None 753 } 754 Option::None => Option::None, 755 }; 756 757 for (k, v) in obj { 758 unknown.insert(k, v); 759 } 760 761 ( 762 Some(Self { 763 op_type, 764 sig, 765 prev, 766 rotation_keys, 767 verification_methods, 768 also_known_as, 769 services, 770 signing_key, 771 recovery_key, 772 handle, 773 service, 774 unknown_packed: Self::pack_unknown(unknown), 775 }), 776 errors, 777 ) 778 } 779 780 fn to_json_value(&self) -> serde_json::Value { 781 let mut map = serde_json::Map::new(); 782 783 map.insert((*StoredOpField::Type).into(), self.op_type.as_str().into()); 784 map.insert((*StoredOpField::Sig).into(), self.sig.to_string().into()); 785 map.insert( 786 (*StoredOpField::Prev).into(), 787 self.prev 788 .as_ref() 789 .map(|c| serde_json::Value::String(c.to_string())) 790 .unwrap_or(serde_json::Value::Null), 791 ); 792 793 if let Some(keys) = &self.rotation_keys { 794 map.insert( 795 (*StoredOpField::RotationKeys).into(), 796 keys.iter() 797 .map(|k| serde_json::Value::String(k.to_string())) 798 .collect::<Vec<_>>() 799 .into(), 800 ); 801 } 802 803 if let Some(methods) = &self.verification_methods { 804 let obj: serde_json::Map<String, serde_json::Value> = methods 805 .iter() 806 .map(|(k, v)| { 807 ( 808 k.as_str().to_string(), 809 serde_json::Value::String(v.to_string()), 810 ) 811 }) 812 .collect(); 813 map.insert((*StoredOpField::VerificationMethods).into(), obj.into()); 814 } 815 816 if let Some(aka) = &self.also_known_as { 817 map.insert( 818 (*StoredOpField::AlsoKnownAs).into(), 819 aka.iter() 820 .map(|h| serde_json::Value::String(h.to_string())) 821 .collect::<Vec<_>>() 822 .into(), 823 ); 824 } 825 826 if let Some(services) = &self.services { 827 let obj: serde_json::Map<String, serde_json::Value> = services 828 .iter() 829 .map(|(k, svc)| { 830 ( 831 k.as_str().to_string(), 832 serde_json::json!({ 833 "type": svc.r#type.as_str(), 834 "endpoint": svc.endpoint.as_string(), 835 }), 836 ) 837 }) 838 .collect(); 839 map.insert((*StoredOpField::Services).into(), obj.into()); 840 } 841 842 // legacy create fields 843 if let Some(key) = &self.signing_key { 844 map.insert((*StoredOpField::SigningKey).into(), key.to_string().into()); 845 } 846 if let Some(key) = &self.recovery_key { 847 map.insert((*StoredOpField::RecoveryKey).into(), key.to_string().into()); 848 } 849 if let Some(handle) = &self.handle { 850 map.insert((*StoredOpField::Handle).into(), handle.as_string().into()); 851 } 852 if let Some(service) = &self.service { 853 map.insert((*StoredOpField::Service).into(), service.as_string().into()); 854 } 855 856 for (k, v) in self.unknown() { 857 map.insert(k, v); 858 } 859 860 serde_json::Value::Object(map) 861 } 862} 863 864fn verify_op_sig(op: &StoredOp, prev: Option<&StoredOp>) -> anyhow::Result<AssuranceResults> { 865 let keys: Vec<&DidKey> = match &op.prev { 866 None => op.get_keys(), 867 Some(_) => match prev { 868 None => anyhow::bail!("prev cid exists but the op for that cid is missing"), 869 Some(p) => p.get_keys(), 870 }, 871 }; 872 873 if keys.is_empty() { 874 anyhow::bail!("no keys found for genesis op or prev op"); 875 } 876 877 let data = { 878 let serde_json::Value::Object(mut data) = op.to_json_value() else { 879 unreachable!("we know op is valid, because it comes from StoredOp") 880 }; 881 data.remove("sig"); 882 serde_json::Value::Object(data) 883 }; 884 885 let results = assure_valid_sig(keys, &op.sig, &data) 886 .expect("that our op is an object and we removed sig field"); 887 Ok(results) 888} 889 890// stored alongside the seq key in the ops keyspace 891// cid and created_at are in the value (not the key) in the new layout 892#[derive(Debug, Deserialize, Serialize, bitcode::Encode, bitcode::Decode)] 893#[serde(rename_all = "camelCase")] 894struct DbOp { 895 #[serde(with = "serde_bytes")] 896 pub did: Vec<u8>, 897 #[serde(with = "serde_bytes")] 898 pub cid: Vec<u8>, 899 pub created_at: u64, 900 pub nullified: bool, 901 pub operation: StoredOp, 902} 903 904// we have our own Op struct for fjall since we dont want to have to convert Value back to RawValue 905#[derive(Debug, Serialize, Deserialize, Clone)] 906pub struct Op { 907 pub seq: u64, 908 pub did: String, 909 pub cid: String, 910 #[serde(rename = "createdAt")] 911 pub created_at: Dt, 912 pub nullified: bool, 913 pub operation: serde_json::Value, 914} 915 916impl Op { 917 // todo: we should probably just have this in Op tbh as a `r#type: SequencedOpType` or something 918 /// adds the `type` field to the op 919 pub fn to_sequenced_json(&self) -> serde_json::Value { 920 let mut val = serde_json::to_value(self).expect("Op is serializable"); 921 if let serde_json::Value::Object(ref mut map) = val { 922 map.insert("type".to_string(), "sequenced_op".into()); 923 } 924 val 925 } 926} 927 928#[derive(Clone)] 929pub struct FjallDb { 930 inner: Arc<FjallInner>, 931} 932 933struct FjallInner { 934 db: Database, 935 /// primary keyspace: seq (varint) -> DbOp 936 ops: Keyspace, 937 /// secondary index: [encoded_did][SEP][seq_varint] -> [] 938 by_did: Keyspace, 939 notify_stream: Notify, 940} 941 942impl FjallDb { 943 pub fn open(path: impl AsRef<Path>) -> fjall::Result<Self> { 944 const fn kb(kb: u32) -> u32 { 945 kb * 1_024 946 } 947 const fn mb(mb: u32) -> u64 { 948 kb(mb) as u64 * 1_024 949 } 950 951 let db = Database::builder(path) 952 // 32mb is too low we can afford more 953 // this should be configurable though! 954 .cache_size(mb(256)) 955 .manual_journal_persist(true) 956 .open()?; 957 let opts = KeyspaceCreateOptions::default; 958 let ops = db.keyspace("ops", || { 959 opts() 960 // this is mainly for when backfilling 961 .max_memtable_size(mb(192)) 962 // this wont compress terribly well since its a bunch of CIDs and signatures and did:keys 963 // and we want to keep reads fast since we'll be reading a lot... 964 .data_block_size_policy(BlockSizePolicy::new([kb(8), kb(32), kb(64), kb(128)])) 965 // this has no downsides, since the only point reads that might miss we do is on by_did 966 .expect_point_read_hits(true) 967 })?; 968 let by_did = db.keyspace("by_did", || { 969 opts() 970 .max_memtable_size(mb(64)) 971 // this isn't gonna compress well anyway, since its just keys (did + seq) 972 // and dids dont have many operations in the first place, so we can use small blocks 973 .data_block_size_policy(BlockSizePolicy::all(kb(2))) 974 // lower restart interval since plcs are hashes, and dids dont have 975 // many ops in themselves 976 .data_block_restart_interval_policy(RestartIntervalPolicy::all(8)) 977 })?; 978 Ok(Self { 979 inner: Arc::new(FjallInner { 980 db, 981 ops, 982 by_did, 983 notify_stream: Notify::new(), 984 }), 985 }) 986 } 987 988 pub fn clear(&self) -> fjall::Result<()> { 989 self.inner.ops.clear()?; 990 self.inner.by_did.clear()?; 991 Ok(()) 992 } 993 994 pub fn persist(&self, mode: PersistMode) -> fjall::Result<()> { 995 self.inner.db.persist(mode) 996 } 997 998 pub fn compact(&self) -> fjall::Result<()> { 999 self.inner.ops.major_compact()?; 1000 self.inner.by_did.major_compact()?; 1001 Ok(()) 1002 } 1003 1004 pub fn subscribe(&self) -> Notified<'_> { 1005 self.inner.notify_stream.notified() 1006 } 1007 1008 /// Returns `(seq, created_at)` for the last stored op, or `None` if empty. 1009 pub fn get_latest(&self) -> anyhow::Result<Option<(u64, Dt)>> { 1010 let Some(guard) = self.inner.ops.last_key_value() else { 1011 return Ok(None); 1012 }; 1013 let (key, value) = guard 1014 .into_inner() 1015 .map_err(|e| anyhow::anyhow!("fjall read error: {e}"))?; 1016 let seq = decode_seq_key(&key)?; 1017 let db_op: DbOp = bitcode::decode::<DbOp>(&value)?; 1018 let dt = Dt::from_timestamp_micros(db_op.created_at as i64) 1019 .ok_or_else(|| anyhow::anyhow!("invalid created_at in last op"))?; 1020 Ok(Some((seq, dt))) 1021 } 1022 1023 pub fn insert_op<const VERIFY: bool>(&self, op: &CommonOp, seq: u64) -> anyhow::Result<usize> { 1024 let cid_bytes = decode_cid_str(&op.cid)?; 1025 1026 let op_json: serde_json::Value = serde_json::from_str(op.operation.get())?; 1027 let (stored, mut errors) = StoredOp::from_json_value(op_json); 1028 1029 let Some(operation) = stored else { 1030 return Err(errors.remove(0)).context("fatal operation parse error"); 1031 }; 1032 1033 for err in &errors { 1034 tracing::warn!("dropping op {} {} (seq {seq}) parse error: {err}", op.did, op.cid); 1035 } 1036 if !errors.is_empty() { 1037 // if parse failed but not fatal, we just dont store it 1038 return Ok(0); 1039 } 1040 1041 if VERIFY { 1042 let prev_op = operation 1043 .prev 1044 .as_ref() 1045 .map(|prev_cid| { 1046 // TODO: we should have a cid -> seq lookup eventually maybe? 1047 // this is probably fine though we will only iter over like 2 ops at most 1048 // or so, its there to handle nullified... 1049 // but a cid lookup would also help us avoid duplicate ops! 1050 self._ops_for_did(&op.did) 1051 .map(|ops| { 1052 ops.rev() 1053 .find(|r| r.as_ref().map_or(true, |(_, _, cid, _)| cid == prev_cid)) 1054 .transpose() 1055 }) 1056 .flatten() 1057 }) 1058 .transpose()? 1059 .flatten(); 1060 1061 let prev_stored = prev_op.as_ref().map(|(_, _, _, p)| &p.operation); 1062 1063 match verify_op_sig(&operation, prev_stored) { 1064 Ok(results) => { 1065 if !results.valid { 1066 let msg = results 1067 .errors 1068 .iter() 1069 .map(|e| e.to_string()) 1070 .collect::<Vec<_>>() 1071 .join("\n"); 1072 tracing::warn!("dropping op {} {} (seq {seq}) invalid sig:\n{msg}", op.did, op.cid); 1073 return Ok(0); 1074 } 1075 } 1076 Err(e) => { 1077 tracing::warn!("dropping op {} {} (seq {seq}): {e}", op.did, op.cid); 1078 return Ok(0); 1079 } 1080 } 1081 tracing::debug!("verified op {} {} (seq {seq})", op.did, op.cid); 1082 } 1083 1084 let db_op = DbOp { 1085 did: { 1086 let mut encoded_did = Vec::with_capacity(15); 1087 encode_did(&mut encoded_did, &op.did)?; 1088 encoded_did 1089 }, 1090 cid: cid_bytes, 1091 created_at: op.created_at.timestamp_micros() as u64, 1092 nullified: op.nullified, 1093 operation, 1094 }; 1095 1096 let seq_val = bitcode::encode(&db_op); 1097 let seq_key_bytes = seq_key(seq); 1098 let by_did_key_bytes = by_did_key(&op.did, seq)?; 1099 1100 let mut batch = self.inner.db.batch(); 1101 batch.insert(&self.inner.ops, seq_key_bytes, seq_val); 1102 batch.insert(&self.inner.by_did, by_did_key_bytes, &[]); 1103 batch.commit()?; 1104 1105 self.inner.notify_stream.notify_waiters(); 1106 1107 tracing::debug!("inserted op {} {} (seq {seq})", op.did, op.cid); 1108 Ok(1) 1109 } 1110 1111 pub(crate) fn get_op_at_or_after(&self, seq: u64) -> anyhow::Result<Option<Op>> { 1112 self.inner 1113 .ops 1114 .range(seq_key(seq)..) 1115 .next() 1116 .map(|v| { 1117 bitcode::decode::<DbOp>(&v.value()?) 1118 .context("failed to decode op") 1119 .map(|op| { 1120 Ok(Op { 1121 seq, 1122 did: decode_did(&op.did), 1123 cid: decode_cid(&op.cid)?, 1124 created_at: Dt::from_timestamp_micros(op.created_at as i64) 1125 .ok_or_else(|| anyhow::anyhow!("invalid created_at in op"))?, 1126 nullified: op.nullified, 1127 operation: op.operation.to_json_value(), 1128 }) 1129 }) 1130 .flatten() 1131 }) 1132 .transpose() 1133 } 1134 1135 /// Decode a `by_did` entry: extract the seq from the key suffix, then 1136 /// look up the full `DbOp` in the `ops` keyspace. 1137 fn decode_by_did_entry( 1138 &self, 1139 by_did_key_bytes: &[u8], 1140 prefix_len: usize, 1141 ) -> anyhow::Result<(u64, Dt, PlcCid, DbOp)> { 1142 let key_suffix = by_did_key_bytes 1143 .get(prefix_len..) 1144 .ok_or_else(|| anyhow::anyhow!("invalid by_did key {by_did_key_bytes:?}"))?; 1145 1146 let seq = 1147 u64::decode_variable(key_suffix).context("failed to decode seq from by_did key")?; 1148 1149 let value = self 1150 .inner 1151 .ops 1152 .get(seq_key(seq))? 1153 .ok_or_else(|| anyhow::anyhow!("op not found for seq {seq}"))?; 1154 1155 let op: DbOp = bitcode::decode::<DbOp>(&value)?; 1156 let ts = Dt::from_timestamp_micros(op.created_at as i64) 1157 .ok_or_else(|| anyhow::anyhow!("invalid created_at_micros {}", op.created_at))?; 1158 let cid = PlcCid(op.cid.clone()); 1159 1160 Ok((seq, ts, cid, op)) 1161 } 1162 1163 fn _ops_for_did( 1164 &self, 1165 did: &str, 1166 ) -> anyhow::Result<impl DoubleEndedIterator<Item = anyhow::Result<(u64, Dt, PlcCid, DbOp)>> + '_> 1167 { 1168 let prefix = by_did_prefix(did)?; 1169 1170 Ok(self.inner.by_did.prefix(&prefix).map(move |guard| { 1171 let (by_did_key, _) = guard 1172 .into_inner() 1173 .map_err(|e| anyhow::anyhow!("fjall read error: {e}"))?; 1174 self.decode_by_did_entry(&by_did_key, prefix.len()) 1175 })) 1176 } 1177 1178 pub fn ops_for_did( 1179 &self, 1180 did: &str, 1181 ) -> anyhow::Result<impl DoubleEndedIterator<Item = anyhow::Result<Op>> + '_> { 1182 Ok(self._ops_for_did(did)?.map(|res| { 1183 let (seq, ts, cid, op) = res?; 1184 let cid = decode_cid(&cid.0)?; 1185 let did = decode_did(&op.did); 1186 Ok(Op { 1187 seq, 1188 did, 1189 cid, 1190 created_at: ts, 1191 nullified: op.nullified, 1192 operation: op.operation.to_json_value(), 1193 }) 1194 })) 1195 } 1196 1197 pub fn export_ops( 1198 &self, 1199 range: impl std::ops::RangeBounds<u64>, 1200 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Op>> + '_> { 1201 use std::ops::Bound; 1202 let map_bound = |b: Bound<&u64>| -> Bound<Vec<u8>> { 1203 match b { 1204 Bound::Included(seq) => Bound::Included(seq_key(*seq)), 1205 Bound::Excluded(seq) => Bound::Excluded(seq_key(*seq)), 1206 Bound::Unbounded => Bound::Unbounded, 1207 } 1208 }; 1209 let range = (map_bound(range.start_bound()), map_bound(range.end_bound())); 1210 1211 Ok(self 1212 .inner 1213 .ops 1214 .range(range) 1215 .map(|item| -> anyhow::Result<Op> { 1216 let (key, value) = item 1217 .into_inner() 1218 .map_err(|e: fjall::Error| anyhow::anyhow!("fjall read error: {e}"))?; 1219 let seq = decode_seq_key(&key)?; 1220 let db_op: DbOp = bitcode::decode::<DbOp>(&value)?; 1221 let created_at = 1222 Dt::from_timestamp_micros(db_op.created_at as i64).ok_or_else(|| { 1223 anyhow::anyhow!("invalid created_at_micros {}", db_op.created_at) 1224 })?; 1225 let cid = decode_cid(&db_op.cid)?; 1226 let did = decode_did(&db_op.did); 1227 Ok(Op { 1228 seq, 1229 did, 1230 cid, 1231 created_at, 1232 nullified: db_op.nullified, 1233 operation: db_op.operation.to_json_value(), 1234 }) 1235 })) 1236 } 1237 1238 pub fn drop_op(&self, did_str: &str, _created_at: &Dt, _cid: &str) -> anyhow::Result<()> { 1239 // scan the by_did index for this DID and find the op that matches 1240 // (in practice drop_op is rare so a scan is fine) 1241 let prefix = by_did_prefix(did_str)?; 1242 let mut found_seq: Option<u64> = None; 1243 let mut found_by_did_key: Option<Vec<u8>> = None; 1244 1245 for guard in self.inner.by_did.prefix(&prefix) { 1246 let (key, _) = guard 1247 .into_inner() 1248 .map_err(|e| anyhow::anyhow!("fjall read error: {e}"))?; 1249 let suffix = &key[prefix.len()..]; 1250 let seq = u64::decode_variable(suffix).context("decode seq in drop_op")?; 1251 found_seq = Some(seq); 1252 found_by_did_key = Some(key.to_vec()); 1253 // if there were multiple ops for this DID we'd need to match by cid, 1254 // but for now take the last matched (they're in seq order) 1255 } 1256 1257 let (seq, by_did_key_bytes) = match (found_seq, found_by_did_key) { 1258 (Some(s), Some(k)) => (s, k), 1259 _ => { 1260 tracing::warn!("drop_op: by_did entry not found for {did_str}"); 1261 return Ok(()); 1262 } 1263 }; 1264 1265 let mut batch = self.inner.db.batch(); 1266 batch.remove(&self.inner.ops, seq_key(seq)); 1267 batch.remove(&self.inner.by_did, by_did_key_bytes); 1268 batch.commit()?; 1269 1270 Ok(()) 1271 } 1272 1273 pub fn audit(&self, invalid_ops_tx: mpsc::Sender<InvalidOp>) -> anyhow::Result<(usize, usize)> { 1274 use std::sync::mpsc; 1275 1276 let ops = self.inner.by_did.len()?; 1277 1278 let workers = std::thread::available_parallelism() 1279 .map(|n| n.get()) 1280 .unwrap_or(4); 1281 1282 type Batch = (Vec<u8>, Vec<(Dt, PlcCid, DbOp)>); 1283 let (result_tx, result_rx) = mpsc::sync_channel::<anyhow::Result<(usize, usize)>>(workers); 1284 1285 let channels: Vec<_> = (0..workers) 1286 .map(|_| mpsc::sync_channel::<Batch>(512)) 1287 .collect(); 1288 let senders: Vec<_> = channels.iter().map(|(tx, _)| tx.clone()).collect(); 1289 1290 std::thread::scope(|s| { 1291 for (_, rx) in channels { 1292 let result_tx = result_tx.clone(); 1293 let invalid_ops_tx = invalid_ops_tx.clone(); 1294 s.spawn(move || { 1295 let mut checked: usize = 0; 1296 let mut failed: usize = 0; 1297 while let Ok((did_prefix, ops)) = rx.recv() { 1298 let did = decode_did(&did_prefix[..did_prefix.len() - 1]); 1299 for (ts, cid, op) in &ops { 1300 let send_invalid = || { 1301 let _ = invalid_ops_tx.blocking_send(InvalidOp { 1302 did: did.clone(), 1303 at: ts.clone(), 1304 cid: cid.to_string(), 1305 }); 1306 }; 1307 checked += 1; 1308 let prev_op = op.operation.prev.as_ref().and_then(|expected| { 1309 ops.iter().find(|(_, c, _)| c == expected) 1310 }); 1311 let prev_cid_ok = op.operation.prev.is_none() || prev_op.is_some(); 1312 if !prev_cid_ok { 1313 tracing::error!("audit: op {did} {cid} prev cid mismatch or missing predecessor, is db corrupted?"); 1314 failed += 1; 1315 send_invalid(); 1316 continue; 1317 } 1318 let prev_stored = prev_op.map(|(_, _, p)| &p.operation); 1319 match verify_op_sig(&op.operation, prev_stored) { 1320 Ok(results) => { 1321 if !results.valid { 1322 let msg = results 1323 .errors 1324 .iter() 1325 .map(|e| e.to_string()) 1326 .collect::<Vec<_>>() 1327 .join("\n "); 1328 tracing::warn!("audit: invalid op {} {}:\n {msg}", did, cid); 1329 failed += 1; 1330 send_invalid(); 1331 } 1332 } 1333 Err(e) => { 1334 tracing::warn!("audit: invalid op {} {}: {e}", did, cid); 1335 failed += 1; 1336 send_invalid(); 1337 } 1338 } 1339 } 1340 } 1341 let _ = result_tx.send(Ok((checked, failed))); 1342 }); 1343 } 1344 drop(result_tx); 1345 1346 // todo: probably dont use a macro... 1347 macro_rules! spawn_scan_thread { 1348 ($iter_method:ident, $start_idx:expr, $reverse:expr, $limit:expr) => {{ 1349 let senders = senders.clone(); 1350 let mut iter = self.inner.by_did.iter(); 1351 1352 s.spawn(move || -> anyhow::Result<()> { 1353 let mut current_prefix: Option<[u8; 16]> = None; 1354 let mut did_ops: Vec<(Dt, PlcCid, DbOp)> = Vec::new(); 1355 let mut idx = $start_idx; 1356 let mut processed_ops: usize = 0; 1357 1358 while let Some(guard) = iter.$iter_method() { 1359 let (by_did_key, _) = guard 1360 .into_inner() 1361 .map_err(|e| anyhow::anyhow!("fjall read error: {e}"))?; 1362 1363 let mut prefix_array = [0u8; 16]; 1364 prefix_array.copy_from_slice(by_did_key.get(..16).ok_or_else( 1365 || anyhow::anyhow!("by_did key too short: {by_did_key:?}"), 1366 )?); 1367 1368 let op = self.decode_by_did_entry(&by_did_key, 16)?; 1369 1370 if current_prefix.map_or(true, |cp| cp != prefix_array) { 1371 // new did, push the ops 1372 if let Some(prefix) = current_prefix.take() { 1373 if $reverse { 1374 did_ops.reverse(); 1375 } 1376 senders[idx % workers] 1377 .send((prefix.to_vec(), std::mem::take(&mut did_ops))) 1378 .ok(); 1379 idx += 1; 1380 1381 if processed_ops >= $limit { 1382 break; 1383 } 1384 } 1385 current_prefix = Some(prefix_array); 1386 } 1387 1388 did_ops.push((op.1, op.2, op.3)); 1389 processed_ops += 1; 1390 } 1391 1392 if let Some(prefix) = current_prefix { 1393 if $reverse { 1394 did_ops.reverse(); 1395 } 1396 senders[idx % workers].send((prefix.to_vec(), did_ops)).ok(); 1397 } 1398 1399 Ok(()) 1400 }) 1401 }}; 1402 } 1403 1404 // we can start two threads, one for forward iteration and one for reverse iteration 1405 // this way we have two scans in parallel which should be faster! 1406 let f_count = ops / 2; 1407 let f_handle = spawn_scan_thread!(next, 0, false, f_count); 1408 let b_count = ops - f_count; 1409 let b_handle = spawn_scan_thread!(next_back, workers / 2, true, b_count); 1410 1411 f_handle.join().unwrap()?; 1412 b_handle.join().unwrap()?; 1413 1414 drop(senders); 1415 1416 let mut total_checked: usize = 0; 1417 let mut total_failed: usize = 0; 1418 for res in result_rx { 1419 let (c, f) = res?; 1420 total_checked += c; 1421 total_failed += f; 1422 } 1423 1424 Ok((total_checked, total_failed)) 1425 }) 1426 } 1427} 1428 1429pub async fn backfill_to_fjall( 1430 db: FjallDb, 1431 reset: bool, 1432 mut pages: mpsc::Receiver<crate::SeqPage>, 1433 notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 1434) -> anyhow::Result<&'static str> { 1435 let t0 = Instant::now(); 1436 1437 if reset { 1438 let db = db.clone(); 1439 tokio::task::spawn_blocking(move || db.clear()).await??; 1440 tracing::warn!("fjall reset: cleared all data"); 1441 } 1442 1443 let mut last_at = None; 1444 let mut ops_inserted: usize = 0; 1445 let mut insert_tasks: tokio::task::JoinSet<anyhow::Result<usize>> = tokio::task::JoinSet::new(); 1446 1447 loop { 1448 let pages_finished = pages.is_closed(); 1449 // we can stop if we have no more pages and all the insert tasks are finished 1450 if pages_finished && insert_tasks.is_empty() { 1451 break; 1452 } 1453 tokio::select! { 1454 page = pages.recv(), if !pages_finished => { 1455 let Some(page) = page else { continue; }; 1456 if notify_last_at.is_some() { 1457 // SeqPage ops are always in order, so we can just grab the last one 1458 if let Some(last_op) = page.ops.last() { 1459 last_at = last_at.filter(|&l| l >= last_op.created_at).or(Some(last_op.created_at)); 1460 } 1461 } 1462 1463 let db = db.clone(); 1464 1465 // we don't have to wait for inserts to finish, because insert_op 1466 // without verification does not read anything from the db 1467 insert_tasks.spawn_blocking(move || { 1468 let mut count: usize = 0; 1469 for seq_op in &page.ops { 1470 let op = CommonOp { 1471 did: seq_op.did.clone(), 1472 cid: seq_op.cid.clone(), 1473 created_at: seq_op.created_at, 1474 nullified: seq_op.nullified, 1475 operation: seq_op.operation.clone(), 1476 }; 1477 // we don't verify sigs for bulk, since pages might be out of order (and we trust for backfills) 1478 count += db.insert_op::<false>(&op, seq_op.seq)?; 1479 } 1480 db.persist(PersistMode::Buffer)?; 1481 Ok(count) 1482 }); 1483 } 1484 Some(res) = insert_tasks.join_next() => { 1485 match res? { 1486 Ok(count) => ops_inserted += count, 1487 Err(e) => { 1488 insert_tasks.abort_all(); 1489 return Err(e); 1490 } 1491 } 1492 } 1493 } 1494 } 1495 tracing::debug!("finished receiving bulk pages"); 1496 1497 if let Some(notify) = notify_last_at { 1498 tracing::trace!("notifying last_at: {last_at:?}"); 1499 if notify.send(last_at).is_err() { 1500 tracing::error!("receiver for last_at dropped, can't notify"); 1501 }; 1502 } 1503 1504 tokio::task::spawn_blocking(move || db.persist(PersistMode::SyncAll)).await??; 1505 1506 tracing::info!( 1507 "backfill_to_fjall: inserted {ops_inserted} ops in {:?}", 1508 t0.elapsed() 1509 ); 1510 Ok("backfill_to_fjall") 1511} 1512 1513/// Write sequenced ops (with PLC seq numbers) into fjall. 1514pub async fn seq_pages_to_fjall( 1515 db: FjallDb, 1516 mut pages: mpsc::Receiver<crate::SeqPage>, 1517) -> anyhow::Result<&'static str> { 1518 tracing::info!("starting seq_pages_to_fjall writer..."); 1519 1520 let t0 = Instant::now(); 1521 let mut ops_inserted: usize = 0; 1522 1523 while let Some(page) = pages.recv().await { 1524 let first_seq = page.ops.first().map(|op| op.seq); 1525 let last_seq = page.ops.last().map(|op| op.seq); 1526 tracing::debug!( 1527 "seq_pages: received page with {} ops, seq {:?}..{:?}", 1528 page.ops.len(), 1529 first_seq, 1530 last_seq 1531 ); 1532 let page_len = page.ops.len(); 1533 let db = db.clone(); 1534 let count = tokio::task::spawn_blocking(move || -> anyhow::Result<usize> { 1535 let mut count: usize = 0; 1536 for seq_op in &page.ops { 1537 tracing::debug!("seq_pages: processing op {} {} (seq {})", seq_op.did, seq_op.cid, seq_op.seq); 1538 let common_op = CommonOp { 1539 did: seq_op.did.clone(), 1540 cid: seq_op.cid.clone(), 1541 created_at: seq_op.created_at, 1542 nullified: seq_op.nullified, 1543 operation: seq_op.operation.clone(), 1544 }; 1545 count += db.insert_op::<true>(&common_op, seq_op.seq)?; 1546 } 1547 db.persist(PersistMode::Buffer)?; 1548 Ok(count) 1549 }) 1550 .await??; 1551 if count < page_len { 1552 tracing::warn!( 1553 "seq_pages: page seq {:?}..{:?} inserted {count}/{page_len} ops ({} dropped)", 1554 first_seq, 1555 last_seq, 1556 page_len - count 1557 ); 1558 } 1559 ops_inserted += count; 1560 } 1561 1562 tracing::info!( 1563 "no more seq pages. inserted {ops_inserted} ops in {:?}", 1564 t0.elapsed() 1565 ); 1566 Ok("seq_pages_to_fjall") 1567} 1568 1569pub async fn audit( 1570 db: FjallDb, 1571 invalid_ops_tx: mpsc::Sender<InvalidOp>, 1572) -> anyhow::Result<&'static str> { 1573 tracing::info!("starting fjall audit..."); 1574 let t0 = std::time::Instant::now(); 1575 let (checked, failed) = tokio::task::spawn_blocking(move || db.audit(invalid_ops_tx)).await??; 1576 tracing::info!( 1577 "fjall audit complete in {:?}, {checked} ops checked", 1578 t0.elapsed() 1579 ); 1580 if failed > 0 { 1581 tracing::error!("audit found {failed} invalid operations"); 1582 } 1583 Ok("audit_fjall") 1584} 1585 1586pub async fn fix_ops( 1587 db: FjallDb, 1588 upstream: reqwest::Url, 1589 only_drop: bool, 1590 mut invalid_ops_rx: mpsc::Receiver<InvalidOp>, 1591) -> anyhow::Result<&'static str> { 1592 tracing::info!("starting fjall fix ops..."); 1593 let mut fixed_dids = std::collections::HashSet::new(); 1594 let mut count = 0; 1595 1596 let latest_at = db 1597 .get_latest()? 1598 .ok_or_else(|| anyhow::anyhow!("db not backfilled? expected at least one op")) 1599 .map(|(_, dt)| dt)?; 1600 1601 // local seq counter for newly fetched ops 1602 let mut next_seq = db.get_latest()?.map(|(s, _)| s).unwrap_or(0) + 1; 1603 1604 while let Some(op) = invalid_ops_rx.recv().await { 1605 let InvalidOp { did, at, cid, .. } = op; 1606 1607 if only_drop { 1608 db.drop_op(&did, &at, &cid)?; 1609 db.persist(PersistMode::Buffer)?; 1610 count += 1; 1611 continue; 1612 } 1613 1614 if fixed_dids.contains(&did) { 1615 continue; 1616 } 1617 1618 tracing::trace!("fetching upstream ops to fix did: {did}"); 1619 let mut url = upstream.clone(); 1620 url.set_path(&format!("/{did}/log/audit")); 1621 1622 let resp = crate::CLIENT.get(url).send().await?; 1623 1624 use reqwest::StatusCode; 1625 let ops: Vec<CommonOp> = match resp.status() { 1626 StatusCode::OK => match resp.json().await { 1627 Ok(ops) => ops, 1628 Err(e) => { 1629 tracing::warn!("failed to parse upstream ops for {did}: {e}"); 1630 continue; 1631 } 1632 }, 1633 StatusCode::NOT_FOUND => { 1634 tracing::trace!("did not found upstream: {did}"); 1635 Vec::new() // this essentially means drop the whole did 1636 } 1637 s => { 1638 tracing::warn!("failed to fetch upstream for {did}: {s}"); 1639 continue; 1640 } 1641 }; 1642 1643 tracing::trace!("fetched {} ops for {did}", ops.len()); 1644 1645 // we drop all ops first just to be safe 1646 let existing = db.ops_for_did(&did)?; 1647 for op in existing { 1648 let op = op?; 1649 db.drop_op(&did, &op.created_at, &op.cid)?; 1650 } 1651 1652 // then insert the fresh ops 1653 for op in ops { 1654 // skip newer ops, since we will fill them in later anyway 1655 // if we don't skip these we might miss some ops in between 1656 // the latest_at we started with vs the one we ended up with 1657 if op.created_at > latest_at { 1658 tracing::trace!( 1659 "skipping op {} for {did} because it is newer than latest_at {latest_at}", 1660 op.cid 1661 ); 1662 continue; 1663 } 1664 1665 let seq = next_seq; 1666 next_seq += 1; 1667 count += db.insert_op::<true>(&op, seq)?; 1668 } 1669 1670 db.persist(PersistMode::Buffer)?; 1671 fixed_dids.insert(did); 1672 } 1673 1674 tracing::info!("fixed {count} ops"); 1675 1676 Ok("fix_ops_fjall") 1677} 1678 1679#[cfg(test)] 1680mod tests { 1681 use super::*; 1682 1683 #[test] 1684 fn plc_cid_roundtrip() { 1685 let original = "bafyreigp6shzy6dlcxuowwoxz7u5nemdrkad2my5zwzpwilcnhih7bw6zm"; 1686 let cid = PlcCid::from_cid_str(original).unwrap(); 1687 assert_eq!(cid.to_string(), original); 1688 } 1689 1690 #[test] 1691 fn bsky_aka_roundtrip() { 1692 let h = Aka::from_str("at://alice.bsky.social"); 1693 assert_eq!(h, Aka::Bluesky("alice".to_string())); 1694 assert_eq!(h.to_string(), "at://alice.bsky.social"); 1695 } 1696 1697 #[test] 1698 fn atproto_aka_roundtrip() { 1699 let h = Aka::from_str("at://alice.example.com"); 1700 assert_eq!(h, Aka::Atproto("alice.example.com".to_string())); 1701 assert_eq!(h.to_string(), "at://alice.example.com"); 1702 } 1703 1704 #[test] 1705 fn other_aka_roundtrip() { 1706 let h = Aka::from_str("https://something.else"); 1707 assert_eq!(h, Aka::Other("https://something.else".to_string())); 1708 assert_eq!(h.to_string(), "https://something.else"); 1709 } 1710 1711 #[test] 1712 fn handle_bsky_social_roundtrip() { 1713 let h = Handle::from_str("alice.bsky.social"); 1714 assert_eq!(h, Handle::BskySocial("alice".to_string())); 1715 assert_eq!(h.as_string(), "alice.bsky.social"); 1716 } 1717 1718 #[test] 1719 fn handle_other_roundtrip() { 1720 let h = Handle::from_str("user.example.com"); 1721 assert_eq!(h, Handle::Other("user.example.com".to_string())); 1722 assert_eq!(h.as_string(), "user.example.com"); 1723 } 1724 1725 #[test] 1726 fn verification_method_key_roundtrip() { 1727 let k1 = VerificationMethodKey::from_str("atproto"); 1728 assert_eq!(k1, VerificationMethodKey::Atproto); 1729 assert_eq!(k1.to_string(), "atproto"); 1730 1731 let k2 = VerificationMethodKey::from_str("other_key"); 1732 assert_eq!(k2, VerificationMethodKey::Other("other_key".to_string())); 1733 assert_eq!(k2.to_string(), "other_key"); 1734 } 1735 1736 #[test] 1737 fn service_key_roundtrip() { 1738 let k1 = ServiceKey::from_str("atproto_pds"); 1739 assert_eq!(k1, ServiceKey::AtprotoPds); 1740 assert_eq!(k1.to_string(), "atproto_pds"); 1741 1742 let k2 = ServiceKey::from_str("other_svc"); 1743 assert_eq!(k2, ServiceKey::Other("other_svc".to_string())); 1744 assert_eq!(k2.to_string(), "other_svc"); 1745 } 1746 1747 #[test] 1748 fn service_type_roundtrip() { 1749 let t1 = ServiceType::from_str("AtprotoPersonalDataServer"); 1750 assert_eq!(t1, ServiceType::AtprotoPersonalDataServer); 1751 assert_eq!(t1.as_str(), "AtprotoPersonalDataServer"); 1752 1753 let t2 = ServiceType::from_str("OtherType"); 1754 assert_eq!(t2, ServiceType::Other("OtherType".to_string())); 1755 assert_eq!(t2.as_str(), "OtherType"); 1756 } 1757 1758 #[test] 1759 fn service_endpoint_roundtrip() { 1760 let e1 = ServiceEndpoint::from_str("https://example.host.bsky.network"); 1761 assert_eq!(e1, ServiceEndpoint::BlueskyPds("example".to_string())); 1762 assert_eq!(e1.as_string(), "https://example.host.bsky.network"); 1763 1764 let e2 = ServiceEndpoint::from_str("https://bsky.social"); 1765 assert_eq!(e2, ServiceEndpoint::BlueskySocial); 1766 assert_eq!(e2.as_string(), "https://bsky.social"); 1767 1768 let e3 = ServiceEndpoint::from_str("https://other.endpoint.com"); 1769 assert_eq!( 1770 e3, 1771 ServiceEndpoint::Other("https://other.endpoint.com".to_string()) 1772 ); 1773 assert_eq!(e3.as_string(), "https://other.endpoint.com"); 1774 } 1775 1776 #[test] 1777 fn op_type_roundtrip() { 1778 assert_eq!(OpType::from_str("plc_operation").as_str(), "plc_operation"); 1779 assert_eq!(OpType::from_str("create").as_str(), "create"); 1780 assert_eq!(OpType::from_str("plc_tombstone").as_str(), "plc_tombstone"); 1781 assert_eq!(OpType::from_str("weird_thing").as_str(), "weird_thing"); 1782 } 1783 1784 #[test] 1785 fn stored_op_fixture_roundtrip() { 1786 let mut fixtures: Vec<_> = std::fs::read_dir("tests/fixtures") 1787 .unwrap() 1788 .filter_map(|e| e.ok()) 1789 .map(|e| e.path()) 1790 .filter(|p| p.extension().map_or(false, |ext| ext == "json")) 1791 .collect(); 1792 fixtures.sort(); 1793 1794 let mut total_json_size = 0; 1795 let mut total_packed_size = 0; 1796 1797 for path in &fixtures { 1798 let data = std::fs::read_to_string(path).unwrap(); 1799 let entries: Vec<serde_json::Value> = serde_json::from_str(&data).unwrap(); 1800 1801 for entry in &entries { 1802 let op = &entry["operation"]; 1803 let (stored, errors) = StoredOp::from_json_value(op.clone()); 1804 if !errors.is_empty() { 1805 let mut msg = format!("failed to parse op in {}:\n", path.display()); 1806 for e in errors { 1807 msg.push_str(&format!(" - {e:?}\n")); 1808 } 1809 msg.push_str(&format!("op: {op}\n")); 1810 panic!("{msg}"); 1811 } 1812 let stored = stored.unwrap(); 1813 1814 let packed = bitcode::encode(&stored); 1815 let unpacked: StoredOp = bitcode::decode::<StoredOp>(&packed).unwrap(); 1816 1817 let reconstructed = unpacked.to_json_value(); 1818 assert_eq!( 1819 *op, 1820 reconstructed, 1821 "roundtrip mismatch in {}", 1822 path.display() 1823 ); 1824 1825 total_json_size += serde_json::to_vec(op).unwrap().len(); 1826 total_packed_size += packed.len(); 1827 } 1828 } 1829 1830 println!( 1831 "json size: {} bytes, bitcode size: {} bytes, saved: {} bytes", 1832 total_json_size, 1833 total_packed_size, 1834 total_json_size as isize - total_packed_size as isize 1835 ); 1836 } 1837 1838 #[test] 1839 fn stored_op_fixture_sig_roundtrip() { 1840 let mut fixtures: Vec<_> = std::fs::read_dir("tests/fixtures") 1841 .unwrap() 1842 .filter_map(|e| e.ok()) 1843 .map(|e| e.path()) 1844 .filter(|p| p.extension().map_or(false, |ext| ext == "json")) 1845 .collect(); 1846 fixtures.sort(); 1847 1848 for path in &fixtures { 1849 let data = std::fs::read_to_string(path).unwrap(); 1850 let entries: Vec<serde_json::Value> = serde_json::from_str(&data).unwrap(); 1851 1852 // build a cid -> StoredOp map so we can look up prev ops 1853 let mut by_cid: std::collections::HashMap<String, StoredOp> = 1854 std::collections::HashMap::new(); 1855 1856 for entry in &entries { 1857 let cid = entry["cid"].as_str().unwrap().to_string(); 1858 let op_json = entry["operation"].clone(); 1859 1860 let (stored, errors) = StoredOp::from_json_value(op_json); 1861 assert!( 1862 errors.is_empty(), 1863 "{} {cid}: parse errors: {errors:?}", 1864 path.display() 1865 ); 1866 let stored = stored.unwrap(); 1867 1868 let prev = stored.prev.as_ref().map(|c| c.to_string()); 1869 let prev_stored = prev.as_deref().and_then(|c| by_cid.get(c)); 1870 1871 let results = verify_op_sig(&stored, prev_stored) 1872 .unwrap_or_else(|e| panic!("{} {cid}: {e}", path.display())); 1873 assert!( 1874 results.valid, 1875 "{} {cid}: sig invalid after StoredOp roundtrip: {:?}", 1876 path.display(), 1877 results.errors 1878 ); 1879 1880 by_cid.insert(cid, stored); 1881 } 1882 } 1883 } 1884}