forked from
microcosm.blue/Allegedly
Server tools to backfill, tail, mirror, and verify PLC logs
1use crate::{
2 Dt, InvalidOp, Op as CommonOp,
3 crypto::{AssuranceResults, DidKey, Signature, assure_valid_sig},
4};
5use anyhow::Context;
6use data_encoding::BASE32_NOPAD;
7use fjall::{
8 Database, Keyspace, KeyspaceCreateOptions, PersistMode,
9 config::{BlockSizePolicy, RestartIntervalPolicy},
10};
11use ordered_varint::Variable;
12use serde::{Deserialize, Serialize};
13use std::collections::BTreeMap;
14use std::fmt;
15use std::path::Path;
16use std::sync::Arc;
17use std::time::Instant;
18use tokio::sync::{Notify, futures::Notified, mpsc, oneshot};
19
20const SEP: u8 = 0;
21
22fn seq_key(seq: u64) -> Vec<u8> {
23 seq.to_variable_vec().expect("that seq number encodes")
24}
25
26fn decode_seq_key(key: &[u8]) -> anyhow::Result<u64> {
27 u64::decode_variable(key).context("failed to decode seq key")
28}
29
30type IpldCid = cid::CidGeneric<64>;
31
32// 24 bytes -> 15 bytes
33fn encode_did(buf: &mut Vec<u8>, did: &str) -> anyhow::Result<usize> {
34 let input = did.trim_start_matches("did:plc:").to_uppercase();
35 let len = BASE32_NOPAD
36 .decode_len(input.len())
37 .map_err(|_| anyhow::anyhow!("failed to calculate decode len for {did}"))?;
38
39 let start = buf.len();
40 buf.resize(start + len, 0);
41
42 BASE32_NOPAD
43 .decode_mut(input.as_bytes(), &mut buf[start..])
44 .map_err(|_| anyhow::anyhow!("failed to encode did {did}"))
45}
46
47// 59 bytes -> 36 bytes
48fn decode_cid_str(s: &str) -> anyhow::Result<Vec<u8>> {
49 let cid = IpldCid::try_from(s)?;
50 let mut buf = Vec::new();
51 cid.write_bytes(&mut buf)
52 .map_err(|e| anyhow::anyhow!("failed to encode cid {s}: {e}"))?;
53 Ok(buf)
54}
55
56fn decode_cid(bytes: &[u8]) -> anyhow::Result<String> {
57 IpldCid::try_from(bytes)
58 .map_err(|e| anyhow::anyhow!("failed to decode cid: {e}"))
59 .map(|cid| cid.to_string())
60}
61
62fn decode_did(bytes: &[u8]) -> String {
63 let decoded = BASE32_NOPAD.encode(bytes).to_lowercase();
64 format!("did:plc:{decoded}")
65}
66
67fn by_did_prefix(did: &str) -> anyhow::Result<Vec<u8>> {
68 let mut p = Vec::with_capacity(BASE32_NOPAD.decode_len(did.len())? + 1);
69 encode_did(&mut p, did)?;
70 p.push(SEP);
71 Ok(p)
72}
73
74/// by_did key: [15 bytes encoded did][SEP][seq varint]
75fn by_did_key(did: &str, seq: u64) -> anyhow::Result<Vec<u8>> {
76 let mut key = by_did_prefix(did)?;
77 seq.encode_variable(&mut key)?;
78 Ok(key)
79}
80
81/// CID string → binary CID bytes
82// STABILITY: never reorder variants, only append.
83#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, bitcode::Encode, bitcode::Decode)]
84struct PlcCid(#[serde(with = "serde_bytes")] Vec<u8>);
85
86impl PlcCid {
87 fn from_cid_str(s: &str) -> anyhow::Result<Self> {
88 let cid = IpldCid::try_from(s)?;
89 let mut buf = Vec::new();
90 cid.write_bytes(&mut buf)
91 .map_err(|e| anyhow::anyhow!("failed to encode cid {s}: {e}"))?;
92 Ok(Self(buf))
93 }
94}
95
96impl fmt::Display for PlcCid {
97 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
98 let cid = IpldCid::try_from(self.0.as_slice()).map_err(|_| fmt::Error)?;
99 write!(f, "{cid}")
100 }
101}
102
103// STABILITY: never reorder variants, only append.
104#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, bitcode::Encode, bitcode::Decode)]
105enum Aka {
106 Other(String), // 0
107 Bluesky(String), // 1
108 Atproto(String), // 2
109}
110
111impl Aka {
112 fn from_str(s: &str) -> Self {
113 if let Some(stripped) = s.strip_prefix("at://") {
114 if let Some(handle) = stripped.strip_suffix(".bsky.social") {
115 Self::Bluesky(handle.to_string())
116 } else {
117 Self::Atproto(stripped.to_string())
118 }
119 } else {
120 Self::Other(s.to_string())
121 }
122 }
123}
124
125impl fmt::Display for Aka {
126 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127 match self {
128 Self::Bluesky(h) => write!(f, "at://{h}.bsky.social"),
129 Self::Atproto(h) => write!(f, "at://{h}"),
130 Self::Other(s) => f.write_str(s),
131 }
132 }
133}
134
135// STABILITY: never reorder variants, only append.
136#[derive(Debug, Clone, Serialize, Deserialize, bitcode::Encode, bitcode::Decode)]
137#[serde(rename_all = "snake_case")]
138enum OpType {
139 Other(String), // 0
140 PlcOperation, // 1
141 Create, // 2
142 PlcTombstone, // 3
143}
144
145impl OpType {
146 fn from_str(s: &str) -> Self {
147 match s {
148 "plc_operation" => Self::PlcOperation,
149 "create" => Self::Create,
150 "plc_tombstone" => Self::PlcTombstone,
151 other => Self::Other(other.to_string()),
152 }
153 }
154
155 fn as_str(&self) -> &str {
156 match self {
157 Self::PlcOperation => "plc_operation",
158 Self::Create => "create",
159 Self::PlcTombstone => "plc_tombstone",
160 Self::Other(s) => s,
161 }
162 }
163}
164
165#[derive(Debug, Clone, Copy, PartialEq, Eq)]
166enum StoredOpField {
167 Type,
168 Sig,
169 Prev,
170 RotationKeys,
171 VerificationMethods,
172 AlsoKnownAs,
173 Services,
174 SigningKey,
175 RecoveryKey,
176 Handle,
177 Service,
178}
179
180impl StoredOpField {
181 fn as_str(&self) -> &'static str {
182 match self {
183 Self::Type => "type",
184 Self::Sig => "sig",
185 Self::Prev => "prev",
186 Self::RotationKeys => "rotationKeys",
187 Self::VerificationMethods => "verificationMethods",
188 Self::AlsoKnownAs => "alsoKnownAs",
189 Self::Services => "services",
190 Self::SigningKey => "signingKey",
191 Self::RecoveryKey => "recoveryKey",
192 Self::Handle => "handle",
193 Self::Service => "service",
194 }
195 }
196}
197
198impl AsRef<str> for StoredOpField {
199 fn as_ref(&self) -> &str {
200 self.as_str()
201 }
202}
203
204impl std::ops::Deref for StoredOpField {
205 type Target = str;
206 fn deref(&self) -> &Self::Target {
207 self.as_str()
208 }
209}
210
211impl fmt::Display for StoredOpField {
212 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
213 f.write_str(self.as_str())
214 }
215}
216
217#[derive(Debug, thiserror::Error)]
218enum StoredOpError {
219 #[error("operation is not an object")]
220 NotAnObject,
221 #[error("missing required field: {0}")]
222 MissingField(StoredOpField),
223 #[error("invalid field {0}: {1}")]
224 InvalidField(StoredOpField, #[source] anyhow::Error),
225 #[error("type mismatch for field {0}: expected {1}")]
226 TypeMismatch(StoredOpField, &'static str),
227}
228
229// STABILITY: never reorder variants, only append.
230#[derive(
231 Debug,
232 Clone,
233 Serialize,
234 Deserialize,
235 PartialEq,
236 Eq,
237 PartialOrd,
238 Ord,
239 bitcode::Encode,
240 bitcode::Decode,
241)]
242enum VerificationMethodKey {
243 Other(String), // 0
244 Atproto, // 1
245}
246
247impl VerificationMethodKey {
248 fn from_str(s: &str) -> Self {
249 match s {
250 "atproto" => Self::Atproto,
251 _ => Self::Other(s.to_string()),
252 }
253 }
254
255 fn as_str(&self) -> &str {
256 match self {
257 Self::Atproto => "atproto",
258 Self::Other(s) => s,
259 }
260 }
261}
262
263impl fmt::Display for VerificationMethodKey {
264 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
265 f.write_str(self.as_str())
266 }
267}
268
269// STABILITY: never reorder variants, only append.
270#[derive(
271 Debug,
272 Clone,
273 Serialize,
274 Deserialize,
275 PartialEq,
276 Eq,
277 PartialOrd,
278 Ord,
279 bitcode::Encode,
280 bitcode::Decode,
281)]
282enum ServiceKey {
283 Other(String), // 0
284 AtprotoPds, // 1
285}
286
287impl ServiceKey {
288 fn from_str(s: &str) -> Self {
289 match s {
290 "atproto_pds" => Self::AtprotoPds,
291 _ => Self::Other(s.to_string()),
292 }
293 }
294
295 fn as_str(&self) -> &str {
296 match self {
297 Self::AtprotoPds => "atproto_pds",
298 Self::Other(s) => s,
299 }
300 }
301}
302
303impl fmt::Display for ServiceKey {
304 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
305 f.write_str(self.as_str())
306 }
307}
308
309// STABILITY: never reorder variants, only append.
310#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, bitcode::Encode, bitcode::Decode)]
311enum ServiceType {
312 Other(String), // 0
313 AtprotoPersonalDataServer, // 1
314}
315
316impl ServiceType {
317 fn from_str(s: &str) -> Self {
318 match s {
319 "AtprotoPersonalDataServer" => Self::AtprotoPersonalDataServer,
320 _ => Self::Other(s.to_string()),
321 }
322 }
323
324 fn as_str(&self) -> &str {
325 match self {
326 Self::AtprotoPersonalDataServer => "AtprotoPersonalDataServer",
327 Self::Other(s) => s,
328 }
329 }
330}
331
332// STABILITY: never reorder variants, only append.
333#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, bitcode::Encode, bitcode::Decode)]
334enum ServiceEndpoint {
335 Other(String), // 0
336 BlueskyPds(String), // 1
337 BlueskySocial, // 2
338}
339
340impl ServiceEndpoint {
341 fn from_str(s: &str) -> Self {
342 if let Some(host) = s
343 .strip_prefix("https://")
344 .and_then(|h| h.strip_suffix(".host.bsky.network"))
345 {
346 Self::BlueskyPds(host.to_string())
347 } else if s == "https://bsky.social" {
348 Self::BlueskySocial
349 } else {
350 Self::Other(s.to_string())
351 }
352 }
353
354 fn as_string(&self) -> String {
355 match self {
356 Self::BlueskyPds(h) => format!("https://{h}.host.bsky.network"),
357 Self::BlueskySocial => "https://bsky.social".to_string(),
358 Self::Other(s) => s.clone(),
359 }
360 }
361}
362
363// STABILITY: never reorder variants, only append.
364#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, bitcode::Encode, bitcode::Decode)]
365enum Handle {
366 Other(String), // 0
367 BskySocial(String), // 1
368}
369
370impl Handle {
371 fn from_str(s: &str) -> Self {
372 if let Some(handle) = s.strip_suffix(".bsky.social") {
373 Self::BskySocial(handle.to_string())
374 } else {
375 Self::Other(s.to_string())
376 }
377 }
378
379 fn as_string(&self) -> String {
380 match self {
381 Self::BskySocial(h) => format!("{h}.bsky.social"),
382 Self::Other(s) => s.clone(),
383 }
384 }
385}
386
387#[derive(Debug, Clone, Serialize, Deserialize, bitcode::Encode, bitcode::Decode)]
388struct StoredService {
389 r#type: ServiceType,
390 endpoint: ServiceEndpoint,
391}
392
393#[derive(Debug, Clone, Serialize, Deserialize, bitcode::Encode, bitcode::Decode)]
394struct StoredOp {
395 op_type: OpType,
396 sig: Signature,
397 prev: Option<PlcCid>,
398
399 rotation_keys: Option<Vec<DidKey>>,
400 verification_methods: Option<BTreeMap<VerificationMethodKey, DidKey>>,
401 also_known_as: Option<Vec<Aka>>,
402 services: Option<BTreeMap<ServiceKey, StoredService>>,
403
404 // legacy create fields
405 signing_key: Option<DidKey>,
406 recovery_key: Option<DidKey>,
407 handle: Option<Handle>,
408 service: Option<ServiceEndpoint>,
409
410 // msgpack-encoded BTreeMap<String, serde_json::Value>.
411 // Vec<u8> is used because bitcode cannot handle serde_json::Value directly.
412 // empty vec when there are no unknown fields (the common case).
413 #[serde(skip)]
414 unknown_packed: Vec<u8>,
415}
416
417impl StoredOp {
418 fn get_keys(&self) -> Vec<&DidKey> {
419 let mut keys = Vec::with_capacity(self.rotation_keys.as_ref().map_or(2, |keys| keys.len()));
420 if let Some(rot_keys) = self.rotation_keys.as_ref() {
421 keys.extend(rot_keys.iter());
422 } else {
423 // legacy genesis op
424 if let Some(recovery_key) = self.recovery_key.as_ref() {
425 keys.push(recovery_key);
426 }
427 if let Some(signing_key) = self.signing_key.as_ref() {
428 keys.push(signing_key);
429 }
430 }
431 keys
432 }
433
434 fn unknown(&self) -> BTreeMap<String, serde_json::Value> {
435 if self.unknown_packed.is_empty() {
436 return BTreeMap::new();
437 }
438 rmp_serde::from_slice(&self.unknown_packed).unwrap_or_default()
439 }
440
441 fn pack_unknown(unknown: BTreeMap<String, serde_json::Value>) -> Vec<u8> {
442 if unknown.is_empty() {
443 return Vec::new();
444 }
445 rmp_serde::to_vec(&unknown).expect("unknown fields are serializable")
446 }
447 fn from_json_value(v: serde_json::Value) -> (Option<Self>, Vec<StoredOpError>) {
448 let serde_json::Value::Object(mut obj) = v else {
449 return (None, vec![StoredOpError::NotAnObject]);
450 };
451
452 let mut errors = Vec::new();
453 let mut unknown = BTreeMap::new();
454
455 let op_type = match obj.remove(&*StoredOpField::Type) {
456 Some(serde_json::Value::String(s)) => OpType::from_str(&s),
457 Some(v) => {
458 errors.push(StoredOpError::TypeMismatch(StoredOpField::Type, "string"));
459 unknown.insert(StoredOpField::Type.to_string(), v);
460 OpType::Other(String::new())
461 }
462 Option::None => {
463 errors.push(StoredOpError::MissingField(StoredOpField::Type));
464 OpType::Other(String::new())
465 }
466 };
467
468 let sig = match obj.remove(&*StoredOpField::Sig) {
469 Some(serde_json::Value::String(s)) => match Signature::from_base64url(&s) {
470 Ok(sig) => sig,
471 Err(e) => {
472 errors.push(StoredOpError::InvalidField(StoredOpField::Sig, e));
473 unknown.insert(StoredOpField::Sig.to_string(), serde_json::Value::String(s));
474 Signature(Vec::new())
475 }
476 },
477 Some(v) => {
478 errors.push(StoredOpError::TypeMismatch(StoredOpField::Sig, "string"));
479 unknown.insert(StoredOpField::Sig.to_string(), v);
480 Signature(Vec::new())
481 }
482 Option::None => {
483 errors.push(StoredOpError::MissingField(StoredOpField::Sig));
484 Signature(Vec::new())
485 }
486 };
487
488 let prev = match obj.remove(&*StoredOpField::Prev) {
489 Some(serde_json::Value::Null) | Option::None => Option::None,
490 Some(serde_json::Value::String(s)) => match PlcCid::from_cid_str(&s) {
491 Ok(p) => Some(p),
492 Err(e) => {
493 errors.push(StoredOpError::InvalidField(StoredOpField::Prev, e));
494 unknown.insert(
495 StoredOpField::Prev.to_string(),
496 serde_json::Value::String(s),
497 );
498 Option::None
499 }
500 },
501 Some(v) => {
502 errors.push(StoredOpError::TypeMismatch(StoredOpField::Prev, "string"));
503 unknown.insert(StoredOpField::Prev.to_string(), v);
504 Option::None
505 }
506 };
507
508 let rotation_keys = match obj.remove(&*StoredOpField::RotationKeys) {
509 Some(serde_json::Value::Array(arr)) => {
510 let mut keys = Vec::with_capacity(arr.len());
511 let mut failed = false;
512 for v in &arr {
513 match v {
514 serde_json::Value::String(s) => match DidKey::from_did_key(s) {
515 Ok(k) => keys.push(k),
516 Err(e) => {
517 errors.push(StoredOpError::InvalidField(
518 StoredOpField::RotationKeys,
519 e,
520 ));
521 failed = true;
522 break;
523 }
524 },
525 _ => {
526 errors.push(StoredOpError::TypeMismatch(
527 StoredOpField::RotationKeys,
528 "string inside array",
529 ));
530 failed = true;
531 break;
532 }
533 }
534 }
535 if failed {
536 unknown.insert(
537 StoredOpField::RotationKeys.to_string(),
538 serde_json::Value::Array(arr),
539 );
540 Option::None
541 } else {
542 Some(keys)
543 }
544 }
545 Some(v) => {
546 errors.push(StoredOpError::TypeMismatch(
547 StoredOpField::RotationKeys,
548 "array",
549 ));
550 unknown.insert(StoredOpField::RotationKeys.to_string(), v);
551 Option::None
552 }
553 Option::None => Option::None,
554 };
555
556 let verification_methods = match obj.remove(&*StoredOpField::VerificationMethods) {
557 Some(serde_json::Value::Object(map)) => {
558 let mut methods = BTreeMap::new();
559 let mut failed = false;
560 for (k, v) in &map {
561 match v {
562 serde_json::Value::String(s) => match DidKey::from_did_key(s) {
563 Ok(key) => {
564 methods.insert(VerificationMethodKey::from_str(k), key);
565 }
566 Err(e) => {
567 errors.push(StoredOpError::InvalidField(
568 StoredOpField::VerificationMethods,
569 e,
570 ));
571 failed = true;
572 break;
573 }
574 },
575 _ => {
576 errors.push(StoredOpError::TypeMismatch(
577 StoredOpField::VerificationMethods,
578 "string value in object",
579 ));
580 failed = true;
581 break;
582 }
583 }
584 }
585 if failed {
586 unknown.insert(
587 StoredOpField::VerificationMethods.to_string(),
588 serde_json::Value::Object(map),
589 );
590 Option::None
591 } else {
592 Some(methods)
593 }
594 }
595 Some(v) => {
596 errors.push(StoredOpError::TypeMismatch(
597 StoredOpField::VerificationMethods,
598 "object",
599 ));
600 unknown.insert(StoredOpField::VerificationMethods.to_string(), v);
601 Option::None
602 }
603 Option::None => Option::None,
604 };
605
606 let also_known_as = match obj.remove(&*StoredOpField::AlsoKnownAs) {
607 Some(serde_json::Value::Array(arr)) => {
608 let mut akas = Vec::with_capacity(arr.len());
609 let mut failed = false;
610 for v in &arr {
611 match v {
612 serde_json::Value::String(s) => akas.push(Aka::from_str(s)),
613 _ => {
614 errors.push(StoredOpError::TypeMismatch(
615 StoredOpField::AlsoKnownAs,
616 "string inside array",
617 ));
618 failed = true;
619 break;
620 }
621 }
622 }
623 if failed {
624 unknown.insert(
625 StoredOpField::AlsoKnownAs.to_string(),
626 serde_json::Value::Array(arr),
627 );
628 Option::None
629 } else {
630 Some(akas)
631 }
632 }
633 Some(v) => {
634 errors.push(StoredOpError::TypeMismatch(
635 StoredOpField::AlsoKnownAs,
636 "array",
637 ));
638 unknown.insert(StoredOpField::AlsoKnownAs.to_string(), v);
639 Option::None
640 }
641 Option::None => Option::None,
642 };
643
644 let services = match obj.remove(&*StoredOpField::Services) {
645 Some(serde_json::Value::Object(map)) => {
646 let mut svcs = BTreeMap::new();
647 let mut failed = false;
648 for (k, v) in &map {
649 if let (Some(r#type), Some(endpoint)) = (
650 v.get("type").and_then(|t| t.as_str()),
651 v.get("endpoint").and_then(|e| e.as_str()),
652 ) {
653 let svc = StoredService {
654 r#type: ServiceType::from_str(r#type),
655 endpoint: ServiceEndpoint::from_str(endpoint),
656 };
657 svcs.insert(ServiceKey::from_str(k), svc);
658 } else {
659 errors.push(StoredOpError::TypeMismatch(
660 StoredOpField::Services,
661 "missing or invalid type/endpoint in service object",
662 ));
663 failed = true;
664 break;
665 }
666 }
667 if failed {
668 unknown.insert(
669 StoredOpField::Services.to_string(),
670 serde_json::Value::Object(map),
671 );
672 Option::None
673 } else {
674 Some(svcs)
675 }
676 }
677 Some(v) => {
678 errors.push(StoredOpError::TypeMismatch(
679 StoredOpField::Services,
680 "object",
681 ));
682 unknown.insert(StoredOpField::Services.to_string(), v);
683 Option::None
684 }
685 Option::None => Option::None,
686 };
687
688 let signing_key = match obj.remove(&*StoredOpField::SigningKey) {
689 Some(serde_json::Value::String(s)) => match DidKey::from_did_key(&s) {
690 Ok(key) => Some(key),
691 Err(e) => {
692 errors.push(StoredOpError::InvalidField(StoredOpField::SigningKey, e));
693 unknown.insert(
694 StoredOpField::SigningKey.to_string(),
695 serde_json::Value::String(s),
696 );
697 Option::None
698 }
699 },
700 Some(v) => {
701 errors.push(StoredOpError::TypeMismatch(
702 StoredOpField::SigningKey,
703 "string",
704 ));
705 unknown.insert(StoredOpField::SigningKey.to_string(), v);
706 Option::None
707 }
708 Option::None => Option::None,
709 };
710
711 let recovery_key = match obj.remove(&*StoredOpField::RecoveryKey) {
712 Some(serde_json::Value::String(s)) => match DidKey::from_did_key(&s) {
713 Ok(key) => Some(key),
714 Err(e) => {
715 errors.push(StoredOpError::InvalidField(StoredOpField::RecoveryKey, e));
716 unknown.insert(
717 StoredOpField::RecoveryKey.to_string(),
718 serde_json::Value::String(s),
719 );
720 Option::None
721 }
722 },
723 Some(v) => {
724 errors.push(StoredOpError::TypeMismatch(
725 StoredOpField::RecoveryKey,
726 "string",
727 ));
728 unknown.insert(StoredOpField::RecoveryKey.to_string(), v);
729 Option::None
730 }
731 Option::None => Option::None,
732 };
733
734 let handle = match obj.remove(&*StoredOpField::Handle) {
735 Some(serde_json::Value::String(s)) => Some(Handle::from_str(&s)),
736 Some(v) => {
737 errors.push(StoredOpError::TypeMismatch(StoredOpField::Handle, "string"));
738 unknown.insert(StoredOpField::Handle.to_string(), v);
739 Option::None
740 }
741 Option::None => Option::None,
742 };
743
744 let service = match obj.remove(&*StoredOpField::Service) {
745 Some(serde_json::Value::String(s)) => Some(ServiceEndpoint::from_str(&s)),
746 Some(v) => {
747 errors.push(StoredOpError::TypeMismatch(
748 StoredOpField::Service,
749 "string",
750 ));
751 unknown.insert(StoredOpField::Service.to_string(), v);
752 Option::None
753 }
754 Option::None => Option::None,
755 };
756
757 for (k, v) in obj {
758 unknown.insert(k, v);
759 }
760
761 (
762 Some(Self {
763 op_type,
764 sig,
765 prev,
766 rotation_keys,
767 verification_methods,
768 also_known_as,
769 services,
770 signing_key,
771 recovery_key,
772 handle,
773 service,
774 unknown_packed: Self::pack_unknown(unknown),
775 }),
776 errors,
777 )
778 }
779
780 fn to_json_value(&self) -> serde_json::Value {
781 let mut map = serde_json::Map::new();
782
783 map.insert((*StoredOpField::Type).into(), self.op_type.as_str().into());
784 map.insert((*StoredOpField::Sig).into(), self.sig.to_string().into());
785 map.insert(
786 (*StoredOpField::Prev).into(),
787 self.prev
788 .as_ref()
789 .map(|c| serde_json::Value::String(c.to_string()))
790 .unwrap_or(serde_json::Value::Null),
791 );
792
793 if let Some(keys) = &self.rotation_keys {
794 map.insert(
795 (*StoredOpField::RotationKeys).into(),
796 keys.iter()
797 .map(|k| serde_json::Value::String(k.to_string()))
798 .collect::<Vec<_>>()
799 .into(),
800 );
801 }
802
803 if let Some(methods) = &self.verification_methods {
804 let obj: serde_json::Map<String, serde_json::Value> = methods
805 .iter()
806 .map(|(k, v)| {
807 (
808 k.as_str().to_string(),
809 serde_json::Value::String(v.to_string()),
810 )
811 })
812 .collect();
813 map.insert((*StoredOpField::VerificationMethods).into(), obj.into());
814 }
815
816 if let Some(aka) = &self.also_known_as {
817 map.insert(
818 (*StoredOpField::AlsoKnownAs).into(),
819 aka.iter()
820 .map(|h| serde_json::Value::String(h.to_string()))
821 .collect::<Vec<_>>()
822 .into(),
823 );
824 }
825
826 if let Some(services) = &self.services {
827 let obj: serde_json::Map<String, serde_json::Value> = services
828 .iter()
829 .map(|(k, svc)| {
830 (
831 k.as_str().to_string(),
832 serde_json::json!({
833 "type": svc.r#type.as_str(),
834 "endpoint": svc.endpoint.as_string(),
835 }),
836 )
837 })
838 .collect();
839 map.insert((*StoredOpField::Services).into(), obj.into());
840 }
841
842 // legacy create fields
843 if let Some(key) = &self.signing_key {
844 map.insert((*StoredOpField::SigningKey).into(), key.to_string().into());
845 }
846 if let Some(key) = &self.recovery_key {
847 map.insert((*StoredOpField::RecoveryKey).into(), key.to_string().into());
848 }
849 if let Some(handle) = &self.handle {
850 map.insert((*StoredOpField::Handle).into(), handle.as_string().into());
851 }
852 if let Some(service) = &self.service {
853 map.insert((*StoredOpField::Service).into(), service.as_string().into());
854 }
855
856 for (k, v) in self.unknown() {
857 map.insert(k, v);
858 }
859
860 serde_json::Value::Object(map)
861 }
862}
863
864fn verify_op_sig(op: &StoredOp, prev: Option<&StoredOp>) -> anyhow::Result<AssuranceResults> {
865 let keys: Vec<&DidKey> = match &op.prev {
866 None => op.get_keys(),
867 Some(_) => match prev {
868 None => anyhow::bail!("prev cid exists but the op for that cid is missing"),
869 Some(p) => p.get_keys(),
870 },
871 };
872
873 if keys.is_empty() {
874 anyhow::bail!("no keys found for genesis op or prev op");
875 }
876
877 let data = {
878 let serde_json::Value::Object(mut data) = op.to_json_value() else {
879 unreachable!("we know op is valid, because it comes from StoredOp")
880 };
881 data.remove("sig");
882 serde_json::Value::Object(data)
883 };
884
885 let results = assure_valid_sig(keys, &op.sig, &data)
886 .expect("that our op is an object and we removed sig field");
887 Ok(results)
888}
889
890// stored alongside the seq key in the ops keyspace
891// cid and created_at are in the value (not the key) in the new layout
892#[derive(Debug, Deserialize, Serialize, bitcode::Encode, bitcode::Decode)]
893#[serde(rename_all = "camelCase")]
894struct DbOp {
895 #[serde(with = "serde_bytes")]
896 pub did: Vec<u8>,
897 #[serde(with = "serde_bytes")]
898 pub cid: Vec<u8>,
899 pub created_at: u64,
900 pub nullified: bool,
901 pub operation: StoredOp,
902}
903
904// we have our own Op struct for fjall since we dont want to have to convert Value back to RawValue
905#[derive(Debug, Serialize, Deserialize, Clone)]
906pub struct Op {
907 pub seq: u64,
908 pub did: String,
909 pub cid: String,
910 #[serde(rename = "createdAt")]
911 pub created_at: Dt,
912 pub nullified: bool,
913 pub operation: serde_json::Value,
914}
915
916impl Op {
917 // todo: we should probably just have this in Op tbh as a `r#type: SequencedOpType` or something
918 /// adds the `type` field to the op
919 pub fn to_sequenced_json(&self) -> serde_json::Value {
920 let mut val = serde_json::to_value(self).expect("Op is serializable");
921 if let serde_json::Value::Object(ref mut map) = val {
922 map.insert("type".to_string(), "sequenced_op".into());
923 }
924 val
925 }
926}
927
928#[derive(Clone)]
929pub struct FjallDb {
930 inner: Arc<FjallInner>,
931}
932
933struct FjallInner {
934 db: Database,
935 /// primary keyspace: seq (varint) -> DbOp
936 ops: Keyspace,
937 /// secondary index: [encoded_did][SEP][seq_varint] -> []
938 by_did: Keyspace,
939 notify_stream: Notify,
940}
941
942impl FjallDb {
943 pub fn open(path: impl AsRef<Path>) -> fjall::Result<Self> {
944 const fn kb(kb: u32) -> u32 {
945 kb * 1_024
946 }
947 const fn mb(mb: u32) -> u64 {
948 kb(mb) as u64 * 1_024
949 }
950
951 let db = Database::builder(path)
952 // 32mb is too low we can afford more
953 // this should be configurable though!
954 .cache_size(mb(256))
955 .manual_journal_persist(true)
956 .open()?;
957 let opts = KeyspaceCreateOptions::default;
958 let ops = db.keyspace("ops", || {
959 opts()
960 // this is mainly for when backfilling
961 .max_memtable_size(mb(192))
962 // this wont compress terribly well since its a bunch of CIDs and signatures and did:keys
963 // and we want to keep reads fast since we'll be reading a lot...
964 .data_block_size_policy(BlockSizePolicy::new([kb(8), kb(32), kb(64), kb(128)]))
965 // this has no downsides, since the only point reads that might miss we do is on by_did
966 .expect_point_read_hits(true)
967 })?;
968 let by_did = db.keyspace("by_did", || {
969 opts()
970 .max_memtable_size(mb(64))
971 // this isn't gonna compress well anyway, since its just keys (did + seq)
972 // and dids dont have many operations in the first place, so we can use small blocks
973 .data_block_size_policy(BlockSizePolicy::all(kb(2)))
974 // lower restart interval since plcs are hashes, and dids dont have
975 // many ops in themselves
976 .data_block_restart_interval_policy(RestartIntervalPolicy::all(8))
977 })?;
978 Ok(Self {
979 inner: Arc::new(FjallInner {
980 db,
981 ops,
982 by_did,
983 notify_stream: Notify::new(),
984 }),
985 })
986 }
987
988 pub fn clear(&self) -> fjall::Result<()> {
989 self.inner.ops.clear()?;
990 self.inner.by_did.clear()?;
991 Ok(())
992 }
993
994 pub fn persist(&self, mode: PersistMode) -> fjall::Result<()> {
995 self.inner.db.persist(mode)
996 }
997
998 pub fn compact(&self) -> fjall::Result<()> {
999 self.inner.ops.major_compact()?;
1000 self.inner.by_did.major_compact()?;
1001 Ok(())
1002 }
1003
1004 pub fn subscribe(&self) -> Notified<'_> {
1005 self.inner.notify_stream.notified()
1006 }
1007
1008 /// Returns `(seq, created_at)` for the last stored op, or `None` if empty.
1009 pub fn get_latest(&self) -> anyhow::Result<Option<(u64, Dt)>> {
1010 let Some(guard) = self.inner.ops.last_key_value() else {
1011 return Ok(None);
1012 };
1013 let (key, value) = guard
1014 .into_inner()
1015 .map_err(|e| anyhow::anyhow!("fjall read error: {e}"))?;
1016 let seq = decode_seq_key(&key)?;
1017 let db_op: DbOp = bitcode::decode::<DbOp>(&value)?;
1018 let dt = Dt::from_timestamp_micros(db_op.created_at as i64)
1019 .ok_or_else(|| anyhow::anyhow!("invalid created_at in last op"))?;
1020 Ok(Some((seq, dt)))
1021 }
1022
1023 pub fn insert_op<const VERIFY: bool>(&self, op: &CommonOp, seq: u64) -> anyhow::Result<usize> {
1024 let cid_bytes = decode_cid_str(&op.cid)?;
1025
1026 let op_json: serde_json::Value = serde_json::from_str(op.operation.get())?;
1027 let (stored, mut errors) = StoredOp::from_json_value(op_json);
1028
1029 let Some(operation) = stored else {
1030 return Err(errors.remove(0)).context("fatal operation parse error");
1031 };
1032
1033 for err in &errors {
1034 tracing::warn!("dropping op {} {} (seq {seq}) parse error: {err}", op.did, op.cid);
1035 }
1036 if !errors.is_empty() {
1037 // if parse failed but not fatal, we just dont store it
1038 return Ok(0);
1039 }
1040
1041 if VERIFY {
1042 let prev_op = operation
1043 .prev
1044 .as_ref()
1045 .map(|prev_cid| {
1046 // TODO: we should have a cid -> seq lookup eventually maybe?
1047 // this is probably fine though we will only iter over like 2 ops at most
1048 // or so, its there to handle nullified...
1049 // but a cid lookup would also help us avoid duplicate ops!
1050 self._ops_for_did(&op.did)
1051 .map(|ops| {
1052 ops.rev()
1053 .find(|r| r.as_ref().map_or(true, |(_, _, cid, _)| cid == prev_cid))
1054 .transpose()
1055 })
1056 .flatten()
1057 })
1058 .transpose()?
1059 .flatten();
1060
1061 let prev_stored = prev_op.as_ref().map(|(_, _, _, p)| &p.operation);
1062
1063 match verify_op_sig(&operation, prev_stored) {
1064 Ok(results) => {
1065 if !results.valid {
1066 let msg = results
1067 .errors
1068 .iter()
1069 .map(|e| e.to_string())
1070 .collect::<Vec<_>>()
1071 .join("\n");
1072 tracing::warn!("dropping op {} {} (seq {seq}) invalid sig:\n{msg}", op.did, op.cid);
1073 return Ok(0);
1074 }
1075 }
1076 Err(e) => {
1077 tracing::warn!("dropping op {} {} (seq {seq}): {e}", op.did, op.cid);
1078 return Ok(0);
1079 }
1080 }
1081 tracing::debug!("verified op {} {} (seq {seq})", op.did, op.cid);
1082 }
1083
1084 let db_op = DbOp {
1085 did: {
1086 let mut encoded_did = Vec::with_capacity(15);
1087 encode_did(&mut encoded_did, &op.did)?;
1088 encoded_did
1089 },
1090 cid: cid_bytes,
1091 created_at: op.created_at.timestamp_micros() as u64,
1092 nullified: op.nullified,
1093 operation,
1094 };
1095
1096 let seq_val = bitcode::encode(&db_op);
1097 let seq_key_bytes = seq_key(seq);
1098 let by_did_key_bytes = by_did_key(&op.did, seq)?;
1099
1100 let mut batch = self.inner.db.batch();
1101 batch.insert(&self.inner.ops, seq_key_bytes, seq_val);
1102 batch.insert(&self.inner.by_did, by_did_key_bytes, &[]);
1103 batch.commit()?;
1104
1105 self.inner.notify_stream.notify_waiters();
1106
1107 tracing::debug!("inserted op {} {} (seq {seq})", op.did, op.cid);
1108 Ok(1)
1109 }
1110
1111 pub(crate) fn get_op_at_or_after(&self, seq: u64) -> anyhow::Result<Option<Op>> {
1112 self.inner
1113 .ops
1114 .range(seq_key(seq)..)
1115 .next()
1116 .map(|v| {
1117 bitcode::decode::<DbOp>(&v.value()?)
1118 .context("failed to decode op")
1119 .map(|op| {
1120 Ok(Op {
1121 seq,
1122 did: decode_did(&op.did),
1123 cid: decode_cid(&op.cid)?,
1124 created_at: Dt::from_timestamp_micros(op.created_at as i64)
1125 .ok_or_else(|| anyhow::anyhow!("invalid created_at in op"))?,
1126 nullified: op.nullified,
1127 operation: op.operation.to_json_value(),
1128 })
1129 })
1130 .flatten()
1131 })
1132 .transpose()
1133 }
1134
1135 /// Decode a `by_did` entry: extract the seq from the key suffix, then
1136 /// look up the full `DbOp` in the `ops` keyspace.
1137 fn decode_by_did_entry(
1138 &self,
1139 by_did_key_bytes: &[u8],
1140 prefix_len: usize,
1141 ) -> anyhow::Result<(u64, Dt, PlcCid, DbOp)> {
1142 let key_suffix = by_did_key_bytes
1143 .get(prefix_len..)
1144 .ok_or_else(|| anyhow::anyhow!("invalid by_did key {by_did_key_bytes:?}"))?;
1145
1146 let seq =
1147 u64::decode_variable(key_suffix).context("failed to decode seq from by_did key")?;
1148
1149 let value = self
1150 .inner
1151 .ops
1152 .get(seq_key(seq))?
1153 .ok_or_else(|| anyhow::anyhow!("op not found for seq {seq}"))?;
1154
1155 let op: DbOp = bitcode::decode::<DbOp>(&value)?;
1156 let ts = Dt::from_timestamp_micros(op.created_at as i64)
1157 .ok_or_else(|| anyhow::anyhow!("invalid created_at_micros {}", op.created_at))?;
1158 let cid = PlcCid(op.cid.clone());
1159
1160 Ok((seq, ts, cid, op))
1161 }
1162
1163 fn _ops_for_did(
1164 &self,
1165 did: &str,
1166 ) -> anyhow::Result<impl DoubleEndedIterator<Item = anyhow::Result<(u64, Dt, PlcCid, DbOp)>> + '_>
1167 {
1168 let prefix = by_did_prefix(did)?;
1169
1170 Ok(self.inner.by_did.prefix(&prefix).map(move |guard| {
1171 let (by_did_key, _) = guard
1172 .into_inner()
1173 .map_err(|e| anyhow::anyhow!("fjall read error: {e}"))?;
1174 self.decode_by_did_entry(&by_did_key, prefix.len())
1175 }))
1176 }
1177
1178 pub fn ops_for_did(
1179 &self,
1180 did: &str,
1181 ) -> anyhow::Result<impl DoubleEndedIterator<Item = anyhow::Result<Op>> + '_> {
1182 Ok(self._ops_for_did(did)?.map(|res| {
1183 let (seq, ts, cid, op) = res?;
1184 let cid = decode_cid(&cid.0)?;
1185 let did = decode_did(&op.did);
1186 Ok(Op {
1187 seq,
1188 did,
1189 cid,
1190 created_at: ts,
1191 nullified: op.nullified,
1192 operation: op.operation.to_json_value(),
1193 })
1194 }))
1195 }
1196
1197 pub fn export_ops(
1198 &self,
1199 range: impl std::ops::RangeBounds<u64>,
1200 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Op>> + '_> {
1201 use std::ops::Bound;
1202 let map_bound = |b: Bound<&u64>| -> Bound<Vec<u8>> {
1203 match b {
1204 Bound::Included(seq) => Bound::Included(seq_key(*seq)),
1205 Bound::Excluded(seq) => Bound::Excluded(seq_key(*seq)),
1206 Bound::Unbounded => Bound::Unbounded,
1207 }
1208 };
1209 let range = (map_bound(range.start_bound()), map_bound(range.end_bound()));
1210
1211 Ok(self
1212 .inner
1213 .ops
1214 .range(range)
1215 .map(|item| -> anyhow::Result<Op> {
1216 let (key, value) = item
1217 .into_inner()
1218 .map_err(|e: fjall::Error| anyhow::anyhow!("fjall read error: {e}"))?;
1219 let seq = decode_seq_key(&key)?;
1220 let db_op: DbOp = bitcode::decode::<DbOp>(&value)?;
1221 let created_at =
1222 Dt::from_timestamp_micros(db_op.created_at as i64).ok_or_else(|| {
1223 anyhow::anyhow!("invalid created_at_micros {}", db_op.created_at)
1224 })?;
1225 let cid = decode_cid(&db_op.cid)?;
1226 let did = decode_did(&db_op.did);
1227 Ok(Op {
1228 seq,
1229 did,
1230 cid,
1231 created_at,
1232 nullified: db_op.nullified,
1233 operation: db_op.operation.to_json_value(),
1234 })
1235 }))
1236 }
1237
1238 pub fn drop_op(&self, did_str: &str, _created_at: &Dt, _cid: &str) -> anyhow::Result<()> {
1239 // scan the by_did index for this DID and find the op that matches
1240 // (in practice drop_op is rare so a scan is fine)
1241 let prefix = by_did_prefix(did_str)?;
1242 let mut found_seq: Option<u64> = None;
1243 let mut found_by_did_key: Option<Vec<u8>> = None;
1244
1245 for guard in self.inner.by_did.prefix(&prefix) {
1246 let (key, _) = guard
1247 .into_inner()
1248 .map_err(|e| anyhow::anyhow!("fjall read error: {e}"))?;
1249 let suffix = &key[prefix.len()..];
1250 let seq = u64::decode_variable(suffix).context("decode seq in drop_op")?;
1251 found_seq = Some(seq);
1252 found_by_did_key = Some(key.to_vec());
1253 // if there were multiple ops for this DID we'd need to match by cid,
1254 // but for now take the last matched (they're in seq order)
1255 }
1256
1257 let (seq, by_did_key_bytes) = match (found_seq, found_by_did_key) {
1258 (Some(s), Some(k)) => (s, k),
1259 _ => {
1260 tracing::warn!("drop_op: by_did entry not found for {did_str}");
1261 return Ok(());
1262 }
1263 };
1264
1265 let mut batch = self.inner.db.batch();
1266 batch.remove(&self.inner.ops, seq_key(seq));
1267 batch.remove(&self.inner.by_did, by_did_key_bytes);
1268 batch.commit()?;
1269
1270 Ok(())
1271 }
1272
1273 pub fn audit(&self, invalid_ops_tx: mpsc::Sender<InvalidOp>) -> anyhow::Result<(usize, usize)> {
1274 use std::sync::mpsc;
1275
1276 let ops = self.inner.by_did.len()?;
1277
1278 let workers = std::thread::available_parallelism()
1279 .map(|n| n.get())
1280 .unwrap_or(4);
1281
1282 type Batch = (Vec<u8>, Vec<(Dt, PlcCid, DbOp)>);
1283 let (result_tx, result_rx) = mpsc::sync_channel::<anyhow::Result<(usize, usize)>>(workers);
1284
1285 let channels: Vec<_> = (0..workers)
1286 .map(|_| mpsc::sync_channel::<Batch>(512))
1287 .collect();
1288 let senders: Vec<_> = channels.iter().map(|(tx, _)| tx.clone()).collect();
1289
1290 std::thread::scope(|s| {
1291 for (_, rx) in channels {
1292 let result_tx = result_tx.clone();
1293 let invalid_ops_tx = invalid_ops_tx.clone();
1294 s.spawn(move || {
1295 let mut checked: usize = 0;
1296 let mut failed: usize = 0;
1297 while let Ok((did_prefix, ops)) = rx.recv() {
1298 let did = decode_did(&did_prefix[..did_prefix.len() - 1]);
1299 for (ts, cid, op) in &ops {
1300 let send_invalid = || {
1301 let _ = invalid_ops_tx.blocking_send(InvalidOp {
1302 did: did.clone(),
1303 at: ts.clone(),
1304 cid: cid.to_string(),
1305 });
1306 };
1307 checked += 1;
1308 let prev_op = op.operation.prev.as_ref().and_then(|expected| {
1309 ops.iter().find(|(_, c, _)| c == expected)
1310 });
1311 let prev_cid_ok = op.operation.prev.is_none() || prev_op.is_some();
1312 if !prev_cid_ok {
1313 tracing::error!("audit: op {did} {cid} prev cid mismatch or missing predecessor, is db corrupted?");
1314 failed += 1;
1315 send_invalid();
1316 continue;
1317 }
1318 let prev_stored = prev_op.map(|(_, _, p)| &p.operation);
1319 match verify_op_sig(&op.operation, prev_stored) {
1320 Ok(results) => {
1321 if !results.valid {
1322 let msg = results
1323 .errors
1324 .iter()
1325 .map(|e| e.to_string())
1326 .collect::<Vec<_>>()
1327 .join("\n ");
1328 tracing::warn!("audit: invalid op {} {}:\n {msg}", did, cid);
1329 failed += 1;
1330 send_invalid();
1331 }
1332 }
1333 Err(e) => {
1334 tracing::warn!("audit: invalid op {} {}: {e}", did, cid);
1335 failed += 1;
1336 send_invalid();
1337 }
1338 }
1339 }
1340 }
1341 let _ = result_tx.send(Ok((checked, failed)));
1342 });
1343 }
1344 drop(result_tx);
1345
1346 // todo: probably dont use a macro...
1347 macro_rules! spawn_scan_thread {
1348 ($iter_method:ident, $start_idx:expr, $reverse:expr, $limit:expr) => {{
1349 let senders = senders.clone();
1350 let mut iter = self.inner.by_did.iter();
1351
1352 s.spawn(move || -> anyhow::Result<()> {
1353 let mut current_prefix: Option<[u8; 16]> = None;
1354 let mut did_ops: Vec<(Dt, PlcCid, DbOp)> = Vec::new();
1355 let mut idx = $start_idx;
1356 let mut processed_ops: usize = 0;
1357
1358 while let Some(guard) = iter.$iter_method() {
1359 let (by_did_key, _) = guard
1360 .into_inner()
1361 .map_err(|e| anyhow::anyhow!("fjall read error: {e}"))?;
1362
1363 let mut prefix_array = [0u8; 16];
1364 prefix_array.copy_from_slice(by_did_key.get(..16).ok_or_else(
1365 || anyhow::anyhow!("by_did key too short: {by_did_key:?}"),
1366 )?);
1367
1368 let op = self.decode_by_did_entry(&by_did_key, 16)?;
1369
1370 if current_prefix.map_or(true, |cp| cp != prefix_array) {
1371 // new did, push the ops
1372 if let Some(prefix) = current_prefix.take() {
1373 if $reverse {
1374 did_ops.reverse();
1375 }
1376 senders[idx % workers]
1377 .send((prefix.to_vec(), std::mem::take(&mut did_ops)))
1378 .ok();
1379 idx += 1;
1380
1381 if processed_ops >= $limit {
1382 break;
1383 }
1384 }
1385 current_prefix = Some(prefix_array);
1386 }
1387
1388 did_ops.push((op.1, op.2, op.3));
1389 processed_ops += 1;
1390 }
1391
1392 if let Some(prefix) = current_prefix {
1393 if $reverse {
1394 did_ops.reverse();
1395 }
1396 senders[idx % workers].send((prefix.to_vec(), did_ops)).ok();
1397 }
1398
1399 Ok(())
1400 })
1401 }};
1402 }
1403
1404 // we can start two threads, one for forward iteration and one for reverse iteration
1405 // this way we have two scans in parallel which should be faster!
1406 let f_count = ops / 2;
1407 let f_handle = spawn_scan_thread!(next, 0, false, f_count);
1408 let b_count = ops - f_count;
1409 let b_handle = spawn_scan_thread!(next_back, workers / 2, true, b_count);
1410
1411 f_handle.join().unwrap()?;
1412 b_handle.join().unwrap()?;
1413
1414 drop(senders);
1415
1416 let mut total_checked: usize = 0;
1417 let mut total_failed: usize = 0;
1418 for res in result_rx {
1419 let (c, f) = res?;
1420 total_checked += c;
1421 total_failed += f;
1422 }
1423
1424 Ok((total_checked, total_failed))
1425 })
1426 }
1427}
1428
1429pub async fn backfill_to_fjall(
1430 db: FjallDb,
1431 reset: bool,
1432 mut pages: mpsc::Receiver<crate::SeqPage>,
1433 notify_last_at: Option<oneshot::Sender<Option<Dt>>>,
1434) -> anyhow::Result<&'static str> {
1435 let t0 = Instant::now();
1436
1437 if reset {
1438 let db = db.clone();
1439 tokio::task::spawn_blocking(move || db.clear()).await??;
1440 tracing::warn!("fjall reset: cleared all data");
1441 }
1442
1443 let mut last_at = None;
1444 let mut ops_inserted: usize = 0;
1445 let mut insert_tasks: tokio::task::JoinSet<anyhow::Result<usize>> = tokio::task::JoinSet::new();
1446
1447 loop {
1448 let pages_finished = pages.is_closed();
1449 // we can stop if we have no more pages and all the insert tasks are finished
1450 if pages_finished && insert_tasks.is_empty() {
1451 break;
1452 }
1453 tokio::select! {
1454 page = pages.recv(), if !pages_finished => {
1455 let Some(page) = page else { continue; };
1456 if notify_last_at.is_some() {
1457 // SeqPage ops are always in order, so we can just grab the last one
1458 if let Some(last_op) = page.ops.last() {
1459 last_at = last_at.filter(|&l| l >= last_op.created_at).or(Some(last_op.created_at));
1460 }
1461 }
1462
1463 let db = db.clone();
1464
1465 // we don't have to wait for inserts to finish, because insert_op
1466 // without verification does not read anything from the db
1467 insert_tasks.spawn_blocking(move || {
1468 let mut count: usize = 0;
1469 for seq_op in &page.ops {
1470 let op = CommonOp {
1471 did: seq_op.did.clone(),
1472 cid: seq_op.cid.clone(),
1473 created_at: seq_op.created_at,
1474 nullified: seq_op.nullified,
1475 operation: seq_op.operation.clone(),
1476 };
1477 // we don't verify sigs for bulk, since pages might be out of order (and we trust for backfills)
1478 count += db.insert_op::<false>(&op, seq_op.seq)?;
1479 }
1480 db.persist(PersistMode::Buffer)?;
1481 Ok(count)
1482 });
1483 }
1484 Some(res) = insert_tasks.join_next() => {
1485 match res? {
1486 Ok(count) => ops_inserted += count,
1487 Err(e) => {
1488 insert_tasks.abort_all();
1489 return Err(e);
1490 }
1491 }
1492 }
1493 }
1494 }
1495 tracing::debug!("finished receiving bulk pages");
1496
1497 if let Some(notify) = notify_last_at {
1498 tracing::trace!("notifying last_at: {last_at:?}");
1499 if notify.send(last_at).is_err() {
1500 tracing::error!("receiver for last_at dropped, can't notify");
1501 };
1502 }
1503
1504 tokio::task::spawn_blocking(move || db.persist(PersistMode::SyncAll)).await??;
1505
1506 tracing::info!(
1507 "backfill_to_fjall: inserted {ops_inserted} ops in {:?}",
1508 t0.elapsed()
1509 );
1510 Ok("backfill_to_fjall")
1511}
1512
1513/// Write sequenced ops (with PLC seq numbers) into fjall.
1514pub async fn seq_pages_to_fjall(
1515 db: FjallDb,
1516 mut pages: mpsc::Receiver<crate::SeqPage>,
1517) -> anyhow::Result<&'static str> {
1518 tracing::info!("starting seq_pages_to_fjall writer...");
1519
1520 let t0 = Instant::now();
1521 let mut ops_inserted: usize = 0;
1522
1523 while let Some(page) = pages.recv().await {
1524 let first_seq = page.ops.first().map(|op| op.seq);
1525 let last_seq = page.ops.last().map(|op| op.seq);
1526 tracing::debug!(
1527 "seq_pages: received page with {} ops, seq {:?}..{:?}",
1528 page.ops.len(),
1529 first_seq,
1530 last_seq
1531 );
1532 let page_len = page.ops.len();
1533 let db = db.clone();
1534 let count = tokio::task::spawn_blocking(move || -> anyhow::Result<usize> {
1535 let mut count: usize = 0;
1536 for seq_op in &page.ops {
1537 tracing::debug!("seq_pages: processing op {} {} (seq {})", seq_op.did, seq_op.cid, seq_op.seq);
1538 let common_op = CommonOp {
1539 did: seq_op.did.clone(),
1540 cid: seq_op.cid.clone(),
1541 created_at: seq_op.created_at,
1542 nullified: seq_op.nullified,
1543 operation: seq_op.operation.clone(),
1544 };
1545 count += db.insert_op::<true>(&common_op, seq_op.seq)?;
1546 }
1547 db.persist(PersistMode::Buffer)?;
1548 Ok(count)
1549 })
1550 .await??;
1551 if count < page_len {
1552 tracing::warn!(
1553 "seq_pages: page seq {:?}..{:?} inserted {count}/{page_len} ops ({} dropped)",
1554 first_seq,
1555 last_seq,
1556 page_len - count
1557 );
1558 }
1559 ops_inserted += count;
1560 }
1561
1562 tracing::info!(
1563 "no more seq pages. inserted {ops_inserted} ops in {:?}",
1564 t0.elapsed()
1565 );
1566 Ok("seq_pages_to_fjall")
1567}
1568
1569pub async fn audit(
1570 db: FjallDb,
1571 invalid_ops_tx: mpsc::Sender<InvalidOp>,
1572) -> anyhow::Result<&'static str> {
1573 tracing::info!("starting fjall audit...");
1574 let t0 = std::time::Instant::now();
1575 let (checked, failed) = tokio::task::spawn_blocking(move || db.audit(invalid_ops_tx)).await??;
1576 tracing::info!(
1577 "fjall audit complete in {:?}, {checked} ops checked",
1578 t0.elapsed()
1579 );
1580 if failed > 0 {
1581 tracing::error!("audit found {failed} invalid operations");
1582 }
1583 Ok("audit_fjall")
1584}
1585
1586pub async fn fix_ops(
1587 db: FjallDb,
1588 upstream: reqwest::Url,
1589 only_drop: bool,
1590 mut invalid_ops_rx: mpsc::Receiver<InvalidOp>,
1591) -> anyhow::Result<&'static str> {
1592 tracing::info!("starting fjall fix ops...");
1593 let mut fixed_dids = std::collections::HashSet::new();
1594 let mut count = 0;
1595
1596 let latest_at = db
1597 .get_latest()?
1598 .ok_or_else(|| anyhow::anyhow!("db not backfilled? expected at least one op"))
1599 .map(|(_, dt)| dt)?;
1600
1601 // local seq counter for newly fetched ops
1602 let mut next_seq = db.get_latest()?.map(|(s, _)| s).unwrap_or(0) + 1;
1603
1604 while let Some(op) = invalid_ops_rx.recv().await {
1605 let InvalidOp { did, at, cid, .. } = op;
1606
1607 if only_drop {
1608 db.drop_op(&did, &at, &cid)?;
1609 db.persist(PersistMode::Buffer)?;
1610 count += 1;
1611 continue;
1612 }
1613
1614 if fixed_dids.contains(&did) {
1615 continue;
1616 }
1617
1618 tracing::trace!("fetching upstream ops to fix did: {did}");
1619 let mut url = upstream.clone();
1620 url.set_path(&format!("/{did}/log/audit"));
1621
1622 let resp = crate::CLIENT.get(url).send().await?;
1623
1624 use reqwest::StatusCode;
1625 let ops: Vec<CommonOp> = match resp.status() {
1626 StatusCode::OK => match resp.json().await {
1627 Ok(ops) => ops,
1628 Err(e) => {
1629 tracing::warn!("failed to parse upstream ops for {did}: {e}");
1630 continue;
1631 }
1632 },
1633 StatusCode::NOT_FOUND => {
1634 tracing::trace!("did not found upstream: {did}");
1635 Vec::new() // this essentially means drop the whole did
1636 }
1637 s => {
1638 tracing::warn!("failed to fetch upstream for {did}: {s}");
1639 continue;
1640 }
1641 };
1642
1643 tracing::trace!("fetched {} ops for {did}", ops.len());
1644
1645 // we drop all ops first just to be safe
1646 let existing = db.ops_for_did(&did)?;
1647 for op in existing {
1648 let op = op?;
1649 db.drop_op(&did, &op.created_at, &op.cid)?;
1650 }
1651
1652 // then insert the fresh ops
1653 for op in ops {
1654 // skip newer ops, since we will fill them in later anyway
1655 // if we don't skip these we might miss some ops in between
1656 // the latest_at we started with vs the one we ended up with
1657 if op.created_at > latest_at {
1658 tracing::trace!(
1659 "skipping op {} for {did} because it is newer than latest_at {latest_at}",
1660 op.cid
1661 );
1662 continue;
1663 }
1664
1665 let seq = next_seq;
1666 next_seq += 1;
1667 count += db.insert_op::<true>(&op, seq)?;
1668 }
1669
1670 db.persist(PersistMode::Buffer)?;
1671 fixed_dids.insert(did);
1672 }
1673
1674 tracing::info!("fixed {count} ops");
1675
1676 Ok("fix_ops_fjall")
1677}
1678
1679#[cfg(test)]
1680mod tests {
1681 use super::*;
1682
1683 #[test]
1684 fn plc_cid_roundtrip() {
1685 let original = "bafyreigp6shzy6dlcxuowwoxz7u5nemdrkad2my5zwzpwilcnhih7bw6zm";
1686 let cid = PlcCid::from_cid_str(original).unwrap();
1687 assert_eq!(cid.to_string(), original);
1688 }
1689
1690 #[test]
1691 fn bsky_aka_roundtrip() {
1692 let h = Aka::from_str("at://alice.bsky.social");
1693 assert_eq!(h, Aka::Bluesky("alice".to_string()));
1694 assert_eq!(h.to_string(), "at://alice.bsky.social");
1695 }
1696
1697 #[test]
1698 fn atproto_aka_roundtrip() {
1699 let h = Aka::from_str("at://alice.example.com");
1700 assert_eq!(h, Aka::Atproto("alice.example.com".to_string()));
1701 assert_eq!(h.to_string(), "at://alice.example.com");
1702 }
1703
1704 #[test]
1705 fn other_aka_roundtrip() {
1706 let h = Aka::from_str("https://something.else");
1707 assert_eq!(h, Aka::Other("https://something.else".to_string()));
1708 assert_eq!(h.to_string(), "https://something.else");
1709 }
1710
1711 #[test]
1712 fn handle_bsky_social_roundtrip() {
1713 let h = Handle::from_str("alice.bsky.social");
1714 assert_eq!(h, Handle::BskySocial("alice".to_string()));
1715 assert_eq!(h.as_string(), "alice.bsky.social");
1716 }
1717
1718 #[test]
1719 fn handle_other_roundtrip() {
1720 let h = Handle::from_str("user.example.com");
1721 assert_eq!(h, Handle::Other("user.example.com".to_string()));
1722 assert_eq!(h.as_string(), "user.example.com");
1723 }
1724
1725 #[test]
1726 fn verification_method_key_roundtrip() {
1727 let k1 = VerificationMethodKey::from_str("atproto");
1728 assert_eq!(k1, VerificationMethodKey::Atproto);
1729 assert_eq!(k1.to_string(), "atproto");
1730
1731 let k2 = VerificationMethodKey::from_str("other_key");
1732 assert_eq!(k2, VerificationMethodKey::Other("other_key".to_string()));
1733 assert_eq!(k2.to_string(), "other_key");
1734 }
1735
1736 #[test]
1737 fn service_key_roundtrip() {
1738 let k1 = ServiceKey::from_str("atproto_pds");
1739 assert_eq!(k1, ServiceKey::AtprotoPds);
1740 assert_eq!(k1.to_string(), "atproto_pds");
1741
1742 let k2 = ServiceKey::from_str("other_svc");
1743 assert_eq!(k2, ServiceKey::Other("other_svc".to_string()));
1744 assert_eq!(k2.to_string(), "other_svc");
1745 }
1746
1747 #[test]
1748 fn service_type_roundtrip() {
1749 let t1 = ServiceType::from_str("AtprotoPersonalDataServer");
1750 assert_eq!(t1, ServiceType::AtprotoPersonalDataServer);
1751 assert_eq!(t1.as_str(), "AtprotoPersonalDataServer");
1752
1753 let t2 = ServiceType::from_str("OtherType");
1754 assert_eq!(t2, ServiceType::Other("OtherType".to_string()));
1755 assert_eq!(t2.as_str(), "OtherType");
1756 }
1757
1758 #[test]
1759 fn service_endpoint_roundtrip() {
1760 let e1 = ServiceEndpoint::from_str("https://example.host.bsky.network");
1761 assert_eq!(e1, ServiceEndpoint::BlueskyPds("example".to_string()));
1762 assert_eq!(e1.as_string(), "https://example.host.bsky.network");
1763
1764 let e2 = ServiceEndpoint::from_str("https://bsky.social");
1765 assert_eq!(e2, ServiceEndpoint::BlueskySocial);
1766 assert_eq!(e2.as_string(), "https://bsky.social");
1767
1768 let e3 = ServiceEndpoint::from_str("https://other.endpoint.com");
1769 assert_eq!(
1770 e3,
1771 ServiceEndpoint::Other("https://other.endpoint.com".to_string())
1772 );
1773 assert_eq!(e3.as_string(), "https://other.endpoint.com");
1774 }
1775
1776 #[test]
1777 fn op_type_roundtrip() {
1778 assert_eq!(OpType::from_str("plc_operation").as_str(), "plc_operation");
1779 assert_eq!(OpType::from_str("create").as_str(), "create");
1780 assert_eq!(OpType::from_str("plc_tombstone").as_str(), "plc_tombstone");
1781 assert_eq!(OpType::from_str("weird_thing").as_str(), "weird_thing");
1782 }
1783
1784 #[test]
1785 fn stored_op_fixture_roundtrip() {
1786 let mut fixtures: Vec<_> = std::fs::read_dir("tests/fixtures")
1787 .unwrap()
1788 .filter_map(|e| e.ok())
1789 .map(|e| e.path())
1790 .filter(|p| p.extension().map_or(false, |ext| ext == "json"))
1791 .collect();
1792 fixtures.sort();
1793
1794 let mut total_json_size = 0;
1795 let mut total_packed_size = 0;
1796
1797 for path in &fixtures {
1798 let data = std::fs::read_to_string(path).unwrap();
1799 let entries: Vec<serde_json::Value> = serde_json::from_str(&data).unwrap();
1800
1801 for entry in &entries {
1802 let op = &entry["operation"];
1803 let (stored, errors) = StoredOp::from_json_value(op.clone());
1804 if !errors.is_empty() {
1805 let mut msg = format!("failed to parse op in {}:\n", path.display());
1806 for e in errors {
1807 msg.push_str(&format!(" - {e:?}\n"));
1808 }
1809 msg.push_str(&format!("op: {op}\n"));
1810 panic!("{msg}");
1811 }
1812 let stored = stored.unwrap();
1813
1814 let packed = bitcode::encode(&stored);
1815 let unpacked: StoredOp = bitcode::decode::<StoredOp>(&packed).unwrap();
1816
1817 let reconstructed = unpacked.to_json_value();
1818 assert_eq!(
1819 *op,
1820 reconstructed,
1821 "roundtrip mismatch in {}",
1822 path.display()
1823 );
1824
1825 total_json_size += serde_json::to_vec(op).unwrap().len();
1826 total_packed_size += packed.len();
1827 }
1828 }
1829
1830 println!(
1831 "json size: {} bytes, bitcode size: {} bytes, saved: {} bytes",
1832 total_json_size,
1833 total_packed_size,
1834 total_json_size as isize - total_packed_size as isize
1835 );
1836 }
1837
1838 #[test]
1839 fn stored_op_fixture_sig_roundtrip() {
1840 let mut fixtures: Vec<_> = std::fs::read_dir("tests/fixtures")
1841 .unwrap()
1842 .filter_map(|e| e.ok())
1843 .map(|e| e.path())
1844 .filter(|p| p.extension().map_or(false, |ext| ext == "json"))
1845 .collect();
1846 fixtures.sort();
1847
1848 for path in &fixtures {
1849 let data = std::fs::read_to_string(path).unwrap();
1850 let entries: Vec<serde_json::Value> = serde_json::from_str(&data).unwrap();
1851
1852 // build a cid -> StoredOp map so we can look up prev ops
1853 let mut by_cid: std::collections::HashMap<String, StoredOp> =
1854 std::collections::HashMap::new();
1855
1856 for entry in &entries {
1857 let cid = entry["cid"].as_str().unwrap().to_string();
1858 let op_json = entry["operation"].clone();
1859
1860 let (stored, errors) = StoredOp::from_json_value(op_json);
1861 assert!(
1862 errors.is_empty(),
1863 "{} {cid}: parse errors: {errors:?}",
1864 path.display()
1865 );
1866 let stored = stored.unwrap();
1867
1868 let prev = stored.prev.as_ref().map(|c| c.to_string());
1869 let prev_stored = prev.as_deref().and_then(|c| by_cid.get(c));
1870
1871 let results = verify_op_sig(&stored, prev_stored)
1872 .unwrap_or_else(|e| panic!("{} {cid}: {e}", path.display()));
1873 assert!(
1874 results.valid,
1875 "{} {cid}: sig invalid after StoredOp roundtrip: {:?}",
1876 path.display(),
1877 results.errors
1878 );
1879
1880 by_cid.insert(cid, stored);
1881 }
1882 }
1883 }
1884}