Server tools to backfill, tail, mirror, and verify PLC logs
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fjall: fix nullification handling + pipeline bulk inserts

dawn 206eed9f e4f29238

+95 -55
+95 -55
src/plc_fjall.rs
··· 4 4 }; 5 5 use anyhow::Context; 6 6 use data_encoding::BASE32_NOPAD; 7 - use fjall::{ 8 - Database, Keyspace, KeyspaceCreateOptions, OwnedWriteBatch, PersistMode, 9 - config::BlockSizePolicy, 10 - }; 7 + use fjall::{Database, Keyspace, KeyspaceCreateOptions, PersistMode, config::BlockSizePolicy}; 11 8 use futures::Future; 12 9 use serde::{Deserialize, Serialize}; 13 10 use std::collections::BTreeMap; ··· 92 89 } 93 90 94 91 /// CID string → binary CID bytes 95 - #[derive(Debug, Clone, Serialize, Deserialize)] 92 + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 96 93 struct PlcCid(#[serde(with = "serde_bytes")] Vec<u8>); 97 94 98 95 impl PlcCid { ··· 847 844 // 32mb is too low we can afford more 848 845 // this should be configurable though! 849 846 .cache_size(mb(256)) 847 + .manual_journal_persist(true) 850 848 .open()?; 851 849 let opts = KeyspaceCreateOptions::default; 852 850 let ops = db.keyspace("ops", || { ··· 877 875 Ok(()) 878 876 } 879 877 880 - pub fn persist(&self) -> fjall::Result<()> { 881 - self.inner.db.persist(PersistMode::SyncAll) 878 + pub fn persist(&self, mode: PersistMode) -> fjall::Result<()> { 879 + self.inner.db.persist(mode) 882 880 } 883 881 884 882 pub fn compact(&self) -> fjall::Result<()> { ··· 902 900 .map(Some) 903 901 } 904 902 905 - pub fn insert_op(&self, batch: &mut OwnedWriteBatch, op: &CommonOp) -> anyhow::Result<usize> { 903 + pub fn insert_op<const VERIFY: bool>(&self, op: &CommonOp) -> anyhow::Result<usize> { 906 904 let cid_bytes = decode_cid_str(&op.cid)?; 907 905 let cid_prefix = cid_bytes 908 906 .get(..30) ··· 927 925 return Ok(0); 928 926 } 929 927 930 - // get keys from either prev or genesis op 931 - let prev_op = self._ops_for_did(&op.did)?.rev().next().transpose()?; 932 - let keys = prev_op.as_ref().map_or_else( 933 - || operation.get_keys(), 934 - |(_, _, op)| op.operation.get_keys(), 935 - ); 928 + if VERIFY { 929 + let prev_op = operation 930 + .prev 931 + .as_ref() 932 + .map(|prev_cid| { 933 + self._ops_for_did(&op.did) 934 + .map(|ops| { 935 + ops.rev() 936 + .find(|r| r.as_ref().map_or(true, |(_, cid, _)| cid == prev_cid)) 937 + .transpose() 938 + }) 939 + .flatten() 940 + }) 941 + .transpose()? 942 + .flatten(); 936 943 937 - if keys.is_empty() { 938 - log::warn!("no keys for op {} {}", op.did, op.cid); 939 - return Ok(0); 940 - } 944 + let keys: Vec<&DidKey> = match &operation.prev { 945 + None => operation.get_keys(), 946 + Some(_) => match &prev_op { 947 + None => { 948 + log::error!( 949 + "op {} {} has prev but the prev op is not found", 950 + op.did, 951 + op.cid 952 + ); 953 + return Ok(0); 954 + } 955 + Some((_, _, prev)) => prev.operation.get_keys(), 956 + }, 957 + }; 941 958 942 - let data = { 943 - let serde_json::Value::Object(mut data) = operation.to_json_value() else { 944 - unreachable!("we checked if operation is valid already") 959 + if keys.is_empty() { 960 + log::warn!("no keys for op {} {}", op.did, op.cid); 961 + return Ok(0); 962 + } 963 + 964 + let data = { 965 + let serde_json::Value::Object(mut data) = operation.to_json_value() else { 966 + unreachable!("we checked if operation is valid already") 967 + }; 968 + data.remove("sig"); 969 + serde_json::Value::Object(data) 945 970 }; 946 - data.remove("sig"); 947 - serde_json::Value::Object(data) 948 - }; 949 - let results = assure_valid_sig(keys, &operation.sig, &data)?; 950 - if !results.valid { 951 - for err in results.errors { 952 - log::warn!("invalid signature for op {} {}: {err}", op.did, op.cid); 971 + let results = assure_valid_sig(keys, &operation.sig, &data)?; 972 + if !results.valid { 973 + for err in results.errors { 974 + log::warn!("invalid signature for op {} {}: {err}", op.did, op.cid); 975 + } 976 + return Ok(0); 953 977 } 954 - // don't accept invalid ops 955 - return Ok(0); 956 978 } 957 979 958 980 let db_op = DbOp { ··· 966 988 operation, 967 989 }; 968 990 991 + let mut batch = self.inner.db.batch(); 969 992 batch.insert( 970 993 &self.inner.ops, 971 994 op_key(&op.created_at, cid_suffix), ··· 976 999 by_did_key(&op.did, &op.created_at, cid_suffix)?, 977 1000 &[], 978 1001 ); 1002 + batch.commit()?; 979 1003 980 1004 Ok(1) 981 1005 } ··· 983 1007 fn _ops_for_did( 984 1008 &self, 985 1009 did: &str, 986 - ) -> anyhow::Result<impl DoubleEndedIterator<Item = anyhow::Result<(Dt, String, DbOp)>> + '_> 1010 + ) -> anyhow::Result<impl DoubleEndedIterator<Item = anyhow::Result<(Dt, PlcCid, DbOp)>> + '_> 987 1011 { 988 1012 let prefix = by_did_prefix(did)?; 989 1013 ··· 1016 1040 let mut full_cid_bytes = op.cid_prefix.clone(); 1017 1041 full_cid_bytes.extend_from_slice(cid_suffix); 1018 1042 1019 - let cid = decode_cid(&full_cid_bytes)?; 1043 + let cid = PlcCid(full_cid_bytes); 1020 1044 1021 1045 Ok((ts, cid, op)) 1022 1046 })) ··· 1029 1053 Ok(self._ops_for_did(did)?.map(|res| { 1030 1054 let (ts, cid, op) = res?; 1031 1055 1056 + let cid = decode_cid(&cid.0)?; 1032 1057 let did = decode_did(&op.did); 1033 1058 1034 1059 Ok(Op { ··· 1155 1180 1156 1181 let mut last_at = None; 1157 1182 let mut ops_inserted: usize = 0; 1183 + let mut insert_tasks: tokio::task::JoinSet<anyhow::Result<usize>> = tokio::task::JoinSet::new(); 1158 1184 1159 - while let Some(page) = pages.recv().await { 1160 - let should_track = notify_last_at.is_some(); 1161 - if should_track { 1162 - if let Some(s) = PageBoundaryState::new(&page) { 1163 - last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at)); 1185 + loop { 1186 + let pages_finished = pages.is_closed(); 1187 + if pages_finished && insert_tasks.is_empty() { 1188 + break; 1189 + } 1190 + tokio::select! { 1191 + page = pages.recv(), if !pages_finished => { 1192 + let Some(page) = page else { continue; }; 1193 + if notify_last_at.is_some() { 1194 + if let Some(s) = PageBoundaryState::new(&page) { 1195 + last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at)); 1196 + } 1197 + } 1198 + let db = db.clone(); 1199 + // we don't have to wait for inserts to finish, because insert_op 1200 + // without verification does not read anything from the db 1201 + insert_tasks.spawn_blocking(move || { 1202 + let mut count: usize = 0; 1203 + for op in &page.ops { 1204 + // we don't verify sigs for bulk, since pages might be out of order 1205 + count += db.insert_op::<false>(op)?; 1206 + } 1207 + db.persist(PersistMode::Buffer)?; 1208 + Ok(count) 1209 + }); 1210 + } 1211 + Some(res) = insert_tasks.join_next() => { 1212 + match res? { 1213 + Ok(count) => ops_inserted += count, 1214 + Err(e) => { 1215 + insert_tasks.abort_all(); 1216 + return Err(e); 1217 + } 1218 + } 1164 1219 } 1165 1220 } 1166 - 1167 - let db = db.clone(); 1168 - let count = tokio::task::spawn_blocking(move || -> anyhow::Result<usize> { 1169 - let mut batch = db.inner.db.batch(); 1170 - let mut count: usize = 0; 1171 - for op in &page.ops { 1172 - count += db.insert_op(&mut batch, op)?; 1173 - } 1174 - batch.commit()?; 1175 - Ok(count) 1176 - }) 1177 - .await??; 1178 - ops_inserted += count; 1179 1221 } 1180 1222 log::debug!("finished receiving bulk pages"); 1181 1223 ··· 1186 1228 }; 1187 1229 } 1188 1230 1189 - let db = db.clone(); 1190 - tokio::task::spawn_blocking(move || db.persist()).await??; 1231 + tokio::task::spawn_blocking(move || db.persist(PersistMode::SyncAll)).await??; 1191 1232 1192 1233 log::info!( 1193 1234 "backfill_to_fjall: inserted {ops_inserted} ops in {:?}", ··· 1209 1250 log::trace!("writing page with {} ops", page.ops.len()); 1210 1251 let db = db.clone(); 1211 1252 let count = tokio::task::spawn_blocking(move || -> anyhow::Result<usize> { 1212 - let mut batch = db.inner.db.batch(); 1213 1253 let mut count: usize = 0; 1214 1254 for op in &page.ops { 1215 - count += db.insert_op(&mut batch, op)?; 1255 + count += db.insert_op::<true>(op)?; 1216 1256 } 1217 - batch.commit()?; 1257 + db.persist(PersistMode::Buffer)?; 1218 1258 Ok(count) 1219 1259 }) 1220 1260 .await??;